comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/BioSQL/BioSeq.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 69:33d812a61356
1 # Copyright 2002 by Andrew Dalke. All rights reserved.
2 # Revisions 2007-2016 copyright by Peter Cock. All rights reserved.
3 # Revisions 2008-2009 copyright by Cymon J. Cox. All rights reserved.
4 #
5 # This file is part of the Biopython distribution and governed by your
6 # choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
7 # Please see the LICENSE file that should have been included as part of this
8 # package.
9 #
10 # Note that BioSQL (including the database schema and scripts) is
11 # available and licensed separately. Please consult www.biosql.org
12 """Implementations of Biopython-like Seq objects on top of BioSQL.
13
14 This allows retrieval of items stored in a BioSQL database using
15 a biopython-like SeqRecord and Seq interface.
16
17 Note: Currently we do not support recording per-letter-annotations
18 (like quality scores) in BioSQL.
19 """
20
21 from typing import List, Optional
22
23 from Bio.Seq import Seq, SequenceDataAbstractBaseClass
24 from Bio.SeqRecord import SeqRecord, _RestrictedDict
25 from Bio import SeqFeature
26
27
28 class _BioSQLSequenceData(SequenceDataAbstractBaseClass):
29 """Retrieves sequence data from a BioSQL database (PRIVATE)."""
30
31 __slots__ = ("primary_id", "adaptor", "_length", "start")
32
33 def __init__(self, primary_id, adaptor, start=0, length=0):
34 """Create a new _BioSQLSequenceData object referring to a BioSQL entry.
35
36 You wouldn't normally create a _BioSQLSequenceData object yourself,
37 this is done for you when retrieving a DBSeqRecord object from the
38 database, which creates a Seq object using a _BioSQLSequenceData
39 instance as the data provider.
40 """
41 self.primary_id = primary_id
42 self.adaptor = adaptor
43 self._length = length
44 self.start = start
45 super().__init__()
46
47 def __len__(self):
48 """Return the length of the sequence."""
49 return self._length
50
51 def __getitem__(self, key):
52 """Return a subsequence as a bytes or a _BioSQLSequenceData object."""
53 if isinstance(key, slice):
54 start, end, step = key.indices(self._length)
55 size = len(range(start, end, step))
56 if size == 0:
57 return b""
58 else:
59 # Return a single letter as an integer (consistent with bytes)
60 i = key
61 if i < 0:
62 i += self._length
63 if i < 0:
64 raise IndexError(key)
65 elif i >= self._length:
66 raise IndexError(key)
67 c = self.adaptor.get_subseq_as_string(
68 self.primary_id, self.start + i, self.start + i + 1
69 )
70 return ord(c)
71
72 if step == 1:
73 if start == 0 and size == self._length:
74 # Return the full sequence as bytes
75 sequence = self.adaptor.get_subseq_as_string(
76 self.primary_id, self.start, self.start + self._length
77 )
78 return sequence.encode("ASCII")
79 else:
80 # Return a _BioSQLSequenceData with the start and end adjusted
81 return _BioSQLSequenceData(
82 self.primary_id, self.adaptor, self.start + start, size
83 )
84 else:
85 # Will have to extract the sequence because of the stride
86 full = self.adaptor.get_subseq_as_string(
87 self.primary_id, self.start + start, self.start + end
88 )
89 return full[::step].encode("ASCII")
90
91
92 def _retrieve_seq_len(adaptor, primary_id):
93 # The database schema ensures there will be only one matching row
94 seqs = adaptor.execute_and_fetchall(
95 "SELECT length FROM biosequence WHERE bioentry_id = %s", (primary_id,)
96 )
97 if not seqs:
98 return None
99 if len(seqs) != 1:
100 raise ValueError(f"Expected 1 response, got {len(seqs)}.")
101 (given_length,) = seqs[0]
102 return int(given_length)
103
104
105 def _retrieve_seq(adaptor, primary_id):
106 # The database schema ensures there will be only one matching
107 # row in the table.
108
109 # If an undefined sequence was recorded, seq will be NULL,
110 # but length will be populated. This means length(seq)
111 # will return None.
112 seqs = adaptor.execute_and_fetchall(
113 "SELECT alphabet, length, length(seq) FROM biosequence WHERE bioentry_id = %s",
114 (primary_id,),
115 )
116 if not seqs:
117 return
118 if len(seqs) != 1:
119 raise ValueError(f"Expected 1 response, got {len(seqs)}.")
120 moltype, given_length, length = seqs[0]
121
122 try:
123 length = int(length)
124 given_length = int(given_length)
125 if length != given_length:
126 raise ValueError(
127 f"'length' differs from sequence length, {given_length}, {length}"
128 )
129 have_seq = True
130 except TypeError:
131 if length is not None:
132 raise ValueError(f"Expected 'length' to be 'None', got {length}.")
133 seqs = adaptor.execute_and_fetchall(
134 "SELECT alphabet, length, seq FROM biosequence WHERE bioentry_id = %s",
135 (primary_id,),
136 )
137 if len(seqs) != 1:
138 raise ValueError(f"Expected 1 response, got {len(seqs)}.")
139 moltype, given_length, seq = seqs[0]
140 if seq:
141 raise ValueError(f"Expected 'seq' to have a falsy value, got {seq}.")
142 length = int(given_length)
143 have_seq = False
144 del seq
145 del given_length
146
147 if have_seq:
148 data = _BioSQLSequenceData(primary_id, adaptor, start=0, length=length)
149 return Seq(data)
150 else:
151 return Seq(None, length=length)
152
153
154 def _retrieve_dbxrefs(adaptor, primary_id):
155 """Retrieve the database cross references for the sequence (PRIVATE)."""
156 _dbxrefs = []
157 dbxrefs = adaptor.execute_and_fetchall(
158 "SELECT dbname, accession, version"
159 " FROM bioentry_dbxref join dbxref using (dbxref_id)"
160 " WHERE bioentry_id = %s"
161 ' ORDER BY "rank"',
162 (primary_id,),
163 )
164 for dbname, accession, version in dbxrefs:
165 if version and version != "0":
166 v = f"{accession}.{version}"
167 else:
168 v = accession
169 _dbxrefs.append(f"{dbname}:{v}")
170 return _dbxrefs
171
172
173 def _retrieve_features(adaptor, primary_id):
174 sql = (
175 'SELECT seqfeature_id, type.name, "rank"'
176 " FROM seqfeature join term type on (type_term_id = type.term_id)"
177 " WHERE bioentry_id = %s"
178 ' ORDER BY "rank"'
179 )
180 results = adaptor.execute_and_fetchall(sql, (primary_id,))
181 seq_feature_list = []
182 for seqfeature_id, seqfeature_type, seqfeature_rank in results:
183 # Get qualifiers [except for db_xref which is stored separately]
184 qvs = adaptor.execute_and_fetchall(
185 "SELECT name, value"
186 " FROM seqfeature_qualifier_value join term using (term_id)"
187 " WHERE seqfeature_id = %s"
188 ' ORDER BY "rank"',
189 (seqfeature_id,),
190 )
191 qualifiers = {}
192 for qv_name, qv_value in qvs:
193 qualifiers.setdefault(qv_name, []).append(qv_value)
194 # Get db_xrefs [special case of qualifiers]
195 qvs = adaptor.execute_and_fetchall(
196 "SELECT dbxref.dbname, dbxref.accession"
197 " FROM dbxref join seqfeature_dbxref using (dbxref_id)"
198 " WHERE seqfeature_dbxref.seqfeature_id = %s"
199 ' ORDER BY "rank"',
200 (seqfeature_id,),
201 )
202 for qv_name, qv_value in qvs:
203 value = f"{qv_name}:{qv_value}"
204 qualifiers.setdefault("db_xref", []).append(value)
205 # Get locations
206 results = adaptor.execute_and_fetchall(
207 "SELECT location_id, start_pos, end_pos, strand"
208 " FROM location"
209 " WHERE seqfeature_id = %s"
210 ' ORDER BY "rank"',
211 (seqfeature_id,),
212 )
213 locations = []
214 # convert to Python standard form
215 # Convert strand = 0 to strand = None
216 # re: comment in Loader.py:
217 # Biopython uses None when we don't know strand information but
218 # BioSQL requires something (non null) and sets this as zero
219 # So we'll use the strand or 0 if Biopython spits out None
220 for location_id, start, end, strand in results:
221 if start:
222 start -= 1
223 if strand == 0:
224 strand = None
225 if strand not in (+1, -1, None):
226 raise ValueError(
227 "Invalid strand %s found in database for "
228 "seqfeature_id %s" % (strand, seqfeature_id)
229 )
230 if start is not None and end is not None and end < start:
231 import warnings
232 from Bio import BiopythonWarning
233
234 warnings.warn(
235 "Inverted location start/end (%i and %i) for "
236 "seqfeature_id %s" % (start, end, seqfeature_id),
237 BiopythonWarning,
238 )
239
240 # For SwissProt unknown positions (?)
241 if start is None:
242 start = SeqFeature.UnknownPosition()
243 if end is None:
244 end = SeqFeature.UnknownPosition()
245
246 locations.append((location_id, start, end, strand))
247 # Get possible remote reference information
248 remote_results = adaptor.execute_and_fetchall(
249 "SELECT location_id, dbname, accession, version"
250 " FROM location join dbxref using (dbxref_id)"
251 " WHERE seqfeature_id = %s",
252 (seqfeature_id,),
253 )
254 lookup = {}
255 for location_id, dbname, accession, version in remote_results:
256 if version and version != "0":
257 v = f"{accession}.{version}"
258 else:
259 v = accession
260 # subfeature remote location db_ref are stored as a empty string
261 # when not present
262 if dbname == "":
263 dbname = None
264 lookup[location_id] = (dbname, v)
265
266 feature = SeqFeature.SeqFeature(type=seqfeature_type)
267 # Store the key as a private property
268 feature._seqfeature_id = seqfeature_id
269 feature.qualifiers = qualifiers
270 if len(locations) == 0:
271 pass
272 elif len(locations) == 1:
273 location_id, start, end, strand = locations[0]
274 # See Bug 2677, we currently don't record the location_operator
275 # For consistency with older versions Biopython, default to "".
276 feature.location_operator = _retrieve_location_qualifier_value(
277 adaptor, location_id
278 )
279 dbname, version = lookup.get(location_id, (None, None))
280 feature.location = SeqFeature.SimpleLocation(start, end)
281 feature.location.strand = strand
282 feature.location.ref_db = dbname
283 feature.location.ref = version
284 else:
285 locs = []
286 for location in locations:
287 location_id, start, end, strand = location
288 dbname, version = lookup.get(location_id, (None, None))
289 locs.append(
290 SeqFeature.SimpleLocation(
291 start, end, strand=strand, ref=version, ref_db=dbname
292 )
293 )
294 # Locations are typically in biological in order (see negative
295 # strands below), but because of remote locations for
296 # sub-features they are not necessarily in numerical order:
297 strands = {_.strand for _ in locs}
298 if len(strands) == 1 and -1 in strands:
299 # Evil hack time for backwards compatibility
300 # TODO - Check if BioPerl and (old) Biopython did the same,
301 # we may have an existing incompatibility lurking here...
302 locs = locs[::-1]
303 feature.location = SeqFeature.CompoundLocation(locs, "join")
304 # TODO - See Bug 2677 - we don't yet record location operator,
305 # so for consistency with older versions of Biopython default
306 # to assuming its a join.
307 seq_feature_list.append(feature)
308 return seq_feature_list
309
310
311 def _retrieve_location_qualifier_value(adaptor, location_id):
312 value = adaptor.execute_and_fetch_col0(
313 "SELECT value FROM location_qualifier_value WHERE location_id = %s",
314 (location_id,),
315 )
316 try:
317 return value[0]
318 except IndexError:
319 return ""
320
321
322 def _retrieve_annotations(adaptor, primary_id, taxon_id):
323 annotations = {}
324 annotations.update(_retrieve_alphabet(adaptor, primary_id))
325 annotations.update(_retrieve_qualifier_value(adaptor, primary_id))
326 annotations.update(_retrieve_reference(adaptor, primary_id))
327 annotations.update(_retrieve_taxon(adaptor, primary_id, taxon_id))
328 annotations.update(_retrieve_comment(adaptor, primary_id))
329 return annotations
330
331
332 def _retrieve_alphabet(adaptor, primary_id):
333 results = adaptor.execute_and_fetchall(
334 "SELECT alphabet FROM biosequence WHERE bioentry_id = %s", (primary_id,)
335 )
336 if len(results) != 1:
337 raise ValueError(f"Expected 1 response, got {len(results)}.")
338 alphabets = results[0]
339 if len(alphabets) != 1:
340 raise ValueError(f"Expected 1 alphabet in response, got {len(alphabets)}.")
341 alphabet = alphabets[0]
342 if alphabet == "dna":
343 molecule_type = "DNA"
344 elif alphabet == "rna":
345 molecule_type = "RNA"
346 elif alphabet == "protein":
347 molecule_type = "protein"
348 else:
349 molecule_type = None
350 if molecule_type is not None:
351 return {"molecule_type": molecule_type}
352 else:
353 return {}
354
355
356 def _retrieve_qualifier_value(adaptor, primary_id):
357 qvs = adaptor.execute_and_fetchall(
358 "SELECT name, value"
359 " FROM bioentry_qualifier_value JOIN term USING (term_id)"
360 " WHERE bioentry_id = %s"
361 ' ORDER BY "rank"',
362 (primary_id,),
363 )
364 qualifiers = {}
365 for name, value in qvs:
366 if name == "keyword":
367 name = "keywords"
368 # See handling of "date" in Loader.py
369 elif name == "date_changed":
370 name = "date"
371 elif name == "secondary_accession":
372 name = "accessions"
373 qualifiers.setdefault(name, []).append(value)
374 return qualifiers
375
376
377 def _retrieve_reference(adaptor, primary_id):
378 # XXX dbxref_qualifier_value
379
380 refs = adaptor.execute_and_fetchall(
381 "SELECT start_pos, end_pos, "
382 " location, title, authors,"
383 " dbname, accession"
384 " FROM bioentry_reference"
385 " JOIN reference USING (reference_id)"
386 " LEFT JOIN dbxref USING (dbxref_id)"
387 " WHERE bioentry_id = %s"
388 ' ORDER BY "rank"',
389 (primary_id,),
390 )
391 references = []
392 for start, end, location, title, authors, dbname, accession in refs:
393 reference = SeqFeature.Reference()
394 # If the start/end are missing, reference.location is an empty list
395 if (start is not None) or (end is not None):
396 if start is not None:
397 start -= 1 # python counting
398 reference.location = [SeqFeature.SimpleLocation(start, end)]
399 # Don't replace the default "" with None.
400 if authors:
401 reference.authors = authors
402 if title:
403 reference.title = title
404 reference.journal = location
405 if dbname == "PUBMED":
406 reference.pubmed_id = accession
407 elif dbname == "MEDLINE":
408 reference.medline_id = accession
409 references.append(reference)
410 if references:
411 return {"references": references}
412 else:
413 return {}
414
415
416 def _retrieve_taxon(adaptor, primary_id, taxon_id):
417 a = {}
418 common_names = adaptor.execute_and_fetch_col0(
419 "SELECT name FROM taxon_name WHERE taxon_id = %s"
420 " AND name_class = 'genbank common name'",
421 (taxon_id,),
422 )
423 if common_names:
424 a["source"] = common_names[0]
425 scientific_names = adaptor.execute_and_fetch_col0(
426 "SELECT name FROM taxon_name WHERE taxon_id = %s"
427 " AND name_class = 'scientific name'",
428 (taxon_id,),
429 )
430 if scientific_names:
431 a["organism"] = scientific_names[0]
432 ncbi_taxids = adaptor.execute_and_fetch_col0(
433 "SELECT ncbi_taxon_id FROM taxon WHERE taxon_id = %s", (taxon_id,)
434 )
435 if ncbi_taxids and ncbi_taxids[0] and ncbi_taxids[0] != "0":
436 a["ncbi_taxid"] = ncbi_taxids[0]
437
438 # Old code used the left/right values in the taxon table to get the
439 # taxonomy lineage in one SQL command. This was actually very slow,
440 # and would fail if the (optional) left/right values were missing.
441 #
442 # The following code is based on a contribution from Eric Gibert, and
443 # relies on the taxon table's parent_taxon_id field only (ignoring the
444 # optional left/right values). This means that it has to make a
445 # separate SQL query for each entry in the lineage, but it does still
446 # appear to be *much* faster. See Bug 2494.
447 taxonomy = []
448 while taxon_id:
449 name, rank, parent_taxon_id = adaptor.execute_one(
450 "SELECT taxon_name.name, taxon.node_rank, taxon.parent_taxon_id"
451 " FROM taxon, taxon_name"
452 " WHERE taxon.taxon_id=taxon_name.taxon_id"
453 " AND taxon_name.name_class='scientific name'"
454 " AND taxon.taxon_id = %s",
455 (taxon_id,),
456 )
457 if taxon_id == parent_taxon_id:
458 # If the taxon table has been populated by the BioSQL script
459 # load_ncbi_taxonomy.pl this is how top parent nodes are stored.
460 # Personally, I would have used a NULL parent_taxon_id here.
461 break
462
463 taxonomy.insert(0, name)
464 taxon_id = parent_taxon_id
465
466 if taxonomy:
467 a["taxonomy"] = taxonomy
468 return a
469
470
471 def _retrieve_comment(adaptor, primary_id):
472 qvs = adaptor.execute_and_fetchall(
473 'SELECT comment_text FROM comment WHERE bioentry_id=%s ORDER BY "rank"',
474 (primary_id,),
475 )
476 comments = [comm[0] for comm in qvs]
477 # Don't want to add an empty list...
478 if comments:
479 return {"comment": comments}
480 else:
481 return {}
482
483
484 class DBSeqRecord(SeqRecord):
485 """BioSQL equivalent of the Biopython SeqRecord object."""
486
487 def __init__(self, adaptor, primary_id):
488 """Create a DBSeqRecord object.
489
490 Arguments:
491 - adaptor - A BioSQL.BioSeqDatabase.Adaptor object
492 - primary_id - An internal integer ID used by BioSQL
493
494 You wouldn't normally create a DBSeqRecord object yourself,
495 this is done for you when using a BioSeqDatabase object
496 """
497 self._adaptor = adaptor
498 self._primary_id = primary_id
499
500 (
501 self._biodatabase_id,
502 self._taxon_id,
503 self.name,
504 accession,
505 version,
506 self._identifier,
507 self._division,
508 self.description,
509 ) = self._adaptor.execute_one(
510 "SELECT biodatabase_id, taxon_id, name, accession, version,"
511 " identifier, division, description"
512 " FROM bioentry"
513 " WHERE bioentry_id = %s",
514 (self._primary_id,),
515 )
516 if version and version != "0":
517 self.id = f"{accession}.{version}"
518 else:
519 self.id = accession
520 # We don't yet record any per-letter-annotations in the
521 # BioSQL database, but we should set this property up
522 # for completeness (and the __str__ method).
523 # We do NOT want to load the sequence from the DB here!
524 length = _retrieve_seq_len(adaptor, primary_id)
525 self._per_letter_annotations = _RestrictedDict(length=length)
526
527 def __get_seq(self):
528 if not hasattr(self, "_seq"):
529 self._seq = _retrieve_seq(self._adaptor, self._primary_id)
530 return self._seq
531
532 def __set_seq(self, seq):
533 # TODO - Check consistent with self._per_letter_annotations
534 self._seq = seq
535
536 def __del_seq(self):
537 del self._seq
538
539 seq = property(__get_seq, __set_seq, __del_seq, "Seq object")
540
541 @property
542 def dbxrefs(self) -> List[str]:
543 """Database cross references."""
544 if not hasattr(self, "_dbxrefs"):
545 self._dbxrefs = _retrieve_dbxrefs(self._adaptor, self._primary_id)
546 return self._dbxrefs
547
548 @dbxrefs.setter
549 def dbxrefs(self, value: List[str]) -> None:
550 self._dbxrefs = value
551
552 @dbxrefs.deleter
553 def dbxrefs(self) -> None:
554 del self._dbxrefs
555
556 def __get_features(self):
557 if not hasattr(self, "_features"):
558 self._features = _retrieve_features(self._adaptor, self._primary_id)
559 return self._features
560
561 def __set_features(self, features):
562 self._features = features
563
564 def __del_features(self):
565 del self._features
566
567 features = property(__get_features, __set_features, __del_features, "Features")
568
569 @property
570 def annotations(self) -> SeqRecord._AnnotationsDict:
571 """Annotations."""
572 if not hasattr(self, "_annotations"):
573 self._annotations = _retrieve_annotations(
574 self._adaptor, self._primary_id, self._taxon_id
575 )
576 if self._identifier:
577 self._annotations["gi"] = self._identifier
578 if self._division:
579 self._annotations["data_file_division"] = self._division
580 return self._annotations
581
582 @annotations.setter
583 def annotations(self, value: Optional[SeqRecord._AnnotationsDict]) -> None:
584 if value:
585 self._annotations = value
586 else:
587 self._annotations = {}
588
589 @annotations.deleter
590 def annotations(self) -> None:
591 del self._annotations