jpayne@68: # Copyright 2002 by Andrew Dalke. All rights reserved. jpayne@68: # Revisions 2007-2016 copyright by Peter Cock. All rights reserved. jpayne@68: # Revisions 2008 copyright by Cymon J. Cox. All rights reserved. jpayne@68: # jpayne@68: # This file is part of the Biopython distribution and governed by your jpayne@68: # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". jpayne@68: # Please see the LICENSE file that should have been included as part of this jpayne@68: # package. jpayne@68: # jpayne@68: # Note that BioSQL (including the database schema and scripts) is jpayne@68: # available and licensed separately. Please consult www.biosql.org jpayne@68: jpayne@68: """Load biopython objects into a BioSQL database for persistent storage. jpayne@68: jpayne@68: This code makes it possible to store biopython objects in a relational jpayne@68: database and then retrieve them back. You shouldn't use any of the jpayne@68: classes in this module directly. Rather, call the load() method on jpayne@68: a database object. jpayne@68: """ jpayne@68: # standard modules jpayne@68: from time import gmtime, strftime jpayne@68: jpayne@68: # biopython jpayne@68: from Bio.SeqUtils.CheckSum import crc64 jpayne@68: from Bio import Entrez jpayne@68: from Bio.Seq import UndefinedSequenceError jpayne@68: from Bio.SeqFeature import UnknownPosition jpayne@68: jpayne@68: jpayne@68: class DatabaseLoader: jpayne@68: """Object used to load SeqRecord objects into a BioSQL database.""" jpayne@68: jpayne@68: def __init__(self, adaptor, dbid, fetch_NCBI_taxonomy=False): jpayne@68: """Initialize with connection information for the database. jpayne@68: jpayne@68: Creating a DatabaseLoader object is normally handled via the jpayne@68: BioSeqDatabase DBServer object, for example:: jpayne@68: jpayne@68: from BioSQL import BioSeqDatabase jpayne@68: server = BioSeqDatabase.open_database(driver="MySQLdb", jpayne@68: user="gbrowse", jpayne@68: passwd="biosql", jpayne@68: host="localhost", jpayne@68: db="test_biosql") jpayne@68: try: jpayne@68: db = server["test"] jpayne@68: except KeyError: jpayne@68: db = server.new_database("test", jpayne@68: description="For testing GBrowse") jpayne@68: jpayne@68: """ jpayne@68: self.adaptor = adaptor jpayne@68: self.dbid = dbid jpayne@68: self.fetch_NCBI_taxonomy = fetch_NCBI_taxonomy jpayne@68: jpayne@68: def load_seqrecord(self, record): jpayne@68: """Load a Biopython SeqRecord into the database.""" jpayne@68: bioentry_id = self._load_bioentry_table(record) jpayne@68: self._load_bioentry_date(record, bioentry_id) jpayne@68: self._load_biosequence(record, bioentry_id) jpayne@68: self._load_comment(record, bioentry_id) jpayne@68: self._load_dbxrefs(record, bioentry_id) jpayne@68: references = record.annotations.get("references", ()) jpayne@68: for reference, rank in zip(references, list(range(len(references)))): jpayne@68: self._load_reference(reference, rank, bioentry_id) jpayne@68: self._load_annotations(record, bioentry_id) jpayne@68: for seq_feature_num in range(len(record.features)): jpayne@68: seq_feature = record.features[seq_feature_num] jpayne@68: self._load_seqfeature(seq_feature, seq_feature_num, bioentry_id) jpayne@68: jpayne@68: def _get_ontology_id(self, name, definition=None): jpayne@68: """Return identifier for the named ontology (PRIVATE). jpayne@68: jpayne@68: This looks through the onotology table for a the given entry name. jpayne@68: If it is not found, a row is added for this ontology (using the jpayne@68: definition if supplied). In either case, the id corresponding to jpayne@68: the provided name is returned, so that you can reference it in jpayne@68: another table. jpayne@68: """ jpayne@68: oids = self.adaptor.execute_and_fetch_col0( jpayne@68: "SELECT ontology_id FROM ontology WHERE name = %s", (name,) jpayne@68: ) jpayne@68: if oids: jpayne@68: return oids[0] jpayne@68: self.adaptor.execute( jpayne@68: "INSERT INTO ontology(name, definition) VALUES (%s, %s)", (name, definition) jpayne@68: ) jpayne@68: return self.adaptor.last_id("ontology") jpayne@68: jpayne@68: def _get_term_id(self, name, ontology_id=None, definition=None, identifier=None): jpayne@68: """Get the id that corresponds to a term (PRIVATE). jpayne@68: jpayne@68: This looks through the term table for a the given term. If it jpayne@68: is not found, a new id corresponding to this term is created. jpayne@68: In either case, the id corresponding to that term is returned, so jpayne@68: that you can reference it in another table. jpayne@68: jpayne@68: The ontology_id should be used to disambiguate the term. jpayne@68: """ jpayne@68: # try to get the term id jpayne@68: sql = "SELECT term_id FROM term WHERE name = %s" jpayne@68: fields = [name] jpayne@68: if ontology_id: jpayne@68: sql += " AND ontology_id = %s" jpayne@68: fields.append(ontology_id) jpayne@68: id_results = self.adaptor.execute_and_fetchall(sql, fields) jpayne@68: # something is wrong jpayne@68: if len(id_results) > 1: jpayne@68: raise ValueError(f"Multiple term ids for {name}: {id_results!r}") jpayne@68: elif len(id_results) == 1: jpayne@68: return id_results[0][0] jpayne@68: else: jpayne@68: sql = ( jpayne@68: "INSERT INTO term (name, definition," jpayne@68: " identifier, ontology_id)" jpayne@68: " VALUES (%s, %s, %s, %s)" jpayne@68: ) jpayne@68: self.adaptor.execute(sql, (name, definition, identifier, ontology_id)) jpayne@68: return self.adaptor.last_id("term") jpayne@68: jpayne@68: def _add_dbxref(self, dbname, accession, version): jpayne@68: """Insert a dbxref and return its id (PRIVATE).""" jpayne@68: self.adaptor.execute( jpayne@68: "INSERT INTO dbxref(dbname, accession, version) VALUES (%s, %s, %s)", jpayne@68: (dbname, accession, version), jpayne@68: ) jpayne@68: return self.adaptor.last_id("dbxref") jpayne@68: jpayne@68: def _get_taxon_id(self, record): jpayne@68: """Get the taxon id for this record (PRIVATE). jpayne@68: jpayne@68: Arguments: jpayne@68: - record - a SeqRecord object jpayne@68: jpayne@68: This searches the taxon/taxon_name tables using the jpayne@68: NCBI taxon ID, scientific name and common name to find jpayne@68: the matching taxon table entry's id. jpayne@68: jpayne@68: If the species isn't in the taxon table, and we have at jpayne@68: least the NCBI taxon ID, scientific name or common name, jpayne@68: at least a minimal stub entry is created in the table. jpayne@68: jpayne@68: Returns the taxon id (database key for the taxon table, jpayne@68: not an NCBI taxon ID), or None if the taxonomy information jpayne@68: is missing. jpayne@68: jpayne@68: See also the BioSQL script load_ncbi_taxonomy.pl which jpayne@68: will populate and update the taxon/taxon_name tables jpayne@68: with the latest information from the NCBI. jpayne@68: """ jpayne@68: # To find the NCBI taxid, first check for a top level annotation jpayne@68: ncbi_taxon_id = None jpayne@68: if "ncbi_taxid" in record.annotations: jpayne@68: # Could be a list of IDs. jpayne@68: if isinstance(record.annotations["ncbi_taxid"], list): jpayne@68: if len(record.annotations["ncbi_taxid"]) == 1: jpayne@68: ncbi_taxon_id = record.annotations["ncbi_taxid"][0] jpayne@68: else: jpayne@68: ncbi_taxon_id = record.annotations["ncbi_taxid"] jpayne@68: if not ncbi_taxon_id: jpayne@68: # Secondly, look for a source feature jpayne@68: for f in record.features: jpayne@68: if f.type == "source": jpayne@68: quals = getattr(f, "qualifiers", {}) jpayne@68: if "db_xref" in quals: jpayne@68: for db_xref in f.qualifiers["db_xref"]: jpayne@68: if db_xref.startswith("taxon:"): jpayne@68: ncbi_taxon_id = int(db_xref[6:]) jpayne@68: break jpayne@68: if ncbi_taxon_id: jpayne@68: break jpayne@68: jpayne@68: try: jpayne@68: scientific_name = record.annotations["organism"][:255] jpayne@68: except KeyError: jpayne@68: scientific_name = None jpayne@68: try: jpayne@68: common_name = record.annotations["source"][:255] jpayne@68: except KeyError: jpayne@68: common_name = None jpayne@68: # Note: The maximum length for taxon names in the schema is 255. jpayne@68: # Cropping it now should help in getting a match when searching, jpayne@68: # and avoids an error if we try and add these to the database. jpayne@68: jpayne@68: if ncbi_taxon_id: jpayne@68: # Good, we have the NCBI taxon to go on - this is unambiguous :) jpayne@68: # Note that the scientific name and common name will only be jpayne@68: # used if we have to record a stub entry. jpayne@68: return self._get_taxon_id_from_ncbi_taxon_id( jpayne@68: ncbi_taxon_id, scientific_name, common_name jpayne@68: ) jpayne@68: jpayne@68: if not common_name and not scientific_name: jpayne@68: # Nothing to go on... and there is no point adding jpayne@68: # a new entry to the database. We'll just leave this jpayne@68: # sequence's taxon as a NULL in the database. jpayne@68: return None jpayne@68: jpayne@68: # Next, we'll try to find a match based on the species name jpayne@68: # (stored in GenBank files as the organism and/or the source). jpayne@68: if scientific_name: jpayne@68: taxa = self.adaptor.execute_and_fetch_col0( jpayne@68: "SELECT taxon_id FROM taxon_name" jpayne@68: " WHERE name_class = 'scientific name' AND name = %s", jpayne@68: (scientific_name,), jpayne@68: ) jpayne@68: if taxa: jpayne@68: # Good, mapped the scientific name to a taxon table entry jpayne@68: return taxa[0] jpayne@68: jpayne@68: # Last chance... jpayne@68: if common_name: jpayne@68: taxa = self.adaptor.execute_and_fetch_col0( jpayne@68: "SELECT DISTINCT taxon_id FROM taxon_name WHERE name = %s", jpayne@68: (common_name,), jpayne@68: ) jpayne@68: # Its natural that several distinct taxa will have the same common jpayne@68: # name - in which case we can't resolve the taxon uniquely. jpayne@68: if len(taxa) > 1: jpayne@68: raise ValueError( jpayne@68: "Taxa: %d species have name %r" % (len(taxa), common_name) jpayne@68: ) jpayne@68: if taxa: jpayne@68: # Good, mapped the common name to a taxon table entry jpayne@68: return taxa[0] jpayne@68: jpayne@68: # At this point, as far as we can tell, this species isn't jpayne@68: # in the taxon table already. So we'll have to add it. jpayne@68: # We don't have an NCBI taxonomy ID, so if we do record just jpayne@68: # a stub entry, there is no simple way to fix this later. jpayne@68: # jpayne@68: # TODO - Should we try searching the NCBI taxonomy using the jpayne@68: # species name? jpayne@68: # jpayne@68: # OK, let's try inserting the species. jpayne@68: # Chances are we don't have enough information ... jpayne@68: # Furthermore, it won't be in the hierarchy. jpayne@68: jpayne@68: lineage = [] jpayne@68: for c in record.annotations.get("taxonomy", []): jpayne@68: lineage.append([None, None, c]) jpayne@68: if lineage: jpayne@68: lineage[-1][1] = "genus" jpayne@68: lineage.append([None, "species", record.annotations["organism"]]) jpayne@68: # XXX do we have them? jpayne@68: if "subspecies" in record.annotations: jpayne@68: lineage.append([None, "subspecies", record.annotations["subspecies"]]) jpayne@68: if "variant" in record.annotations: jpayne@68: lineage.append([None, "varietas", record.annotations["variant"]]) jpayne@68: lineage[-1][0] = ncbi_taxon_id jpayne@68: jpayne@68: left_value = self.adaptor.execute_one("SELECT MAX(left_value) FROM taxon")[0] jpayne@68: if not left_value: jpayne@68: left_value = 0 jpayne@68: left_value += 1 jpayne@68: jpayne@68: # XXX -- Brad: Fixing this for now in an ugly way because jpayne@68: # I am getting overlaps for right_values. I need to dig into this jpayne@68: # more to actually understand how it works. I'm not sure it is jpayne@68: # actually working right anyhow. jpayne@68: right_start_value = self.adaptor.execute_one( jpayne@68: "SELECT MAX(right_value) FROM taxon" jpayne@68: )[0] jpayne@68: if not right_start_value: jpayne@68: right_start_value = 0 jpayne@68: right_value = right_start_value + 2 * len(lineage) - 1 jpayne@68: jpayne@68: parent_taxon_id = None jpayne@68: for taxon in lineage: jpayne@68: self.adaptor.execute( jpayne@68: "INSERT INTO taxon(parent_taxon_id, ncbi_taxon_id, node_rank," jpayne@68: " left_value, right_value)" jpayne@68: " VALUES (%s, %s, %s, %s, %s)", jpayne@68: (parent_taxon_id, taxon[0], taxon[1], left_value, right_value), jpayne@68: ) jpayne@68: taxon_id = self.adaptor.last_id("taxon") jpayne@68: self.adaptor.execute( jpayne@68: "INSERT INTO taxon_name(taxon_id, name, name_class)" jpayne@68: "VALUES (%s, %s, 'scientific name')", jpayne@68: (taxon_id, taxon[2][:255]), jpayne@68: ) jpayne@68: # Note the name field is limited to 255, some SwissProt files jpayne@68: # have a multi-species name which can be longer. So truncate this. jpayne@68: left_value += 1 jpayne@68: right_value -= 1 jpayne@68: parent_taxon_id = taxon_id jpayne@68: if common_name: jpayne@68: self.adaptor.execute( jpayne@68: "INSERT INTO taxon_name(taxon_id, name, name_class)" jpayne@68: "VALUES (%s, %s, 'common name')", jpayne@68: (taxon_id, common_name), jpayne@68: ) jpayne@68: jpayne@68: return taxon_id jpayne@68: jpayne@68: def _fix_name_class(self, entrez_name): jpayne@68: """Map Entrez name terms to those used in taxdump (PRIVATE). jpayne@68: jpayne@68: We need to make this conversion to match the taxon_name.name_class jpayne@68: values used by the BioSQL load_ncbi_taxonomy.pl script. jpayne@68: jpayne@68: e.g.:: jpayne@68: jpayne@68: "ScientificName" -> "scientific name", jpayne@68: "EquivalentName" -> "equivalent name", jpayne@68: "Synonym" -> "synonym", jpayne@68: jpayne@68: """ jpayne@68: # Add any special cases here: jpayne@68: # jpayne@68: # known = {} jpayne@68: # try: jpayne@68: # return known[entrez_name] jpayne@68: # except KeyError: jpayne@68: # pass jpayne@68: jpayne@68: # Try automatically by adding spaces before each capital jpayne@68: def add_space(letter): jpayne@68: """Add a space before a capital letter.""" jpayne@68: if letter.isupper(): jpayne@68: return " " + letter.lower() jpayne@68: else: jpayne@68: return letter jpayne@68: jpayne@68: answer = "".join(add_space(letter) for letter in entrez_name).strip() jpayne@68: if answer != answer.lower(): jpayne@68: raise ValueError( jpayne@68: f"Expected processed entrez_name, '{answer}' to only have lower case letters." jpayne@68: ) jpayne@68: return answer jpayne@68: jpayne@68: def _update_left_right_taxon_values(self, left_value): jpayne@68: """Update the left and right taxon values in the table (PRIVATE).""" jpayne@68: if not left_value: jpayne@68: return jpayne@68: # Due to the UNIQUE constraint on the left and right values in the taxon jpayne@68: # table we cannot simply update them through an SQL statement as we risk jpayne@68: # colliding values. Instead we must select all of the rows that we want to jpayne@68: # update, modify the values in python and then update the rows jpayne@68: # self.adaptor.execute("UPDATE taxon SET right_value = right_value + 2 " jpayne@68: # "WHERE right_value >= %s", (left_value,)) jpayne@68: # self.adaptor.execute("UPDATE taxon SET left_value = left_value + 2 " jpayne@68: # "WHERE left_value > %s", (left_value,)) jpayne@68: jpayne@68: rows = self.adaptor.execute_and_fetchall( jpayne@68: "SELECT left_value, right_value, taxon_id FROM taxon " jpayne@68: "WHERE right_value >= %s or left_value > %s", jpayne@68: (left_value, left_value), jpayne@68: ) jpayne@68: jpayne@68: right_rows = [] jpayne@68: left_rows = [] jpayne@68: for row in rows: jpayne@68: new_right = row[1] jpayne@68: new_left = row[0] jpayne@68: if new_right >= left_value: jpayne@68: new_right += 2 jpayne@68: jpayne@68: if new_left > left_value: jpayne@68: new_left += 2 jpayne@68: right_rows.append((new_right, row[2])) jpayne@68: left_rows.append((new_left, row[2])) jpayne@68: jpayne@68: # sort the rows based on the value from largest to smallest jpayne@68: # should ensure no overlaps jpayne@68: right_rows = sorted(right_rows, key=lambda x: x[0], reverse=True) jpayne@68: left_rows = sorted(left_rows, key=lambda x: x[0], reverse=True) jpayne@68: jpayne@68: self.adaptor.executemany( jpayne@68: "UPDATE taxon SET left_value = %s WHERE taxon_id = %s", left_rows jpayne@68: ) jpayne@68: self.adaptor.executemany( jpayne@68: "UPDATE taxon SET right_value = %s WHERE taxon_id = %s", right_rows jpayne@68: ) jpayne@68: jpayne@68: def _get_taxon_id_from_ncbi_taxon_id( jpayne@68: self, ncbi_taxon_id, scientific_name=None, common_name=None jpayne@68: ): jpayne@68: """Get the taxon id for record from NCBI taxon ID (PRIVATE). jpayne@68: jpayne@68: Arguments: jpayne@68: - ncbi_taxon_id - string containing an NCBI taxon id jpayne@68: - scientific_name - string, used if a stub entry is recorded jpayne@68: - common_name - string, used if a stub entry is recorded jpayne@68: jpayne@68: This searches the taxon table using ONLY the NCBI taxon ID jpayne@68: to find the matching taxon table entry's ID (database key). jpayne@68: jpayne@68: If the species isn't in the taxon table, and the fetch_NCBI_taxonomy jpayne@68: flag is true, Biopython will attempt to go online using Bio.Entrez jpayne@68: to fetch the official NCBI lineage, recursing up the tree until an jpayne@68: existing entry is found in the database or the full lineage has been jpayne@68: fetched. jpayne@68: jpayne@68: Otherwise the NCBI taxon ID, scientific name and common name are jpayne@68: recorded as a minimal stub entry in the taxon and taxon_name tables. jpayne@68: Any partial information about the lineage from the SeqRecord is NOT jpayne@68: recorded. This should mean that (re)running the BioSQL script jpayne@68: load_ncbi_taxonomy.pl can fill in the taxonomy lineage. jpayne@68: jpayne@68: Returns the taxon id (database key for the taxon table, not jpayne@68: an NCBI taxon ID). jpayne@68: """ jpayne@68: if not ncbi_taxon_id: jpayne@68: raise ValueError("Expected a non-empty value for ncbi_taxon_id.") jpayne@68: jpayne@68: taxon_id = self.adaptor.execute_and_fetch_col0( jpayne@68: "SELECT taxon_id FROM taxon WHERE ncbi_taxon_id = %s", (int(ncbi_taxon_id),) jpayne@68: ) jpayne@68: if taxon_id: jpayne@68: # Good, we have mapped the NCBI taxid to a taxon table entry jpayne@68: return taxon_id[0] jpayne@68: jpayne@68: # At this point, as far as we can tell, this species isn't jpayne@68: # in the taxon table already. So we'll have to add it. jpayne@68: jpayne@68: parent_taxon_id = None jpayne@68: rank = "species" jpayne@68: genetic_code = None jpayne@68: mito_genetic_code = None jpayne@68: parent_left_value = None jpayne@68: parent_right_value = None jpayne@68: left_value = None jpayne@68: right_value = None jpayne@68: species_names = [] jpayne@68: if scientific_name: jpayne@68: species_names.append(("scientific name", scientific_name)) jpayne@68: if common_name: jpayne@68: species_names.append(("common name", common_name)) jpayne@68: jpayne@68: if self.fetch_NCBI_taxonomy: jpayne@68: # Go online to get the parent taxon ID! jpayne@68: handle = Entrez.efetch(db="taxonomy", id=ncbi_taxon_id, retmode="XML") jpayne@68: taxonomic_record = Entrez.read(handle) jpayne@68: if len(taxonomic_record) == 1: jpayne@68: if taxonomic_record[0]["TaxId"] != str(ncbi_taxon_id): jpayne@68: raise ValueError( jpayne@68: f"ncbi_taxon_id different from parent taxon id. {ncbi_taxon_id} versus {taxonomic_record[0]['TaxId']}" jpayne@68: ) jpayne@68: jpayne@68: ( jpayne@68: parent_taxon_id, jpayne@68: parent_left_value, jpayne@68: parent_right_value, jpayne@68: ) = self._get_taxon_id_from_ncbi_lineage( jpayne@68: taxonomic_record[0]["LineageEx"] jpayne@68: ) jpayne@68: jpayne@68: left_value = parent_right_value jpayne@68: right_value = parent_right_value + 1 jpayne@68: jpayne@68: rank = str(taxonomic_record[0]["Rank"]) jpayne@68: jpayne@68: genetic_code = int(taxonomic_record[0]["GeneticCode"]["GCId"]) jpayne@68: jpayne@68: mito_genetic_code = int(taxonomic_record[0]["MitoGeneticCode"]["MGCId"]) jpayne@68: jpayne@68: species_names = [ jpayne@68: ("scientific name", str(taxonomic_record[0]["ScientificName"])) jpayne@68: ] jpayne@68: try: jpayne@68: for name_class, names in taxonomic_record[0]["OtherNames"].items(): jpayne@68: name_class = self._fix_name_class(name_class) jpayne@68: if not isinstance(names, list): jpayne@68: # The Entrez parser seems to return single entry jpayne@68: # lists as just a string which is annoying. jpayne@68: names = [names] jpayne@68: for name in names: jpayne@68: # Want to ignore complex things like ClassCDE jpayne@68: # entries jpayne@68: if isinstance(name, str): jpayne@68: species_names.append((name_class, name)) jpayne@68: except KeyError: jpayne@68: # OtherNames isn't always present, jpayne@68: # e.g. NCBI taxon 41205, Bromheadia finlaysoniana jpayne@68: pass jpayne@68: else: jpayne@68: pass jpayne@68: # If we are not allowed to go online, we will record the bare minimum; jpayne@68: # as long as the NCBI taxon id is present, then (re)running jpayne@68: # load_ncbi_taxonomy.pl should fill in the taxonomomy lineage jpayne@68: # (and update the species names). jpayne@68: # jpayne@68: # I am NOT going to try and record the lineage, even if it jpayne@68: # is in the record annotation as a list of names, as we won't jpayne@68: # know the NCBI taxon IDs for these parent nodes. jpayne@68: jpayne@68: self._update_left_right_taxon_values(left_value) jpayne@68: jpayne@68: self.adaptor.execute( jpayne@68: "INSERT INTO taxon(parent_taxon_id, ncbi_taxon_id, node_rank," jpayne@68: " genetic_code, mito_genetic_code, left_value, right_value)" jpayne@68: " VALUES (%s, %s, %s, %s, %s, %s, %s)", jpayne@68: ( jpayne@68: parent_taxon_id, jpayne@68: ncbi_taxon_id, jpayne@68: rank, jpayne@68: genetic_code, jpayne@68: mito_genetic_code, jpayne@68: left_value, jpayne@68: right_value, jpayne@68: ), jpayne@68: ) jpayne@68: jpayne@68: taxon_id = self.adaptor.last_id("taxon") jpayne@68: jpayne@68: # Record the scientific name, common name, etc jpayne@68: for name_class, name in species_names: jpayne@68: self.adaptor.execute( jpayne@68: "INSERT INTO taxon_name(taxon_id, name, name_class)" jpayne@68: " VALUES (%s, %s, %s)", jpayne@68: (taxon_id, name[:255], name_class), jpayne@68: ) jpayne@68: return taxon_id jpayne@68: jpayne@68: def _get_taxon_id_from_ncbi_lineage(self, taxonomic_lineage): jpayne@68: """Recursive method to get taxon ID from NCBI lineage (PRIVATE). jpayne@68: jpayne@68: Arguments: jpayne@68: - taxonomic_lineage - list of taxonomy dictionaries from Bio.Entrez jpayne@68: jpayne@68: First dictionary in list is the taxonomy root, highest would be jpayne@68: the species. Each dictionary includes: jpayne@68: jpayne@68: - TaxID (string, NCBI taxon id) jpayne@68: - Rank (string, e.g. "species", "genus", ..., "phylum", ...) jpayne@68: - ScientificName (string) jpayne@68: jpayne@68: (and that is all at the time of writing) jpayne@68: jpayne@68: This method will record all the lineage given, returning the taxon id jpayne@68: (database key, not NCBI taxon id) of the final entry (the species). jpayne@68: """ jpayne@68: ncbi_taxon_id = int(taxonomic_lineage[-1]["TaxId"]) jpayne@68: left_value = None jpayne@68: right_value = None jpayne@68: parent_left_value = None jpayne@68: parent_right_value = None jpayne@68: # Is this in the database already? Check the taxon table... jpayne@68: rows = self.adaptor.execute_and_fetchall( jpayne@68: "SELECT taxon_id, left_value, right_value FROM taxon" jpayne@68: " WHERE ncbi_taxon_id=%s" % ncbi_taxon_id jpayne@68: ) jpayne@68: if rows: jpayne@68: # we could verify that the Scientific Name etc in the database jpayne@68: # is the same and update it or print a warning if not... jpayne@68: if len(rows) != 1: jpayne@68: raise ValueError(f"Expected 1 reponse, got {len(rows)}") jpayne@68: return rows[0] jpayne@68: jpayne@68: # We have to record this. jpayne@68: if len(taxonomic_lineage) > 1: jpayne@68: # Use recursion to find out the taxon id (database key) of the jpayne@68: # parent. jpayne@68: ( jpayne@68: parent_taxon_id, jpayne@68: parent_left_value, jpayne@68: parent_right_value, jpayne@68: ) = self._get_taxon_id_from_ncbi_lineage(taxonomic_lineage[:-1]) jpayne@68: left_value = parent_right_value jpayne@68: right_value = parent_right_value + 1 jpayne@68: if not isinstance(parent_taxon_id, int): jpayne@68: raise ValueError( jpayne@68: f"Expected parent_taxon_id to be an int, got {parent_taxon_id}" jpayne@68: ) jpayne@68: else: jpayne@68: # we have reached the top of the lineage but no current taxonomy jpayne@68: # id has been found jpayne@68: parent_taxon_id = None jpayne@68: left_value = self.adaptor.execute_one("SELECT MAX(left_value) FROM taxon")[ jpayne@68: 0 jpayne@68: ] jpayne@68: if not left_value: jpayne@68: left_value = 0 jpayne@68: jpayne@68: right_value = left_value + 1 jpayne@68: jpayne@68: self._update_left_right_taxon_values(left_value) jpayne@68: jpayne@68: # INSERT new taxon jpayne@68: rank = str(taxonomic_lineage[-1].get("Rank")) jpayne@68: self.adaptor.execute( jpayne@68: "INSERT INTO taxon(ncbi_taxon_id, parent_taxon_id, node_rank, " jpayne@68: "left_value, right_value) VALUES (%s, %s, %s, %s, %s)", jpayne@68: (ncbi_taxon_id, parent_taxon_id, rank, left_value, right_value), jpayne@68: ) jpayne@68: jpayne@68: taxon_id = self.adaptor.last_id("taxon") jpayne@68: # assert isinstance(taxon_id, int), repr(taxon_id) jpayne@68: # ... and its name in taxon_name jpayne@68: scientific_name = taxonomic_lineage[-1].get("ScientificName") jpayne@68: if scientific_name: jpayne@68: self.adaptor.execute( jpayne@68: "INSERT INTO taxon_name(taxon_id, name, name_class) " jpayne@68: "VALUES (%s, %s, 'scientific name')", jpayne@68: (taxon_id, scientific_name[:255]), jpayne@68: ) jpayne@68: return taxon_id, left_value, right_value jpayne@68: jpayne@68: def _load_bioentry_table(self, record): jpayne@68: """Fill the bioentry table with sequence information (PRIVATE). jpayne@68: jpayne@68: Arguments: jpayne@68: - record - SeqRecord object to add to the database. jpayne@68: jpayne@68: """ jpayne@68: # get the pertinent info and insert it jpayne@68: jpayne@68: if record.id.count(".") == 1: # try to get a version from the id jpayne@68: # This assumes the string is something like "XXXXXXXX.123" jpayne@68: accession, version = record.id.split(".") jpayne@68: try: jpayne@68: version = int(version) jpayne@68: except ValueError: jpayne@68: accession = record.id jpayne@68: version = 0 jpayne@68: else: # otherwise just use a version of 0 jpayne@68: accession = record.id jpayne@68: version = 0 jpayne@68: jpayne@68: if ( jpayne@68: "accessions" in record.annotations jpayne@68: and isinstance(record.annotations["accessions"], list) jpayne@68: and record.annotations["accessions"] jpayne@68: ): jpayne@68: # Take the first accession (one if there is more than one) jpayne@68: accession = record.annotations["accessions"][0] jpayne@68: jpayne@68: # Find the taxon id (this is not just the NCBI Taxon ID) jpayne@68: # NOTE - If the species isn't defined in the taxon table, jpayne@68: # a new minimal entry is created. jpayne@68: taxon_id = self._get_taxon_id(record) jpayne@68: jpayne@68: if "gi" in record.annotations: jpayne@68: identifier = record.annotations["gi"] jpayne@68: else: jpayne@68: identifier = record.id jpayne@68: jpayne@68: # Allow description and division to default to NULL as in BioPerl. jpayne@68: description = getattr(record, "description", None) jpayne@68: division = record.annotations.get("data_file_division") jpayne@68: jpayne@68: sql = """ jpayne@68: INSERT INTO bioentry ( jpayne@68: biodatabase_id, jpayne@68: taxon_id, jpayne@68: name, jpayne@68: accession, jpayne@68: identifier, jpayne@68: division, jpayne@68: description, jpayne@68: version) jpayne@68: VALUES ( jpayne@68: %s, jpayne@68: %s, jpayne@68: %s, jpayne@68: %s, jpayne@68: %s, jpayne@68: %s, jpayne@68: %s, jpayne@68: %s)""" jpayne@68: # print(self.dbid, taxon_id, record.name, accession, identifier, \ jpayne@68: # division, description, version) jpayne@68: self.adaptor.execute( jpayne@68: sql, jpayne@68: ( jpayne@68: self.dbid, jpayne@68: taxon_id, jpayne@68: record.name, jpayne@68: accession, jpayne@68: identifier, jpayne@68: division, jpayne@68: description, jpayne@68: version, jpayne@68: ), jpayne@68: ) jpayne@68: # now retrieve the id for the bioentry jpayne@68: return self.adaptor.last_id("bioentry") jpayne@68: jpayne@68: def _load_bioentry_date(self, record, bioentry_id): jpayne@68: """Add the effective date of the entry into the database (PRIVATE). jpayne@68: jpayne@68: record - a SeqRecord object with an annotated date jpayne@68: bioentry_id - corresponding database identifier jpayne@68: """ jpayne@68: # dates are GenBank style, like: jpayne@68: # 14-SEP-2000 jpayne@68: date = record.annotations.get("date", strftime("%d-%b-%Y", gmtime()).upper()) jpayne@68: if isinstance(date, list): jpayne@68: date = date[0] jpayne@68: annotation_tags_id = self._get_ontology_id("Annotation Tags") jpayne@68: date_id = self._get_term_id("date_changed", annotation_tags_id) jpayne@68: sql = ( jpayne@68: "INSERT INTO bioentry_qualifier_value" jpayne@68: ' (bioentry_id, term_id, value, "rank")' jpayne@68: " VALUES (%s, %s, %s, 1)" jpayne@68: ) jpayne@68: self.adaptor.execute(sql, (bioentry_id, date_id, date)) jpayne@68: jpayne@68: def _load_biosequence(self, record, bioentry_id): jpayne@68: """Record SeqRecord's sequence and alphabet in DB (PRIVATE). jpayne@68: jpayne@68: Arguments: jpayne@68: - record - a SeqRecord object with a seq property jpayne@68: - bioentry_id - corresponding database identifier jpayne@68: jpayne@68: """ jpayne@68: if record.seq is None: jpayne@68: # The biosequence table entry is optional, so if we haven't jpayne@68: # got a sequence, we don't need to write to the table. jpayne@68: return jpayne@68: jpayne@68: molecule_type = record.annotations.get("molecule_type", "") jpayne@68: if "DNA" in molecule_type: jpayne@68: alphabet = "dna" jpayne@68: elif "RNA" in molecule_type: jpayne@68: alphabet = "rna" jpayne@68: elif "protein" in molecule_type: jpayne@68: alphabet = "protein" jpayne@68: else: jpayne@68: alphabet = "unknown" jpayne@68: jpayne@68: try: jpayne@68: seq_str = str(record.seq) jpayne@68: except UndefinedSequenceError: jpayne@68: seq_str = None jpayne@68: jpayne@68: sql = ( jpayne@68: "INSERT INTO biosequence (bioentry_id, version, " jpayne@68: "length, seq, alphabet) " jpayne@68: "VALUES (%s, 0, %s, %s, %s)" jpayne@68: ) jpayne@68: self.adaptor.execute(sql, (bioentry_id, len(record.seq), seq_str, alphabet)) jpayne@68: jpayne@68: def _load_comment(self, record, bioentry_id): jpayne@68: """Record a SeqRecord's annotated comment in the database (PRIVATE). jpayne@68: jpayne@68: Arguments: jpayne@68: - record - a SeqRecord object with an annotated comment jpayne@68: - bioentry_id - corresponding database identifier jpayne@68: jpayne@68: """ jpayne@68: comments = record.annotations.get("comment") jpayne@68: if not comments: jpayne@68: return jpayne@68: if not isinstance(comments, list): jpayne@68: # It should be a string then... jpayne@68: comments = [comments] jpayne@68: jpayne@68: for index, comment in enumerate(comments): jpayne@68: comment = comment.replace("\n", " ") jpayne@68: # TODO - Store each line as a separate entry? This would preserve jpayne@68: # the newlines, but we should check BioPerl etc to be consistent. jpayne@68: sql = ( jpayne@68: 'INSERT INTO comment (bioentry_id, comment_text, "rank")' jpayne@68: " VALUES (%s, %s, %s)" jpayne@68: ) jpayne@68: self.adaptor.execute(sql, (bioentry_id, comment, index + 1)) jpayne@68: jpayne@68: def _load_annotations(self, record, bioentry_id): jpayne@68: """Record a SeqRecord's misc annotations in the database (PRIVATE). jpayne@68: jpayne@68: The annotation strings are recorded in the bioentry_qualifier_value jpayne@68: table, except for special cases like the reference, comment and jpayne@68: taxonomy which are handled with their own tables. jpayne@68: jpayne@68: Arguments: jpayne@68: - record - a SeqRecord object with an annotations dictionary jpayne@68: - bioentry_id - corresponding database identifier jpayne@68: jpayne@68: """ jpayne@68: mono_sql = ( jpayne@68: "INSERT INTO bioentry_qualifier_value" jpayne@68: "(bioentry_id, term_id, value)" jpayne@68: " VALUES (%s, %s, %s)" jpayne@68: ) jpayne@68: many_sql = ( jpayne@68: "INSERT INTO bioentry_qualifier_value" jpayne@68: '(bioentry_id, term_id, value, "rank")' jpayne@68: " VALUES (%s, %s, %s, %s)" jpayne@68: ) jpayne@68: tag_ontology_id = self._get_ontology_id("Annotation Tags") jpayne@68: for key, value in record.annotations.items(): jpayne@68: if key in ["molecule_type", "references", "comment", "ncbi_taxid", "date"]: jpayne@68: # Handled separately jpayne@68: continue jpayne@68: term_id = self._get_term_id(key, ontology_id=tag_ontology_id) jpayne@68: if isinstance(value, (list, tuple)): jpayne@68: rank = 0 jpayne@68: for entry in value: jpayne@68: if isinstance(entry, (str, int)): jpayne@68: # Easy case jpayne@68: rank += 1 jpayne@68: self.adaptor.execute( jpayne@68: many_sql, (bioentry_id, term_id, str(entry), rank) jpayne@68: ) jpayne@68: else: jpayne@68: pass jpayne@68: elif isinstance(value, (str, int)): jpayne@68: # Have a simple single entry, leave rank as the DB default jpayne@68: self.adaptor.execute(mono_sql, (bioentry_id, term_id, str(value))) jpayne@68: else: jpayne@68: pass jpayne@68: # print("Ignoring annotation '%s' entry of type '%s'" \ jpayne@68: # % (key, type(value))) jpayne@68: jpayne@68: def _load_reference(self, reference, rank, bioentry_id): jpayne@68: """Record SeqRecord's annotated references in the database (PRIVATE). jpayne@68: jpayne@68: Arguments: jpayne@68: - record - a SeqRecord object with annotated references jpayne@68: - bioentry_id - corresponding database identifier jpayne@68: jpayne@68: """ jpayne@68: refs = None jpayne@68: if reference.medline_id: jpayne@68: refs = self.adaptor.execute_and_fetch_col0( jpayne@68: "SELECT reference_id" jpayne@68: " FROM reference JOIN dbxref USING (dbxref_id)" jpayne@68: " WHERE dbname = 'MEDLINE' AND accession = %s", jpayne@68: (reference.medline_id,), jpayne@68: ) jpayne@68: if not refs and reference.pubmed_id: jpayne@68: refs = self.adaptor.execute_and_fetch_col0( jpayne@68: "SELECT reference_id" jpayne@68: " FROM reference JOIN dbxref USING (dbxref_id)" jpayne@68: " WHERE dbname = 'PUBMED' AND accession = %s", jpayne@68: (reference.pubmed_id,), jpayne@68: ) jpayne@68: if not refs: jpayne@68: s = [] jpayne@68: for f in reference.authors, reference.title, reference.journal: jpayne@68: s.append(f or "") jpayne@68: crc = crc64("".join(s)) jpayne@68: refs = self.adaptor.execute_and_fetch_col0( jpayne@68: "SELECT reference_id FROM reference WHERE crc = %s", (crc,) jpayne@68: ) jpayne@68: if not refs: jpayne@68: if reference.medline_id: jpayne@68: dbxref_id = self._add_dbxref("MEDLINE", reference.medline_id, 0) jpayne@68: elif reference.pubmed_id: jpayne@68: dbxref_id = self._add_dbxref("PUBMED", reference.pubmed_id, 0) jpayne@68: else: jpayne@68: dbxref_id = None jpayne@68: authors = reference.authors or None jpayne@68: title = reference.title or None jpayne@68: # The location/journal field cannot be Null, so default jpayne@68: # to an empty string rather than None: jpayne@68: journal = reference.journal or "" jpayne@68: self.adaptor.execute( jpayne@68: "INSERT INTO reference (dbxref_id, location," jpayne@68: " title, authors, crc)" jpayne@68: " VALUES (%s, %s, %s, %s, %s)", jpayne@68: (dbxref_id, journal, title, authors, crc), jpayne@68: ) jpayne@68: reference_id = self.adaptor.last_id("reference") jpayne@68: else: jpayne@68: reference_id = refs[0] jpayne@68: jpayne@68: if reference.location: jpayne@68: start = 1 + int(str(reference.location[0].start)) jpayne@68: end = int(str(reference.location[0].end)) jpayne@68: else: jpayne@68: start = None jpayne@68: end = None jpayne@68: jpayne@68: sql = ( jpayne@68: "INSERT INTO bioentry_reference (bioentry_id, reference_id," jpayne@68: ' start_pos, end_pos, "rank") VALUES (%s, %s, %s, %s, %s)' jpayne@68: ) jpayne@68: self.adaptor.execute(sql, (bioentry_id, reference_id, start, end, rank + 1)) jpayne@68: jpayne@68: def _load_seqfeature(self, feature, feature_rank, bioentry_id): jpayne@68: """Load a biopython SeqFeature into the database (PRIVATE).""" jpayne@68: # records loaded from a gff file using BCBio.GFF will contain value jpayne@68: # of 2nd column of the gff as a feature qualifier. The BioSQL wiki jpayne@68: # suggests that the source should not go in with the other feature jpayne@68: # mappings but instead be put in the term table jpayne@68: # (http://www.biosql.org/wiki/Annotation_Mapping) jpayne@68: try: jpayne@68: source = feature.qualifiers["source"] jpayne@68: if isinstance(source, list): jpayne@68: source = source[0] jpayne@68: seqfeature_id = self._load_seqfeature_basic( jpayne@68: feature.type, feature_rank, bioentry_id, source=source jpayne@68: ) jpayne@68: except KeyError: jpayne@68: seqfeature_id = self._load_seqfeature_basic( jpayne@68: feature.type, feature_rank, bioentry_id jpayne@68: ) jpayne@68: jpayne@68: self._load_seqfeature_locations(feature, seqfeature_id) jpayne@68: self._load_seqfeature_qualifiers(feature.qualifiers, seqfeature_id) jpayne@68: jpayne@68: def _load_seqfeature_basic( jpayne@68: self, feature_type, feature_rank, bioentry_id, source="EMBL/GenBank/SwissProt" jpayne@68: ): jpayne@68: """Load the first tables of a seqfeature and returns the id (PRIVATE). jpayne@68: jpayne@68: This loads the "key" of the seqfeature (ie. CDS, gene) and jpayne@68: the basic seqfeature table itself. jpayne@68: """ jpayne@68: ontology_id = self._get_ontology_id("SeqFeature Keys") jpayne@68: seqfeature_key_id = self._get_term_id(feature_type, ontology_id=ontology_id) jpayne@68: source_cat_id = self._get_ontology_id("SeqFeature Sources") jpayne@68: source_term_id = self._get_term_id(source, ontology_id=source_cat_id) jpayne@68: jpayne@68: sql = ( jpayne@68: "INSERT INTO seqfeature (bioentry_id, type_term_id, " jpayne@68: 'source_term_id, "rank") VALUES (%s, %s, %s, %s)' jpayne@68: ) jpayne@68: self.adaptor.execute( jpayne@68: sql, (bioentry_id, seqfeature_key_id, source_term_id, feature_rank + 1) jpayne@68: ) jpayne@68: return self.adaptor.last_id("seqfeature") jpayne@68: jpayne@68: def _load_seqfeature_locations(self, feature, seqfeature_id): jpayne@68: """Load all of the locations for a SeqFeature into tables (PRIVATE). jpayne@68: jpayne@68: This adds the locations related to the SeqFeature into the jpayne@68: seqfeature_location table. Fuzzies are not handled right now. jpayne@68: For a simple location, ie (1..2), we have a single table row jpayne@68: with seq_start = 1, seq_end = 2, location_rank = 1. jpayne@68: jpayne@68: For split locations, ie (1..2, 3..4, 5..6) we would have three jpayne@68: row tables with:: jpayne@68: jpayne@68: start = 1, end = 2, rank = 1 jpayne@68: start = 3, end = 4, rank = 2 jpayne@68: start = 5, end = 6, rank = 3 jpayne@68: jpayne@68: """ jpayne@68: # TODO - Record an ontology for the locations (using location.term_id) jpayne@68: # which for now as in BioPerl we leave defaulting to NULL. jpayne@68: try: jpayne@68: if feature.location.operator != "join": jpayne@68: # e.g. order locations... we don't record "order" so it jpayne@68: # will become a "join" on reloading. What does BioPerl do? jpayne@68: import warnings jpayne@68: from Bio import BiopythonWarning jpayne@68: jpayne@68: warnings.warn( jpayne@68: "%s location operators are not fully supported" jpayne@68: % feature.location_operator, jpayne@68: BiopythonWarning, jpayne@68: ) jpayne@68: except AttributeError: jpayne@68: pass jpayne@68: # This will be a list of length one for a SimpleLocation: jpayne@68: parts = feature.location.parts jpayne@68: if parts and {loc.strand for loc in parts} == {-1}: jpayne@68: # To mimic prior behaviour of Biopython+BioSQL, reverse order jpayne@68: parts = parts[::-1] jpayne@68: # TODO - Check what BioPerl does; see also BioSeq.py code jpayne@68: for rank, loc in enumerate(parts): jpayne@68: self._insert_location(loc, rank + 1, seqfeature_id) jpayne@68: jpayne@68: def _insert_location(self, location, rank, seqfeature_id): jpayne@68: """Add SeqFeature location to seqfeature_location table (PRIVATE). jpayne@68: jpayne@68: TODO - Add location operator to location_qualifier_value? jpayne@68: """ jpayne@68: # convert biopython locations to the 1-based location system jpayne@68: # used in bioSQL jpayne@68: # XXX This could also handle fuzzies jpayne@68: jpayne@68: try: jpayne@68: start = int(location.start) + 1 jpayne@68: except TypeError: jpayne@68: # Handle SwissProt unknown position (?) jpayne@68: if isinstance(location.start, UnknownPosition): jpayne@68: start = None jpayne@68: else: jpayne@68: raise jpayne@68: jpayne@68: try: jpayne@68: end = int(location.end) jpayne@68: except TypeError: jpayne@68: # Handle SwissProt unknown position (?) jpayne@68: if isinstance(location.end, UnknownPosition): jpayne@68: end = None jpayne@68: else: jpayne@68: raise jpayne@68: jpayne@68: # Biopython uses None when we don't know strand information but jpayne@68: # BioSQL requires something (non null) and sets this as zero jpayne@68: # So we'll use the strand or 0 if Biopython spits out None jpayne@68: strand = location.strand or 0 jpayne@68: jpayne@68: # TODO - Record an ontology term for the location (location.term_id) jpayne@68: # which for now like BioPerl we'll leave as NULL. jpayne@68: # This might allow us to record "between" positions properly, but I jpayne@68: # don't really see how it could work for before/after fuzzy positions jpayne@68: loc_term_id = None jpayne@68: jpayne@68: if location.ref: jpayne@68: # sub_feature remote locations when they are in the same db as the jpayne@68: # current record do not have a value for ref_db, which SeqFeature jpayne@68: # object stores as None. BioSQL schema requires a varchar and is jpayne@68: # not NULL jpayne@68: dbxref_id = self._get_dbxref_id(location.ref_db or "", location.ref) jpayne@68: else: jpayne@68: dbxref_id = None jpayne@68: jpayne@68: sql = ( jpayne@68: "INSERT INTO location (seqfeature_id, dbxref_id, term_id," jpayne@68: 'start_pos, end_pos, strand, "rank") ' jpayne@68: "VALUES (%s, %s, %s, %s, %s, %s, %s)" jpayne@68: ) jpayne@68: self.adaptor.execute( jpayne@68: sql, (seqfeature_id, dbxref_id, loc_term_id, start, end, strand, rank) jpayne@68: ) jpayne@68: jpayne@68: """ jpayne@68: # See Bug 2677 jpayne@68: # TODO - Record the location_operator (e.g. "join" or "order") jpayne@68: # using the location_qualifier_value table (which we and BioPerl jpayne@68: # have historically left empty). jpayne@68: # Note this will need an ontology term for the location qualifier jpayne@68: # (location_qualifier_value.term_id) for which oddly the schema jpayne@68: # does not allow NULL. jpayne@68: if feature.location_operator: jpayne@68: #e.g. "join" (common), jpayne@68: #or "order" (see Tests/GenBank/protein_refseq2.gb) jpayne@68: location_id = self.adaptor.last_id('location') jpayne@68: loc_qual_term_id = None # Not allowed in BioSQL v1.0.1 jpayne@68: sql = ("INSERT INTO location_qualifier_value" jpayne@68: "(location_id, term_id, value) " jpayne@68: "VALUES (%s, %s, %s)") jpayne@68: self.adaptor.execute(sql, (location_id, loc_qual_term_id, jpayne@68: feature.location_operator)) jpayne@68: """ jpayne@68: jpayne@68: def _load_seqfeature_qualifiers(self, qualifiers, seqfeature_id): jpayne@68: """Insert feature's (key, value) pair qualifiers (PRIVATE). jpayne@68: jpayne@68: Qualifiers should be a dictionary of the form:: jpayne@68: jpayne@68: {key : [value1, value2]} jpayne@68: jpayne@68: """ jpayne@68: tag_ontology_id = self._get_ontology_id("Annotation Tags") jpayne@68: for qualifier_key in qualifiers: jpayne@68: # Treat db_xref qualifiers differently to sequence annotation jpayne@68: # qualifiers by populating the seqfeature_dbxref and dbxref jpayne@68: # tables. Other qualifiers go into the seqfeature_qualifier_value jpayne@68: # and (if new) term tables. jpayne@68: if qualifier_key != "db_xref": jpayne@68: qualifier_key_id = self._get_term_id( jpayne@68: qualifier_key, ontology_id=tag_ontology_id jpayne@68: ) jpayne@68: # now add all of the values to their table jpayne@68: entries = qualifiers[qualifier_key] jpayne@68: if not isinstance(entries, list): jpayne@68: # Could be a plain string, or an int or a float. jpayne@68: # However, we exect a list of strings here. jpayne@68: entries = [entries] jpayne@68: for qual_value_rank in range(len(entries)): jpayne@68: qualifier_value = entries[qual_value_rank] jpayne@68: sql = ( jpayne@68: "INSERT INTO seqfeature_qualifier_value " jpayne@68: ' (seqfeature_id, term_id, "rank", value) VALUES' jpayne@68: " (%s, %s, %s, %s)" jpayne@68: ) jpayne@68: self.adaptor.execute( jpayne@68: sql, jpayne@68: ( jpayne@68: seqfeature_id, jpayne@68: qualifier_key_id, jpayne@68: qual_value_rank + 1, jpayne@68: qualifier_value, jpayne@68: ), jpayne@68: ) jpayne@68: else: jpayne@68: # The dbxref_id qualifier/value sets go into the dbxref table jpayne@68: # as dbname, accession, version tuples, with dbxref.dbxref_id jpayne@68: # being automatically assigned, and into the seqfeature_dbxref jpayne@68: # table as seqfeature_id, dbxref_id, and rank tuples jpayne@68: self._load_seqfeature_dbxref(qualifiers[qualifier_key], seqfeature_id) jpayne@68: jpayne@68: def _load_seqfeature_dbxref(self, dbxrefs, seqfeature_id): jpayne@68: """Add SeqFeature's DB cross-references to the database (PRIVATE). jpayne@68: jpayne@68: Arguments: jpayne@68: - dbxrefs - List, dbxref data from the source file in the jpayne@68: format : jpayne@68: - seqfeature_id - Int, the identifier for the seqfeature in the jpayne@68: seqfeature table jpayne@68: jpayne@68: Insert dbxref qualifier data for a seqfeature into the jpayne@68: seqfeature_dbxref and, if required, dbxref tables. jpayne@68: The dbxref_id qualifier/value sets go into the dbxref table jpayne@68: as dbname, accession, version tuples, with dbxref.dbxref_id jpayne@68: being automatically assigned, and into the seqfeature_dbxref jpayne@68: table as seqfeature_id, dbxref_id, and rank tuples. jpayne@68: """ jpayne@68: # NOTE - In older versions of Biopython, we would map the GenBank jpayne@68: # db_xref "name", for example "GI" to "GeneIndex", and give a warning jpayne@68: # for any unknown terms. This was a long term maintenance problem, jpayne@68: # and differed from BioPerl and BioJava's implementation. See bug 2405 jpayne@68: for rank, value in enumerate(dbxrefs): jpayne@68: # Split the DB:accession format string at colons. We have to jpayne@68: # account for multiple-line and multiple-accession entries jpayne@68: try: jpayne@68: dbxref_data = value.replace(" ", "").replace("\n", "").split(":") jpayne@68: db = dbxref_data[0] jpayne@68: accessions = dbxref_data[1:] jpayne@68: except Exception: jpayne@68: raise ValueError(f"Parsing of db_xref failed: '{value}'") from None jpayne@68: # Loop over all the grabbed accessions, and attempt to fill the jpayne@68: # table jpayne@68: for accession in accessions: jpayne@68: # Get the dbxref_id value for the dbxref data jpayne@68: dbxref_id = self._get_dbxref_id(db, accession) jpayne@68: # Insert the seqfeature_dbxref data jpayne@68: self._get_seqfeature_dbxref(seqfeature_id, dbxref_id, rank + 1) jpayne@68: jpayne@68: def _get_dbxref_id(self, db, accession): jpayne@68: """Get DB cross-reference for accession (PRIVATE). jpayne@68: jpayne@68: Arguments: jpayne@68: - db - String, the name of the external database containing jpayne@68: the accession number jpayne@68: - accession - String, the accession of the dbxref data jpayne@68: jpayne@68: Finds and returns the dbxref_id for the passed data. The method jpayne@68: attempts to find an existing record first, and inserts the data jpayne@68: if there is no record. jpayne@68: """ jpayne@68: # Check for an existing record jpayne@68: sql = "SELECT dbxref_id FROM dbxref WHERE dbname = %s AND accession = %s" jpayne@68: dbxref_id = self.adaptor.execute_and_fetch_col0(sql, (db, accession)) jpayne@68: # If there was a record, return the dbxref_id, else create the jpayne@68: # record and return the created dbxref_id jpayne@68: if dbxref_id: jpayne@68: return dbxref_id[0] jpayne@68: return self._add_dbxref(db, accession, 0) jpayne@68: jpayne@68: def _get_seqfeature_dbxref(self, seqfeature_id, dbxref_id, rank): jpayne@68: """Get DB cross-reference, creating it if needed (PRIVATE). jpayne@68: jpayne@68: Check for a pre-existing seqfeature_dbxref entry with the passed jpayne@68: seqfeature_id and dbxref_id. If one does not exist, insert new jpayne@68: data. jpayne@68: """ jpayne@68: # Check for an existing record jpayne@68: sql = ( jpayne@68: "SELECT seqfeature_id, dbxref_id FROM seqfeature_dbxref " jpayne@68: "WHERE seqfeature_id = %s AND dbxref_id = %s" jpayne@68: ) jpayne@68: result = self.adaptor.execute_and_fetch_col0(sql, (seqfeature_id, dbxref_id)) jpayne@68: # If there was a record, return without executing anything, else create jpayne@68: # the record and return jpayne@68: if result: jpayne@68: return result jpayne@68: return self._add_seqfeature_dbxref(seqfeature_id, dbxref_id, rank) jpayne@68: jpayne@68: def _add_seqfeature_dbxref(self, seqfeature_id, dbxref_id, rank): jpayne@68: """Add DB cross-reference (PRIVATE). jpayne@68: jpayne@68: Insert a seqfeature_dbxref row and return the seqfeature_id and jpayne@68: dbxref_id jpayne@68: """ jpayne@68: sql = ( jpayne@68: "INSERT INTO seqfeature_dbxref " jpayne@68: '(seqfeature_id, dbxref_id, "rank") VALUES' jpayne@68: "(%s, %s, %s)" jpayne@68: ) jpayne@68: self.adaptor.execute(sql, (seqfeature_id, dbxref_id, rank)) jpayne@68: return (seqfeature_id, dbxref_id) jpayne@68: jpayne@68: def _load_dbxrefs(self, record, bioentry_id): jpayne@68: """Load any sequence level cross references into the database (PRIVATE). jpayne@68: jpayne@68: See table bioentry_dbxref. jpayne@68: """ jpayne@68: for rank, value in enumerate(record.dbxrefs): jpayne@68: # Split the DB:accession string at first colon. jpayne@68: # We have to cope with things like: jpayne@68: # "MGD:MGI:892" (db="MGD", accession="MGI:892") jpayne@68: # "GO:GO:123" (db="GO", accession="GO:123") jpayne@68: # jpayne@68: # Annoyingly I have seen the NCBI use both the style jpayne@68: # "GO:GO:123" and "GO:123" in different vintages. jpayne@68: newline_escape_count = value.count("\n") jpayne@68: if newline_escape_count != 0: jpayne@68: raise ValueError( jpayne@68: "Expected a single line in value, got {newline_escape_count}" jpayne@68: ) jpayne@68: try: jpayne@68: db, accession = value.split(":", 1) jpayne@68: db = db.strip() jpayne@68: accession = accession.strip() jpayne@68: except Exception: jpayne@68: raise ValueError(f"Parsing of dbxrefs list failed: '{value}'") from None jpayne@68: # Get the dbxref_id value for the dbxref data jpayne@68: dbxref_id = self._get_dbxref_id(db, accession) jpayne@68: # Insert the bioentry_dbxref data jpayne@68: self._get_bioentry_dbxref(bioentry_id, dbxref_id, rank + 1) jpayne@68: jpayne@68: def _get_bioentry_dbxref(self, bioentry_id, dbxref_id, rank): jpayne@68: """Get pre-existing db-xref, or create and return it (PRIVATE). jpayne@68: jpayne@68: Check for a pre-existing bioentry_dbxref entry with the passed jpayne@68: seqfeature_id and dbxref_id. If one does not exist, insert new jpayne@68: data jpayne@68: """ jpayne@68: # Check for an existing record jpayne@68: sql = ( jpayne@68: "SELECT bioentry_id, dbxref_id FROM bioentry_dbxref " jpayne@68: "WHERE bioentry_id = %s AND dbxref_id = %s" jpayne@68: ) jpayne@68: result = self.adaptor.execute_and_fetch_col0(sql, (bioentry_id, dbxref_id)) jpayne@68: # If there was a record, return without executing anything, else create jpayne@68: # the record and return jpayne@68: if result: jpayne@68: return result jpayne@68: return self._add_bioentry_dbxref(bioentry_id, dbxref_id, rank) jpayne@68: jpayne@68: def _add_bioentry_dbxref(self, bioentry_id, dbxref_id, rank): jpayne@68: """Insert a bioentry_dbxref row (PRIVATE). jpayne@68: jpayne@68: Returns the seqfeature_id and dbxref_id (PRIVATE). jpayne@68: """ jpayne@68: sql = ( jpayne@68: "INSERT INTO bioentry_dbxref " jpayne@68: '(bioentry_id,dbxref_id,"rank") VALUES ' jpayne@68: "(%s, %s, %s)" jpayne@68: ) jpayne@68: self.adaptor.execute(sql, (bioentry_id, dbxref_id, rank)) jpayne@68: return (bioentry_id, dbxref_id) jpayne@68: jpayne@68: jpayne@68: class DatabaseRemover: jpayne@68: """Complement the Loader functionality by fully removing a database. jpayne@68: jpayne@68: This probably isn't really useful for normal purposes, since you jpayne@68: can just do a:: jpayne@68: jpayne@68: DROP DATABASE db_name jpayne@68: jpayne@68: and then recreate the database. But, it's really useful for testing jpayne@68: purposes. jpayne@68: """ jpayne@68: jpayne@68: def __init__(self, adaptor, dbid): jpayne@68: """Initialize with a database id and adaptor connection.""" jpayne@68: self.adaptor = adaptor jpayne@68: self.dbid = dbid jpayne@68: jpayne@68: def remove(self): jpayne@68: """Remove everything related to the given database id.""" jpayne@68: sql = "DELETE FROM bioentry WHERE biodatabase_id = %s" jpayne@68: self.adaptor.execute(sql, (self.dbid,)) jpayne@68: sql = "DELETE FROM biodatabase WHERE biodatabase_id = %s" jpayne@68: self.adaptor.execute(sql, (self.dbid,))