annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/BioSQL/Loader.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 # Copyright 2002 by Andrew Dalke. All rights reserved.
jpayne@68 2 # Revisions 2007-2016 copyright by Peter Cock. All rights reserved.
jpayne@68 3 # Revisions 2008 copyright by Cymon J. Cox. All rights reserved.
jpayne@68 4 #
jpayne@68 5 # This file is part of the Biopython distribution and governed by your
jpayne@68 6 # choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
jpayne@68 7 # Please see the LICENSE file that should have been included as part of this
jpayne@68 8 # package.
jpayne@68 9 #
jpayne@68 10 # Note that BioSQL (including the database schema and scripts) is
jpayne@68 11 # available and licensed separately. Please consult www.biosql.org
jpayne@68 12
jpayne@68 13 """Load biopython objects into a BioSQL database for persistent storage.
jpayne@68 14
jpayne@68 15 This code makes it possible to store biopython objects in a relational
jpayne@68 16 database and then retrieve them back. You shouldn't use any of the
jpayne@68 17 classes in this module directly. Rather, call the load() method on
jpayne@68 18 a database object.
jpayne@68 19 """
jpayne@68 20 # standard modules
jpayne@68 21 from time import gmtime, strftime
jpayne@68 22
jpayne@68 23 # biopython
jpayne@68 24 from Bio.SeqUtils.CheckSum import crc64
jpayne@68 25 from Bio import Entrez
jpayne@68 26 from Bio.Seq import UndefinedSequenceError
jpayne@68 27 from Bio.SeqFeature import UnknownPosition
jpayne@68 28
jpayne@68 29
jpayne@68 30 class DatabaseLoader:
jpayne@68 31 """Object used to load SeqRecord objects into a BioSQL database."""
jpayne@68 32
jpayne@68 33 def __init__(self, adaptor, dbid, fetch_NCBI_taxonomy=False):
jpayne@68 34 """Initialize with connection information for the database.
jpayne@68 35
jpayne@68 36 Creating a DatabaseLoader object is normally handled via the
jpayne@68 37 BioSeqDatabase DBServer object, for example::
jpayne@68 38
jpayne@68 39 from BioSQL import BioSeqDatabase
jpayne@68 40 server = BioSeqDatabase.open_database(driver="MySQLdb",
jpayne@68 41 user="gbrowse",
jpayne@68 42 passwd="biosql",
jpayne@68 43 host="localhost",
jpayne@68 44 db="test_biosql")
jpayne@68 45 try:
jpayne@68 46 db = server["test"]
jpayne@68 47 except KeyError:
jpayne@68 48 db = server.new_database("test",
jpayne@68 49 description="For testing GBrowse")
jpayne@68 50
jpayne@68 51 """
jpayne@68 52 self.adaptor = adaptor
jpayne@68 53 self.dbid = dbid
jpayne@68 54 self.fetch_NCBI_taxonomy = fetch_NCBI_taxonomy
jpayne@68 55
jpayne@68 56 def load_seqrecord(self, record):
jpayne@68 57 """Load a Biopython SeqRecord into the database."""
jpayne@68 58 bioentry_id = self._load_bioentry_table(record)
jpayne@68 59 self._load_bioentry_date(record, bioentry_id)
jpayne@68 60 self._load_biosequence(record, bioentry_id)
jpayne@68 61 self._load_comment(record, bioentry_id)
jpayne@68 62 self._load_dbxrefs(record, bioentry_id)
jpayne@68 63 references = record.annotations.get("references", ())
jpayne@68 64 for reference, rank in zip(references, list(range(len(references)))):
jpayne@68 65 self._load_reference(reference, rank, bioentry_id)
jpayne@68 66 self._load_annotations(record, bioentry_id)
jpayne@68 67 for seq_feature_num in range(len(record.features)):
jpayne@68 68 seq_feature = record.features[seq_feature_num]
jpayne@68 69 self._load_seqfeature(seq_feature, seq_feature_num, bioentry_id)
jpayne@68 70
jpayne@68 71 def _get_ontology_id(self, name, definition=None):
jpayne@68 72 """Return identifier for the named ontology (PRIVATE).
jpayne@68 73
jpayne@68 74 This looks through the onotology table for a the given entry name.
jpayne@68 75 If it is not found, a row is added for this ontology (using the
jpayne@68 76 definition if supplied). In either case, the id corresponding to
jpayne@68 77 the provided name is returned, so that you can reference it in
jpayne@68 78 another table.
jpayne@68 79 """
jpayne@68 80 oids = self.adaptor.execute_and_fetch_col0(
jpayne@68 81 "SELECT ontology_id FROM ontology WHERE name = %s", (name,)
jpayne@68 82 )
jpayne@68 83 if oids:
jpayne@68 84 return oids[0]
jpayne@68 85 self.adaptor.execute(
jpayne@68 86 "INSERT INTO ontology(name, definition) VALUES (%s, %s)", (name, definition)
jpayne@68 87 )
jpayne@68 88 return self.adaptor.last_id("ontology")
jpayne@68 89
jpayne@68 90 def _get_term_id(self, name, ontology_id=None, definition=None, identifier=None):
jpayne@68 91 """Get the id that corresponds to a term (PRIVATE).
jpayne@68 92
jpayne@68 93 This looks through the term table for a the given term. If it
jpayne@68 94 is not found, a new id corresponding to this term is created.
jpayne@68 95 In either case, the id corresponding to that term is returned, so
jpayne@68 96 that you can reference it in another table.
jpayne@68 97
jpayne@68 98 The ontology_id should be used to disambiguate the term.
jpayne@68 99 """
jpayne@68 100 # try to get the term id
jpayne@68 101 sql = "SELECT term_id FROM term WHERE name = %s"
jpayne@68 102 fields = [name]
jpayne@68 103 if ontology_id:
jpayne@68 104 sql += " AND ontology_id = %s"
jpayne@68 105 fields.append(ontology_id)
jpayne@68 106 id_results = self.adaptor.execute_and_fetchall(sql, fields)
jpayne@68 107 # something is wrong
jpayne@68 108 if len(id_results) > 1:
jpayne@68 109 raise ValueError(f"Multiple term ids for {name}: {id_results!r}")
jpayne@68 110 elif len(id_results) == 1:
jpayne@68 111 return id_results[0][0]
jpayne@68 112 else:
jpayne@68 113 sql = (
jpayne@68 114 "INSERT INTO term (name, definition,"
jpayne@68 115 " identifier, ontology_id)"
jpayne@68 116 " VALUES (%s, %s, %s, %s)"
jpayne@68 117 )
jpayne@68 118 self.adaptor.execute(sql, (name, definition, identifier, ontology_id))
jpayne@68 119 return self.adaptor.last_id("term")
jpayne@68 120
jpayne@68 121 def _add_dbxref(self, dbname, accession, version):
jpayne@68 122 """Insert a dbxref and return its id (PRIVATE)."""
jpayne@68 123 self.adaptor.execute(
jpayne@68 124 "INSERT INTO dbxref(dbname, accession, version) VALUES (%s, %s, %s)",
jpayne@68 125 (dbname, accession, version),
jpayne@68 126 )
jpayne@68 127 return self.adaptor.last_id("dbxref")
jpayne@68 128
jpayne@68 129 def _get_taxon_id(self, record):
jpayne@68 130 """Get the taxon id for this record (PRIVATE).
jpayne@68 131
jpayne@68 132 Arguments:
jpayne@68 133 - record - a SeqRecord object
jpayne@68 134
jpayne@68 135 This searches the taxon/taxon_name tables using the
jpayne@68 136 NCBI taxon ID, scientific name and common name to find
jpayne@68 137 the matching taxon table entry's id.
jpayne@68 138
jpayne@68 139 If the species isn't in the taxon table, and we have at
jpayne@68 140 least the NCBI taxon ID, scientific name or common name,
jpayne@68 141 at least a minimal stub entry is created in the table.
jpayne@68 142
jpayne@68 143 Returns the taxon id (database key for the taxon table,
jpayne@68 144 not an NCBI taxon ID), or None if the taxonomy information
jpayne@68 145 is missing.
jpayne@68 146
jpayne@68 147 See also the BioSQL script load_ncbi_taxonomy.pl which
jpayne@68 148 will populate and update the taxon/taxon_name tables
jpayne@68 149 with the latest information from the NCBI.
jpayne@68 150 """
jpayne@68 151 # To find the NCBI taxid, first check for a top level annotation
jpayne@68 152 ncbi_taxon_id = None
jpayne@68 153 if "ncbi_taxid" in record.annotations:
jpayne@68 154 # Could be a list of IDs.
jpayne@68 155 if isinstance(record.annotations["ncbi_taxid"], list):
jpayne@68 156 if len(record.annotations["ncbi_taxid"]) == 1:
jpayne@68 157 ncbi_taxon_id = record.annotations["ncbi_taxid"][0]
jpayne@68 158 else:
jpayne@68 159 ncbi_taxon_id = record.annotations["ncbi_taxid"]
jpayne@68 160 if not ncbi_taxon_id:
jpayne@68 161 # Secondly, look for a source feature
jpayne@68 162 for f in record.features:
jpayne@68 163 if f.type == "source":
jpayne@68 164 quals = getattr(f, "qualifiers", {})
jpayne@68 165 if "db_xref" in quals:
jpayne@68 166 for db_xref in f.qualifiers["db_xref"]:
jpayne@68 167 if db_xref.startswith("taxon:"):
jpayne@68 168 ncbi_taxon_id = int(db_xref[6:])
jpayne@68 169 break
jpayne@68 170 if ncbi_taxon_id:
jpayne@68 171 break
jpayne@68 172
jpayne@68 173 try:
jpayne@68 174 scientific_name = record.annotations["organism"][:255]
jpayne@68 175 except KeyError:
jpayne@68 176 scientific_name = None
jpayne@68 177 try:
jpayne@68 178 common_name = record.annotations["source"][:255]
jpayne@68 179 except KeyError:
jpayne@68 180 common_name = None
jpayne@68 181 # Note: The maximum length for taxon names in the schema is 255.
jpayne@68 182 # Cropping it now should help in getting a match when searching,
jpayne@68 183 # and avoids an error if we try and add these to the database.
jpayne@68 184
jpayne@68 185 if ncbi_taxon_id:
jpayne@68 186 # Good, we have the NCBI taxon to go on - this is unambiguous :)
jpayne@68 187 # Note that the scientific name and common name will only be
jpayne@68 188 # used if we have to record a stub entry.
jpayne@68 189 return self._get_taxon_id_from_ncbi_taxon_id(
jpayne@68 190 ncbi_taxon_id, scientific_name, common_name
jpayne@68 191 )
jpayne@68 192
jpayne@68 193 if not common_name and not scientific_name:
jpayne@68 194 # Nothing to go on... and there is no point adding
jpayne@68 195 # a new entry to the database. We'll just leave this
jpayne@68 196 # sequence's taxon as a NULL in the database.
jpayne@68 197 return None
jpayne@68 198
jpayne@68 199 # Next, we'll try to find a match based on the species name
jpayne@68 200 # (stored in GenBank files as the organism and/or the source).
jpayne@68 201 if scientific_name:
jpayne@68 202 taxa = self.adaptor.execute_and_fetch_col0(
jpayne@68 203 "SELECT taxon_id FROM taxon_name"
jpayne@68 204 " WHERE name_class = 'scientific name' AND name = %s",
jpayne@68 205 (scientific_name,),
jpayne@68 206 )
jpayne@68 207 if taxa:
jpayne@68 208 # Good, mapped the scientific name to a taxon table entry
jpayne@68 209 return taxa[0]
jpayne@68 210
jpayne@68 211 # Last chance...
jpayne@68 212 if common_name:
jpayne@68 213 taxa = self.adaptor.execute_and_fetch_col0(
jpayne@68 214 "SELECT DISTINCT taxon_id FROM taxon_name WHERE name = %s",
jpayne@68 215 (common_name,),
jpayne@68 216 )
jpayne@68 217 # Its natural that several distinct taxa will have the same common
jpayne@68 218 # name - in which case we can't resolve the taxon uniquely.
jpayne@68 219 if len(taxa) > 1:
jpayne@68 220 raise ValueError(
jpayne@68 221 "Taxa: %d species have name %r" % (len(taxa), common_name)
jpayne@68 222 )
jpayne@68 223 if taxa:
jpayne@68 224 # Good, mapped the common name to a taxon table entry
jpayne@68 225 return taxa[0]
jpayne@68 226
jpayne@68 227 # At this point, as far as we can tell, this species isn't
jpayne@68 228 # in the taxon table already. So we'll have to add it.
jpayne@68 229 # We don't have an NCBI taxonomy ID, so if we do record just
jpayne@68 230 # a stub entry, there is no simple way to fix this later.
jpayne@68 231 #
jpayne@68 232 # TODO - Should we try searching the NCBI taxonomy using the
jpayne@68 233 # species name?
jpayne@68 234 #
jpayne@68 235 # OK, let's try inserting the species.
jpayne@68 236 # Chances are we don't have enough information ...
jpayne@68 237 # Furthermore, it won't be in the hierarchy.
jpayne@68 238
jpayne@68 239 lineage = []
jpayne@68 240 for c in record.annotations.get("taxonomy", []):
jpayne@68 241 lineage.append([None, None, c])
jpayne@68 242 if lineage:
jpayne@68 243 lineage[-1][1] = "genus"
jpayne@68 244 lineage.append([None, "species", record.annotations["organism"]])
jpayne@68 245 # XXX do we have them?
jpayne@68 246 if "subspecies" in record.annotations:
jpayne@68 247 lineage.append([None, "subspecies", record.annotations["subspecies"]])
jpayne@68 248 if "variant" in record.annotations:
jpayne@68 249 lineage.append([None, "varietas", record.annotations["variant"]])
jpayne@68 250 lineage[-1][0] = ncbi_taxon_id
jpayne@68 251
jpayne@68 252 left_value = self.adaptor.execute_one("SELECT MAX(left_value) FROM taxon")[0]
jpayne@68 253 if not left_value:
jpayne@68 254 left_value = 0
jpayne@68 255 left_value += 1
jpayne@68 256
jpayne@68 257 # XXX -- Brad: Fixing this for now in an ugly way because
jpayne@68 258 # I am getting overlaps for right_values. I need to dig into this
jpayne@68 259 # more to actually understand how it works. I'm not sure it is
jpayne@68 260 # actually working right anyhow.
jpayne@68 261 right_start_value = self.adaptor.execute_one(
jpayne@68 262 "SELECT MAX(right_value) FROM taxon"
jpayne@68 263 )[0]
jpayne@68 264 if not right_start_value:
jpayne@68 265 right_start_value = 0
jpayne@68 266 right_value = right_start_value + 2 * len(lineage) - 1
jpayne@68 267
jpayne@68 268 parent_taxon_id = None
jpayne@68 269 for taxon in lineage:
jpayne@68 270 self.adaptor.execute(
jpayne@68 271 "INSERT INTO taxon(parent_taxon_id, ncbi_taxon_id, node_rank,"
jpayne@68 272 " left_value, right_value)"
jpayne@68 273 " VALUES (%s, %s, %s, %s, %s)",
jpayne@68 274 (parent_taxon_id, taxon[0], taxon[1], left_value, right_value),
jpayne@68 275 )
jpayne@68 276 taxon_id = self.adaptor.last_id("taxon")
jpayne@68 277 self.adaptor.execute(
jpayne@68 278 "INSERT INTO taxon_name(taxon_id, name, name_class)"
jpayne@68 279 "VALUES (%s, %s, 'scientific name')",
jpayne@68 280 (taxon_id, taxon[2][:255]),
jpayne@68 281 )
jpayne@68 282 # Note the name field is limited to 255, some SwissProt files
jpayne@68 283 # have a multi-species name which can be longer. So truncate this.
jpayne@68 284 left_value += 1
jpayne@68 285 right_value -= 1
jpayne@68 286 parent_taxon_id = taxon_id
jpayne@68 287 if common_name:
jpayne@68 288 self.adaptor.execute(
jpayne@68 289 "INSERT INTO taxon_name(taxon_id, name, name_class)"
jpayne@68 290 "VALUES (%s, %s, 'common name')",
jpayne@68 291 (taxon_id, common_name),
jpayne@68 292 )
jpayne@68 293
jpayne@68 294 return taxon_id
jpayne@68 295
jpayne@68 296 def _fix_name_class(self, entrez_name):
jpayne@68 297 """Map Entrez name terms to those used in taxdump (PRIVATE).
jpayne@68 298
jpayne@68 299 We need to make this conversion to match the taxon_name.name_class
jpayne@68 300 values used by the BioSQL load_ncbi_taxonomy.pl script.
jpayne@68 301
jpayne@68 302 e.g.::
jpayne@68 303
jpayne@68 304 "ScientificName" -> "scientific name",
jpayne@68 305 "EquivalentName" -> "equivalent name",
jpayne@68 306 "Synonym" -> "synonym",
jpayne@68 307
jpayne@68 308 """
jpayne@68 309 # Add any special cases here:
jpayne@68 310 #
jpayne@68 311 # known = {}
jpayne@68 312 # try:
jpayne@68 313 # return known[entrez_name]
jpayne@68 314 # except KeyError:
jpayne@68 315 # pass
jpayne@68 316
jpayne@68 317 # Try automatically by adding spaces before each capital
jpayne@68 318 def add_space(letter):
jpayne@68 319 """Add a space before a capital letter."""
jpayne@68 320 if letter.isupper():
jpayne@68 321 return " " + letter.lower()
jpayne@68 322 else:
jpayne@68 323 return letter
jpayne@68 324
jpayne@68 325 answer = "".join(add_space(letter) for letter in entrez_name).strip()
jpayne@68 326 if answer != answer.lower():
jpayne@68 327 raise ValueError(
jpayne@68 328 f"Expected processed entrez_name, '{answer}' to only have lower case letters."
jpayne@68 329 )
jpayne@68 330 return answer
jpayne@68 331
jpayne@68 332 def _update_left_right_taxon_values(self, left_value):
jpayne@68 333 """Update the left and right taxon values in the table (PRIVATE)."""
jpayne@68 334 if not left_value:
jpayne@68 335 return
jpayne@68 336 # Due to the UNIQUE constraint on the left and right values in the taxon
jpayne@68 337 # table we cannot simply update them through an SQL statement as we risk
jpayne@68 338 # colliding values. Instead we must select all of the rows that we want to
jpayne@68 339 # update, modify the values in python and then update the rows
jpayne@68 340 # self.adaptor.execute("UPDATE taxon SET right_value = right_value + 2 "
jpayne@68 341 # "WHERE right_value >= %s", (left_value,))
jpayne@68 342 # self.adaptor.execute("UPDATE taxon SET left_value = left_value + 2 "
jpayne@68 343 # "WHERE left_value > %s", (left_value,))
jpayne@68 344
jpayne@68 345 rows = self.adaptor.execute_and_fetchall(
jpayne@68 346 "SELECT left_value, right_value, taxon_id FROM taxon "
jpayne@68 347 "WHERE right_value >= %s or left_value > %s",
jpayne@68 348 (left_value, left_value),
jpayne@68 349 )
jpayne@68 350
jpayne@68 351 right_rows = []
jpayne@68 352 left_rows = []
jpayne@68 353 for row in rows:
jpayne@68 354 new_right = row[1]
jpayne@68 355 new_left = row[0]
jpayne@68 356 if new_right >= left_value:
jpayne@68 357 new_right += 2
jpayne@68 358
jpayne@68 359 if new_left > left_value:
jpayne@68 360 new_left += 2
jpayne@68 361 right_rows.append((new_right, row[2]))
jpayne@68 362 left_rows.append((new_left, row[2]))
jpayne@68 363
jpayne@68 364 # sort the rows based on the value from largest to smallest
jpayne@68 365 # should ensure no overlaps
jpayne@68 366 right_rows = sorted(right_rows, key=lambda x: x[0], reverse=True)
jpayne@68 367 left_rows = sorted(left_rows, key=lambda x: x[0], reverse=True)
jpayne@68 368
jpayne@68 369 self.adaptor.executemany(
jpayne@68 370 "UPDATE taxon SET left_value = %s WHERE taxon_id = %s", left_rows
jpayne@68 371 )
jpayne@68 372 self.adaptor.executemany(
jpayne@68 373 "UPDATE taxon SET right_value = %s WHERE taxon_id = %s", right_rows
jpayne@68 374 )
jpayne@68 375
jpayne@68 376 def _get_taxon_id_from_ncbi_taxon_id(
jpayne@68 377 self, ncbi_taxon_id, scientific_name=None, common_name=None
jpayne@68 378 ):
jpayne@68 379 """Get the taxon id for record from NCBI taxon ID (PRIVATE).
jpayne@68 380
jpayne@68 381 Arguments:
jpayne@68 382 - ncbi_taxon_id - string containing an NCBI taxon id
jpayne@68 383 - scientific_name - string, used if a stub entry is recorded
jpayne@68 384 - common_name - string, used if a stub entry is recorded
jpayne@68 385
jpayne@68 386 This searches the taxon table using ONLY the NCBI taxon ID
jpayne@68 387 to find the matching taxon table entry's ID (database key).
jpayne@68 388
jpayne@68 389 If the species isn't in the taxon table, and the fetch_NCBI_taxonomy
jpayne@68 390 flag is true, Biopython will attempt to go online using Bio.Entrez
jpayne@68 391 to fetch the official NCBI lineage, recursing up the tree until an
jpayne@68 392 existing entry is found in the database or the full lineage has been
jpayne@68 393 fetched.
jpayne@68 394
jpayne@68 395 Otherwise the NCBI taxon ID, scientific name and common name are
jpayne@68 396 recorded as a minimal stub entry in the taxon and taxon_name tables.
jpayne@68 397 Any partial information about the lineage from the SeqRecord is NOT
jpayne@68 398 recorded. This should mean that (re)running the BioSQL script
jpayne@68 399 load_ncbi_taxonomy.pl can fill in the taxonomy lineage.
jpayne@68 400
jpayne@68 401 Returns the taxon id (database key for the taxon table, not
jpayne@68 402 an NCBI taxon ID).
jpayne@68 403 """
jpayne@68 404 if not ncbi_taxon_id:
jpayne@68 405 raise ValueError("Expected a non-empty value for ncbi_taxon_id.")
jpayne@68 406
jpayne@68 407 taxon_id = self.adaptor.execute_and_fetch_col0(
jpayne@68 408 "SELECT taxon_id FROM taxon WHERE ncbi_taxon_id = %s", (int(ncbi_taxon_id),)
jpayne@68 409 )
jpayne@68 410 if taxon_id:
jpayne@68 411 # Good, we have mapped the NCBI taxid to a taxon table entry
jpayne@68 412 return taxon_id[0]
jpayne@68 413
jpayne@68 414 # At this point, as far as we can tell, this species isn't
jpayne@68 415 # in the taxon table already. So we'll have to add it.
jpayne@68 416
jpayne@68 417 parent_taxon_id = None
jpayne@68 418 rank = "species"
jpayne@68 419 genetic_code = None
jpayne@68 420 mito_genetic_code = None
jpayne@68 421 parent_left_value = None
jpayne@68 422 parent_right_value = None
jpayne@68 423 left_value = None
jpayne@68 424 right_value = None
jpayne@68 425 species_names = []
jpayne@68 426 if scientific_name:
jpayne@68 427 species_names.append(("scientific name", scientific_name))
jpayne@68 428 if common_name:
jpayne@68 429 species_names.append(("common name", common_name))
jpayne@68 430
jpayne@68 431 if self.fetch_NCBI_taxonomy:
jpayne@68 432 # Go online to get the parent taxon ID!
jpayne@68 433 handle = Entrez.efetch(db="taxonomy", id=ncbi_taxon_id, retmode="XML")
jpayne@68 434 taxonomic_record = Entrez.read(handle)
jpayne@68 435 if len(taxonomic_record) == 1:
jpayne@68 436 if taxonomic_record[0]["TaxId"] != str(ncbi_taxon_id):
jpayne@68 437 raise ValueError(
jpayne@68 438 f"ncbi_taxon_id different from parent taxon id. {ncbi_taxon_id} versus {taxonomic_record[0]['TaxId']}"
jpayne@68 439 )
jpayne@68 440
jpayne@68 441 (
jpayne@68 442 parent_taxon_id,
jpayne@68 443 parent_left_value,
jpayne@68 444 parent_right_value,
jpayne@68 445 ) = self._get_taxon_id_from_ncbi_lineage(
jpayne@68 446 taxonomic_record[0]["LineageEx"]
jpayne@68 447 )
jpayne@68 448
jpayne@68 449 left_value = parent_right_value
jpayne@68 450 right_value = parent_right_value + 1
jpayne@68 451
jpayne@68 452 rank = str(taxonomic_record[0]["Rank"])
jpayne@68 453
jpayne@68 454 genetic_code = int(taxonomic_record[0]["GeneticCode"]["GCId"])
jpayne@68 455
jpayne@68 456 mito_genetic_code = int(taxonomic_record[0]["MitoGeneticCode"]["MGCId"])
jpayne@68 457
jpayne@68 458 species_names = [
jpayne@68 459 ("scientific name", str(taxonomic_record[0]["ScientificName"]))
jpayne@68 460 ]
jpayne@68 461 try:
jpayne@68 462 for name_class, names in taxonomic_record[0]["OtherNames"].items():
jpayne@68 463 name_class = self._fix_name_class(name_class)
jpayne@68 464 if not isinstance(names, list):
jpayne@68 465 # The Entrez parser seems to return single entry
jpayne@68 466 # lists as just a string which is annoying.
jpayne@68 467 names = [names]
jpayne@68 468 for name in names:
jpayne@68 469 # Want to ignore complex things like ClassCDE
jpayne@68 470 # entries
jpayne@68 471 if isinstance(name, str):
jpayne@68 472 species_names.append((name_class, name))
jpayne@68 473 except KeyError:
jpayne@68 474 # OtherNames isn't always present,
jpayne@68 475 # e.g. NCBI taxon 41205, Bromheadia finlaysoniana
jpayne@68 476 pass
jpayne@68 477 else:
jpayne@68 478 pass
jpayne@68 479 # If we are not allowed to go online, we will record the bare minimum;
jpayne@68 480 # as long as the NCBI taxon id is present, then (re)running
jpayne@68 481 # load_ncbi_taxonomy.pl should fill in the taxonomomy lineage
jpayne@68 482 # (and update the species names).
jpayne@68 483 #
jpayne@68 484 # I am NOT going to try and record the lineage, even if it
jpayne@68 485 # is in the record annotation as a list of names, as we won't
jpayne@68 486 # know the NCBI taxon IDs for these parent nodes.
jpayne@68 487
jpayne@68 488 self._update_left_right_taxon_values(left_value)
jpayne@68 489
jpayne@68 490 self.adaptor.execute(
jpayne@68 491 "INSERT INTO taxon(parent_taxon_id, ncbi_taxon_id, node_rank,"
jpayne@68 492 " genetic_code, mito_genetic_code, left_value, right_value)"
jpayne@68 493 " VALUES (%s, %s, %s, %s, %s, %s, %s)",
jpayne@68 494 (
jpayne@68 495 parent_taxon_id,
jpayne@68 496 ncbi_taxon_id,
jpayne@68 497 rank,
jpayne@68 498 genetic_code,
jpayne@68 499 mito_genetic_code,
jpayne@68 500 left_value,
jpayne@68 501 right_value,
jpayne@68 502 ),
jpayne@68 503 )
jpayne@68 504
jpayne@68 505 taxon_id = self.adaptor.last_id("taxon")
jpayne@68 506
jpayne@68 507 # Record the scientific name, common name, etc
jpayne@68 508 for name_class, name in species_names:
jpayne@68 509 self.adaptor.execute(
jpayne@68 510 "INSERT INTO taxon_name(taxon_id, name, name_class)"
jpayne@68 511 " VALUES (%s, %s, %s)",
jpayne@68 512 (taxon_id, name[:255], name_class),
jpayne@68 513 )
jpayne@68 514 return taxon_id
jpayne@68 515
jpayne@68 516 def _get_taxon_id_from_ncbi_lineage(self, taxonomic_lineage):
jpayne@68 517 """Recursive method to get taxon ID from NCBI lineage (PRIVATE).
jpayne@68 518
jpayne@68 519 Arguments:
jpayne@68 520 - taxonomic_lineage - list of taxonomy dictionaries from Bio.Entrez
jpayne@68 521
jpayne@68 522 First dictionary in list is the taxonomy root, highest would be
jpayne@68 523 the species. Each dictionary includes:
jpayne@68 524
jpayne@68 525 - TaxID (string, NCBI taxon id)
jpayne@68 526 - Rank (string, e.g. "species", "genus", ..., "phylum", ...)
jpayne@68 527 - ScientificName (string)
jpayne@68 528
jpayne@68 529 (and that is all at the time of writing)
jpayne@68 530
jpayne@68 531 This method will record all the lineage given, returning the taxon id
jpayne@68 532 (database key, not NCBI taxon id) of the final entry (the species).
jpayne@68 533 """
jpayne@68 534 ncbi_taxon_id = int(taxonomic_lineage[-1]["TaxId"])
jpayne@68 535 left_value = None
jpayne@68 536 right_value = None
jpayne@68 537 parent_left_value = None
jpayne@68 538 parent_right_value = None
jpayne@68 539 # Is this in the database already? Check the taxon table...
jpayne@68 540 rows = self.adaptor.execute_and_fetchall(
jpayne@68 541 "SELECT taxon_id, left_value, right_value FROM taxon"
jpayne@68 542 " WHERE ncbi_taxon_id=%s" % ncbi_taxon_id
jpayne@68 543 )
jpayne@68 544 if rows:
jpayne@68 545 # we could verify that the Scientific Name etc in the database
jpayne@68 546 # is the same and update it or print a warning if not...
jpayne@68 547 if len(rows) != 1:
jpayne@68 548 raise ValueError(f"Expected 1 reponse, got {len(rows)}")
jpayne@68 549 return rows[0]
jpayne@68 550
jpayne@68 551 # We have to record this.
jpayne@68 552 if len(taxonomic_lineage) > 1:
jpayne@68 553 # Use recursion to find out the taxon id (database key) of the
jpayne@68 554 # parent.
jpayne@68 555 (
jpayne@68 556 parent_taxon_id,
jpayne@68 557 parent_left_value,
jpayne@68 558 parent_right_value,
jpayne@68 559 ) = self._get_taxon_id_from_ncbi_lineage(taxonomic_lineage[:-1])
jpayne@68 560 left_value = parent_right_value
jpayne@68 561 right_value = parent_right_value + 1
jpayne@68 562 if not isinstance(parent_taxon_id, int):
jpayne@68 563 raise ValueError(
jpayne@68 564 f"Expected parent_taxon_id to be an int, got {parent_taxon_id}"
jpayne@68 565 )
jpayne@68 566 else:
jpayne@68 567 # we have reached the top of the lineage but no current taxonomy
jpayne@68 568 # id has been found
jpayne@68 569 parent_taxon_id = None
jpayne@68 570 left_value = self.adaptor.execute_one("SELECT MAX(left_value) FROM taxon")[
jpayne@68 571 0
jpayne@68 572 ]
jpayne@68 573 if not left_value:
jpayne@68 574 left_value = 0
jpayne@68 575
jpayne@68 576 right_value = left_value + 1
jpayne@68 577
jpayne@68 578 self._update_left_right_taxon_values(left_value)
jpayne@68 579
jpayne@68 580 # INSERT new taxon
jpayne@68 581 rank = str(taxonomic_lineage[-1].get("Rank"))
jpayne@68 582 self.adaptor.execute(
jpayne@68 583 "INSERT INTO taxon(ncbi_taxon_id, parent_taxon_id, node_rank, "
jpayne@68 584 "left_value, right_value) VALUES (%s, %s, %s, %s, %s)",
jpayne@68 585 (ncbi_taxon_id, parent_taxon_id, rank, left_value, right_value),
jpayne@68 586 )
jpayne@68 587
jpayne@68 588 taxon_id = self.adaptor.last_id("taxon")
jpayne@68 589 # assert isinstance(taxon_id, int), repr(taxon_id)
jpayne@68 590 # ... and its name in taxon_name
jpayne@68 591 scientific_name = taxonomic_lineage[-1].get("ScientificName")
jpayne@68 592 if scientific_name:
jpayne@68 593 self.adaptor.execute(
jpayne@68 594 "INSERT INTO taxon_name(taxon_id, name, name_class) "
jpayne@68 595 "VALUES (%s, %s, 'scientific name')",
jpayne@68 596 (taxon_id, scientific_name[:255]),
jpayne@68 597 )
jpayne@68 598 return taxon_id, left_value, right_value
jpayne@68 599
jpayne@68 600 def _load_bioentry_table(self, record):
jpayne@68 601 """Fill the bioentry table with sequence information (PRIVATE).
jpayne@68 602
jpayne@68 603 Arguments:
jpayne@68 604 - record - SeqRecord object to add to the database.
jpayne@68 605
jpayne@68 606 """
jpayne@68 607 # get the pertinent info and insert it
jpayne@68 608
jpayne@68 609 if record.id.count(".") == 1: # try to get a version from the id
jpayne@68 610 # This assumes the string is something like "XXXXXXXX.123"
jpayne@68 611 accession, version = record.id.split(".")
jpayne@68 612 try:
jpayne@68 613 version = int(version)
jpayne@68 614 except ValueError:
jpayne@68 615 accession = record.id
jpayne@68 616 version = 0
jpayne@68 617 else: # otherwise just use a version of 0
jpayne@68 618 accession = record.id
jpayne@68 619 version = 0
jpayne@68 620
jpayne@68 621 if (
jpayne@68 622 "accessions" in record.annotations
jpayne@68 623 and isinstance(record.annotations["accessions"], list)
jpayne@68 624 and record.annotations["accessions"]
jpayne@68 625 ):
jpayne@68 626 # Take the first accession (one if there is more than one)
jpayne@68 627 accession = record.annotations["accessions"][0]
jpayne@68 628
jpayne@68 629 # Find the taxon id (this is not just the NCBI Taxon ID)
jpayne@68 630 # NOTE - If the species isn't defined in the taxon table,
jpayne@68 631 # a new minimal entry is created.
jpayne@68 632 taxon_id = self._get_taxon_id(record)
jpayne@68 633
jpayne@68 634 if "gi" in record.annotations:
jpayne@68 635 identifier = record.annotations["gi"]
jpayne@68 636 else:
jpayne@68 637 identifier = record.id
jpayne@68 638
jpayne@68 639 # Allow description and division to default to NULL as in BioPerl.
jpayne@68 640 description = getattr(record, "description", None)
jpayne@68 641 division = record.annotations.get("data_file_division")
jpayne@68 642
jpayne@68 643 sql = """
jpayne@68 644 INSERT INTO bioentry (
jpayne@68 645 biodatabase_id,
jpayne@68 646 taxon_id,
jpayne@68 647 name,
jpayne@68 648 accession,
jpayne@68 649 identifier,
jpayne@68 650 division,
jpayne@68 651 description,
jpayne@68 652 version)
jpayne@68 653 VALUES (
jpayne@68 654 %s,
jpayne@68 655 %s,
jpayne@68 656 %s,
jpayne@68 657 %s,
jpayne@68 658 %s,
jpayne@68 659 %s,
jpayne@68 660 %s,
jpayne@68 661 %s)"""
jpayne@68 662 # print(self.dbid, taxon_id, record.name, accession, identifier, \
jpayne@68 663 # division, description, version)
jpayne@68 664 self.adaptor.execute(
jpayne@68 665 sql,
jpayne@68 666 (
jpayne@68 667 self.dbid,
jpayne@68 668 taxon_id,
jpayne@68 669 record.name,
jpayne@68 670 accession,
jpayne@68 671 identifier,
jpayne@68 672 division,
jpayne@68 673 description,
jpayne@68 674 version,
jpayne@68 675 ),
jpayne@68 676 )
jpayne@68 677 # now retrieve the id for the bioentry
jpayne@68 678 return self.adaptor.last_id("bioentry")
jpayne@68 679
jpayne@68 680 def _load_bioentry_date(self, record, bioentry_id):
jpayne@68 681 """Add the effective date of the entry into the database (PRIVATE).
jpayne@68 682
jpayne@68 683 record - a SeqRecord object with an annotated date
jpayne@68 684 bioentry_id - corresponding database identifier
jpayne@68 685 """
jpayne@68 686 # dates are GenBank style, like:
jpayne@68 687 # 14-SEP-2000
jpayne@68 688 date = record.annotations.get("date", strftime("%d-%b-%Y", gmtime()).upper())
jpayne@68 689 if isinstance(date, list):
jpayne@68 690 date = date[0]
jpayne@68 691 annotation_tags_id = self._get_ontology_id("Annotation Tags")
jpayne@68 692 date_id = self._get_term_id("date_changed", annotation_tags_id)
jpayne@68 693 sql = (
jpayne@68 694 "INSERT INTO bioentry_qualifier_value"
jpayne@68 695 ' (bioentry_id, term_id, value, "rank")'
jpayne@68 696 " VALUES (%s, %s, %s, 1)"
jpayne@68 697 )
jpayne@68 698 self.adaptor.execute(sql, (bioentry_id, date_id, date))
jpayne@68 699
jpayne@68 700 def _load_biosequence(self, record, bioentry_id):
jpayne@68 701 """Record SeqRecord's sequence and alphabet in DB (PRIVATE).
jpayne@68 702
jpayne@68 703 Arguments:
jpayne@68 704 - record - a SeqRecord object with a seq property
jpayne@68 705 - bioentry_id - corresponding database identifier
jpayne@68 706
jpayne@68 707 """
jpayne@68 708 if record.seq is None:
jpayne@68 709 # The biosequence table entry is optional, so if we haven't
jpayne@68 710 # got a sequence, we don't need to write to the table.
jpayne@68 711 return
jpayne@68 712
jpayne@68 713 molecule_type = record.annotations.get("molecule_type", "")
jpayne@68 714 if "DNA" in molecule_type:
jpayne@68 715 alphabet = "dna"
jpayne@68 716 elif "RNA" in molecule_type:
jpayne@68 717 alphabet = "rna"
jpayne@68 718 elif "protein" in molecule_type:
jpayne@68 719 alphabet = "protein"
jpayne@68 720 else:
jpayne@68 721 alphabet = "unknown"
jpayne@68 722
jpayne@68 723 try:
jpayne@68 724 seq_str = str(record.seq)
jpayne@68 725 except UndefinedSequenceError:
jpayne@68 726 seq_str = None
jpayne@68 727
jpayne@68 728 sql = (
jpayne@68 729 "INSERT INTO biosequence (bioentry_id, version, "
jpayne@68 730 "length, seq, alphabet) "
jpayne@68 731 "VALUES (%s, 0, %s, %s, %s)"
jpayne@68 732 )
jpayne@68 733 self.adaptor.execute(sql, (bioentry_id, len(record.seq), seq_str, alphabet))
jpayne@68 734
jpayne@68 735 def _load_comment(self, record, bioentry_id):
jpayne@68 736 """Record a SeqRecord's annotated comment in the database (PRIVATE).
jpayne@68 737
jpayne@68 738 Arguments:
jpayne@68 739 - record - a SeqRecord object with an annotated comment
jpayne@68 740 - bioentry_id - corresponding database identifier
jpayne@68 741
jpayne@68 742 """
jpayne@68 743 comments = record.annotations.get("comment")
jpayne@68 744 if not comments:
jpayne@68 745 return
jpayne@68 746 if not isinstance(comments, list):
jpayne@68 747 # It should be a string then...
jpayne@68 748 comments = [comments]
jpayne@68 749
jpayne@68 750 for index, comment in enumerate(comments):
jpayne@68 751 comment = comment.replace("\n", " ")
jpayne@68 752 # TODO - Store each line as a separate entry? This would preserve
jpayne@68 753 # the newlines, but we should check BioPerl etc to be consistent.
jpayne@68 754 sql = (
jpayne@68 755 'INSERT INTO comment (bioentry_id, comment_text, "rank")'
jpayne@68 756 " VALUES (%s, %s, %s)"
jpayne@68 757 )
jpayne@68 758 self.adaptor.execute(sql, (bioentry_id, comment, index + 1))
jpayne@68 759
jpayne@68 760 def _load_annotations(self, record, bioentry_id):
jpayne@68 761 """Record a SeqRecord's misc annotations in the database (PRIVATE).
jpayne@68 762
jpayne@68 763 The annotation strings are recorded in the bioentry_qualifier_value
jpayne@68 764 table, except for special cases like the reference, comment and
jpayne@68 765 taxonomy which are handled with their own tables.
jpayne@68 766
jpayne@68 767 Arguments:
jpayne@68 768 - record - a SeqRecord object with an annotations dictionary
jpayne@68 769 - bioentry_id - corresponding database identifier
jpayne@68 770
jpayne@68 771 """
jpayne@68 772 mono_sql = (
jpayne@68 773 "INSERT INTO bioentry_qualifier_value"
jpayne@68 774 "(bioentry_id, term_id, value)"
jpayne@68 775 " VALUES (%s, %s, %s)"
jpayne@68 776 )
jpayne@68 777 many_sql = (
jpayne@68 778 "INSERT INTO bioentry_qualifier_value"
jpayne@68 779 '(bioentry_id, term_id, value, "rank")'
jpayne@68 780 " VALUES (%s, %s, %s, %s)"
jpayne@68 781 )
jpayne@68 782 tag_ontology_id = self._get_ontology_id("Annotation Tags")
jpayne@68 783 for key, value in record.annotations.items():
jpayne@68 784 if key in ["molecule_type", "references", "comment", "ncbi_taxid", "date"]:
jpayne@68 785 # Handled separately
jpayne@68 786 continue
jpayne@68 787 term_id = self._get_term_id(key, ontology_id=tag_ontology_id)
jpayne@68 788 if isinstance(value, (list, tuple)):
jpayne@68 789 rank = 0
jpayne@68 790 for entry in value:
jpayne@68 791 if isinstance(entry, (str, int)):
jpayne@68 792 # Easy case
jpayne@68 793 rank += 1
jpayne@68 794 self.adaptor.execute(
jpayne@68 795 many_sql, (bioentry_id, term_id, str(entry), rank)
jpayne@68 796 )
jpayne@68 797 else:
jpayne@68 798 pass
jpayne@68 799 elif isinstance(value, (str, int)):
jpayne@68 800 # Have a simple single entry, leave rank as the DB default
jpayne@68 801 self.adaptor.execute(mono_sql, (bioentry_id, term_id, str(value)))
jpayne@68 802 else:
jpayne@68 803 pass
jpayne@68 804 # print("Ignoring annotation '%s' entry of type '%s'" \
jpayne@68 805 # % (key, type(value)))
jpayne@68 806
jpayne@68 807 def _load_reference(self, reference, rank, bioentry_id):
jpayne@68 808 """Record SeqRecord's annotated references in the database (PRIVATE).
jpayne@68 809
jpayne@68 810 Arguments:
jpayne@68 811 - record - a SeqRecord object with annotated references
jpayne@68 812 - bioentry_id - corresponding database identifier
jpayne@68 813
jpayne@68 814 """
jpayne@68 815 refs = None
jpayne@68 816 if reference.medline_id:
jpayne@68 817 refs = self.adaptor.execute_and_fetch_col0(
jpayne@68 818 "SELECT reference_id"
jpayne@68 819 " FROM reference JOIN dbxref USING (dbxref_id)"
jpayne@68 820 " WHERE dbname = 'MEDLINE' AND accession = %s",
jpayne@68 821 (reference.medline_id,),
jpayne@68 822 )
jpayne@68 823 if not refs and reference.pubmed_id:
jpayne@68 824 refs = self.adaptor.execute_and_fetch_col0(
jpayne@68 825 "SELECT reference_id"
jpayne@68 826 " FROM reference JOIN dbxref USING (dbxref_id)"
jpayne@68 827 " WHERE dbname = 'PUBMED' AND accession = %s",
jpayne@68 828 (reference.pubmed_id,),
jpayne@68 829 )
jpayne@68 830 if not refs:
jpayne@68 831 s = []
jpayne@68 832 for f in reference.authors, reference.title, reference.journal:
jpayne@68 833 s.append(f or "<undef>")
jpayne@68 834 crc = crc64("".join(s))
jpayne@68 835 refs = self.adaptor.execute_and_fetch_col0(
jpayne@68 836 "SELECT reference_id FROM reference WHERE crc = %s", (crc,)
jpayne@68 837 )
jpayne@68 838 if not refs:
jpayne@68 839 if reference.medline_id:
jpayne@68 840 dbxref_id = self._add_dbxref("MEDLINE", reference.medline_id, 0)
jpayne@68 841 elif reference.pubmed_id:
jpayne@68 842 dbxref_id = self._add_dbxref("PUBMED", reference.pubmed_id, 0)
jpayne@68 843 else:
jpayne@68 844 dbxref_id = None
jpayne@68 845 authors = reference.authors or None
jpayne@68 846 title = reference.title or None
jpayne@68 847 # The location/journal field cannot be Null, so default
jpayne@68 848 # to an empty string rather than None:
jpayne@68 849 journal = reference.journal or ""
jpayne@68 850 self.adaptor.execute(
jpayne@68 851 "INSERT INTO reference (dbxref_id, location,"
jpayne@68 852 " title, authors, crc)"
jpayne@68 853 " VALUES (%s, %s, %s, %s, %s)",
jpayne@68 854 (dbxref_id, journal, title, authors, crc),
jpayne@68 855 )
jpayne@68 856 reference_id = self.adaptor.last_id("reference")
jpayne@68 857 else:
jpayne@68 858 reference_id = refs[0]
jpayne@68 859
jpayne@68 860 if reference.location:
jpayne@68 861 start = 1 + int(str(reference.location[0].start))
jpayne@68 862 end = int(str(reference.location[0].end))
jpayne@68 863 else:
jpayne@68 864 start = None
jpayne@68 865 end = None
jpayne@68 866
jpayne@68 867 sql = (
jpayne@68 868 "INSERT INTO bioentry_reference (bioentry_id, reference_id,"
jpayne@68 869 ' start_pos, end_pos, "rank") VALUES (%s, %s, %s, %s, %s)'
jpayne@68 870 )
jpayne@68 871 self.adaptor.execute(sql, (bioentry_id, reference_id, start, end, rank + 1))
jpayne@68 872
jpayne@68 873 def _load_seqfeature(self, feature, feature_rank, bioentry_id):
jpayne@68 874 """Load a biopython SeqFeature into the database (PRIVATE)."""
jpayne@68 875 # records loaded from a gff file using BCBio.GFF will contain value
jpayne@68 876 # of 2nd column of the gff as a feature qualifier. The BioSQL wiki
jpayne@68 877 # suggests that the source should not go in with the other feature
jpayne@68 878 # mappings but instead be put in the term table
jpayne@68 879 # (http://www.biosql.org/wiki/Annotation_Mapping)
jpayne@68 880 try:
jpayne@68 881 source = feature.qualifiers["source"]
jpayne@68 882 if isinstance(source, list):
jpayne@68 883 source = source[0]
jpayne@68 884 seqfeature_id = self._load_seqfeature_basic(
jpayne@68 885 feature.type, feature_rank, bioentry_id, source=source
jpayne@68 886 )
jpayne@68 887 except KeyError:
jpayne@68 888 seqfeature_id = self._load_seqfeature_basic(
jpayne@68 889 feature.type, feature_rank, bioentry_id
jpayne@68 890 )
jpayne@68 891
jpayne@68 892 self._load_seqfeature_locations(feature, seqfeature_id)
jpayne@68 893 self._load_seqfeature_qualifiers(feature.qualifiers, seqfeature_id)
jpayne@68 894
jpayne@68 895 def _load_seqfeature_basic(
jpayne@68 896 self, feature_type, feature_rank, bioentry_id, source="EMBL/GenBank/SwissProt"
jpayne@68 897 ):
jpayne@68 898 """Load the first tables of a seqfeature and returns the id (PRIVATE).
jpayne@68 899
jpayne@68 900 This loads the "key" of the seqfeature (ie. CDS, gene) and
jpayne@68 901 the basic seqfeature table itself.
jpayne@68 902 """
jpayne@68 903 ontology_id = self._get_ontology_id("SeqFeature Keys")
jpayne@68 904 seqfeature_key_id = self._get_term_id(feature_type, ontology_id=ontology_id)
jpayne@68 905 source_cat_id = self._get_ontology_id("SeqFeature Sources")
jpayne@68 906 source_term_id = self._get_term_id(source, ontology_id=source_cat_id)
jpayne@68 907
jpayne@68 908 sql = (
jpayne@68 909 "INSERT INTO seqfeature (bioentry_id, type_term_id, "
jpayne@68 910 'source_term_id, "rank") VALUES (%s, %s, %s, %s)'
jpayne@68 911 )
jpayne@68 912 self.adaptor.execute(
jpayne@68 913 sql, (bioentry_id, seqfeature_key_id, source_term_id, feature_rank + 1)
jpayne@68 914 )
jpayne@68 915 return self.adaptor.last_id("seqfeature")
jpayne@68 916
jpayne@68 917 def _load_seqfeature_locations(self, feature, seqfeature_id):
jpayne@68 918 """Load all of the locations for a SeqFeature into tables (PRIVATE).
jpayne@68 919
jpayne@68 920 This adds the locations related to the SeqFeature into the
jpayne@68 921 seqfeature_location table. Fuzzies are not handled right now.
jpayne@68 922 For a simple location, ie (1..2), we have a single table row
jpayne@68 923 with seq_start = 1, seq_end = 2, location_rank = 1.
jpayne@68 924
jpayne@68 925 For split locations, ie (1..2, 3..4, 5..6) we would have three
jpayne@68 926 row tables with::
jpayne@68 927
jpayne@68 928 start = 1, end = 2, rank = 1
jpayne@68 929 start = 3, end = 4, rank = 2
jpayne@68 930 start = 5, end = 6, rank = 3
jpayne@68 931
jpayne@68 932 """
jpayne@68 933 # TODO - Record an ontology for the locations (using location.term_id)
jpayne@68 934 # which for now as in BioPerl we leave defaulting to NULL.
jpayne@68 935 try:
jpayne@68 936 if feature.location.operator != "join":
jpayne@68 937 # e.g. order locations... we don't record "order" so it
jpayne@68 938 # will become a "join" on reloading. What does BioPerl do?
jpayne@68 939 import warnings
jpayne@68 940 from Bio import BiopythonWarning
jpayne@68 941
jpayne@68 942 warnings.warn(
jpayne@68 943 "%s location operators are not fully supported"
jpayne@68 944 % feature.location_operator,
jpayne@68 945 BiopythonWarning,
jpayne@68 946 )
jpayne@68 947 except AttributeError:
jpayne@68 948 pass
jpayne@68 949 # This will be a list of length one for a SimpleLocation:
jpayne@68 950 parts = feature.location.parts
jpayne@68 951 if parts and {loc.strand for loc in parts} == {-1}:
jpayne@68 952 # To mimic prior behaviour of Biopython+BioSQL, reverse order
jpayne@68 953 parts = parts[::-1]
jpayne@68 954 # TODO - Check what BioPerl does; see also BioSeq.py code
jpayne@68 955 for rank, loc in enumerate(parts):
jpayne@68 956 self._insert_location(loc, rank + 1, seqfeature_id)
jpayne@68 957
jpayne@68 958 def _insert_location(self, location, rank, seqfeature_id):
jpayne@68 959 """Add SeqFeature location to seqfeature_location table (PRIVATE).
jpayne@68 960
jpayne@68 961 TODO - Add location operator to location_qualifier_value?
jpayne@68 962 """
jpayne@68 963 # convert biopython locations to the 1-based location system
jpayne@68 964 # used in bioSQL
jpayne@68 965 # XXX This could also handle fuzzies
jpayne@68 966
jpayne@68 967 try:
jpayne@68 968 start = int(location.start) + 1
jpayne@68 969 except TypeError:
jpayne@68 970 # Handle SwissProt unknown position (?)
jpayne@68 971 if isinstance(location.start, UnknownPosition):
jpayne@68 972 start = None
jpayne@68 973 else:
jpayne@68 974 raise
jpayne@68 975
jpayne@68 976 try:
jpayne@68 977 end = int(location.end)
jpayne@68 978 except TypeError:
jpayne@68 979 # Handle SwissProt unknown position (?)
jpayne@68 980 if isinstance(location.end, UnknownPosition):
jpayne@68 981 end = None
jpayne@68 982 else:
jpayne@68 983 raise
jpayne@68 984
jpayne@68 985 # Biopython uses None when we don't know strand information but
jpayne@68 986 # BioSQL requires something (non null) and sets this as zero
jpayne@68 987 # So we'll use the strand or 0 if Biopython spits out None
jpayne@68 988 strand = location.strand or 0
jpayne@68 989
jpayne@68 990 # TODO - Record an ontology term for the location (location.term_id)
jpayne@68 991 # which for now like BioPerl we'll leave as NULL.
jpayne@68 992 # This might allow us to record "between" positions properly, but I
jpayne@68 993 # don't really see how it could work for before/after fuzzy positions
jpayne@68 994 loc_term_id = None
jpayne@68 995
jpayne@68 996 if location.ref:
jpayne@68 997 # sub_feature remote locations when they are in the same db as the
jpayne@68 998 # current record do not have a value for ref_db, which SeqFeature
jpayne@68 999 # object stores as None. BioSQL schema requires a varchar and is
jpayne@68 1000 # not NULL
jpayne@68 1001 dbxref_id = self._get_dbxref_id(location.ref_db or "", location.ref)
jpayne@68 1002 else:
jpayne@68 1003 dbxref_id = None
jpayne@68 1004
jpayne@68 1005 sql = (
jpayne@68 1006 "INSERT INTO location (seqfeature_id, dbxref_id, term_id,"
jpayne@68 1007 'start_pos, end_pos, strand, "rank") '
jpayne@68 1008 "VALUES (%s, %s, %s, %s, %s, %s, %s)"
jpayne@68 1009 )
jpayne@68 1010 self.adaptor.execute(
jpayne@68 1011 sql, (seqfeature_id, dbxref_id, loc_term_id, start, end, strand, rank)
jpayne@68 1012 )
jpayne@68 1013
jpayne@68 1014 """
jpayne@68 1015 # See Bug 2677
jpayne@68 1016 # TODO - Record the location_operator (e.g. "join" or "order")
jpayne@68 1017 # using the location_qualifier_value table (which we and BioPerl
jpayne@68 1018 # have historically left empty).
jpayne@68 1019 # Note this will need an ontology term for the location qualifier
jpayne@68 1020 # (location_qualifier_value.term_id) for which oddly the schema
jpayne@68 1021 # does not allow NULL.
jpayne@68 1022 if feature.location_operator:
jpayne@68 1023 #e.g. "join" (common),
jpayne@68 1024 #or "order" (see Tests/GenBank/protein_refseq2.gb)
jpayne@68 1025 location_id = self.adaptor.last_id('location')
jpayne@68 1026 loc_qual_term_id = None # Not allowed in BioSQL v1.0.1
jpayne@68 1027 sql = ("INSERT INTO location_qualifier_value"
jpayne@68 1028 "(location_id, term_id, value) "
jpayne@68 1029 "VALUES (%s, %s, %s)")
jpayne@68 1030 self.adaptor.execute(sql, (location_id, loc_qual_term_id,
jpayne@68 1031 feature.location_operator))
jpayne@68 1032 """
jpayne@68 1033
jpayne@68 1034 def _load_seqfeature_qualifiers(self, qualifiers, seqfeature_id):
jpayne@68 1035 """Insert feature's (key, value) pair qualifiers (PRIVATE).
jpayne@68 1036
jpayne@68 1037 Qualifiers should be a dictionary of the form::
jpayne@68 1038
jpayne@68 1039 {key : [value1, value2]}
jpayne@68 1040
jpayne@68 1041 """
jpayne@68 1042 tag_ontology_id = self._get_ontology_id("Annotation Tags")
jpayne@68 1043 for qualifier_key in qualifiers:
jpayne@68 1044 # Treat db_xref qualifiers differently to sequence annotation
jpayne@68 1045 # qualifiers by populating the seqfeature_dbxref and dbxref
jpayne@68 1046 # tables. Other qualifiers go into the seqfeature_qualifier_value
jpayne@68 1047 # and (if new) term tables.
jpayne@68 1048 if qualifier_key != "db_xref":
jpayne@68 1049 qualifier_key_id = self._get_term_id(
jpayne@68 1050 qualifier_key, ontology_id=tag_ontology_id
jpayne@68 1051 )
jpayne@68 1052 # now add all of the values to their table
jpayne@68 1053 entries = qualifiers[qualifier_key]
jpayne@68 1054 if not isinstance(entries, list):
jpayne@68 1055 # Could be a plain string, or an int or a float.
jpayne@68 1056 # However, we exect a list of strings here.
jpayne@68 1057 entries = [entries]
jpayne@68 1058 for qual_value_rank in range(len(entries)):
jpayne@68 1059 qualifier_value = entries[qual_value_rank]
jpayne@68 1060 sql = (
jpayne@68 1061 "INSERT INTO seqfeature_qualifier_value "
jpayne@68 1062 ' (seqfeature_id, term_id, "rank", value) VALUES'
jpayne@68 1063 " (%s, %s, %s, %s)"
jpayne@68 1064 )
jpayne@68 1065 self.adaptor.execute(
jpayne@68 1066 sql,
jpayne@68 1067 (
jpayne@68 1068 seqfeature_id,
jpayne@68 1069 qualifier_key_id,
jpayne@68 1070 qual_value_rank + 1,
jpayne@68 1071 qualifier_value,
jpayne@68 1072 ),
jpayne@68 1073 )
jpayne@68 1074 else:
jpayne@68 1075 # The dbxref_id qualifier/value sets go into the dbxref table
jpayne@68 1076 # as dbname, accession, version tuples, with dbxref.dbxref_id
jpayne@68 1077 # being automatically assigned, and into the seqfeature_dbxref
jpayne@68 1078 # table as seqfeature_id, dbxref_id, and rank tuples
jpayne@68 1079 self._load_seqfeature_dbxref(qualifiers[qualifier_key], seqfeature_id)
jpayne@68 1080
jpayne@68 1081 def _load_seqfeature_dbxref(self, dbxrefs, seqfeature_id):
jpayne@68 1082 """Add SeqFeature's DB cross-references to the database (PRIVATE).
jpayne@68 1083
jpayne@68 1084 Arguments:
jpayne@68 1085 - dbxrefs - List, dbxref data from the source file in the
jpayne@68 1086 format <database>:<accession>
jpayne@68 1087 - seqfeature_id - Int, the identifier for the seqfeature in the
jpayne@68 1088 seqfeature table
jpayne@68 1089
jpayne@68 1090 Insert dbxref qualifier data for a seqfeature into the
jpayne@68 1091 seqfeature_dbxref and, if required, dbxref tables.
jpayne@68 1092 The dbxref_id qualifier/value sets go into the dbxref table
jpayne@68 1093 as dbname, accession, version tuples, with dbxref.dbxref_id
jpayne@68 1094 being automatically assigned, and into the seqfeature_dbxref
jpayne@68 1095 table as seqfeature_id, dbxref_id, and rank tuples.
jpayne@68 1096 """
jpayne@68 1097 # NOTE - In older versions of Biopython, we would map the GenBank
jpayne@68 1098 # db_xref "name", for example "GI" to "GeneIndex", and give a warning
jpayne@68 1099 # for any unknown terms. This was a long term maintenance problem,
jpayne@68 1100 # and differed from BioPerl and BioJava's implementation. See bug 2405
jpayne@68 1101 for rank, value in enumerate(dbxrefs):
jpayne@68 1102 # Split the DB:accession format string at colons. We have to
jpayne@68 1103 # account for multiple-line and multiple-accession entries
jpayne@68 1104 try:
jpayne@68 1105 dbxref_data = value.replace(" ", "").replace("\n", "").split(":")
jpayne@68 1106 db = dbxref_data[0]
jpayne@68 1107 accessions = dbxref_data[1:]
jpayne@68 1108 except Exception:
jpayne@68 1109 raise ValueError(f"Parsing of db_xref failed: '{value}'") from None
jpayne@68 1110 # Loop over all the grabbed accessions, and attempt to fill the
jpayne@68 1111 # table
jpayne@68 1112 for accession in accessions:
jpayne@68 1113 # Get the dbxref_id value for the dbxref data
jpayne@68 1114 dbxref_id = self._get_dbxref_id(db, accession)
jpayne@68 1115 # Insert the seqfeature_dbxref data
jpayne@68 1116 self._get_seqfeature_dbxref(seqfeature_id, dbxref_id, rank + 1)
jpayne@68 1117
jpayne@68 1118 def _get_dbxref_id(self, db, accession):
jpayne@68 1119 """Get DB cross-reference for accession (PRIVATE).
jpayne@68 1120
jpayne@68 1121 Arguments:
jpayne@68 1122 - db - String, the name of the external database containing
jpayne@68 1123 the accession number
jpayne@68 1124 - accession - String, the accession of the dbxref data
jpayne@68 1125
jpayne@68 1126 Finds and returns the dbxref_id for the passed data. The method
jpayne@68 1127 attempts to find an existing record first, and inserts the data
jpayne@68 1128 if there is no record.
jpayne@68 1129 """
jpayne@68 1130 # Check for an existing record
jpayne@68 1131 sql = "SELECT dbxref_id FROM dbxref WHERE dbname = %s AND accession = %s"
jpayne@68 1132 dbxref_id = self.adaptor.execute_and_fetch_col0(sql, (db, accession))
jpayne@68 1133 # If there was a record, return the dbxref_id, else create the
jpayne@68 1134 # record and return the created dbxref_id
jpayne@68 1135 if dbxref_id:
jpayne@68 1136 return dbxref_id[0]
jpayne@68 1137 return self._add_dbxref(db, accession, 0)
jpayne@68 1138
jpayne@68 1139 def _get_seqfeature_dbxref(self, seqfeature_id, dbxref_id, rank):
jpayne@68 1140 """Get DB cross-reference, creating it if needed (PRIVATE).
jpayne@68 1141
jpayne@68 1142 Check for a pre-existing seqfeature_dbxref entry with the passed
jpayne@68 1143 seqfeature_id and dbxref_id. If one does not exist, insert new
jpayne@68 1144 data.
jpayne@68 1145 """
jpayne@68 1146 # Check for an existing record
jpayne@68 1147 sql = (
jpayne@68 1148 "SELECT seqfeature_id, dbxref_id FROM seqfeature_dbxref "
jpayne@68 1149 "WHERE seqfeature_id = %s AND dbxref_id = %s"
jpayne@68 1150 )
jpayne@68 1151 result = self.adaptor.execute_and_fetch_col0(sql, (seqfeature_id, dbxref_id))
jpayne@68 1152 # If there was a record, return without executing anything, else create
jpayne@68 1153 # the record and return
jpayne@68 1154 if result:
jpayne@68 1155 return result
jpayne@68 1156 return self._add_seqfeature_dbxref(seqfeature_id, dbxref_id, rank)
jpayne@68 1157
jpayne@68 1158 def _add_seqfeature_dbxref(self, seqfeature_id, dbxref_id, rank):
jpayne@68 1159 """Add DB cross-reference (PRIVATE).
jpayne@68 1160
jpayne@68 1161 Insert a seqfeature_dbxref row and return the seqfeature_id and
jpayne@68 1162 dbxref_id
jpayne@68 1163 """
jpayne@68 1164 sql = (
jpayne@68 1165 "INSERT INTO seqfeature_dbxref "
jpayne@68 1166 '(seqfeature_id, dbxref_id, "rank") VALUES'
jpayne@68 1167 "(%s, %s, %s)"
jpayne@68 1168 )
jpayne@68 1169 self.adaptor.execute(sql, (seqfeature_id, dbxref_id, rank))
jpayne@68 1170 return (seqfeature_id, dbxref_id)
jpayne@68 1171
jpayne@68 1172 def _load_dbxrefs(self, record, bioentry_id):
jpayne@68 1173 """Load any sequence level cross references into the database (PRIVATE).
jpayne@68 1174
jpayne@68 1175 See table bioentry_dbxref.
jpayne@68 1176 """
jpayne@68 1177 for rank, value in enumerate(record.dbxrefs):
jpayne@68 1178 # Split the DB:accession string at first colon.
jpayne@68 1179 # We have to cope with things like:
jpayne@68 1180 # "MGD:MGI:892" (db="MGD", accession="MGI:892")
jpayne@68 1181 # "GO:GO:123" (db="GO", accession="GO:123")
jpayne@68 1182 #
jpayne@68 1183 # Annoyingly I have seen the NCBI use both the style
jpayne@68 1184 # "GO:GO:123" and "GO:123" in different vintages.
jpayne@68 1185 newline_escape_count = value.count("\n")
jpayne@68 1186 if newline_escape_count != 0:
jpayne@68 1187 raise ValueError(
jpayne@68 1188 "Expected a single line in value, got {newline_escape_count}"
jpayne@68 1189 )
jpayne@68 1190 try:
jpayne@68 1191 db, accession = value.split(":", 1)
jpayne@68 1192 db = db.strip()
jpayne@68 1193 accession = accession.strip()
jpayne@68 1194 except Exception:
jpayne@68 1195 raise ValueError(f"Parsing of dbxrefs list failed: '{value}'") from None
jpayne@68 1196 # Get the dbxref_id value for the dbxref data
jpayne@68 1197 dbxref_id = self._get_dbxref_id(db, accession)
jpayne@68 1198 # Insert the bioentry_dbxref data
jpayne@68 1199 self._get_bioentry_dbxref(bioentry_id, dbxref_id, rank + 1)
jpayne@68 1200
jpayne@68 1201 def _get_bioentry_dbxref(self, bioentry_id, dbxref_id, rank):
jpayne@68 1202 """Get pre-existing db-xref, or create and return it (PRIVATE).
jpayne@68 1203
jpayne@68 1204 Check for a pre-existing bioentry_dbxref entry with the passed
jpayne@68 1205 seqfeature_id and dbxref_id. If one does not exist, insert new
jpayne@68 1206 data
jpayne@68 1207 """
jpayne@68 1208 # Check for an existing record
jpayne@68 1209 sql = (
jpayne@68 1210 "SELECT bioentry_id, dbxref_id FROM bioentry_dbxref "
jpayne@68 1211 "WHERE bioentry_id = %s AND dbxref_id = %s"
jpayne@68 1212 )
jpayne@68 1213 result = self.adaptor.execute_and_fetch_col0(sql, (bioentry_id, dbxref_id))
jpayne@68 1214 # If there was a record, return without executing anything, else create
jpayne@68 1215 # the record and return
jpayne@68 1216 if result:
jpayne@68 1217 return result
jpayne@68 1218 return self._add_bioentry_dbxref(bioentry_id, dbxref_id, rank)
jpayne@68 1219
jpayne@68 1220 def _add_bioentry_dbxref(self, bioentry_id, dbxref_id, rank):
jpayne@68 1221 """Insert a bioentry_dbxref row (PRIVATE).
jpayne@68 1222
jpayne@68 1223 Returns the seqfeature_id and dbxref_id (PRIVATE).
jpayne@68 1224 """
jpayne@68 1225 sql = (
jpayne@68 1226 "INSERT INTO bioentry_dbxref "
jpayne@68 1227 '(bioentry_id,dbxref_id,"rank") VALUES '
jpayne@68 1228 "(%s, %s, %s)"
jpayne@68 1229 )
jpayne@68 1230 self.adaptor.execute(sql, (bioentry_id, dbxref_id, rank))
jpayne@68 1231 return (bioentry_id, dbxref_id)
jpayne@68 1232
jpayne@68 1233
jpayne@68 1234 class DatabaseRemover:
jpayne@68 1235 """Complement the Loader functionality by fully removing a database.
jpayne@68 1236
jpayne@68 1237 This probably isn't really useful for normal purposes, since you
jpayne@68 1238 can just do a::
jpayne@68 1239
jpayne@68 1240 DROP DATABASE db_name
jpayne@68 1241
jpayne@68 1242 and then recreate the database. But, it's really useful for testing
jpayne@68 1243 purposes.
jpayne@68 1244 """
jpayne@68 1245
jpayne@68 1246 def __init__(self, adaptor, dbid):
jpayne@68 1247 """Initialize with a database id and adaptor connection."""
jpayne@68 1248 self.adaptor = adaptor
jpayne@68 1249 self.dbid = dbid
jpayne@68 1250
jpayne@68 1251 def remove(self):
jpayne@68 1252 """Remove everything related to the given database id."""
jpayne@68 1253 sql = "DELETE FROM bioentry WHERE biodatabase_id = %s"
jpayne@68 1254 self.adaptor.execute(sql, (self.dbid,))
jpayne@68 1255 sql = "DELETE FROM biodatabase WHERE biodatabase_id = %s"
jpayne@68 1256 self.adaptor.execute(sql, (self.dbid,))