comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/BioSQL/Loader.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 69:33d812a61356
1 # Copyright 2002 by Andrew Dalke. All rights reserved.
2 # Revisions 2007-2016 copyright by Peter Cock. All rights reserved.
3 # Revisions 2008 copyright by Cymon J. Cox. All rights reserved.
4 #
5 # This file is part of the Biopython distribution and governed by your
6 # choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
7 # Please see the LICENSE file that should have been included as part of this
8 # package.
9 #
10 # Note that BioSQL (including the database schema and scripts) is
11 # available and licensed separately. Please consult www.biosql.org
12
13 """Load biopython objects into a BioSQL database for persistent storage.
14
15 This code makes it possible to store biopython objects in a relational
16 database and then retrieve them back. You shouldn't use any of the
17 classes in this module directly. Rather, call the load() method on
18 a database object.
19 """
20 # standard modules
21 from time import gmtime, strftime
22
23 # biopython
24 from Bio.SeqUtils.CheckSum import crc64
25 from Bio import Entrez
26 from Bio.Seq import UndefinedSequenceError
27 from Bio.SeqFeature import UnknownPosition
28
29
30 class DatabaseLoader:
31 """Object used to load SeqRecord objects into a BioSQL database."""
32
33 def __init__(self, adaptor, dbid, fetch_NCBI_taxonomy=False):
34 """Initialize with connection information for the database.
35
36 Creating a DatabaseLoader object is normally handled via the
37 BioSeqDatabase DBServer object, for example::
38
39 from BioSQL import BioSeqDatabase
40 server = BioSeqDatabase.open_database(driver="MySQLdb",
41 user="gbrowse",
42 passwd="biosql",
43 host="localhost",
44 db="test_biosql")
45 try:
46 db = server["test"]
47 except KeyError:
48 db = server.new_database("test",
49 description="For testing GBrowse")
50
51 """
52 self.adaptor = adaptor
53 self.dbid = dbid
54 self.fetch_NCBI_taxonomy = fetch_NCBI_taxonomy
55
56 def load_seqrecord(self, record):
57 """Load a Biopython SeqRecord into the database."""
58 bioentry_id = self._load_bioentry_table(record)
59 self._load_bioentry_date(record, bioentry_id)
60 self._load_biosequence(record, bioentry_id)
61 self._load_comment(record, bioentry_id)
62 self._load_dbxrefs(record, bioentry_id)
63 references = record.annotations.get("references", ())
64 for reference, rank in zip(references, list(range(len(references)))):
65 self._load_reference(reference, rank, bioentry_id)
66 self._load_annotations(record, bioentry_id)
67 for seq_feature_num in range(len(record.features)):
68 seq_feature = record.features[seq_feature_num]
69 self._load_seqfeature(seq_feature, seq_feature_num, bioentry_id)
70
71 def _get_ontology_id(self, name, definition=None):
72 """Return identifier for the named ontology (PRIVATE).
73
74 This looks through the onotology table for a the given entry name.
75 If it is not found, a row is added for this ontology (using the
76 definition if supplied). In either case, the id corresponding to
77 the provided name is returned, so that you can reference it in
78 another table.
79 """
80 oids = self.adaptor.execute_and_fetch_col0(
81 "SELECT ontology_id FROM ontology WHERE name = %s", (name,)
82 )
83 if oids:
84 return oids[0]
85 self.adaptor.execute(
86 "INSERT INTO ontology(name, definition) VALUES (%s, %s)", (name, definition)
87 )
88 return self.adaptor.last_id("ontology")
89
90 def _get_term_id(self, name, ontology_id=None, definition=None, identifier=None):
91 """Get the id that corresponds to a term (PRIVATE).
92
93 This looks through the term table for a the given term. If it
94 is not found, a new id corresponding to this term is created.
95 In either case, the id corresponding to that term is returned, so
96 that you can reference it in another table.
97
98 The ontology_id should be used to disambiguate the term.
99 """
100 # try to get the term id
101 sql = "SELECT term_id FROM term WHERE name = %s"
102 fields = [name]
103 if ontology_id:
104 sql += " AND ontology_id = %s"
105 fields.append(ontology_id)
106 id_results = self.adaptor.execute_and_fetchall(sql, fields)
107 # something is wrong
108 if len(id_results) > 1:
109 raise ValueError(f"Multiple term ids for {name}: {id_results!r}")
110 elif len(id_results) == 1:
111 return id_results[0][0]
112 else:
113 sql = (
114 "INSERT INTO term (name, definition,"
115 " identifier, ontology_id)"
116 " VALUES (%s, %s, %s, %s)"
117 )
118 self.adaptor.execute(sql, (name, definition, identifier, ontology_id))
119 return self.adaptor.last_id("term")
120
121 def _add_dbxref(self, dbname, accession, version):
122 """Insert a dbxref and return its id (PRIVATE)."""
123 self.adaptor.execute(
124 "INSERT INTO dbxref(dbname, accession, version) VALUES (%s, %s, %s)",
125 (dbname, accession, version),
126 )
127 return self.adaptor.last_id("dbxref")
128
129 def _get_taxon_id(self, record):
130 """Get the taxon id for this record (PRIVATE).
131
132 Arguments:
133 - record - a SeqRecord object
134
135 This searches the taxon/taxon_name tables using the
136 NCBI taxon ID, scientific name and common name to find
137 the matching taxon table entry's id.
138
139 If the species isn't in the taxon table, and we have at
140 least the NCBI taxon ID, scientific name or common name,
141 at least a minimal stub entry is created in the table.
142
143 Returns the taxon id (database key for the taxon table,
144 not an NCBI taxon ID), or None if the taxonomy information
145 is missing.
146
147 See also the BioSQL script load_ncbi_taxonomy.pl which
148 will populate and update the taxon/taxon_name tables
149 with the latest information from the NCBI.
150 """
151 # To find the NCBI taxid, first check for a top level annotation
152 ncbi_taxon_id = None
153 if "ncbi_taxid" in record.annotations:
154 # Could be a list of IDs.
155 if isinstance(record.annotations["ncbi_taxid"], list):
156 if len(record.annotations["ncbi_taxid"]) == 1:
157 ncbi_taxon_id = record.annotations["ncbi_taxid"][0]
158 else:
159 ncbi_taxon_id = record.annotations["ncbi_taxid"]
160 if not ncbi_taxon_id:
161 # Secondly, look for a source feature
162 for f in record.features:
163 if f.type == "source":
164 quals = getattr(f, "qualifiers", {})
165 if "db_xref" in quals:
166 for db_xref in f.qualifiers["db_xref"]:
167 if db_xref.startswith("taxon:"):
168 ncbi_taxon_id = int(db_xref[6:])
169 break
170 if ncbi_taxon_id:
171 break
172
173 try:
174 scientific_name = record.annotations["organism"][:255]
175 except KeyError:
176 scientific_name = None
177 try:
178 common_name = record.annotations["source"][:255]
179 except KeyError:
180 common_name = None
181 # Note: The maximum length for taxon names in the schema is 255.
182 # Cropping it now should help in getting a match when searching,
183 # and avoids an error if we try and add these to the database.
184
185 if ncbi_taxon_id:
186 # Good, we have the NCBI taxon to go on - this is unambiguous :)
187 # Note that the scientific name and common name will only be
188 # used if we have to record a stub entry.
189 return self._get_taxon_id_from_ncbi_taxon_id(
190 ncbi_taxon_id, scientific_name, common_name
191 )
192
193 if not common_name and not scientific_name:
194 # Nothing to go on... and there is no point adding
195 # a new entry to the database. We'll just leave this
196 # sequence's taxon as a NULL in the database.
197 return None
198
199 # Next, we'll try to find a match based on the species name
200 # (stored in GenBank files as the organism and/or the source).
201 if scientific_name:
202 taxa = self.adaptor.execute_and_fetch_col0(
203 "SELECT taxon_id FROM taxon_name"
204 " WHERE name_class = 'scientific name' AND name = %s",
205 (scientific_name,),
206 )
207 if taxa:
208 # Good, mapped the scientific name to a taxon table entry
209 return taxa[0]
210
211 # Last chance...
212 if common_name:
213 taxa = self.adaptor.execute_and_fetch_col0(
214 "SELECT DISTINCT taxon_id FROM taxon_name WHERE name = %s",
215 (common_name,),
216 )
217 # Its natural that several distinct taxa will have the same common
218 # name - in which case we can't resolve the taxon uniquely.
219 if len(taxa) > 1:
220 raise ValueError(
221 "Taxa: %d species have name %r" % (len(taxa), common_name)
222 )
223 if taxa:
224 # Good, mapped the common name to a taxon table entry
225 return taxa[0]
226
227 # At this point, as far as we can tell, this species isn't
228 # in the taxon table already. So we'll have to add it.
229 # We don't have an NCBI taxonomy ID, so if we do record just
230 # a stub entry, there is no simple way to fix this later.
231 #
232 # TODO - Should we try searching the NCBI taxonomy using the
233 # species name?
234 #
235 # OK, let's try inserting the species.
236 # Chances are we don't have enough information ...
237 # Furthermore, it won't be in the hierarchy.
238
239 lineage = []
240 for c in record.annotations.get("taxonomy", []):
241 lineage.append([None, None, c])
242 if lineage:
243 lineage[-1][1] = "genus"
244 lineage.append([None, "species", record.annotations["organism"]])
245 # XXX do we have them?
246 if "subspecies" in record.annotations:
247 lineage.append([None, "subspecies", record.annotations["subspecies"]])
248 if "variant" in record.annotations:
249 lineage.append([None, "varietas", record.annotations["variant"]])
250 lineage[-1][0] = ncbi_taxon_id
251
252 left_value = self.adaptor.execute_one("SELECT MAX(left_value) FROM taxon")[0]
253 if not left_value:
254 left_value = 0
255 left_value += 1
256
257 # XXX -- Brad: Fixing this for now in an ugly way because
258 # I am getting overlaps for right_values. I need to dig into this
259 # more to actually understand how it works. I'm not sure it is
260 # actually working right anyhow.
261 right_start_value = self.adaptor.execute_one(
262 "SELECT MAX(right_value) FROM taxon"
263 )[0]
264 if not right_start_value:
265 right_start_value = 0
266 right_value = right_start_value + 2 * len(lineage) - 1
267
268 parent_taxon_id = None
269 for taxon in lineage:
270 self.adaptor.execute(
271 "INSERT INTO taxon(parent_taxon_id, ncbi_taxon_id, node_rank,"
272 " left_value, right_value)"
273 " VALUES (%s, %s, %s, %s, %s)",
274 (parent_taxon_id, taxon[0], taxon[1], left_value, right_value),
275 )
276 taxon_id = self.adaptor.last_id("taxon")
277 self.adaptor.execute(
278 "INSERT INTO taxon_name(taxon_id, name, name_class)"
279 "VALUES (%s, %s, 'scientific name')",
280 (taxon_id, taxon[2][:255]),
281 )
282 # Note the name field is limited to 255, some SwissProt files
283 # have a multi-species name which can be longer. So truncate this.
284 left_value += 1
285 right_value -= 1
286 parent_taxon_id = taxon_id
287 if common_name:
288 self.adaptor.execute(
289 "INSERT INTO taxon_name(taxon_id, name, name_class)"
290 "VALUES (%s, %s, 'common name')",
291 (taxon_id, common_name),
292 )
293
294 return taxon_id
295
296 def _fix_name_class(self, entrez_name):
297 """Map Entrez name terms to those used in taxdump (PRIVATE).
298
299 We need to make this conversion to match the taxon_name.name_class
300 values used by the BioSQL load_ncbi_taxonomy.pl script.
301
302 e.g.::
303
304 "ScientificName" -> "scientific name",
305 "EquivalentName" -> "equivalent name",
306 "Synonym" -> "synonym",
307
308 """
309 # Add any special cases here:
310 #
311 # known = {}
312 # try:
313 # return known[entrez_name]
314 # except KeyError:
315 # pass
316
317 # Try automatically by adding spaces before each capital
318 def add_space(letter):
319 """Add a space before a capital letter."""
320 if letter.isupper():
321 return " " + letter.lower()
322 else:
323 return letter
324
325 answer = "".join(add_space(letter) for letter in entrez_name).strip()
326 if answer != answer.lower():
327 raise ValueError(
328 f"Expected processed entrez_name, '{answer}' to only have lower case letters."
329 )
330 return answer
331
332 def _update_left_right_taxon_values(self, left_value):
333 """Update the left and right taxon values in the table (PRIVATE)."""
334 if not left_value:
335 return
336 # Due to the UNIQUE constraint on the left and right values in the taxon
337 # table we cannot simply update them through an SQL statement as we risk
338 # colliding values. Instead we must select all of the rows that we want to
339 # update, modify the values in python and then update the rows
340 # self.adaptor.execute("UPDATE taxon SET right_value = right_value + 2 "
341 # "WHERE right_value >= %s", (left_value,))
342 # self.adaptor.execute("UPDATE taxon SET left_value = left_value + 2 "
343 # "WHERE left_value > %s", (left_value,))
344
345 rows = self.adaptor.execute_and_fetchall(
346 "SELECT left_value, right_value, taxon_id FROM taxon "
347 "WHERE right_value >= %s or left_value > %s",
348 (left_value, left_value),
349 )
350
351 right_rows = []
352 left_rows = []
353 for row in rows:
354 new_right = row[1]
355 new_left = row[0]
356 if new_right >= left_value:
357 new_right += 2
358
359 if new_left > left_value:
360 new_left += 2
361 right_rows.append((new_right, row[2]))
362 left_rows.append((new_left, row[2]))
363
364 # sort the rows based on the value from largest to smallest
365 # should ensure no overlaps
366 right_rows = sorted(right_rows, key=lambda x: x[0], reverse=True)
367 left_rows = sorted(left_rows, key=lambda x: x[0], reverse=True)
368
369 self.adaptor.executemany(
370 "UPDATE taxon SET left_value = %s WHERE taxon_id = %s", left_rows
371 )
372 self.adaptor.executemany(
373 "UPDATE taxon SET right_value = %s WHERE taxon_id = %s", right_rows
374 )
375
376 def _get_taxon_id_from_ncbi_taxon_id(
377 self, ncbi_taxon_id, scientific_name=None, common_name=None
378 ):
379 """Get the taxon id for record from NCBI taxon ID (PRIVATE).
380
381 Arguments:
382 - ncbi_taxon_id - string containing an NCBI taxon id
383 - scientific_name - string, used if a stub entry is recorded
384 - common_name - string, used if a stub entry is recorded
385
386 This searches the taxon table using ONLY the NCBI taxon ID
387 to find the matching taxon table entry's ID (database key).
388
389 If the species isn't in the taxon table, and the fetch_NCBI_taxonomy
390 flag is true, Biopython will attempt to go online using Bio.Entrez
391 to fetch the official NCBI lineage, recursing up the tree until an
392 existing entry is found in the database or the full lineage has been
393 fetched.
394
395 Otherwise the NCBI taxon ID, scientific name and common name are
396 recorded as a minimal stub entry in the taxon and taxon_name tables.
397 Any partial information about the lineage from the SeqRecord is NOT
398 recorded. This should mean that (re)running the BioSQL script
399 load_ncbi_taxonomy.pl can fill in the taxonomy lineage.
400
401 Returns the taxon id (database key for the taxon table, not
402 an NCBI taxon ID).
403 """
404 if not ncbi_taxon_id:
405 raise ValueError("Expected a non-empty value for ncbi_taxon_id.")
406
407 taxon_id = self.adaptor.execute_and_fetch_col0(
408 "SELECT taxon_id FROM taxon WHERE ncbi_taxon_id = %s", (int(ncbi_taxon_id),)
409 )
410 if taxon_id:
411 # Good, we have mapped the NCBI taxid to a taxon table entry
412 return taxon_id[0]
413
414 # At this point, as far as we can tell, this species isn't
415 # in the taxon table already. So we'll have to add it.
416
417 parent_taxon_id = None
418 rank = "species"
419 genetic_code = None
420 mito_genetic_code = None
421 parent_left_value = None
422 parent_right_value = None
423 left_value = None
424 right_value = None
425 species_names = []
426 if scientific_name:
427 species_names.append(("scientific name", scientific_name))
428 if common_name:
429 species_names.append(("common name", common_name))
430
431 if self.fetch_NCBI_taxonomy:
432 # Go online to get the parent taxon ID!
433 handle = Entrez.efetch(db="taxonomy", id=ncbi_taxon_id, retmode="XML")
434 taxonomic_record = Entrez.read(handle)
435 if len(taxonomic_record) == 1:
436 if taxonomic_record[0]["TaxId"] != str(ncbi_taxon_id):
437 raise ValueError(
438 f"ncbi_taxon_id different from parent taxon id. {ncbi_taxon_id} versus {taxonomic_record[0]['TaxId']}"
439 )
440
441 (
442 parent_taxon_id,
443 parent_left_value,
444 parent_right_value,
445 ) = self._get_taxon_id_from_ncbi_lineage(
446 taxonomic_record[0]["LineageEx"]
447 )
448
449 left_value = parent_right_value
450 right_value = parent_right_value + 1
451
452 rank = str(taxonomic_record[0]["Rank"])
453
454 genetic_code = int(taxonomic_record[0]["GeneticCode"]["GCId"])
455
456 mito_genetic_code = int(taxonomic_record[0]["MitoGeneticCode"]["MGCId"])
457
458 species_names = [
459 ("scientific name", str(taxonomic_record[0]["ScientificName"]))
460 ]
461 try:
462 for name_class, names in taxonomic_record[0]["OtherNames"].items():
463 name_class = self._fix_name_class(name_class)
464 if not isinstance(names, list):
465 # The Entrez parser seems to return single entry
466 # lists as just a string which is annoying.
467 names = [names]
468 for name in names:
469 # Want to ignore complex things like ClassCDE
470 # entries
471 if isinstance(name, str):
472 species_names.append((name_class, name))
473 except KeyError:
474 # OtherNames isn't always present,
475 # e.g. NCBI taxon 41205, Bromheadia finlaysoniana
476 pass
477 else:
478 pass
479 # If we are not allowed to go online, we will record the bare minimum;
480 # as long as the NCBI taxon id is present, then (re)running
481 # load_ncbi_taxonomy.pl should fill in the taxonomomy lineage
482 # (and update the species names).
483 #
484 # I am NOT going to try and record the lineage, even if it
485 # is in the record annotation as a list of names, as we won't
486 # know the NCBI taxon IDs for these parent nodes.
487
488 self._update_left_right_taxon_values(left_value)
489
490 self.adaptor.execute(
491 "INSERT INTO taxon(parent_taxon_id, ncbi_taxon_id, node_rank,"
492 " genetic_code, mito_genetic_code, left_value, right_value)"
493 " VALUES (%s, %s, %s, %s, %s, %s, %s)",
494 (
495 parent_taxon_id,
496 ncbi_taxon_id,
497 rank,
498 genetic_code,
499 mito_genetic_code,
500 left_value,
501 right_value,
502 ),
503 )
504
505 taxon_id = self.adaptor.last_id("taxon")
506
507 # Record the scientific name, common name, etc
508 for name_class, name in species_names:
509 self.adaptor.execute(
510 "INSERT INTO taxon_name(taxon_id, name, name_class)"
511 " VALUES (%s, %s, %s)",
512 (taxon_id, name[:255], name_class),
513 )
514 return taxon_id
515
516 def _get_taxon_id_from_ncbi_lineage(self, taxonomic_lineage):
517 """Recursive method to get taxon ID from NCBI lineage (PRIVATE).
518
519 Arguments:
520 - taxonomic_lineage - list of taxonomy dictionaries from Bio.Entrez
521
522 First dictionary in list is the taxonomy root, highest would be
523 the species. Each dictionary includes:
524
525 - TaxID (string, NCBI taxon id)
526 - Rank (string, e.g. "species", "genus", ..., "phylum", ...)
527 - ScientificName (string)
528
529 (and that is all at the time of writing)
530
531 This method will record all the lineage given, returning the taxon id
532 (database key, not NCBI taxon id) of the final entry (the species).
533 """
534 ncbi_taxon_id = int(taxonomic_lineage[-1]["TaxId"])
535 left_value = None
536 right_value = None
537 parent_left_value = None
538 parent_right_value = None
539 # Is this in the database already? Check the taxon table...
540 rows = self.adaptor.execute_and_fetchall(
541 "SELECT taxon_id, left_value, right_value FROM taxon"
542 " WHERE ncbi_taxon_id=%s" % ncbi_taxon_id
543 )
544 if rows:
545 # we could verify that the Scientific Name etc in the database
546 # is the same and update it or print a warning if not...
547 if len(rows) != 1:
548 raise ValueError(f"Expected 1 reponse, got {len(rows)}")
549 return rows[0]
550
551 # We have to record this.
552 if len(taxonomic_lineage) > 1:
553 # Use recursion to find out the taxon id (database key) of the
554 # parent.
555 (
556 parent_taxon_id,
557 parent_left_value,
558 parent_right_value,
559 ) = self._get_taxon_id_from_ncbi_lineage(taxonomic_lineage[:-1])
560 left_value = parent_right_value
561 right_value = parent_right_value + 1
562 if not isinstance(parent_taxon_id, int):
563 raise ValueError(
564 f"Expected parent_taxon_id to be an int, got {parent_taxon_id}"
565 )
566 else:
567 # we have reached the top of the lineage but no current taxonomy
568 # id has been found
569 parent_taxon_id = None
570 left_value = self.adaptor.execute_one("SELECT MAX(left_value) FROM taxon")[
571 0
572 ]
573 if not left_value:
574 left_value = 0
575
576 right_value = left_value + 1
577
578 self._update_left_right_taxon_values(left_value)
579
580 # INSERT new taxon
581 rank = str(taxonomic_lineage[-1].get("Rank"))
582 self.adaptor.execute(
583 "INSERT INTO taxon(ncbi_taxon_id, parent_taxon_id, node_rank, "
584 "left_value, right_value) VALUES (%s, %s, %s, %s, %s)",
585 (ncbi_taxon_id, parent_taxon_id, rank, left_value, right_value),
586 )
587
588 taxon_id = self.adaptor.last_id("taxon")
589 # assert isinstance(taxon_id, int), repr(taxon_id)
590 # ... and its name in taxon_name
591 scientific_name = taxonomic_lineage[-1].get("ScientificName")
592 if scientific_name:
593 self.adaptor.execute(
594 "INSERT INTO taxon_name(taxon_id, name, name_class) "
595 "VALUES (%s, %s, 'scientific name')",
596 (taxon_id, scientific_name[:255]),
597 )
598 return taxon_id, left_value, right_value
599
600 def _load_bioentry_table(self, record):
601 """Fill the bioentry table with sequence information (PRIVATE).
602
603 Arguments:
604 - record - SeqRecord object to add to the database.
605
606 """
607 # get the pertinent info and insert it
608
609 if record.id.count(".") == 1: # try to get a version from the id
610 # This assumes the string is something like "XXXXXXXX.123"
611 accession, version = record.id.split(".")
612 try:
613 version = int(version)
614 except ValueError:
615 accession = record.id
616 version = 0
617 else: # otherwise just use a version of 0
618 accession = record.id
619 version = 0
620
621 if (
622 "accessions" in record.annotations
623 and isinstance(record.annotations["accessions"], list)
624 and record.annotations["accessions"]
625 ):
626 # Take the first accession (one if there is more than one)
627 accession = record.annotations["accessions"][0]
628
629 # Find the taxon id (this is not just the NCBI Taxon ID)
630 # NOTE - If the species isn't defined in the taxon table,
631 # a new minimal entry is created.
632 taxon_id = self._get_taxon_id(record)
633
634 if "gi" in record.annotations:
635 identifier = record.annotations["gi"]
636 else:
637 identifier = record.id
638
639 # Allow description and division to default to NULL as in BioPerl.
640 description = getattr(record, "description", None)
641 division = record.annotations.get("data_file_division")
642
643 sql = """
644 INSERT INTO bioentry (
645 biodatabase_id,
646 taxon_id,
647 name,
648 accession,
649 identifier,
650 division,
651 description,
652 version)
653 VALUES (
654 %s,
655 %s,
656 %s,
657 %s,
658 %s,
659 %s,
660 %s,
661 %s)"""
662 # print(self.dbid, taxon_id, record.name, accession, identifier, \
663 # division, description, version)
664 self.adaptor.execute(
665 sql,
666 (
667 self.dbid,
668 taxon_id,
669 record.name,
670 accession,
671 identifier,
672 division,
673 description,
674 version,
675 ),
676 )
677 # now retrieve the id for the bioentry
678 return self.adaptor.last_id("bioentry")
679
680 def _load_bioentry_date(self, record, bioentry_id):
681 """Add the effective date of the entry into the database (PRIVATE).
682
683 record - a SeqRecord object with an annotated date
684 bioentry_id - corresponding database identifier
685 """
686 # dates are GenBank style, like:
687 # 14-SEP-2000
688 date = record.annotations.get("date", strftime("%d-%b-%Y", gmtime()).upper())
689 if isinstance(date, list):
690 date = date[0]
691 annotation_tags_id = self._get_ontology_id("Annotation Tags")
692 date_id = self._get_term_id("date_changed", annotation_tags_id)
693 sql = (
694 "INSERT INTO bioentry_qualifier_value"
695 ' (bioentry_id, term_id, value, "rank")'
696 " VALUES (%s, %s, %s, 1)"
697 )
698 self.adaptor.execute(sql, (bioentry_id, date_id, date))
699
700 def _load_biosequence(self, record, bioentry_id):
701 """Record SeqRecord's sequence and alphabet in DB (PRIVATE).
702
703 Arguments:
704 - record - a SeqRecord object with a seq property
705 - bioentry_id - corresponding database identifier
706
707 """
708 if record.seq is None:
709 # The biosequence table entry is optional, so if we haven't
710 # got a sequence, we don't need to write to the table.
711 return
712
713 molecule_type = record.annotations.get("molecule_type", "")
714 if "DNA" in molecule_type:
715 alphabet = "dna"
716 elif "RNA" in molecule_type:
717 alphabet = "rna"
718 elif "protein" in molecule_type:
719 alphabet = "protein"
720 else:
721 alphabet = "unknown"
722
723 try:
724 seq_str = str(record.seq)
725 except UndefinedSequenceError:
726 seq_str = None
727
728 sql = (
729 "INSERT INTO biosequence (bioentry_id, version, "
730 "length, seq, alphabet) "
731 "VALUES (%s, 0, %s, %s, %s)"
732 )
733 self.adaptor.execute(sql, (bioentry_id, len(record.seq), seq_str, alphabet))
734
735 def _load_comment(self, record, bioentry_id):
736 """Record a SeqRecord's annotated comment in the database (PRIVATE).
737
738 Arguments:
739 - record - a SeqRecord object with an annotated comment
740 - bioentry_id - corresponding database identifier
741
742 """
743 comments = record.annotations.get("comment")
744 if not comments:
745 return
746 if not isinstance(comments, list):
747 # It should be a string then...
748 comments = [comments]
749
750 for index, comment in enumerate(comments):
751 comment = comment.replace("\n", " ")
752 # TODO - Store each line as a separate entry? This would preserve
753 # the newlines, but we should check BioPerl etc to be consistent.
754 sql = (
755 'INSERT INTO comment (bioentry_id, comment_text, "rank")'
756 " VALUES (%s, %s, %s)"
757 )
758 self.adaptor.execute(sql, (bioentry_id, comment, index + 1))
759
760 def _load_annotations(self, record, bioentry_id):
761 """Record a SeqRecord's misc annotations in the database (PRIVATE).
762
763 The annotation strings are recorded in the bioentry_qualifier_value
764 table, except for special cases like the reference, comment and
765 taxonomy which are handled with their own tables.
766
767 Arguments:
768 - record - a SeqRecord object with an annotations dictionary
769 - bioentry_id - corresponding database identifier
770
771 """
772 mono_sql = (
773 "INSERT INTO bioentry_qualifier_value"
774 "(bioentry_id, term_id, value)"
775 " VALUES (%s, %s, %s)"
776 )
777 many_sql = (
778 "INSERT INTO bioentry_qualifier_value"
779 '(bioentry_id, term_id, value, "rank")'
780 " VALUES (%s, %s, %s, %s)"
781 )
782 tag_ontology_id = self._get_ontology_id("Annotation Tags")
783 for key, value in record.annotations.items():
784 if key in ["molecule_type", "references", "comment", "ncbi_taxid", "date"]:
785 # Handled separately
786 continue
787 term_id = self._get_term_id(key, ontology_id=tag_ontology_id)
788 if isinstance(value, (list, tuple)):
789 rank = 0
790 for entry in value:
791 if isinstance(entry, (str, int)):
792 # Easy case
793 rank += 1
794 self.adaptor.execute(
795 many_sql, (bioentry_id, term_id, str(entry), rank)
796 )
797 else:
798 pass
799 elif isinstance(value, (str, int)):
800 # Have a simple single entry, leave rank as the DB default
801 self.adaptor.execute(mono_sql, (bioentry_id, term_id, str(value)))
802 else:
803 pass
804 # print("Ignoring annotation '%s' entry of type '%s'" \
805 # % (key, type(value)))
806
807 def _load_reference(self, reference, rank, bioentry_id):
808 """Record SeqRecord's annotated references in the database (PRIVATE).
809
810 Arguments:
811 - record - a SeqRecord object with annotated references
812 - bioentry_id - corresponding database identifier
813
814 """
815 refs = None
816 if reference.medline_id:
817 refs = self.adaptor.execute_and_fetch_col0(
818 "SELECT reference_id"
819 " FROM reference JOIN dbxref USING (dbxref_id)"
820 " WHERE dbname = 'MEDLINE' AND accession = %s",
821 (reference.medline_id,),
822 )
823 if not refs and reference.pubmed_id:
824 refs = self.adaptor.execute_and_fetch_col0(
825 "SELECT reference_id"
826 " FROM reference JOIN dbxref USING (dbxref_id)"
827 " WHERE dbname = 'PUBMED' AND accession = %s",
828 (reference.pubmed_id,),
829 )
830 if not refs:
831 s = []
832 for f in reference.authors, reference.title, reference.journal:
833 s.append(f or "<undef>")
834 crc = crc64("".join(s))
835 refs = self.adaptor.execute_and_fetch_col0(
836 "SELECT reference_id FROM reference WHERE crc = %s", (crc,)
837 )
838 if not refs:
839 if reference.medline_id:
840 dbxref_id = self._add_dbxref("MEDLINE", reference.medline_id, 0)
841 elif reference.pubmed_id:
842 dbxref_id = self._add_dbxref("PUBMED", reference.pubmed_id, 0)
843 else:
844 dbxref_id = None
845 authors = reference.authors or None
846 title = reference.title or None
847 # The location/journal field cannot be Null, so default
848 # to an empty string rather than None:
849 journal = reference.journal or ""
850 self.adaptor.execute(
851 "INSERT INTO reference (dbxref_id, location,"
852 " title, authors, crc)"
853 " VALUES (%s, %s, %s, %s, %s)",
854 (dbxref_id, journal, title, authors, crc),
855 )
856 reference_id = self.adaptor.last_id("reference")
857 else:
858 reference_id = refs[0]
859
860 if reference.location:
861 start = 1 + int(str(reference.location[0].start))
862 end = int(str(reference.location[0].end))
863 else:
864 start = None
865 end = None
866
867 sql = (
868 "INSERT INTO bioentry_reference (bioentry_id, reference_id,"
869 ' start_pos, end_pos, "rank") VALUES (%s, %s, %s, %s, %s)'
870 )
871 self.adaptor.execute(sql, (bioentry_id, reference_id, start, end, rank + 1))
872
873 def _load_seqfeature(self, feature, feature_rank, bioentry_id):
874 """Load a biopython SeqFeature into the database (PRIVATE)."""
875 # records loaded from a gff file using BCBio.GFF will contain value
876 # of 2nd column of the gff as a feature qualifier. The BioSQL wiki
877 # suggests that the source should not go in with the other feature
878 # mappings but instead be put in the term table
879 # (http://www.biosql.org/wiki/Annotation_Mapping)
880 try:
881 source = feature.qualifiers["source"]
882 if isinstance(source, list):
883 source = source[0]
884 seqfeature_id = self._load_seqfeature_basic(
885 feature.type, feature_rank, bioentry_id, source=source
886 )
887 except KeyError:
888 seqfeature_id = self._load_seqfeature_basic(
889 feature.type, feature_rank, bioentry_id
890 )
891
892 self._load_seqfeature_locations(feature, seqfeature_id)
893 self._load_seqfeature_qualifiers(feature.qualifiers, seqfeature_id)
894
895 def _load_seqfeature_basic(
896 self, feature_type, feature_rank, bioentry_id, source="EMBL/GenBank/SwissProt"
897 ):
898 """Load the first tables of a seqfeature and returns the id (PRIVATE).
899
900 This loads the "key" of the seqfeature (ie. CDS, gene) and
901 the basic seqfeature table itself.
902 """
903 ontology_id = self._get_ontology_id("SeqFeature Keys")
904 seqfeature_key_id = self._get_term_id(feature_type, ontology_id=ontology_id)
905 source_cat_id = self._get_ontology_id("SeqFeature Sources")
906 source_term_id = self._get_term_id(source, ontology_id=source_cat_id)
907
908 sql = (
909 "INSERT INTO seqfeature (bioentry_id, type_term_id, "
910 'source_term_id, "rank") VALUES (%s, %s, %s, %s)'
911 )
912 self.adaptor.execute(
913 sql, (bioentry_id, seqfeature_key_id, source_term_id, feature_rank + 1)
914 )
915 return self.adaptor.last_id("seqfeature")
916
917 def _load_seqfeature_locations(self, feature, seqfeature_id):
918 """Load all of the locations for a SeqFeature into tables (PRIVATE).
919
920 This adds the locations related to the SeqFeature into the
921 seqfeature_location table. Fuzzies are not handled right now.
922 For a simple location, ie (1..2), we have a single table row
923 with seq_start = 1, seq_end = 2, location_rank = 1.
924
925 For split locations, ie (1..2, 3..4, 5..6) we would have three
926 row tables with::
927
928 start = 1, end = 2, rank = 1
929 start = 3, end = 4, rank = 2
930 start = 5, end = 6, rank = 3
931
932 """
933 # TODO - Record an ontology for the locations (using location.term_id)
934 # which for now as in BioPerl we leave defaulting to NULL.
935 try:
936 if feature.location.operator != "join":
937 # e.g. order locations... we don't record "order" so it
938 # will become a "join" on reloading. What does BioPerl do?
939 import warnings
940 from Bio import BiopythonWarning
941
942 warnings.warn(
943 "%s location operators are not fully supported"
944 % feature.location_operator,
945 BiopythonWarning,
946 )
947 except AttributeError:
948 pass
949 # This will be a list of length one for a SimpleLocation:
950 parts = feature.location.parts
951 if parts and {loc.strand for loc in parts} == {-1}:
952 # To mimic prior behaviour of Biopython+BioSQL, reverse order
953 parts = parts[::-1]
954 # TODO - Check what BioPerl does; see also BioSeq.py code
955 for rank, loc in enumerate(parts):
956 self._insert_location(loc, rank + 1, seqfeature_id)
957
958 def _insert_location(self, location, rank, seqfeature_id):
959 """Add SeqFeature location to seqfeature_location table (PRIVATE).
960
961 TODO - Add location operator to location_qualifier_value?
962 """
963 # convert biopython locations to the 1-based location system
964 # used in bioSQL
965 # XXX This could also handle fuzzies
966
967 try:
968 start = int(location.start) + 1
969 except TypeError:
970 # Handle SwissProt unknown position (?)
971 if isinstance(location.start, UnknownPosition):
972 start = None
973 else:
974 raise
975
976 try:
977 end = int(location.end)
978 except TypeError:
979 # Handle SwissProt unknown position (?)
980 if isinstance(location.end, UnknownPosition):
981 end = None
982 else:
983 raise
984
985 # Biopython uses None when we don't know strand information but
986 # BioSQL requires something (non null) and sets this as zero
987 # So we'll use the strand or 0 if Biopython spits out None
988 strand = location.strand or 0
989
990 # TODO - Record an ontology term for the location (location.term_id)
991 # which for now like BioPerl we'll leave as NULL.
992 # This might allow us to record "between" positions properly, but I
993 # don't really see how it could work for before/after fuzzy positions
994 loc_term_id = None
995
996 if location.ref:
997 # sub_feature remote locations when they are in the same db as the
998 # current record do not have a value for ref_db, which SeqFeature
999 # object stores as None. BioSQL schema requires a varchar and is
1000 # not NULL
1001 dbxref_id = self._get_dbxref_id(location.ref_db or "", location.ref)
1002 else:
1003 dbxref_id = None
1004
1005 sql = (
1006 "INSERT INTO location (seqfeature_id, dbxref_id, term_id,"
1007 'start_pos, end_pos, strand, "rank") '
1008 "VALUES (%s, %s, %s, %s, %s, %s, %s)"
1009 )
1010 self.adaptor.execute(
1011 sql, (seqfeature_id, dbxref_id, loc_term_id, start, end, strand, rank)
1012 )
1013
1014 """
1015 # See Bug 2677
1016 # TODO - Record the location_operator (e.g. "join" or "order")
1017 # using the location_qualifier_value table (which we and BioPerl
1018 # have historically left empty).
1019 # Note this will need an ontology term for the location qualifier
1020 # (location_qualifier_value.term_id) for which oddly the schema
1021 # does not allow NULL.
1022 if feature.location_operator:
1023 #e.g. "join" (common),
1024 #or "order" (see Tests/GenBank/protein_refseq2.gb)
1025 location_id = self.adaptor.last_id('location')
1026 loc_qual_term_id = None # Not allowed in BioSQL v1.0.1
1027 sql = ("INSERT INTO location_qualifier_value"
1028 "(location_id, term_id, value) "
1029 "VALUES (%s, %s, %s)")
1030 self.adaptor.execute(sql, (location_id, loc_qual_term_id,
1031 feature.location_operator))
1032 """
1033
1034 def _load_seqfeature_qualifiers(self, qualifiers, seqfeature_id):
1035 """Insert feature's (key, value) pair qualifiers (PRIVATE).
1036
1037 Qualifiers should be a dictionary of the form::
1038
1039 {key : [value1, value2]}
1040
1041 """
1042 tag_ontology_id = self._get_ontology_id("Annotation Tags")
1043 for qualifier_key in qualifiers:
1044 # Treat db_xref qualifiers differently to sequence annotation
1045 # qualifiers by populating the seqfeature_dbxref and dbxref
1046 # tables. Other qualifiers go into the seqfeature_qualifier_value
1047 # and (if new) term tables.
1048 if qualifier_key != "db_xref":
1049 qualifier_key_id = self._get_term_id(
1050 qualifier_key, ontology_id=tag_ontology_id
1051 )
1052 # now add all of the values to their table
1053 entries = qualifiers[qualifier_key]
1054 if not isinstance(entries, list):
1055 # Could be a plain string, or an int or a float.
1056 # However, we exect a list of strings here.
1057 entries = [entries]
1058 for qual_value_rank in range(len(entries)):
1059 qualifier_value = entries[qual_value_rank]
1060 sql = (
1061 "INSERT INTO seqfeature_qualifier_value "
1062 ' (seqfeature_id, term_id, "rank", value) VALUES'
1063 " (%s, %s, %s, %s)"
1064 )
1065 self.adaptor.execute(
1066 sql,
1067 (
1068 seqfeature_id,
1069 qualifier_key_id,
1070 qual_value_rank + 1,
1071 qualifier_value,
1072 ),
1073 )
1074 else:
1075 # The dbxref_id qualifier/value sets go into the dbxref table
1076 # as dbname, accession, version tuples, with dbxref.dbxref_id
1077 # being automatically assigned, and into the seqfeature_dbxref
1078 # table as seqfeature_id, dbxref_id, and rank tuples
1079 self._load_seqfeature_dbxref(qualifiers[qualifier_key], seqfeature_id)
1080
1081 def _load_seqfeature_dbxref(self, dbxrefs, seqfeature_id):
1082 """Add SeqFeature's DB cross-references to the database (PRIVATE).
1083
1084 Arguments:
1085 - dbxrefs - List, dbxref data from the source file in the
1086 format <database>:<accession>
1087 - seqfeature_id - Int, the identifier for the seqfeature in the
1088 seqfeature table
1089
1090 Insert dbxref qualifier data for a seqfeature into the
1091 seqfeature_dbxref and, if required, dbxref tables.
1092 The dbxref_id qualifier/value sets go into the dbxref table
1093 as dbname, accession, version tuples, with dbxref.dbxref_id
1094 being automatically assigned, and into the seqfeature_dbxref
1095 table as seqfeature_id, dbxref_id, and rank tuples.
1096 """
1097 # NOTE - In older versions of Biopython, we would map the GenBank
1098 # db_xref "name", for example "GI" to "GeneIndex", and give a warning
1099 # for any unknown terms. This was a long term maintenance problem,
1100 # and differed from BioPerl and BioJava's implementation. See bug 2405
1101 for rank, value in enumerate(dbxrefs):
1102 # Split the DB:accession format string at colons. We have to
1103 # account for multiple-line and multiple-accession entries
1104 try:
1105 dbxref_data = value.replace(" ", "").replace("\n", "").split(":")
1106 db = dbxref_data[0]
1107 accessions = dbxref_data[1:]
1108 except Exception:
1109 raise ValueError(f"Parsing of db_xref failed: '{value}'") from None
1110 # Loop over all the grabbed accessions, and attempt to fill the
1111 # table
1112 for accession in accessions:
1113 # Get the dbxref_id value for the dbxref data
1114 dbxref_id = self._get_dbxref_id(db, accession)
1115 # Insert the seqfeature_dbxref data
1116 self._get_seqfeature_dbxref(seqfeature_id, dbxref_id, rank + 1)
1117
1118 def _get_dbxref_id(self, db, accession):
1119 """Get DB cross-reference for accession (PRIVATE).
1120
1121 Arguments:
1122 - db - String, the name of the external database containing
1123 the accession number
1124 - accession - String, the accession of the dbxref data
1125
1126 Finds and returns the dbxref_id for the passed data. The method
1127 attempts to find an existing record first, and inserts the data
1128 if there is no record.
1129 """
1130 # Check for an existing record
1131 sql = "SELECT dbxref_id FROM dbxref WHERE dbname = %s AND accession = %s"
1132 dbxref_id = self.adaptor.execute_and_fetch_col0(sql, (db, accession))
1133 # If there was a record, return the dbxref_id, else create the
1134 # record and return the created dbxref_id
1135 if dbxref_id:
1136 return dbxref_id[0]
1137 return self._add_dbxref(db, accession, 0)
1138
1139 def _get_seqfeature_dbxref(self, seqfeature_id, dbxref_id, rank):
1140 """Get DB cross-reference, creating it if needed (PRIVATE).
1141
1142 Check for a pre-existing seqfeature_dbxref entry with the passed
1143 seqfeature_id and dbxref_id. If one does not exist, insert new
1144 data.
1145 """
1146 # Check for an existing record
1147 sql = (
1148 "SELECT seqfeature_id, dbxref_id FROM seqfeature_dbxref "
1149 "WHERE seqfeature_id = %s AND dbxref_id = %s"
1150 )
1151 result = self.adaptor.execute_and_fetch_col0(sql, (seqfeature_id, dbxref_id))
1152 # If there was a record, return without executing anything, else create
1153 # the record and return
1154 if result:
1155 return result
1156 return self._add_seqfeature_dbxref(seqfeature_id, dbxref_id, rank)
1157
1158 def _add_seqfeature_dbxref(self, seqfeature_id, dbxref_id, rank):
1159 """Add DB cross-reference (PRIVATE).
1160
1161 Insert a seqfeature_dbxref row and return the seqfeature_id and
1162 dbxref_id
1163 """
1164 sql = (
1165 "INSERT INTO seqfeature_dbxref "
1166 '(seqfeature_id, dbxref_id, "rank") VALUES'
1167 "(%s, %s, %s)"
1168 )
1169 self.adaptor.execute(sql, (seqfeature_id, dbxref_id, rank))
1170 return (seqfeature_id, dbxref_id)
1171
1172 def _load_dbxrefs(self, record, bioentry_id):
1173 """Load any sequence level cross references into the database (PRIVATE).
1174
1175 See table bioentry_dbxref.
1176 """
1177 for rank, value in enumerate(record.dbxrefs):
1178 # Split the DB:accession string at first colon.
1179 # We have to cope with things like:
1180 # "MGD:MGI:892" (db="MGD", accession="MGI:892")
1181 # "GO:GO:123" (db="GO", accession="GO:123")
1182 #
1183 # Annoyingly I have seen the NCBI use both the style
1184 # "GO:GO:123" and "GO:123" in different vintages.
1185 newline_escape_count = value.count("\n")
1186 if newline_escape_count != 0:
1187 raise ValueError(
1188 "Expected a single line in value, got {newline_escape_count}"
1189 )
1190 try:
1191 db, accession = value.split(":", 1)
1192 db = db.strip()
1193 accession = accession.strip()
1194 except Exception:
1195 raise ValueError(f"Parsing of dbxrefs list failed: '{value}'") from None
1196 # Get the dbxref_id value for the dbxref data
1197 dbxref_id = self._get_dbxref_id(db, accession)
1198 # Insert the bioentry_dbxref data
1199 self._get_bioentry_dbxref(bioentry_id, dbxref_id, rank + 1)
1200
1201 def _get_bioentry_dbxref(self, bioentry_id, dbxref_id, rank):
1202 """Get pre-existing db-xref, or create and return it (PRIVATE).
1203
1204 Check for a pre-existing bioentry_dbxref entry with the passed
1205 seqfeature_id and dbxref_id. If one does not exist, insert new
1206 data
1207 """
1208 # Check for an existing record
1209 sql = (
1210 "SELECT bioentry_id, dbxref_id FROM bioentry_dbxref "
1211 "WHERE bioentry_id = %s AND dbxref_id = %s"
1212 )
1213 result = self.adaptor.execute_and_fetch_col0(sql, (bioentry_id, dbxref_id))
1214 # If there was a record, return without executing anything, else create
1215 # the record and return
1216 if result:
1217 return result
1218 return self._add_bioentry_dbxref(bioentry_id, dbxref_id, rank)
1219
1220 def _add_bioentry_dbxref(self, bioentry_id, dbxref_id, rank):
1221 """Insert a bioentry_dbxref row (PRIVATE).
1222
1223 Returns the seqfeature_id and dbxref_id (PRIVATE).
1224 """
1225 sql = (
1226 "INSERT INTO bioentry_dbxref "
1227 '(bioentry_id,dbxref_id,"rank") VALUES '
1228 "(%s, %s, %s)"
1229 )
1230 self.adaptor.execute(sql, (bioentry_id, dbxref_id, rank))
1231 return (bioentry_id, dbxref_id)
1232
1233
1234 class DatabaseRemover:
1235 """Complement the Loader functionality by fully removing a database.
1236
1237 This probably isn't really useful for normal purposes, since you
1238 can just do a::
1239
1240 DROP DATABASE db_name
1241
1242 and then recreate the database. But, it's really useful for testing
1243 purposes.
1244 """
1245
1246 def __init__(self, adaptor, dbid):
1247 """Initialize with a database id and adaptor connection."""
1248 self.adaptor = adaptor
1249 self.dbid = dbid
1250
1251 def remove(self):
1252 """Remove everything related to the given database id."""
1253 sql = "DELETE FROM bioentry WHERE biodatabase_id = %s"
1254 self.adaptor.execute(sql, (self.dbid,))
1255 sql = "DELETE FROM biodatabase WHERE biodatabase_id = %s"
1256 self.adaptor.execute(sql, (self.dbid,))