jpayne@69: # Copyright 2002 by Andrew Dalke. All rights reserved. jpayne@69: # Revisions 2007-2016 copyright by Peter Cock. All rights reserved. jpayne@69: # Revisions 2008 copyright by Cymon J. Cox. All rights reserved. jpayne@69: # jpayne@69: # This file is part of the Biopython distribution and governed by your jpayne@69: # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". jpayne@69: # Please see the LICENSE file that should have been included as part of this jpayne@69: # package. jpayne@69: # jpayne@69: # Note that BioSQL (including the database schema and scripts) is jpayne@69: # available and licensed separately. Please consult www.biosql.org jpayne@69: jpayne@69: """Load biopython objects into a BioSQL database for persistent storage. jpayne@69: jpayne@69: This code makes it possible to store biopython objects in a relational jpayne@69: database and then retrieve them back. You shouldn't use any of the jpayne@69: classes in this module directly. Rather, call the load() method on jpayne@69: a database object. jpayne@69: """ jpayne@69: # standard modules jpayne@69: from time import gmtime, strftime jpayne@69: jpayne@69: # biopython jpayne@69: from Bio.SeqUtils.CheckSum import crc64 jpayne@69: from Bio import Entrez jpayne@69: from Bio.Seq import UndefinedSequenceError jpayne@69: from Bio.SeqFeature import UnknownPosition jpayne@69: jpayne@69: jpayne@69: class DatabaseLoader: jpayne@69: """Object used to load SeqRecord objects into a BioSQL database.""" jpayne@69: jpayne@69: def __init__(self, adaptor, dbid, fetch_NCBI_taxonomy=False): jpayne@69: """Initialize with connection information for the database. jpayne@69: jpayne@69: Creating a DatabaseLoader object is normally handled via the jpayne@69: BioSeqDatabase DBServer object, for example:: jpayne@69: jpayne@69: from BioSQL import BioSeqDatabase jpayne@69: server = BioSeqDatabase.open_database(driver="MySQLdb", jpayne@69: user="gbrowse", jpayne@69: passwd="biosql", jpayne@69: host="localhost", jpayne@69: db="test_biosql") jpayne@69: try: jpayne@69: db = server["test"] jpayne@69: except KeyError: jpayne@69: db = server.new_database("test", jpayne@69: description="For testing GBrowse") jpayne@69: jpayne@69: """ jpayne@69: self.adaptor = adaptor jpayne@69: self.dbid = dbid jpayne@69: self.fetch_NCBI_taxonomy = fetch_NCBI_taxonomy jpayne@69: jpayne@69: def load_seqrecord(self, record): jpayne@69: """Load a Biopython SeqRecord into the database.""" jpayne@69: bioentry_id = self._load_bioentry_table(record) jpayne@69: self._load_bioentry_date(record, bioentry_id) jpayne@69: self._load_biosequence(record, bioentry_id) jpayne@69: self._load_comment(record, bioentry_id) jpayne@69: self._load_dbxrefs(record, bioentry_id) jpayne@69: references = record.annotations.get("references", ()) jpayne@69: for reference, rank in zip(references, list(range(len(references)))): jpayne@69: self._load_reference(reference, rank, bioentry_id) jpayne@69: self._load_annotations(record, bioentry_id) jpayne@69: for seq_feature_num in range(len(record.features)): jpayne@69: seq_feature = record.features[seq_feature_num] jpayne@69: self._load_seqfeature(seq_feature, seq_feature_num, bioentry_id) jpayne@69: jpayne@69: def _get_ontology_id(self, name, definition=None): jpayne@69: """Return identifier for the named ontology (PRIVATE). jpayne@69: jpayne@69: This looks through the onotology table for a the given entry name. jpayne@69: If it is not found, a row is added for this ontology (using the jpayne@69: definition if supplied). In either case, the id corresponding to jpayne@69: the provided name is returned, so that you can reference it in jpayne@69: another table. jpayne@69: """ jpayne@69: oids = self.adaptor.execute_and_fetch_col0( jpayne@69: "SELECT ontology_id FROM ontology WHERE name = %s", (name,) jpayne@69: ) jpayne@69: if oids: jpayne@69: return oids[0] jpayne@69: self.adaptor.execute( jpayne@69: "INSERT INTO ontology(name, definition) VALUES (%s, %s)", (name, definition) jpayne@69: ) jpayne@69: return self.adaptor.last_id("ontology") jpayne@69: jpayne@69: def _get_term_id(self, name, ontology_id=None, definition=None, identifier=None): jpayne@69: """Get the id that corresponds to a term (PRIVATE). jpayne@69: jpayne@69: This looks through the term table for a the given term. If it jpayne@69: is not found, a new id corresponding to this term is created. jpayne@69: In either case, the id corresponding to that term is returned, so jpayne@69: that you can reference it in another table. jpayne@69: jpayne@69: The ontology_id should be used to disambiguate the term. jpayne@69: """ jpayne@69: # try to get the term id jpayne@69: sql = "SELECT term_id FROM term WHERE name = %s" jpayne@69: fields = [name] jpayne@69: if ontology_id: jpayne@69: sql += " AND ontology_id = %s" jpayne@69: fields.append(ontology_id) jpayne@69: id_results = self.adaptor.execute_and_fetchall(sql, fields) jpayne@69: # something is wrong jpayne@69: if len(id_results) > 1: jpayne@69: raise ValueError(f"Multiple term ids for {name}: {id_results!r}") jpayne@69: elif len(id_results) == 1: jpayne@69: return id_results[0][0] jpayne@69: else: jpayne@69: sql = ( jpayne@69: "INSERT INTO term (name, definition," jpayne@69: " identifier, ontology_id)" jpayne@69: " VALUES (%s, %s, %s, %s)" jpayne@69: ) jpayne@69: self.adaptor.execute(sql, (name, definition, identifier, ontology_id)) jpayne@69: return self.adaptor.last_id("term") jpayne@69: jpayne@69: def _add_dbxref(self, dbname, accession, version): jpayne@69: """Insert a dbxref and return its id (PRIVATE).""" jpayne@69: self.adaptor.execute( jpayne@69: "INSERT INTO dbxref(dbname, accession, version) VALUES (%s, %s, %s)", jpayne@69: (dbname, accession, version), jpayne@69: ) jpayne@69: return self.adaptor.last_id("dbxref") jpayne@69: jpayne@69: def _get_taxon_id(self, record): jpayne@69: """Get the taxon id for this record (PRIVATE). jpayne@69: jpayne@69: Arguments: jpayne@69: - record - a SeqRecord object jpayne@69: jpayne@69: This searches the taxon/taxon_name tables using the jpayne@69: NCBI taxon ID, scientific name and common name to find jpayne@69: the matching taxon table entry's id. jpayne@69: jpayne@69: If the species isn't in the taxon table, and we have at jpayne@69: least the NCBI taxon ID, scientific name or common name, jpayne@69: at least a minimal stub entry is created in the table. jpayne@69: jpayne@69: Returns the taxon id (database key for the taxon table, jpayne@69: not an NCBI taxon ID), or None if the taxonomy information jpayne@69: is missing. jpayne@69: jpayne@69: See also the BioSQL script load_ncbi_taxonomy.pl which jpayne@69: will populate and update the taxon/taxon_name tables jpayne@69: with the latest information from the NCBI. jpayne@69: """ jpayne@69: # To find the NCBI taxid, first check for a top level annotation jpayne@69: ncbi_taxon_id = None jpayne@69: if "ncbi_taxid" in record.annotations: jpayne@69: # Could be a list of IDs. jpayne@69: if isinstance(record.annotations["ncbi_taxid"], list): jpayne@69: if len(record.annotations["ncbi_taxid"]) == 1: jpayne@69: ncbi_taxon_id = record.annotations["ncbi_taxid"][0] jpayne@69: else: jpayne@69: ncbi_taxon_id = record.annotations["ncbi_taxid"] jpayne@69: if not ncbi_taxon_id: jpayne@69: # Secondly, look for a source feature jpayne@69: for f in record.features: jpayne@69: if f.type == "source": jpayne@69: quals = getattr(f, "qualifiers", {}) jpayne@69: if "db_xref" in quals: jpayne@69: for db_xref in f.qualifiers["db_xref"]: jpayne@69: if db_xref.startswith("taxon:"): jpayne@69: ncbi_taxon_id = int(db_xref[6:]) jpayne@69: break jpayne@69: if ncbi_taxon_id: jpayne@69: break jpayne@69: jpayne@69: try: jpayne@69: scientific_name = record.annotations["organism"][:255] jpayne@69: except KeyError: jpayne@69: scientific_name = None jpayne@69: try: jpayne@69: common_name = record.annotations["source"][:255] jpayne@69: except KeyError: jpayne@69: common_name = None jpayne@69: # Note: The maximum length for taxon names in the schema is 255. jpayne@69: # Cropping it now should help in getting a match when searching, jpayne@69: # and avoids an error if we try and add these to the database. jpayne@69: jpayne@69: if ncbi_taxon_id: jpayne@69: # Good, we have the NCBI taxon to go on - this is unambiguous :) jpayne@69: # Note that the scientific name and common name will only be jpayne@69: # used if we have to record a stub entry. jpayne@69: return self._get_taxon_id_from_ncbi_taxon_id( jpayne@69: ncbi_taxon_id, scientific_name, common_name jpayne@69: ) jpayne@69: jpayne@69: if not common_name and not scientific_name: jpayne@69: # Nothing to go on... and there is no point adding jpayne@69: # a new entry to the database. We'll just leave this jpayne@69: # sequence's taxon as a NULL in the database. jpayne@69: return None jpayne@69: jpayne@69: # Next, we'll try to find a match based on the species name jpayne@69: # (stored in GenBank files as the organism and/or the source). jpayne@69: if scientific_name: jpayne@69: taxa = self.adaptor.execute_and_fetch_col0( jpayne@69: "SELECT taxon_id FROM taxon_name" jpayne@69: " WHERE name_class = 'scientific name' AND name = %s", jpayne@69: (scientific_name,), jpayne@69: ) jpayne@69: if taxa: jpayne@69: # Good, mapped the scientific name to a taxon table entry jpayne@69: return taxa[0] jpayne@69: jpayne@69: # Last chance... jpayne@69: if common_name: jpayne@69: taxa = self.adaptor.execute_and_fetch_col0( jpayne@69: "SELECT DISTINCT taxon_id FROM taxon_name WHERE name = %s", jpayne@69: (common_name,), jpayne@69: ) jpayne@69: # Its natural that several distinct taxa will have the same common jpayne@69: # name - in which case we can't resolve the taxon uniquely. jpayne@69: if len(taxa) > 1: jpayne@69: raise ValueError( jpayne@69: "Taxa: %d species have name %r" % (len(taxa), common_name) jpayne@69: ) jpayne@69: if taxa: jpayne@69: # Good, mapped the common name to a taxon table entry jpayne@69: return taxa[0] jpayne@69: jpayne@69: # At this point, as far as we can tell, this species isn't jpayne@69: # in the taxon table already. So we'll have to add it. jpayne@69: # We don't have an NCBI taxonomy ID, so if we do record just jpayne@69: # a stub entry, there is no simple way to fix this later. jpayne@69: # jpayne@69: # TODO - Should we try searching the NCBI taxonomy using the jpayne@69: # species name? jpayne@69: # jpayne@69: # OK, let's try inserting the species. jpayne@69: # Chances are we don't have enough information ... jpayne@69: # Furthermore, it won't be in the hierarchy. jpayne@69: jpayne@69: lineage = [] jpayne@69: for c in record.annotations.get("taxonomy", []): jpayne@69: lineage.append([None, None, c]) jpayne@69: if lineage: jpayne@69: lineage[-1][1] = "genus" jpayne@69: lineage.append([None, "species", record.annotations["organism"]]) jpayne@69: # XXX do we have them? jpayne@69: if "subspecies" in record.annotations: jpayne@69: lineage.append([None, "subspecies", record.annotations["subspecies"]]) jpayne@69: if "variant" in record.annotations: jpayne@69: lineage.append([None, "varietas", record.annotations["variant"]]) jpayne@69: lineage[-1][0] = ncbi_taxon_id jpayne@69: jpayne@69: left_value = self.adaptor.execute_one("SELECT MAX(left_value) FROM taxon")[0] jpayne@69: if not left_value: jpayne@69: left_value = 0 jpayne@69: left_value += 1 jpayne@69: jpayne@69: # XXX -- Brad: Fixing this for now in an ugly way because jpayne@69: # I am getting overlaps for right_values. I need to dig into this jpayne@69: # more to actually understand how it works. I'm not sure it is jpayne@69: # actually working right anyhow. jpayne@69: right_start_value = self.adaptor.execute_one( jpayne@69: "SELECT MAX(right_value) FROM taxon" jpayne@69: )[0] jpayne@69: if not right_start_value: jpayne@69: right_start_value = 0 jpayne@69: right_value = right_start_value + 2 * len(lineage) - 1 jpayne@69: jpayne@69: parent_taxon_id = None jpayne@69: for taxon in lineage: jpayne@69: self.adaptor.execute( jpayne@69: "INSERT INTO taxon(parent_taxon_id, ncbi_taxon_id, node_rank," jpayne@69: " left_value, right_value)" jpayne@69: " VALUES (%s, %s, %s, %s, %s)", jpayne@69: (parent_taxon_id, taxon[0], taxon[1], left_value, right_value), jpayne@69: ) jpayne@69: taxon_id = self.adaptor.last_id("taxon") jpayne@69: self.adaptor.execute( jpayne@69: "INSERT INTO taxon_name(taxon_id, name, name_class)" jpayne@69: "VALUES (%s, %s, 'scientific name')", jpayne@69: (taxon_id, taxon[2][:255]), jpayne@69: ) jpayne@69: # Note the name field is limited to 255, some SwissProt files jpayne@69: # have a multi-species name which can be longer. So truncate this. jpayne@69: left_value += 1 jpayne@69: right_value -= 1 jpayne@69: parent_taxon_id = taxon_id jpayne@69: if common_name: jpayne@69: self.adaptor.execute( jpayne@69: "INSERT INTO taxon_name(taxon_id, name, name_class)" jpayne@69: "VALUES (%s, %s, 'common name')", jpayne@69: (taxon_id, common_name), jpayne@69: ) jpayne@69: jpayne@69: return taxon_id jpayne@69: jpayne@69: def _fix_name_class(self, entrez_name): jpayne@69: """Map Entrez name terms to those used in taxdump (PRIVATE). jpayne@69: jpayne@69: We need to make this conversion to match the taxon_name.name_class jpayne@69: values used by the BioSQL load_ncbi_taxonomy.pl script. jpayne@69: jpayne@69: e.g.:: jpayne@69: jpayne@69: "ScientificName" -> "scientific name", jpayne@69: "EquivalentName" -> "equivalent name", jpayne@69: "Synonym" -> "synonym", jpayne@69: jpayne@69: """ jpayne@69: # Add any special cases here: jpayne@69: # jpayne@69: # known = {} jpayne@69: # try: jpayne@69: # return known[entrez_name] jpayne@69: # except KeyError: jpayne@69: # pass jpayne@69: jpayne@69: # Try automatically by adding spaces before each capital jpayne@69: def add_space(letter): jpayne@69: """Add a space before a capital letter.""" jpayne@69: if letter.isupper(): jpayne@69: return " " + letter.lower() jpayne@69: else: jpayne@69: return letter jpayne@69: jpayne@69: answer = "".join(add_space(letter) for letter in entrez_name).strip() jpayne@69: if answer != answer.lower(): jpayne@69: raise ValueError( jpayne@69: f"Expected processed entrez_name, '{answer}' to only have lower case letters." jpayne@69: ) jpayne@69: return answer jpayne@69: jpayne@69: def _update_left_right_taxon_values(self, left_value): jpayne@69: """Update the left and right taxon values in the table (PRIVATE).""" jpayne@69: if not left_value: jpayne@69: return jpayne@69: # Due to the UNIQUE constraint on the left and right values in the taxon jpayne@69: # table we cannot simply update them through an SQL statement as we risk jpayne@69: # colliding values. Instead we must select all of the rows that we want to jpayne@69: # update, modify the values in python and then update the rows jpayne@69: # self.adaptor.execute("UPDATE taxon SET right_value = right_value + 2 " jpayne@69: # "WHERE right_value >= %s", (left_value,)) jpayne@69: # self.adaptor.execute("UPDATE taxon SET left_value = left_value + 2 " jpayne@69: # "WHERE left_value > %s", (left_value,)) jpayne@69: jpayne@69: rows = self.adaptor.execute_and_fetchall( jpayne@69: "SELECT left_value, right_value, taxon_id FROM taxon " jpayne@69: "WHERE right_value >= %s or left_value > %s", jpayne@69: (left_value, left_value), jpayne@69: ) jpayne@69: jpayne@69: right_rows = [] jpayne@69: left_rows = [] jpayne@69: for row in rows: jpayne@69: new_right = row[1] jpayne@69: new_left = row[0] jpayne@69: if new_right >= left_value: jpayne@69: new_right += 2 jpayne@69: jpayne@69: if new_left > left_value: jpayne@69: new_left += 2 jpayne@69: right_rows.append((new_right, row[2])) jpayne@69: left_rows.append((new_left, row[2])) jpayne@69: jpayne@69: # sort the rows based on the value from largest to smallest jpayne@69: # should ensure no overlaps jpayne@69: right_rows = sorted(right_rows, key=lambda x: x[0], reverse=True) jpayne@69: left_rows = sorted(left_rows, key=lambda x: x[0], reverse=True) jpayne@69: jpayne@69: self.adaptor.executemany( jpayne@69: "UPDATE taxon SET left_value = %s WHERE taxon_id = %s", left_rows jpayne@69: ) jpayne@69: self.adaptor.executemany( jpayne@69: "UPDATE taxon SET right_value = %s WHERE taxon_id = %s", right_rows jpayne@69: ) jpayne@69: jpayne@69: def _get_taxon_id_from_ncbi_taxon_id( jpayne@69: self, ncbi_taxon_id, scientific_name=None, common_name=None jpayne@69: ): jpayne@69: """Get the taxon id for record from NCBI taxon ID (PRIVATE). jpayne@69: jpayne@69: Arguments: jpayne@69: - ncbi_taxon_id - string containing an NCBI taxon id jpayne@69: - scientific_name - string, used if a stub entry is recorded jpayne@69: - common_name - string, used if a stub entry is recorded jpayne@69: jpayne@69: This searches the taxon table using ONLY the NCBI taxon ID jpayne@69: to find the matching taxon table entry's ID (database key). jpayne@69: jpayne@69: If the species isn't in the taxon table, and the fetch_NCBI_taxonomy jpayne@69: flag is true, Biopython will attempt to go online using Bio.Entrez jpayne@69: to fetch the official NCBI lineage, recursing up the tree until an jpayne@69: existing entry is found in the database or the full lineage has been jpayne@69: fetched. jpayne@69: jpayne@69: Otherwise the NCBI taxon ID, scientific name and common name are jpayne@69: recorded as a minimal stub entry in the taxon and taxon_name tables. jpayne@69: Any partial information about the lineage from the SeqRecord is NOT jpayne@69: recorded. This should mean that (re)running the BioSQL script jpayne@69: load_ncbi_taxonomy.pl can fill in the taxonomy lineage. jpayne@69: jpayne@69: Returns the taxon id (database key for the taxon table, not jpayne@69: an NCBI taxon ID). jpayne@69: """ jpayne@69: if not ncbi_taxon_id: jpayne@69: raise ValueError("Expected a non-empty value for ncbi_taxon_id.") jpayne@69: jpayne@69: taxon_id = self.adaptor.execute_and_fetch_col0( jpayne@69: "SELECT taxon_id FROM taxon WHERE ncbi_taxon_id = %s", (int(ncbi_taxon_id),) jpayne@69: ) jpayne@69: if taxon_id: jpayne@69: # Good, we have mapped the NCBI taxid to a taxon table entry jpayne@69: return taxon_id[0] jpayne@69: jpayne@69: # At this point, as far as we can tell, this species isn't jpayne@69: # in the taxon table already. So we'll have to add it. jpayne@69: jpayne@69: parent_taxon_id = None jpayne@69: rank = "species" jpayne@69: genetic_code = None jpayne@69: mito_genetic_code = None jpayne@69: parent_left_value = None jpayne@69: parent_right_value = None jpayne@69: left_value = None jpayne@69: right_value = None jpayne@69: species_names = [] jpayne@69: if scientific_name: jpayne@69: species_names.append(("scientific name", scientific_name)) jpayne@69: if common_name: jpayne@69: species_names.append(("common name", common_name)) jpayne@69: jpayne@69: if self.fetch_NCBI_taxonomy: jpayne@69: # Go online to get the parent taxon ID! jpayne@69: handle = Entrez.efetch(db="taxonomy", id=ncbi_taxon_id, retmode="XML") jpayne@69: taxonomic_record = Entrez.read(handle) jpayne@69: if len(taxonomic_record) == 1: jpayne@69: if taxonomic_record[0]["TaxId"] != str(ncbi_taxon_id): jpayne@69: raise ValueError( jpayne@69: f"ncbi_taxon_id different from parent taxon id. {ncbi_taxon_id} versus {taxonomic_record[0]['TaxId']}" jpayne@69: ) jpayne@69: jpayne@69: ( jpayne@69: parent_taxon_id, jpayne@69: parent_left_value, jpayne@69: parent_right_value, jpayne@69: ) = self._get_taxon_id_from_ncbi_lineage( jpayne@69: taxonomic_record[0]["LineageEx"] jpayne@69: ) jpayne@69: jpayne@69: left_value = parent_right_value jpayne@69: right_value = parent_right_value + 1 jpayne@69: jpayne@69: rank = str(taxonomic_record[0]["Rank"]) jpayne@69: jpayne@69: genetic_code = int(taxonomic_record[0]["GeneticCode"]["GCId"]) jpayne@69: jpayne@69: mito_genetic_code = int(taxonomic_record[0]["MitoGeneticCode"]["MGCId"]) jpayne@69: jpayne@69: species_names = [ jpayne@69: ("scientific name", str(taxonomic_record[0]["ScientificName"])) jpayne@69: ] jpayne@69: try: jpayne@69: for name_class, names in taxonomic_record[0]["OtherNames"].items(): jpayne@69: name_class = self._fix_name_class(name_class) jpayne@69: if not isinstance(names, list): jpayne@69: # The Entrez parser seems to return single entry jpayne@69: # lists as just a string which is annoying. jpayne@69: names = [names] jpayne@69: for name in names: jpayne@69: # Want to ignore complex things like ClassCDE jpayne@69: # entries jpayne@69: if isinstance(name, str): jpayne@69: species_names.append((name_class, name)) jpayne@69: except KeyError: jpayne@69: # OtherNames isn't always present, jpayne@69: # e.g. NCBI taxon 41205, Bromheadia finlaysoniana jpayne@69: pass jpayne@69: else: jpayne@69: pass jpayne@69: # If we are not allowed to go online, we will record the bare minimum; jpayne@69: # as long as the NCBI taxon id is present, then (re)running jpayne@69: # load_ncbi_taxonomy.pl should fill in the taxonomomy lineage jpayne@69: # (and update the species names). jpayne@69: # jpayne@69: # I am NOT going to try and record the lineage, even if it jpayne@69: # is in the record annotation as a list of names, as we won't jpayne@69: # know the NCBI taxon IDs for these parent nodes. jpayne@69: jpayne@69: self._update_left_right_taxon_values(left_value) jpayne@69: jpayne@69: self.adaptor.execute( jpayne@69: "INSERT INTO taxon(parent_taxon_id, ncbi_taxon_id, node_rank," jpayne@69: " genetic_code, mito_genetic_code, left_value, right_value)" jpayne@69: " VALUES (%s, %s, %s, %s, %s, %s, %s)", jpayne@69: ( jpayne@69: parent_taxon_id, jpayne@69: ncbi_taxon_id, jpayne@69: rank, jpayne@69: genetic_code, jpayne@69: mito_genetic_code, jpayne@69: left_value, jpayne@69: right_value, jpayne@69: ), jpayne@69: ) jpayne@69: jpayne@69: taxon_id = self.adaptor.last_id("taxon") jpayne@69: jpayne@69: # Record the scientific name, common name, etc jpayne@69: for name_class, name in species_names: jpayne@69: self.adaptor.execute( jpayne@69: "INSERT INTO taxon_name(taxon_id, name, name_class)" jpayne@69: " VALUES (%s, %s, %s)", jpayne@69: (taxon_id, name[:255], name_class), jpayne@69: ) jpayne@69: return taxon_id jpayne@69: jpayne@69: def _get_taxon_id_from_ncbi_lineage(self, taxonomic_lineage): jpayne@69: """Recursive method to get taxon ID from NCBI lineage (PRIVATE). jpayne@69: jpayne@69: Arguments: jpayne@69: - taxonomic_lineage - list of taxonomy dictionaries from Bio.Entrez jpayne@69: jpayne@69: First dictionary in list is the taxonomy root, highest would be jpayne@69: the species. Each dictionary includes: jpayne@69: jpayne@69: - TaxID (string, NCBI taxon id) jpayne@69: - Rank (string, e.g. "species", "genus", ..., "phylum", ...) jpayne@69: - ScientificName (string) jpayne@69: jpayne@69: (and that is all at the time of writing) jpayne@69: jpayne@69: This method will record all the lineage given, returning the taxon id jpayne@69: (database key, not NCBI taxon id) of the final entry (the species). jpayne@69: """ jpayne@69: ncbi_taxon_id = int(taxonomic_lineage[-1]["TaxId"]) jpayne@69: left_value = None jpayne@69: right_value = None jpayne@69: parent_left_value = None jpayne@69: parent_right_value = None jpayne@69: # Is this in the database already? Check the taxon table... jpayne@69: rows = self.adaptor.execute_and_fetchall( jpayne@69: "SELECT taxon_id, left_value, right_value FROM taxon" jpayne@69: " WHERE ncbi_taxon_id=%s" % ncbi_taxon_id jpayne@69: ) jpayne@69: if rows: jpayne@69: # we could verify that the Scientific Name etc in the database jpayne@69: # is the same and update it or print a warning if not... jpayne@69: if len(rows) != 1: jpayne@69: raise ValueError(f"Expected 1 reponse, got {len(rows)}") jpayne@69: return rows[0] jpayne@69: jpayne@69: # We have to record this. jpayne@69: if len(taxonomic_lineage) > 1: jpayne@69: # Use recursion to find out the taxon id (database key) of the jpayne@69: # parent. jpayne@69: ( jpayne@69: parent_taxon_id, jpayne@69: parent_left_value, jpayne@69: parent_right_value, jpayne@69: ) = self._get_taxon_id_from_ncbi_lineage(taxonomic_lineage[:-1]) jpayne@69: left_value = parent_right_value jpayne@69: right_value = parent_right_value + 1 jpayne@69: if not isinstance(parent_taxon_id, int): jpayne@69: raise ValueError( jpayne@69: f"Expected parent_taxon_id to be an int, got {parent_taxon_id}" jpayne@69: ) jpayne@69: else: jpayne@69: # we have reached the top of the lineage but no current taxonomy jpayne@69: # id has been found jpayne@69: parent_taxon_id = None jpayne@69: left_value = self.adaptor.execute_one("SELECT MAX(left_value) FROM taxon")[ jpayne@69: 0 jpayne@69: ] jpayne@69: if not left_value: jpayne@69: left_value = 0 jpayne@69: jpayne@69: right_value = left_value + 1 jpayne@69: jpayne@69: self._update_left_right_taxon_values(left_value) jpayne@69: jpayne@69: # INSERT new taxon jpayne@69: rank = str(taxonomic_lineage[-1].get("Rank")) jpayne@69: self.adaptor.execute( jpayne@69: "INSERT INTO taxon(ncbi_taxon_id, parent_taxon_id, node_rank, " jpayne@69: "left_value, right_value) VALUES (%s, %s, %s, %s, %s)", jpayne@69: (ncbi_taxon_id, parent_taxon_id, rank, left_value, right_value), jpayne@69: ) jpayne@69: jpayne@69: taxon_id = self.adaptor.last_id("taxon") jpayne@69: # assert isinstance(taxon_id, int), repr(taxon_id) jpayne@69: # ... and its name in taxon_name jpayne@69: scientific_name = taxonomic_lineage[-1].get("ScientificName") jpayne@69: if scientific_name: jpayne@69: self.adaptor.execute( jpayne@69: "INSERT INTO taxon_name(taxon_id, name, name_class) " jpayne@69: "VALUES (%s, %s, 'scientific name')", jpayne@69: (taxon_id, scientific_name[:255]), jpayne@69: ) jpayne@69: return taxon_id, left_value, right_value jpayne@69: jpayne@69: def _load_bioentry_table(self, record): jpayne@69: """Fill the bioentry table with sequence information (PRIVATE). jpayne@69: jpayne@69: Arguments: jpayne@69: - record - SeqRecord object to add to the database. jpayne@69: jpayne@69: """ jpayne@69: # get the pertinent info and insert it jpayne@69: jpayne@69: if record.id.count(".") == 1: # try to get a version from the id jpayne@69: # This assumes the string is something like "XXXXXXXX.123" jpayne@69: accession, version = record.id.split(".") jpayne@69: try: jpayne@69: version = int(version) jpayne@69: except ValueError: jpayne@69: accession = record.id jpayne@69: version = 0 jpayne@69: else: # otherwise just use a version of 0 jpayne@69: accession = record.id jpayne@69: version = 0 jpayne@69: jpayne@69: if ( jpayne@69: "accessions" in record.annotations jpayne@69: and isinstance(record.annotations["accessions"], list) jpayne@69: and record.annotations["accessions"] jpayne@69: ): jpayne@69: # Take the first accession (one if there is more than one) jpayne@69: accession = record.annotations["accessions"][0] jpayne@69: jpayne@69: # Find the taxon id (this is not just the NCBI Taxon ID) jpayne@69: # NOTE - If the species isn't defined in the taxon table, jpayne@69: # a new minimal entry is created. jpayne@69: taxon_id = self._get_taxon_id(record) jpayne@69: jpayne@69: if "gi" in record.annotations: jpayne@69: identifier = record.annotations["gi"] jpayne@69: else: jpayne@69: identifier = record.id jpayne@69: jpayne@69: # Allow description and division to default to NULL as in BioPerl. jpayne@69: description = getattr(record, "description", None) jpayne@69: division = record.annotations.get("data_file_division") jpayne@69: jpayne@69: sql = """ jpayne@69: INSERT INTO bioentry ( jpayne@69: biodatabase_id, jpayne@69: taxon_id, jpayne@69: name, jpayne@69: accession, jpayne@69: identifier, jpayne@69: division, jpayne@69: description, jpayne@69: version) jpayne@69: VALUES ( jpayne@69: %s, jpayne@69: %s, jpayne@69: %s, jpayne@69: %s, jpayne@69: %s, jpayne@69: %s, jpayne@69: %s, jpayne@69: %s)""" jpayne@69: # print(self.dbid, taxon_id, record.name, accession, identifier, \ jpayne@69: # division, description, version) jpayne@69: self.adaptor.execute( jpayne@69: sql, jpayne@69: ( jpayne@69: self.dbid, jpayne@69: taxon_id, jpayne@69: record.name, jpayne@69: accession, jpayne@69: identifier, jpayne@69: division, jpayne@69: description, jpayne@69: version, jpayne@69: ), jpayne@69: ) jpayne@69: # now retrieve the id for the bioentry jpayne@69: return self.adaptor.last_id("bioentry") jpayne@69: jpayne@69: def _load_bioentry_date(self, record, bioentry_id): jpayne@69: """Add the effective date of the entry into the database (PRIVATE). jpayne@69: jpayne@69: record - a SeqRecord object with an annotated date jpayne@69: bioentry_id - corresponding database identifier jpayne@69: """ jpayne@69: # dates are GenBank style, like: jpayne@69: # 14-SEP-2000 jpayne@69: date = record.annotations.get("date", strftime("%d-%b-%Y", gmtime()).upper()) jpayne@69: if isinstance(date, list): jpayne@69: date = date[0] jpayne@69: annotation_tags_id = self._get_ontology_id("Annotation Tags") jpayne@69: date_id = self._get_term_id("date_changed", annotation_tags_id) jpayne@69: sql = ( jpayne@69: "INSERT INTO bioentry_qualifier_value" jpayne@69: ' (bioentry_id, term_id, value, "rank")' jpayne@69: " VALUES (%s, %s, %s, 1)" jpayne@69: ) jpayne@69: self.adaptor.execute(sql, (bioentry_id, date_id, date)) jpayne@69: jpayne@69: def _load_biosequence(self, record, bioentry_id): jpayne@69: """Record SeqRecord's sequence and alphabet in DB (PRIVATE). jpayne@69: jpayne@69: Arguments: jpayne@69: - record - a SeqRecord object with a seq property jpayne@69: - bioentry_id - corresponding database identifier jpayne@69: jpayne@69: """ jpayne@69: if record.seq is None: jpayne@69: # The biosequence table entry is optional, so if we haven't jpayne@69: # got a sequence, we don't need to write to the table. jpayne@69: return jpayne@69: jpayne@69: molecule_type = record.annotations.get("molecule_type", "") jpayne@69: if "DNA" in molecule_type: jpayne@69: alphabet = "dna" jpayne@69: elif "RNA" in molecule_type: jpayne@69: alphabet = "rna" jpayne@69: elif "protein" in molecule_type: jpayne@69: alphabet = "protein" jpayne@69: else: jpayne@69: alphabet = "unknown" jpayne@69: jpayne@69: try: jpayne@69: seq_str = str(record.seq) jpayne@69: except UndefinedSequenceError: jpayne@69: seq_str = None jpayne@69: jpayne@69: sql = ( jpayne@69: "INSERT INTO biosequence (bioentry_id, version, " jpayne@69: "length, seq, alphabet) " jpayne@69: "VALUES (%s, 0, %s, %s, %s)" jpayne@69: ) jpayne@69: self.adaptor.execute(sql, (bioentry_id, len(record.seq), seq_str, alphabet)) jpayne@69: jpayne@69: def _load_comment(self, record, bioentry_id): jpayne@69: """Record a SeqRecord's annotated comment in the database (PRIVATE). jpayne@69: jpayne@69: Arguments: jpayne@69: - record - a SeqRecord object with an annotated comment jpayne@69: - bioentry_id - corresponding database identifier jpayne@69: jpayne@69: """ jpayne@69: comments = record.annotations.get("comment") jpayne@69: if not comments: jpayne@69: return jpayne@69: if not isinstance(comments, list): jpayne@69: # It should be a string then... jpayne@69: comments = [comments] jpayne@69: jpayne@69: for index, comment in enumerate(comments): jpayne@69: comment = comment.replace("\n", " ") jpayne@69: # TODO - Store each line as a separate entry? This would preserve jpayne@69: # the newlines, but we should check BioPerl etc to be consistent. jpayne@69: sql = ( jpayne@69: 'INSERT INTO comment (bioentry_id, comment_text, "rank")' jpayne@69: " VALUES (%s, %s, %s)" jpayne@69: ) jpayne@69: self.adaptor.execute(sql, (bioentry_id, comment, index + 1)) jpayne@69: jpayne@69: def _load_annotations(self, record, bioentry_id): jpayne@69: """Record a SeqRecord's misc annotations in the database (PRIVATE). jpayne@69: jpayne@69: The annotation strings are recorded in the bioentry_qualifier_value jpayne@69: table, except for special cases like the reference, comment and jpayne@69: taxonomy which are handled with their own tables. jpayne@69: jpayne@69: Arguments: jpayne@69: - record - a SeqRecord object with an annotations dictionary jpayne@69: - bioentry_id - corresponding database identifier jpayne@69: jpayne@69: """ jpayne@69: mono_sql = ( jpayne@69: "INSERT INTO bioentry_qualifier_value" jpayne@69: "(bioentry_id, term_id, value)" jpayne@69: " VALUES (%s, %s, %s)" jpayne@69: ) jpayne@69: many_sql = ( jpayne@69: "INSERT INTO bioentry_qualifier_value" jpayne@69: '(bioentry_id, term_id, value, "rank")' jpayne@69: " VALUES (%s, %s, %s, %s)" jpayne@69: ) jpayne@69: tag_ontology_id = self._get_ontology_id("Annotation Tags") jpayne@69: for key, value in record.annotations.items(): jpayne@69: if key in ["molecule_type", "references", "comment", "ncbi_taxid", "date"]: jpayne@69: # Handled separately jpayne@69: continue jpayne@69: term_id = self._get_term_id(key, ontology_id=tag_ontology_id) jpayne@69: if isinstance(value, (list, tuple)): jpayne@69: rank = 0 jpayne@69: for entry in value: jpayne@69: if isinstance(entry, (str, int)): jpayne@69: # Easy case jpayne@69: rank += 1 jpayne@69: self.adaptor.execute( jpayne@69: many_sql, (bioentry_id, term_id, str(entry), rank) jpayne@69: ) jpayne@69: else: jpayne@69: pass jpayne@69: elif isinstance(value, (str, int)): jpayne@69: # Have a simple single entry, leave rank as the DB default jpayne@69: self.adaptor.execute(mono_sql, (bioentry_id, term_id, str(value))) jpayne@69: else: jpayne@69: pass jpayne@69: # print("Ignoring annotation '%s' entry of type '%s'" \ jpayne@69: # % (key, type(value))) jpayne@69: jpayne@69: def _load_reference(self, reference, rank, bioentry_id): jpayne@69: """Record SeqRecord's annotated references in the database (PRIVATE). jpayne@69: jpayne@69: Arguments: jpayne@69: - record - a SeqRecord object with annotated references jpayne@69: - bioentry_id - corresponding database identifier jpayne@69: jpayne@69: """ jpayne@69: refs = None jpayne@69: if reference.medline_id: jpayne@69: refs = self.adaptor.execute_and_fetch_col0( jpayne@69: "SELECT reference_id" jpayne@69: " FROM reference JOIN dbxref USING (dbxref_id)" jpayne@69: " WHERE dbname = 'MEDLINE' AND accession = %s", jpayne@69: (reference.medline_id,), jpayne@69: ) jpayne@69: if not refs and reference.pubmed_id: jpayne@69: refs = self.adaptor.execute_and_fetch_col0( jpayne@69: "SELECT reference_id" jpayne@69: " FROM reference JOIN dbxref USING (dbxref_id)" jpayne@69: " WHERE dbname = 'PUBMED' AND accession = %s", jpayne@69: (reference.pubmed_id,), jpayne@69: ) jpayne@69: if not refs: jpayne@69: s = [] jpayne@69: for f in reference.authors, reference.title, reference.journal: jpayne@69: s.append(f or "") jpayne@69: crc = crc64("".join(s)) jpayne@69: refs = self.adaptor.execute_and_fetch_col0( jpayne@69: "SELECT reference_id FROM reference WHERE crc = %s", (crc,) jpayne@69: ) jpayne@69: if not refs: jpayne@69: if reference.medline_id: jpayne@69: dbxref_id = self._add_dbxref("MEDLINE", reference.medline_id, 0) jpayne@69: elif reference.pubmed_id: jpayne@69: dbxref_id = self._add_dbxref("PUBMED", reference.pubmed_id, 0) jpayne@69: else: jpayne@69: dbxref_id = None jpayne@69: authors = reference.authors or None jpayne@69: title = reference.title or None jpayne@69: # The location/journal field cannot be Null, so default jpayne@69: # to an empty string rather than None: jpayne@69: journal = reference.journal or "" jpayne@69: self.adaptor.execute( jpayne@69: "INSERT INTO reference (dbxref_id, location," jpayne@69: " title, authors, crc)" jpayne@69: " VALUES (%s, %s, %s, %s, %s)", jpayne@69: (dbxref_id, journal, title, authors, crc), jpayne@69: ) jpayne@69: reference_id = self.adaptor.last_id("reference") jpayne@69: else: jpayne@69: reference_id = refs[0] jpayne@69: jpayne@69: if reference.location: jpayne@69: start = 1 + int(str(reference.location[0].start)) jpayne@69: end = int(str(reference.location[0].end)) jpayne@69: else: jpayne@69: start = None jpayne@69: end = None jpayne@69: jpayne@69: sql = ( jpayne@69: "INSERT INTO bioentry_reference (bioentry_id, reference_id," jpayne@69: ' start_pos, end_pos, "rank") VALUES (%s, %s, %s, %s, %s)' jpayne@69: ) jpayne@69: self.adaptor.execute(sql, (bioentry_id, reference_id, start, end, rank + 1)) jpayne@69: jpayne@69: def _load_seqfeature(self, feature, feature_rank, bioentry_id): jpayne@69: """Load a biopython SeqFeature into the database (PRIVATE).""" jpayne@69: # records loaded from a gff file using BCBio.GFF will contain value jpayne@69: # of 2nd column of the gff as a feature qualifier. The BioSQL wiki jpayne@69: # suggests that the source should not go in with the other feature jpayne@69: # mappings but instead be put in the term table jpayne@69: # (http://www.biosql.org/wiki/Annotation_Mapping) jpayne@69: try: jpayne@69: source = feature.qualifiers["source"] jpayne@69: if isinstance(source, list): jpayne@69: source = source[0] jpayne@69: seqfeature_id = self._load_seqfeature_basic( jpayne@69: feature.type, feature_rank, bioentry_id, source=source jpayne@69: ) jpayne@69: except KeyError: jpayne@69: seqfeature_id = self._load_seqfeature_basic( jpayne@69: feature.type, feature_rank, bioentry_id jpayne@69: ) jpayne@69: jpayne@69: self._load_seqfeature_locations(feature, seqfeature_id) jpayne@69: self._load_seqfeature_qualifiers(feature.qualifiers, seqfeature_id) jpayne@69: jpayne@69: def _load_seqfeature_basic( jpayne@69: self, feature_type, feature_rank, bioentry_id, source="EMBL/GenBank/SwissProt" jpayne@69: ): jpayne@69: """Load the first tables of a seqfeature and returns the id (PRIVATE). jpayne@69: jpayne@69: This loads the "key" of the seqfeature (ie. CDS, gene) and jpayne@69: the basic seqfeature table itself. jpayne@69: """ jpayne@69: ontology_id = self._get_ontology_id("SeqFeature Keys") jpayne@69: seqfeature_key_id = self._get_term_id(feature_type, ontology_id=ontology_id) jpayne@69: source_cat_id = self._get_ontology_id("SeqFeature Sources") jpayne@69: source_term_id = self._get_term_id(source, ontology_id=source_cat_id) jpayne@69: jpayne@69: sql = ( jpayne@69: "INSERT INTO seqfeature (bioentry_id, type_term_id, " jpayne@69: 'source_term_id, "rank") VALUES (%s, %s, %s, %s)' jpayne@69: ) jpayne@69: self.adaptor.execute( jpayne@69: sql, (bioentry_id, seqfeature_key_id, source_term_id, feature_rank + 1) jpayne@69: ) jpayne@69: return self.adaptor.last_id("seqfeature") jpayne@69: jpayne@69: def _load_seqfeature_locations(self, feature, seqfeature_id): jpayne@69: """Load all of the locations for a SeqFeature into tables (PRIVATE). jpayne@69: jpayne@69: This adds the locations related to the SeqFeature into the jpayne@69: seqfeature_location table. Fuzzies are not handled right now. jpayne@69: For a simple location, ie (1..2), we have a single table row jpayne@69: with seq_start = 1, seq_end = 2, location_rank = 1. jpayne@69: jpayne@69: For split locations, ie (1..2, 3..4, 5..6) we would have three jpayne@69: row tables with:: jpayne@69: jpayne@69: start = 1, end = 2, rank = 1 jpayne@69: start = 3, end = 4, rank = 2 jpayne@69: start = 5, end = 6, rank = 3 jpayne@69: jpayne@69: """ jpayne@69: # TODO - Record an ontology for the locations (using location.term_id) jpayne@69: # which for now as in BioPerl we leave defaulting to NULL. jpayne@69: try: jpayne@69: if feature.location.operator != "join": jpayne@69: # e.g. order locations... we don't record "order" so it jpayne@69: # will become a "join" on reloading. What does BioPerl do? jpayne@69: import warnings jpayne@69: from Bio import BiopythonWarning jpayne@69: jpayne@69: warnings.warn( jpayne@69: "%s location operators are not fully supported" jpayne@69: % feature.location_operator, jpayne@69: BiopythonWarning, jpayne@69: ) jpayne@69: except AttributeError: jpayne@69: pass jpayne@69: # This will be a list of length one for a SimpleLocation: jpayne@69: parts = feature.location.parts jpayne@69: if parts and {loc.strand for loc in parts} == {-1}: jpayne@69: # To mimic prior behaviour of Biopython+BioSQL, reverse order jpayne@69: parts = parts[::-1] jpayne@69: # TODO - Check what BioPerl does; see also BioSeq.py code jpayne@69: for rank, loc in enumerate(parts): jpayne@69: self._insert_location(loc, rank + 1, seqfeature_id) jpayne@69: jpayne@69: def _insert_location(self, location, rank, seqfeature_id): jpayne@69: """Add SeqFeature location to seqfeature_location table (PRIVATE). jpayne@69: jpayne@69: TODO - Add location operator to location_qualifier_value? jpayne@69: """ jpayne@69: # convert biopython locations to the 1-based location system jpayne@69: # used in bioSQL jpayne@69: # XXX This could also handle fuzzies jpayne@69: jpayne@69: try: jpayne@69: start = int(location.start) + 1 jpayne@69: except TypeError: jpayne@69: # Handle SwissProt unknown position (?) jpayne@69: if isinstance(location.start, UnknownPosition): jpayne@69: start = None jpayne@69: else: jpayne@69: raise jpayne@69: jpayne@69: try: jpayne@69: end = int(location.end) jpayne@69: except TypeError: jpayne@69: # Handle SwissProt unknown position (?) jpayne@69: if isinstance(location.end, UnknownPosition): jpayne@69: end = None jpayne@69: else: jpayne@69: raise jpayne@69: jpayne@69: # Biopython uses None when we don't know strand information but jpayne@69: # BioSQL requires something (non null) and sets this as zero jpayne@69: # So we'll use the strand or 0 if Biopython spits out None jpayne@69: strand = location.strand or 0 jpayne@69: jpayne@69: # TODO - Record an ontology term for the location (location.term_id) jpayne@69: # which for now like BioPerl we'll leave as NULL. jpayne@69: # This might allow us to record "between" positions properly, but I jpayne@69: # don't really see how it could work for before/after fuzzy positions jpayne@69: loc_term_id = None jpayne@69: jpayne@69: if location.ref: jpayne@69: # sub_feature remote locations when they are in the same db as the jpayne@69: # current record do not have a value for ref_db, which SeqFeature jpayne@69: # object stores as None. BioSQL schema requires a varchar and is jpayne@69: # not NULL jpayne@69: dbxref_id = self._get_dbxref_id(location.ref_db or "", location.ref) jpayne@69: else: jpayne@69: dbxref_id = None jpayne@69: jpayne@69: sql = ( jpayne@69: "INSERT INTO location (seqfeature_id, dbxref_id, term_id," jpayne@69: 'start_pos, end_pos, strand, "rank") ' jpayne@69: "VALUES (%s, %s, %s, %s, %s, %s, %s)" jpayne@69: ) jpayne@69: self.adaptor.execute( jpayne@69: sql, (seqfeature_id, dbxref_id, loc_term_id, start, end, strand, rank) jpayne@69: ) jpayne@69: jpayne@69: """ jpayne@69: # See Bug 2677 jpayne@69: # TODO - Record the location_operator (e.g. "join" or "order") jpayne@69: # using the location_qualifier_value table (which we and BioPerl jpayne@69: # have historically left empty). jpayne@69: # Note this will need an ontology term for the location qualifier jpayne@69: # (location_qualifier_value.term_id) for which oddly the schema jpayne@69: # does not allow NULL. jpayne@69: if feature.location_operator: jpayne@69: #e.g. "join" (common), jpayne@69: #or "order" (see Tests/GenBank/protein_refseq2.gb) jpayne@69: location_id = self.adaptor.last_id('location') jpayne@69: loc_qual_term_id = None # Not allowed in BioSQL v1.0.1 jpayne@69: sql = ("INSERT INTO location_qualifier_value" jpayne@69: "(location_id, term_id, value) " jpayne@69: "VALUES (%s, %s, %s)") jpayne@69: self.adaptor.execute(sql, (location_id, loc_qual_term_id, jpayne@69: feature.location_operator)) jpayne@69: """ jpayne@69: jpayne@69: def _load_seqfeature_qualifiers(self, qualifiers, seqfeature_id): jpayne@69: """Insert feature's (key, value) pair qualifiers (PRIVATE). jpayne@69: jpayne@69: Qualifiers should be a dictionary of the form:: jpayne@69: jpayne@69: {key : [value1, value2]} jpayne@69: jpayne@69: """ jpayne@69: tag_ontology_id = self._get_ontology_id("Annotation Tags") jpayne@69: for qualifier_key in qualifiers: jpayne@69: # Treat db_xref qualifiers differently to sequence annotation jpayne@69: # qualifiers by populating the seqfeature_dbxref and dbxref jpayne@69: # tables. Other qualifiers go into the seqfeature_qualifier_value jpayne@69: # and (if new) term tables. jpayne@69: if qualifier_key != "db_xref": jpayne@69: qualifier_key_id = self._get_term_id( jpayne@69: qualifier_key, ontology_id=tag_ontology_id jpayne@69: ) jpayne@69: # now add all of the values to their table jpayne@69: entries = qualifiers[qualifier_key] jpayne@69: if not isinstance(entries, list): jpayne@69: # Could be a plain string, or an int or a float. jpayne@69: # However, we exect a list of strings here. jpayne@69: entries = [entries] jpayne@69: for qual_value_rank in range(len(entries)): jpayne@69: qualifier_value = entries[qual_value_rank] jpayne@69: sql = ( jpayne@69: "INSERT INTO seqfeature_qualifier_value " jpayne@69: ' (seqfeature_id, term_id, "rank", value) VALUES' jpayne@69: " (%s, %s, %s, %s)" jpayne@69: ) jpayne@69: self.adaptor.execute( jpayne@69: sql, jpayne@69: ( jpayne@69: seqfeature_id, jpayne@69: qualifier_key_id, jpayne@69: qual_value_rank + 1, jpayne@69: qualifier_value, jpayne@69: ), jpayne@69: ) jpayne@69: else: jpayne@69: # The dbxref_id qualifier/value sets go into the dbxref table jpayne@69: # as dbname, accession, version tuples, with dbxref.dbxref_id jpayne@69: # being automatically assigned, and into the seqfeature_dbxref jpayne@69: # table as seqfeature_id, dbxref_id, and rank tuples jpayne@69: self._load_seqfeature_dbxref(qualifiers[qualifier_key], seqfeature_id) jpayne@69: jpayne@69: def _load_seqfeature_dbxref(self, dbxrefs, seqfeature_id): jpayne@69: """Add SeqFeature's DB cross-references to the database (PRIVATE). jpayne@69: jpayne@69: Arguments: jpayne@69: - dbxrefs - List, dbxref data from the source file in the jpayne@69: format : jpayne@69: - seqfeature_id - Int, the identifier for the seqfeature in the jpayne@69: seqfeature table jpayne@69: jpayne@69: Insert dbxref qualifier data for a seqfeature into the jpayne@69: seqfeature_dbxref and, if required, dbxref tables. jpayne@69: The dbxref_id qualifier/value sets go into the dbxref table jpayne@69: as dbname, accession, version tuples, with dbxref.dbxref_id jpayne@69: being automatically assigned, and into the seqfeature_dbxref jpayne@69: table as seqfeature_id, dbxref_id, and rank tuples. jpayne@69: """ jpayne@69: # NOTE - In older versions of Biopython, we would map the GenBank jpayne@69: # db_xref "name", for example "GI" to "GeneIndex", and give a warning jpayne@69: # for any unknown terms. This was a long term maintenance problem, jpayne@69: # and differed from BioPerl and BioJava's implementation. See bug 2405 jpayne@69: for rank, value in enumerate(dbxrefs): jpayne@69: # Split the DB:accession format string at colons. We have to jpayne@69: # account for multiple-line and multiple-accession entries jpayne@69: try: jpayne@69: dbxref_data = value.replace(" ", "").replace("\n", "").split(":") jpayne@69: db = dbxref_data[0] jpayne@69: accessions = dbxref_data[1:] jpayne@69: except Exception: jpayne@69: raise ValueError(f"Parsing of db_xref failed: '{value}'") from None jpayne@69: # Loop over all the grabbed accessions, and attempt to fill the jpayne@69: # table jpayne@69: for accession in accessions: jpayne@69: # Get the dbxref_id value for the dbxref data jpayne@69: dbxref_id = self._get_dbxref_id(db, accession) jpayne@69: # Insert the seqfeature_dbxref data jpayne@69: self._get_seqfeature_dbxref(seqfeature_id, dbxref_id, rank + 1) jpayne@69: jpayne@69: def _get_dbxref_id(self, db, accession): jpayne@69: """Get DB cross-reference for accession (PRIVATE). jpayne@69: jpayne@69: Arguments: jpayne@69: - db - String, the name of the external database containing jpayne@69: the accession number jpayne@69: - accession - String, the accession of the dbxref data jpayne@69: jpayne@69: Finds and returns the dbxref_id for the passed data. The method jpayne@69: attempts to find an existing record first, and inserts the data jpayne@69: if there is no record. jpayne@69: """ jpayne@69: # Check for an existing record jpayne@69: sql = "SELECT dbxref_id FROM dbxref WHERE dbname = %s AND accession = %s" jpayne@69: dbxref_id = self.adaptor.execute_and_fetch_col0(sql, (db, accession)) jpayne@69: # If there was a record, return the dbxref_id, else create the jpayne@69: # record and return the created dbxref_id jpayne@69: if dbxref_id: jpayne@69: return dbxref_id[0] jpayne@69: return self._add_dbxref(db, accession, 0) jpayne@69: jpayne@69: def _get_seqfeature_dbxref(self, seqfeature_id, dbxref_id, rank): jpayne@69: """Get DB cross-reference, creating it if needed (PRIVATE). jpayne@69: jpayne@69: Check for a pre-existing seqfeature_dbxref entry with the passed jpayne@69: seqfeature_id and dbxref_id. If one does not exist, insert new jpayne@69: data. jpayne@69: """ jpayne@69: # Check for an existing record jpayne@69: sql = ( jpayne@69: "SELECT seqfeature_id, dbxref_id FROM seqfeature_dbxref " jpayne@69: "WHERE seqfeature_id = %s AND dbxref_id = %s" jpayne@69: ) jpayne@69: result = self.adaptor.execute_and_fetch_col0(sql, (seqfeature_id, dbxref_id)) jpayne@69: # If there was a record, return without executing anything, else create jpayne@69: # the record and return jpayne@69: if result: jpayne@69: return result jpayne@69: return self._add_seqfeature_dbxref(seqfeature_id, dbxref_id, rank) jpayne@69: jpayne@69: def _add_seqfeature_dbxref(self, seqfeature_id, dbxref_id, rank): jpayne@69: """Add DB cross-reference (PRIVATE). jpayne@69: jpayne@69: Insert a seqfeature_dbxref row and return the seqfeature_id and jpayne@69: dbxref_id jpayne@69: """ jpayne@69: sql = ( jpayne@69: "INSERT INTO seqfeature_dbxref " jpayne@69: '(seqfeature_id, dbxref_id, "rank") VALUES' jpayne@69: "(%s, %s, %s)" jpayne@69: ) jpayne@69: self.adaptor.execute(sql, (seqfeature_id, dbxref_id, rank)) jpayne@69: return (seqfeature_id, dbxref_id) jpayne@69: jpayne@69: def _load_dbxrefs(self, record, bioentry_id): jpayne@69: """Load any sequence level cross references into the database (PRIVATE). jpayne@69: jpayne@69: See table bioentry_dbxref. jpayne@69: """ jpayne@69: for rank, value in enumerate(record.dbxrefs): jpayne@69: # Split the DB:accession string at first colon. jpayne@69: # We have to cope with things like: jpayne@69: # "MGD:MGI:892" (db="MGD", accession="MGI:892") jpayne@69: # "GO:GO:123" (db="GO", accession="GO:123") jpayne@69: # jpayne@69: # Annoyingly I have seen the NCBI use both the style jpayne@69: # "GO:GO:123" and "GO:123" in different vintages. jpayne@69: newline_escape_count = value.count("\n") jpayne@69: if newline_escape_count != 0: jpayne@69: raise ValueError( jpayne@69: "Expected a single line in value, got {newline_escape_count}" jpayne@69: ) jpayne@69: try: jpayne@69: db, accession = value.split(":", 1) jpayne@69: db = db.strip() jpayne@69: accession = accession.strip() jpayne@69: except Exception: jpayne@69: raise ValueError(f"Parsing of dbxrefs list failed: '{value}'") from None jpayne@69: # Get the dbxref_id value for the dbxref data jpayne@69: dbxref_id = self._get_dbxref_id(db, accession) jpayne@69: # Insert the bioentry_dbxref data jpayne@69: self._get_bioentry_dbxref(bioentry_id, dbxref_id, rank + 1) jpayne@69: jpayne@69: def _get_bioentry_dbxref(self, bioentry_id, dbxref_id, rank): jpayne@69: """Get pre-existing db-xref, or create and return it (PRIVATE). jpayne@69: jpayne@69: Check for a pre-existing bioentry_dbxref entry with the passed jpayne@69: seqfeature_id and dbxref_id. If one does not exist, insert new jpayne@69: data jpayne@69: """ jpayne@69: # Check for an existing record jpayne@69: sql = ( jpayne@69: "SELECT bioentry_id, dbxref_id FROM bioentry_dbxref " jpayne@69: "WHERE bioentry_id = %s AND dbxref_id = %s" jpayne@69: ) jpayne@69: result = self.adaptor.execute_and_fetch_col0(sql, (bioentry_id, dbxref_id)) jpayne@69: # If there was a record, return without executing anything, else create jpayne@69: # the record and return jpayne@69: if result: jpayne@69: return result jpayne@69: return self._add_bioentry_dbxref(bioentry_id, dbxref_id, rank) jpayne@69: jpayne@69: def _add_bioentry_dbxref(self, bioentry_id, dbxref_id, rank): jpayne@69: """Insert a bioentry_dbxref row (PRIVATE). jpayne@69: jpayne@69: Returns the seqfeature_id and dbxref_id (PRIVATE). jpayne@69: """ jpayne@69: sql = ( jpayne@69: "INSERT INTO bioentry_dbxref " jpayne@69: '(bioentry_id,dbxref_id,"rank") VALUES ' jpayne@69: "(%s, %s, %s)" jpayne@69: ) jpayne@69: self.adaptor.execute(sql, (bioentry_id, dbxref_id, rank)) jpayne@69: return (bioentry_id, dbxref_id) jpayne@69: jpayne@69: jpayne@69: class DatabaseRemover: jpayne@69: """Complement the Loader functionality by fully removing a database. jpayne@69: jpayne@69: This probably isn't really useful for normal purposes, since you jpayne@69: can just do a:: jpayne@69: jpayne@69: DROP DATABASE db_name jpayne@69: jpayne@69: and then recreate the database. But, it's really useful for testing jpayne@69: purposes. jpayne@69: """ jpayne@69: jpayne@69: def __init__(self, adaptor, dbid): jpayne@69: """Initialize with a database id and adaptor connection.""" jpayne@69: self.adaptor = adaptor jpayne@69: self.dbid = dbid jpayne@69: jpayne@69: def remove(self): jpayne@69: """Remove everything related to the given database id.""" jpayne@69: sql = "DELETE FROM bioentry WHERE biodatabase_id = %s" jpayne@69: self.adaptor.execute(sql, (self.dbid,)) jpayne@69: sql = "DELETE FROM biodatabase WHERE biodatabase_id = %s" jpayne@69: self.adaptor.execute(sql, (self.dbid,))