Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/BioSQL/BioSeq.py @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 # Copyright 2002 by Andrew Dalke. All rights reserved. | |
2 # Revisions 2007-2016 copyright by Peter Cock. All rights reserved. | |
3 # Revisions 2008-2009 copyright by Cymon J. Cox. All rights reserved. | |
4 # | |
5 # This file is part of the Biopython distribution and governed by your | |
6 # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". | |
7 # Please see the LICENSE file that should have been included as part of this | |
8 # package. | |
9 # | |
10 # Note that BioSQL (including the database schema and scripts) is | |
11 # available and licensed separately. Please consult www.biosql.org | |
12 """Implementations of Biopython-like Seq objects on top of BioSQL. | |
13 | |
14 This allows retrieval of items stored in a BioSQL database using | |
15 a biopython-like SeqRecord and Seq interface. | |
16 | |
17 Note: Currently we do not support recording per-letter-annotations | |
18 (like quality scores) in BioSQL. | |
19 """ | |
20 | |
21 from typing import List, Optional | |
22 | |
23 from Bio.Seq import Seq, SequenceDataAbstractBaseClass | |
24 from Bio.SeqRecord import SeqRecord, _RestrictedDict | |
25 from Bio import SeqFeature | |
26 | |
27 | |
28 class _BioSQLSequenceData(SequenceDataAbstractBaseClass): | |
29 """Retrieves sequence data from a BioSQL database (PRIVATE).""" | |
30 | |
31 __slots__ = ("primary_id", "adaptor", "_length", "start") | |
32 | |
33 def __init__(self, primary_id, adaptor, start=0, length=0): | |
34 """Create a new _BioSQLSequenceData object referring to a BioSQL entry. | |
35 | |
36 You wouldn't normally create a _BioSQLSequenceData object yourself, | |
37 this is done for you when retrieving a DBSeqRecord object from the | |
38 database, which creates a Seq object using a _BioSQLSequenceData | |
39 instance as the data provider. | |
40 """ | |
41 self.primary_id = primary_id | |
42 self.adaptor = adaptor | |
43 self._length = length | |
44 self.start = start | |
45 super().__init__() | |
46 | |
47 def __len__(self): | |
48 """Return the length of the sequence.""" | |
49 return self._length | |
50 | |
51 def __getitem__(self, key): | |
52 """Return a subsequence as a bytes or a _BioSQLSequenceData object.""" | |
53 if isinstance(key, slice): | |
54 start, end, step = key.indices(self._length) | |
55 size = len(range(start, end, step)) | |
56 if size == 0: | |
57 return b"" | |
58 else: | |
59 # Return a single letter as an integer (consistent with bytes) | |
60 i = key | |
61 if i < 0: | |
62 i += self._length | |
63 if i < 0: | |
64 raise IndexError(key) | |
65 elif i >= self._length: | |
66 raise IndexError(key) | |
67 c = self.adaptor.get_subseq_as_string( | |
68 self.primary_id, self.start + i, self.start + i + 1 | |
69 ) | |
70 return ord(c) | |
71 | |
72 if step == 1: | |
73 if start == 0 and size == self._length: | |
74 # Return the full sequence as bytes | |
75 sequence = self.adaptor.get_subseq_as_string( | |
76 self.primary_id, self.start, self.start + self._length | |
77 ) | |
78 return sequence.encode("ASCII") | |
79 else: | |
80 # Return a _BioSQLSequenceData with the start and end adjusted | |
81 return _BioSQLSequenceData( | |
82 self.primary_id, self.adaptor, self.start + start, size | |
83 ) | |
84 else: | |
85 # Will have to extract the sequence because of the stride | |
86 full = self.adaptor.get_subseq_as_string( | |
87 self.primary_id, self.start + start, self.start + end | |
88 ) | |
89 return full[::step].encode("ASCII") | |
90 | |
91 | |
92 def _retrieve_seq_len(adaptor, primary_id): | |
93 # The database schema ensures there will be only one matching row | |
94 seqs = adaptor.execute_and_fetchall( | |
95 "SELECT length FROM biosequence WHERE bioentry_id = %s", (primary_id,) | |
96 ) | |
97 if not seqs: | |
98 return None | |
99 if len(seqs) != 1: | |
100 raise ValueError(f"Expected 1 response, got {len(seqs)}.") | |
101 (given_length,) = seqs[0] | |
102 return int(given_length) | |
103 | |
104 | |
105 def _retrieve_seq(adaptor, primary_id): | |
106 # The database schema ensures there will be only one matching | |
107 # row in the table. | |
108 | |
109 # If an undefined sequence was recorded, seq will be NULL, | |
110 # but length will be populated. This means length(seq) | |
111 # will return None. | |
112 seqs = adaptor.execute_and_fetchall( | |
113 "SELECT alphabet, length, length(seq) FROM biosequence WHERE bioentry_id = %s", | |
114 (primary_id,), | |
115 ) | |
116 if not seqs: | |
117 return | |
118 if len(seqs) != 1: | |
119 raise ValueError(f"Expected 1 response, got {len(seqs)}.") | |
120 moltype, given_length, length = seqs[0] | |
121 | |
122 try: | |
123 length = int(length) | |
124 given_length = int(given_length) | |
125 if length != given_length: | |
126 raise ValueError( | |
127 f"'length' differs from sequence length, {given_length}, {length}" | |
128 ) | |
129 have_seq = True | |
130 except TypeError: | |
131 if length is not None: | |
132 raise ValueError(f"Expected 'length' to be 'None', got {length}.") | |
133 seqs = adaptor.execute_and_fetchall( | |
134 "SELECT alphabet, length, seq FROM biosequence WHERE bioentry_id = %s", | |
135 (primary_id,), | |
136 ) | |
137 if len(seqs) != 1: | |
138 raise ValueError(f"Expected 1 response, got {len(seqs)}.") | |
139 moltype, given_length, seq = seqs[0] | |
140 if seq: | |
141 raise ValueError(f"Expected 'seq' to have a falsy value, got {seq}.") | |
142 length = int(given_length) | |
143 have_seq = False | |
144 del seq | |
145 del given_length | |
146 | |
147 if have_seq: | |
148 data = _BioSQLSequenceData(primary_id, adaptor, start=0, length=length) | |
149 return Seq(data) | |
150 else: | |
151 return Seq(None, length=length) | |
152 | |
153 | |
154 def _retrieve_dbxrefs(adaptor, primary_id): | |
155 """Retrieve the database cross references for the sequence (PRIVATE).""" | |
156 _dbxrefs = [] | |
157 dbxrefs = adaptor.execute_and_fetchall( | |
158 "SELECT dbname, accession, version" | |
159 " FROM bioentry_dbxref join dbxref using (dbxref_id)" | |
160 " WHERE bioentry_id = %s" | |
161 ' ORDER BY "rank"', | |
162 (primary_id,), | |
163 ) | |
164 for dbname, accession, version in dbxrefs: | |
165 if version and version != "0": | |
166 v = f"{accession}.{version}" | |
167 else: | |
168 v = accession | |
169 _dbxrefs.append(f"{dbname}:{v}") | |
170 return _dbxrefs | |
171 | |
172 | |
173 def _retrieve_features(adaptor, primary_id): | |
174 sql = ( | |
175 'SELECT seqfeature_id, type.name, "rank"' | |
176 " FROM seqfeature join term type on (type_term_id = type.term_id)" | |
177 " WHERE bioentry_id = %s" | |
178 ' ORDER BY "rank"' | |
179 ) | |
180 results = adaptor.execute_and_fetchall(sql, (primary_id,)) | |
181 seq_feature_list = [] | |
182 for seqfeature_id, seqfeature_type, seqfeature_rank in results: | |
183 # Get qualifiers [except for db_xref which is stored separately] | |
184 qvs = adaptor.execute_and_fetchall( | |
185 "SELECT name, value" | |
186 " FROM seqfeature_qualifier_value join term using (term_id)" | |
187 " WHERE seqfeature_id = %s" | |
188 ' ORDER BY "rank"', | |
189 (seqfeature_id,), | |
190 ) | |
191 qualifiers = {} | |
192 for qv_name, qv_value in qvs: | |
193 qualifiers.setdefault(qv_name, []).append(qv_value) | |
194 # Get db_xrefs [special case of qualifiers] | |
195 qvs = adaptor.execute_and_fetchall( | |
196 "SELECT dbxref.dbname, dbxref.accession" | |
197 " FROM dbxref join seqfeature_dbxref using (dbxref_id)" | |
198 " WHERE seqfeature_dbxref.seqfeature_id = %s" | |
199 ' ORDER BY "rank"', | |
200 (seqfeature_id,), | |
201 ) | |
202 for qv_name, qv_value in qvs: | |
203 value = f"{qv_name}:{qv_value}" | |
204 qualifiers.setdefault("db_xref", []).append(value) | |
205 # Get locations | |
206 results = adaptor.execute_and_fetchall( | |
207 "SELECT location_id, start_pos, end_pos, strand" | |
208 " FROM location" | |
209 " WHERE seqfeature_id = %s" | |
210 ' ORDER BY "rank"', | |
211 (seqfeature_id,), | |
212 ) | |
213 locations = [] | |
214 # convert to Python standard form | |
215 # Convert strand = 0 to strand = None | |
216 # re: comment in Loader.py: | |
217 # Biopython uses None when we don't know strand information but | |
218 # BioSQL requires something (non null) and sets this as zero | |
219 # So we'll use the strand or 0 if Biopython spits out None | |
220 for location_id, start, end, strand in results: | |
221 if start: | |
222 start -= 1 | |
223 if strand == 0: | |
224 strand = None | |
225 if strand not in (+1, -1, None): | |
226 raise ValueError( | |
227 "Invalid strand %s found in database for " | |
228 "seqfeature_id %s" % (strand, seqfeature_id) | |
229 ) | |
230 if start is not None and end is not None and end < start: | |
231 import warnings | |
232 from Bio import BiopythonWarning | |
233 | |
234 warnings.warn( | |
235 "Inverted location start/end (%i and %i) for " | |
236 "seqfeature_id %s" % (start, end, seqfeature_id), | |
237 BiopythonWarning, | |
238 ) | |
239 | |
240 # For SwissProt unknown positions (?) | |
241 if start is None: | |
242 start = SeqFeature.UnknownPosition() | |
243 if end is None: | |
244 end = SeqFeature.UnknownPosition() | |
245 | |
246 locations.append((location_id, start, end, strand)) | |
247 # Get possible remote reference information | |
248 remote_results = adaptor.execute_and_fetchall( | |
249 "SELECT location_id, dbname, accession, version" | |
250 " FROM location join dbxref using (dbxref_id)" | |
251 " WHERE seqfeature_id = %s", | |
252 (seqfeature_id,), | |
253 ) | |
254 lookup = {} | |
255 for location_id, dbname, accession, version in remote_results: | |
256 if version and version != "0": | |
257 v = f"{accession}.{version}" | |
258 else: | |
259 v = accession | |
260 # subfeature remote location db_ref are stored as a empty string | |
261 # when not present | |
262 if dbname == "": | |
263 dbname = None | |
264 lookup[location_id] = (dbname, v) | |
265 | |
266 feature = SeqFeature.SeqFeature(type=seqfeature_type) | |
267 # Store the key as a private property | |
268 feature._seqfeature_id = seqfeature_id | |
269 feature.qualifiers = qualifiers | |
270 if len(locations) == 0: | |
271 pass | |
272 elif len(locations) == 1: | |
273 location_id, start, end, strand = locations[0] | |
274 # See Bug 2677, we currently don't record the location_operator | |
275 # For consistency with older versions Biopython, default to "". | |
276 feature.location_operator = _retrieve_location_qualifier_value( | |
277 adaptor, location_id | |
278 ) | |
279 dbname, version = lookup.get(location_id, (None, None)) | |
280 feature.location = SeqFeature.SimpleLocation(start, end) | |
281 feature.location.strand = strand | |
282 feature.location.ref_db = dbname | |
283 feature.location.ref = version | |
284 else: | |
285 locs = [] | |
286 for location in locations: | |
287 location_id, start, end, strand = location | |
288 dbname, version = lookup.get(location_id, (None, None)) | |
289 locs.append( | |
290 SeqFeature.SimpleLocation( | |
291 start, end, strand=strand, ref=version, ref_db=dbname | |
292 ) | |
293 ) | |
294 # Locations are typically in biological in order (see negative | |
295 # strands below), but because of remote locations for | |
296 # sub-features they are not necessarily in numerical order: | |
297 strands = {_.strand for _ in locs} | |
298 if len(strands) == 1 and -1 in strands: | |
299 # Evil hack time for backwards compatibility | |
300 # TODO - Check if BioPerl and (old) Biopython did the same, | |
301 # we may have an existing incompatibility lurking here... | |
302 locs = locs[::-1] | |
303 feature.location = SeqFeature.CompoundLocation(locs, "join") | |
304 # TODO - See Bug 2677 - we don't yet record location operator, | |
305 # so for consistency with older versions of Biopython default | |
306 # to assuming its a join. | |
307 seq_feature_list.append(feature) | |
308 return seq_feature_list | |
309 | |
310 | |
311 def _retrieve_location_qualifier_value(adaptor, location_id): | |
312 value = adaptor.execute_and_fetch_col0( | |
313 "SELECT value FROM location_qualifier_value WHERE location_id = %s", | |
314 (location_id,), | |
315 ) | |
316 try: | |
317 return value[0] | |
318 except IndexError: | |
319 return "" | |
320 | |
321 | |
322 def _retrieve_annotations(adaptor, primary_id, taxon_id): | |
323 annotations = {} | |
324 annotations.update(_retrieve_alphabet(adaptor, primary_id)) | |
325 annotations.update(_retrieve_qualifier_value(adaptor, primary_id)) | |
326 annotations.update(_retrieve_reference(adaptor, primary_id)) | |
327 annotations.update(_retrieve_taxon(adaptor, primary_id, taxon_id)) | |
328 annotations.update(_retrieve_comment(adaptor, primary_id)) | |
329 return annotations | |
330 | |
331 | |
332 def _retrieve_alphabet(adaptor, primary_id): | |
333 results = adaptor.execute_and_fetchall( | |
334 "SELECT alphabet FROM biosequence WHERE bioentry_id = %s", (primary_id,) | |
335 ) | |
336 if len(results) != 1: | |
337 raise ValueError(f"Expected 1 response, got {len(results)}.") | |
338 alphabets = results[0] | |
339 if len(alphabets) != 1: | |
340 raise ValueError(f"Expected 1 alphabet in response, got {len(alphabets)}.") | |
341 alphabet = alphabets[0] | |
342 if alphabet == "dna": | |
343 molecule_type = "DNA" | |
344 elif alphabet == "rna": | |
345 molecule_type = "RNA" | |
346 elif alphabet == "protein": | |
347 molecule_type = "protein" | |
348 else: | |
349 molecule_type = None | |
350 if molecule_type is not None: | |
351 return {"molecule_type": molecule_type} | |
352 else: | |
353 return {} | |
354 | |
355 | |
356 def _retrieve_qualifier_value(adaptor, primary_id): | |
357 qvs = adaptor.execute_and_fetchall( | |
358 "SELECT name, value" | |
359 " FROM bioentry_qualifier_value JOIN term USING (term_id)" | |
360 " WHERE bioentry_id = %s" | |
361 ' ORDER BY "rank"', | |
362 (primary_id,), | |
363 ) | |
364 qualifiers = {} | |
365 for name, value in qvs: | |
366 if name == "keyword": | |
367 name = "keywords" | |
368 # See handling of "date" in Loader.py | |
369 elif name == "date_changed": | |
370 name = "date" | |
371 elif name == "secondary_accession": | |
372 name = "accessions" | |
373 qualifiers.setdefault(name, []).append(value) | |
374 return qualifiers | |
375 | |
376 | |
377 def _retrieve_reference(adaptor, primary_id): | |
378 # XXX dbxref_qualifier_value | |
379 | |
380 refs = adaptor.execute_and_fetchall( | |
381 "SELECT start_pos, end_pos, " | |
382 " location, title, authors," | |
383 " dbname, accession" | |
384 " FROM bioentry_reference" | |
385 " JOIN reference USING (reference_id)" | |
386 " LEFT JOIN dbxref USING (dbxref_id)" | |
387 " WHERE bioentry_id = %s" | |
388 ' ORDER BY "rank"', | |
389 (primary_id,), | |
390 ) | |
391 references = [] | |
392 for start, end, location, title, authors, dbname, accession in refs: | |
393 reference = SeqFeature.Reference() | |
394 # If the start/end are missing, reference.location is an empty list | |
395 if (start is not None) or (end is not None): | |
396 if start is not None: | |
397 start -= 1 # python counting | |
398 reference.location = [SeqFeature.SimpleLocation(start, end)] | |
399 # Don't replace the default "" with None. | |
400 if authors: | |
401 reference.authors = authors | |
402 if title: | |
403 reference.title = title | |
404 reference.journal = location | |
405 if dbname == "PUBMED": | |
406 reference.pubmed_id = accession | |
407 elif dbname == "MEDLINE": | |
408 reference.medline_id = accession | |
409 references.append(reference) | |
410 if references: | |
411 return {"references": references} | |
412 else: | |
413 return {} | |
414 | |
415 | |
416 def _retrieve_taxon(adaptor, primary_id, taxon_id): | |
417 a = {} | |
418 common_names = adaptor.execute_and_fetch_col0( | |
419 "SELECT name FROM taxon_name WHERE taxon_id = %s" | |
420 " AND name_class = 'genbank common name'", | |
421 (taxon_id,), | |
422 ) | |
423 if common_names: | |
424 a["source"] = common_names[0] | |
425 scientific_names = adaptor.execute_and_fetch_col0( | |
426 "SELECT name FROM taxon_name WHERE taxon_id = %s" | |
427 " AND name_class = 'scientific name'", | |
428 (taxon_id,), | |
429 ) | |
430 if scientific_names: | |
431 a["organism"] = scientific_names[0] | |
432 ncbi_taxids = adaptor.execute_and_fetch_col0( | |
433 "SELECT ncbi_taxon_id FROM taxon WHERE taxon_id = %s", (taxon_id,) | |
434 ) | |
435 if ncbi_taxids and ncbi_taxids[0] and ncbi_taxids[0] != "0": | |
436 a["ncbi_taxid"] = ncbi_taxids[0] | |
437 | |
438 # Old code used the left/right values in the taxon table to get the | |
439 # taxonomy lineage in one SQL command. This was actually very slow, | |
440 # and would fail if the (optional) left/right values were missing. | |
441 # | |
442 # The following code is based on a contribution from Eric Gibert, and | |
443 # relies on the taxon table's parent_taxon_id field only (ignoring the | |
444 # optional left/right values). This means that it has to make a | |
445 # separate SQL query for each entry in the lineage, but it does still | |
446 # appear to be *much* faster. See Bug 2494. | |
447 taxonomy = [] | |
448 while taxon_id: | |
449 name, rank, parent_taxon_id = adaptor.execute_one( | |
450 "SELECT taxon_name.name, taxon.node_rank, taxon.parent_taxon_id" | |
451 " FROM taxon, taxon_name" | |
452 " WHERE taxon.taxon_id=taxon_name.taxon_id" | |
453 " AND taxon_name.name_class='scientific name'" | |
454 " AND taxon.taxon_id = %s", | |
455 (taxon_id,), | |
456 ) | |
457 if taxon_id == parent_taxon_id: | |
458 # If the taxon table has been populated by the BioSQL script | |
459 # load_ncbi_taxonomy.pl this is how top parent nodes are stored. | |
460 # Personally, I would have used a NULL parent_taxon_id here. | |
461 break | |
462 | |
463 taxonomy.insert(0, name) | |
464 taxon_id = parent_taxon_id | |
465 | |
466 if taxonomy: | |
467 a["taxonomy"] = taxonomy | |
468 return a | |
469 | |
470 | |
471 def _retrieve_comment(adaptor, primary_id): | |
472 qvs = adaptor.execute_and_fetchall( | |
473 'SELECT comment_text FROM comment WHERE bioentry_id=%s ORDER BY "rank"', | |
474 (primary_id,), | |
475 ) | |
476 comments = [comm[0] for comm in qvs] | |
477 # Don't want to add an empty list... | |
478 if comments: | |
479 return {"comment": comments} | |
480 else: | |
481 return {} | |
482 | |
483 | |
484 class DBSeqRecord(SeqRecord): | |
485 """BioSQL equivalent of the Biopython SeqRecord object.""" | |
486 | |
487 def __init__(self, adaptor, primary_id): | |
488 """Create a DBSeqRecord object. | |
489 | |
490 Arguments: | |
491 - adaptor - A BioSQL.BioSeqDatabase.Adaptor object | |
492 - primary_id - An internal integer ID used by BioSQL | |
493 | |
494 You wouldn't normally create a DBSeqRecord object yourself, | |
495 this is done for you when using a BioSeqDatabase object | |
496 """ | |
497 self._adaptor = adaptor | |
498 self._primary_id = primary_id | |
499 | |
500 ( | |
501 self._biodatabase_id, | |
502 self._taxon_id, | |
503 self.name, | |
504 accession, | |
505 version, | |
506 self._identifier, | |
507 self._division, | |
508 self.description, | |
509 ) = self._adaptor.execute_one( | |
510 "SELECT biodatabase_id, taxon_id, name, accession, version," | |
511 " identifier, division, description" | |
512 " FROM bioentry" | |
513 " WHERE bioentry_id = %s", | |
514 (self._primary_id,), | |
515 ) | |
516 if version and version != "0": | |
517 self.id = f"{accession}.{version}" | |
518 else: | |
519 self.id = accession | |
520 # We don't yet record any per-letter-annotations in the | |
521 # BioSQL database, but we should set this property up | |
522 # for completeness (and the __str__ method). | |
523 # We do NOT want to load the sequence from the DB here! | |
524 length = _retrieve_seq_len(adaptor, primary_id) | |
525 self._per_letter_annotations = _RestrictedDict(length=length) | |
526 | |
527 def __get_seq(self): | |
528 if not hasattr(self, "_seq"): | |
529 self._seq = _retrieve_seq(self._adaptor, self._primary_id) | |
530 return self._seq | |
531 | |
532 def __set_seq(self, seq): | |
533 # TODO - Check consistent with self._per_letter_annotations | |
534 self._seq = seq | |
535 | |
536 def __del_seq(self): | |
537 del self._seq | |
538 | |
539 seq = property(__get_seq, __set_seq, __del_seq, "Seq object") | |
540 | |
541 @property | |
542 def dbxrefs(self) -> List[str]: | |
543 """Database cross references.""" | |
544 if not hasattr(self, "_dbxrefs"): | |
545 self._dbxrefs = _retrieve_dbxrefs(self._adaptor, self._primary_id) | |
546 return self._dbxrefs | |
547 | |
548 @dbxrefs.setter | |
549 def dbxrefs(self, value: List[str]) -> None: | |
550 self._dbxrefs = value | |
551 | |
552 @dbxrefs.deleter | |
553 def dbxrefs(self) -> None: | |
554 del self._dbxrefs | |
555 | |
556 def __get_features(self): | |
557 if not hasattr(self, "_features"): | |
558 self._features = _retrieve_features(self._adaptor, self._primary_id) | |
559 return self._features | |
560 | |
561 def __set_features(self, features): | |
562 self._features = features | |
563 | |
564 def __del_features(self): | |
565 del self._features | |
566 | |
567 features = property(__get_features, __set_features, __del_features, "Features") | |
568 | |
569 @property | |
570 def annotations(self) -> SeqRecord._AnnotationsDict: | |
571 """Annotations.""" | |
572 if not hasattr(self, "_annotations"): | |
573 self._annotations = _retrieve_annotations( | |
574 self._adaptor, self._primary_id, self._taxon_id | |
575 ) | |
576 if self._identifier: | |
577 self._annotations["gi"] = self._identifier | |
578 if self._division: | |
579 self._annotations["data_file_division"] = self._division | |
580 return self._annotations | |
581 | |
582 @annotations.setter | |
583 def annotations(self, value: Optional[SeqRecord._AnnotationsDict]) -> None: | |
584 if value: | |
585 self._annotations = value | |
586 else: | |
587 self._annotations = {} | |
588 | |
589 @annotations.deleter | |
590 def annotations(self) -> None: | |
591 del self._annotations |