jpayne@69
|
1 # Copyright 2002 by Andrew Dalke. All rights reserved.
|
jpayne@69
|
2 # Revisions 2007-2016 copyright by Peter Cock. All rights reserved.
|
jpayne@69
|
3 # Revisions 2008-2009 copyright by Cymon J. Cox. All rights reserved.
|
jpayne@69
|
4 #
|
jpayne@69
|
5 # This file is part of the Biopython distribution and governed by your
|
jpayne@69
|
6 # choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
|
jpayne@69
|
7 # Please see the LICENSE file that should have been included as part of this
|
jpayne@69
|
8 # package.
|
jpayne@69
|
9 #
|
jpayne@69
|
10 # Note that BioSQL (including the database schema and scripts) is
|
jpayne@69
|
11 # available and licensed separately. Please consult www.biosql.org
|
jpayne@69
|
12 """Implementations of Biopython-like Seq objects on top of BioSQL.
|
jpayne@69
|
13
|
jpayne@69
|
14 This allows retrieval of items stored in a BioSQL database using
|
jpayne@69
|
15 a biopython-like SeqRecord and Seq interface.
|
jpayne@69
|
16
|
jpayne@69
|
17 Note: Currently we do not support recording per-letter-annotations
|
jpayne@69
|
18 (like quality scores) in BioSQL.
|
jpayne@69
|
19 """
|
jpayne@69
|
20
|
jpayne@69
|
21 from typing import List, Optional
|
jpayne@69
|
22
|
jpayne@69
|
23 from Bio.Seq import Seq, SequenceDataAbstractBaseClass
|
jpayne@69
|
24 from Bio.SeqRecord import SeqRecord, _RestrictedDict
|
jpayne@69
|
25 from Bio import SeqFeature
|
jpayne@69
|
26
|
jpayne@69
|
27
|
jpayne@69
|
28 class _BioSQLSequenceData(SequenceDataAbstractBaseClass):
|
jpayne@69
|
29 """Retrieves sequence data from a BioSQL database (PRIVATE)."""
|
jpayne@69
|
30
|
jpayne@69
|
31 __slots__ = ("primary_id", "adaptor", "_length", "start")
|
jpayne@69
|
32
|
jpayne@69
|
33 def __init__(self, primary_id, adaptor, start=0, length=0):
|
jpayne@69
|
34 """Create a new _BioSQLSequenceData object referring to a BioSQL entry.
|
jpayne@69
|
35
|
jpayne@69
|
36 You wouldn't normally create a _BioSQLSequenceData object yourself,
|
jpayne@69
|
37 this is done for you when retrieving a DBSeqRecord object from the
|
jpayne@69
|
38 database, which creates a Seq object using a _BioSQLSequenceData
|
jpayne@69
|
39 instance as the data provider.
|
jpayne@69
|
40 """
|
jpayne@69
|
41 self.primary_id = primary_id
|
jpayne@69
|
42 self.adaptor = adaptor
|
jpayne@69
|
43 self._length = length
|
jpayne@69
|
44 self.start = start
|
jpayne@69
|
45 super().__init__()
|
jpayne@69
|
46
|
jpayne@69
|
47 def __len__(self):
|
jpayne@69
|
48 """Return the length of the sequence."""
|
jpayne@69
|
49 return self._length
|
jpayne@69
|
50
|
jpayne@69
|
51 def __getitem__(self, key):
|
jpayne@69
|
52 """Return a subsequence as a bytes or a _BioSQLSequenceData object."""
|
jpayne@69
|
53 if isinstance(key, slice):
|
jpayne@69
|
54 start, end, step = key.indices(self._length)
|
jpayne@69
|
55 size = len(range(start, end, step))
|
jpayne@69
|
56 if size == 0:
|
jpayne@69
|
57 return b""
|
jpayne@69
|
58 else:
|
jpayne@69
|
59 # Return a single letter as an integer (consistent with bytes)
|
jpayne@69
|
60 i = key
|
jpayne@69
|
61 if i < 0:
|
jpayne@69
|
62 i += self._length
|
jpayne@69
|
63 if i < 0:
|
jpayne@69
|
64 raise IndexError(key)
|
jpayne@69
|
65 elif i >= self._length:
|
jpayne@69
|
66 raise IndexError(key)
|
jpayne@69
|
67 c = self.adaptor.get_subseq_as_string(
|
jpayne@69
|
68 self.primary_id, self.start + i, self.start + i + 1
|
jpayne@69
|
69 )
|
jpayne@69
|
70 return ord(c)
|
jpayne@69
|
71
|
jpayne@69
|
72 if step == 1:
|
jpayne@69
|
73 if start == 0 and size == self._length:
|
jpayne@69
|
74 # Return the full sequence as bytes
|
jpayne@69
|
75 sequence = self.adaptor.get_subseq_as_string(
|
jpayne@69
|
76 self.primary_id, self.start, self.start + self._length
|
jpayne@69
|
77 )
|
jpayne@69
|
78 return sequence.encode("ASCII")
|
jpayne@69
|
79 else:
|
jpayne@69
|
80 # Return a _BioSQLSequenceData with the start and end adjusted
|
jpayne@69
|
81 return _BioSQLSequenceData(
|
jpayne@69
|
82 self.primary_id, self.adaptor, self.start + start, size
|
jpayne@69
|
83 )
|
jpayne@69
|
84 else:
|
jpayne@69
|
85 # Will have to extract the sequence because of the stride
|
jpayne@69
|
86 full = self.adaptor.get_subseq_as_string(
|
jpayne@69
|
87 self.primary_id, self.start + start, self.start + end
|
jpayne@69
|
88 )
|
jpayne@69
|
89 return full[::step].encode("ASCII")
|
jpayne@69
|
90
|
jpayne@69
|
91
|
jpayne@69
|
92 def _retrieve_seq_len(adaptor, primary_id):
|
jpayne@69
|
93 # The database schema ensures there will be only one matching row
|
jpayne@69
|
94 seqs = adaptor.execute_and_fetchall(
|
jpayne@69
|
95 "SELECT length FROM biosequence WHERE bioentry_id = %s", (primary_id,)
|
jpayne@69
|
96 )
|
jpayne@69
|
97 if not seqs:
|
jpayne@69
|
98 return None
|
jpayne@69
|
99 if len(seqs) != 1:
|
jpayne@69
|
100 raise ValueError(f"Expected 1 response, got {len(seqs)}.")
|
jpayne@69
|
101 (given_length,) = seqs[0]
|
jpayne@69
|
102 return int(given_length)
|
jpayne@69
|
103
|
jpayne@69
|
104
|
jpayne@69
|
105 def _retrieve_seq(adaptor, primary_id):
|
jpayne@69
|
106 # The database schema ensures there will be only one matching
|
jpayne@69
|
107 # row in the table.
|
jpayne@69
|
108
|
jpayne@69
|
109 # If an undefined sequence was recorded, seq will be NULL,
|
jpayne@69
|
110 # but length will be populated. This means length(seq)
|
jpayne@69
|
111 # will return None.
|
jpayne@69
|
112 seqs = adaptor.execute_and_fetchall(
|
jpayne@69
|
113 "SELECT alphabet, length, length(seq) FROM biosequence WHERE bioentry_id = %s",
|
jpayne@69
|
114 (primary_id,),
|
jpayne@69
|
115 )
|
jpayne@69
|
116 if not seqs:
|
jpayne@69
|
117 return
|
jpayne@69
|
118 if len(seqs) != 1:
|
jpayne@69
|
119 raise ValueError(f"Expected 1 response, got {len(seqs)}.")
|
jpayne@69
|
120 moltype, given_length, length = seqs[0]
|
jpayne@69
|
121
|
jpayne@69
|
122 try:
|
jpayne@69
|
123 length = int(length)
|
jpayne@69
|
124 given_length = int(given_length)
|
jpayne@69
|
125 if length != given_length:
|
jpayne@69
|
126 raise ValueError(
|
jpayne@69
|
127 f"'length' differs from sequence length, {given_length}, {length}"
|
jpayne@69
|
128 )
|
jpayne@69
|
129 have_seq = True
|
jpayne@69
|
130 except TypeError:
|
jpayne@69
|
131 if length is not None:
|
jpayne@69
|
132 raise ValueError(f"Expected 'length' to be 'None', got {length}.")
|
jpayne@69
|
133 seqs = adaptor.execute_and_fetchall(
|
jpayne@69
|
134 "SELECT alphabet, length, seq FROM biosequence WHERE bioentry_id = %s",
|
jpayne@69
|
135 (primary_id,),
|
jpayne@69
|
136 )
|
jpayne@69
|
137 if len(seqs) != 1:
|
jpayne@69
|
138 raise ValueError(f"Expected 1 response, got {len(seqs)}.")
|
jpayne@69
|
139 moltype, given_length, seq = seqs[0]
|
jpayne@69
|
140 if seq:
|
jpayne@69
|
141 raise ValueError(f"Expected 'seq' to have a falsy value, got {seq}.")
|
jpayne@69
|
142 length = int(given_length)
|
jpayne@69
|
143 have_seq = False
|
jpayne@69
|
144 del seq
|
jpayne@69
|
145 del given_length
|
jpayne@69
|
146
|
jpayne@69
|
147 if have_seq:
|
jpayne@69
|
148 data = _BioSQLSequenceData(primary_id, adaptor, start=0, length=length)
|
jpayne@69
|
149 return Seq(data)
|
jpayne@69
|
150 else:
|
jpayne@69
|
151 return Seq(None, length=length)
|
jpayne@69
|
152
|
jpayne@69
|
153
|
jpayne@69
|
154 def _retrieve_dbxrefs(adaptor, primary_id):
|
jpayne@69
|
155 """Retrieve the database cross references for the sequence (PRIVATE)."""
|
jpayne@69
|
156 _dbxrefs = []
|
jpayne@69
|
157 dbxrefs = adaptor.execute_and_fetchall(
|
jpayne@69
|
158 "SELECT dbname, accession, version"
|
jpayne@69
|
159 " FROM bioentry_dbxref join dbxref using (dbxref_id)"
|
jpayne@69
|
160 " WHERE bioentry_id = %s"
|
jpayne@69
|
161 ' ORDER BY "rank"',
|
jpayne@69
|
162 (primary_id,),
|
jpayne@69
|
163 )
|
jpayne@69
|
164 for dbname, accession, version in dbxrefs:
|
jpayne@69
|
165 if version and version != "0":
|
jpayne@69
|
166 v = f"{accession}.{version}"
|
jpayne@69
|
167 else:
|
jpayne@69
|
168 v = accession
|
jpayne@69
|
169 _dbxrefs.append(f"{dbname}:{v}")
|
jpayne@69
|
170 return _dbxrefs
|
jpayne@69
|
171
|
jpayne@69
|
172
|
jpayne@69
|
173 def _retrieve_features(adaptor, primary_id):
|
jpayne@69
|
174 sql = (
|
jpayne@69
|
175 'SELECT seqfeature_id, type.name, "rank"'
|
jpayne@69
|
176 " FROM seqfeature join term type on (type_term_id = type.term_id)"
|
jpayne@69
|
177 " WHERE bioentry_id = %s"
|
jpayne@69
|
178 ' ORDER BY "rank"'
|
jpayne@69
|
179 )
|
jpayne@69
|
180 results = adaptor.execute_and_fetchall(sql, (primary_id,))
|
jpayne@69
|
181 seq_feature_list = []
|
jpayne@69
|
182 for seqfeature_id, seqfeature_type, seqfeature_rank in results:
|
jpayne@69
|
183 # Get qualifiers [except for db_xref which is stored separately]
|
jpayne@69
|
184 qvs = adaptor.execute_and_fetchall(
|
jpayne@69
|
185 "SELECT name, value"
|
jpayne@69
|
186 " FROM seqfeature_qualifier_value join term using (term_id)"
|
jpayne@69
|
187 " WHERE seqfeature_id = %s"
|
jpayne@69
|
188 ' ORDER BY "rank"',
|
jpayne@69
|
189 (seqfeature_id,),
|
jpayne@69
|
190 )
|
jpayne@69
|
191 qualifiers = {}
|
jpayne@69
|
192 for qv_name, qv_value in qvs:
|
jpayne@69
|
193 qualifiers.setdefault(qv_name, []).append(qv_value)
|
jpayne@69
|
194 # Get db_xrefs [special case of qualifiers]
|
jpayne@69
|
195 qvs = adaptor.execute_and_fetchall(
|
jpayne@69
|
196 "SELECT dbxref.dbname, dbxref.accession"
|
jpayne@69
|
197 " FROM dbxref join seqfeature_dbxref using (dbxref_id)"
|
jpayne@69
|
198 " WHERE seqfeature_dbxref.seqfeature_id = %s"
|
jpayne@69
|
199 ' ORDER BY "rank"',
|
jpayne@69
|
200 (seqfeature_id,),
|
jpayne@69
|
201 )
|
jpayne@69
|
202 for qv_name, qv_value in qvs:
|
jpayne@69
|
203 value = f"{qv_name}:{qv_value}"
|
jpayne@69
|
204 qualifiers.setdefault("db_xref", []).append(value)
|
jpayne@69
|
205 # Get locations
|
jpayne@69
|
206 results = adaptor.execute_and_fetchall(
|
jpayne@69
|
207 "SELECT location_id, start_pos, end_pos, strand"
|
jpayne@69
|
208 " FROM location"
|
jpayne@69
|
209 " WHERE seqfeature_id = %s"
|
jpayne@69
|
210 ' ORDER BY "rank"',
|
jpayne@69
|
211 (seqfeature_id,),
|
jpayne@69
|
212 )
|
jpayne@69
|
213 locations = []
|
jpayne@69
|
214 # convert to Python standard form
|
jpayne@69
|
215 # Convert strand = 0 to strand = None
|
jpayne@69
|
216 # re: comment in Loader.py:
|
jpayne@69
|
217 # Biopython uses None when we don't know strand information but
|
jpayne@69
|
218 # BioSQL requires something (non null) and sets this as zero
|
jpayne@69
|
219 # So we'll use the strand or 0 if Biopython spits out None
|
jpayne@69
|
220 for location_id, start, end, strand in results:
|
jpayne@69
|
221 if start:
|
jpayne@69
|
222 start -= 1
|
jpayne@69
|
223 if strand == 0:
|
jpayne@69
|
224 strand = None
|
jpayne@69
|
225 if strand not in (+1, -1, None):
|
jpayne@69
|
226 raise ValueError(
|
jpayne@69
|
227 "Invalid strand %s found in database for "
|
jpayne@69
|
228 "seqfeature_id %s" % (strand, seqfeature_id)
|
jpayne@69
|
229 )
|
jpayne@69
|
230 if start is not None and end is not None and end < start:
|
jpayne@69
|
231 import warnings
|
jpayne@69
|
232 from Bio import BiopythonWarning
|
jpayne@69
|
233
|
jpayne@69
|
234 warnings.warn(
|
jpayne@69
|
235 "Inverted location start/end (%i and %i) for "
|
jpayne@69
|
236 "seqfeature_id %s" % (start, end, seqfeature_id),
|
jpayne@69
|
237 BiopythonWarning,
|
jpayne@69
|
238 )
|
jpayne@69
|
239
|
jpayne@69
|
240 # For SwissProt unknown positions (?)
|
jpayne@69
|
241 if start is None:
|
jpayne@69
|
242 start = SeqFeature.UnknownPosition()
|
jpayne@69
|
243 if end is None:
|
jpayne@69
|
244 end = SeqFeature.UnknownPosition()
|
jpayne@69
|
245
|
jpayne@69
|
246 locations.append((location_id, start, end, strand))
|
jpayne@69
|
247 # Get possible remote reference information
|
jpayne@69
|
248 remote_results = adaptor.execute_and_fetchall(
|
jpayne@69
|
249 "SELECT location_id, dbname, accession, version"
|
jpayne@69
|
250 " FROM location join dbxref using (dbxref_id)"
|
jpayne@69
|
251 " WHERE seqfeature_id = %s",
|
jpayne@69
|
252 (seqfeature_id,),
|
jpayne@69
|
253 )
|
jpayne@69
|
254 lookup = {}
|
jpayne@69
|
255 for location_id, dbname, accession, version in remote_results:
|
jpayne@69
|
256 if version and version != "0":
|
jpayne@69
|
257 v = f"{accession}.{version}"
|
jpayne@69
|
258 else:
|
jpayne@69
|
259 v = accession
|
jpayne@69
|
260 # subfeature remote location db_ref are stored as a empty string
|
jpayne@69
|
261 # when not present
|
jpayne@69
|
262 if dbname == "":
|
jpayne@69
|
263 dbname = None
|
jpayne@69
|
264 lookup[location_id] = (dbname, v)
|
jpayne@69
|
265
|
jpayne@69
|
266 feature = SeqFeature.SeqFeature(type=seqfeature_type)
|
jpayne@69
|
267 # Store the key as a private property
|
jpayne@69
|
268 feature._seqfeature_id = seqfeature_id
|
jpayne@69
|
269 feature.qualifiers = qualifiers
|
jpayne@69
|
270 if len(locations) == 0:
|
jpayne@69
|
271 pass
|
jpayne@69
|
272 elif len(locations) == 1:
|
jpayne@69
|
273 location_id, start, end, strand = locations[0]
|
jpayne@69
|
274 # See Bug 2677, we currently don't record the location_operator
|
jpayne@69
|
275 # For consistency with older versions Biopython, default to "".
|
jpayne@69
|
276 feature.location_operator = _retrieve_location_qualifier_value(
|
jpayne@69
|
277 adaptor, location_id
|
jpayne@69
|
278 )
|
jpayne@69
|
279 dbname, version = lookup.get(location_id, (None, None))
|
jpayne@69
|
280 feature.location = SeqFeature.SimpleLocation(start, end)
|
jpayne@69
|
281 feature.location.strand = strand
|
jpayne@69
|
282 feature.location.ref_db = dbname
|
jpayne@69
|
283 feature.location.ref = version
|
jpayne@69
|
284 else:
|
jpayne@69
|
285 locs = []
|
jpayne@69
|
286 for location in locations:
|
jpayne@69
|
287 location_id, start, end, strand = location
|
jpayne@69
|
288 dbname, version = lookup.get(location_id, (None, None))
|
jpayne@69
|
289 locs.append(
|
jpayne@69
|
290 SeqFeature.SimpleLocation(
|
jpayne@69
|
291 start, end, strand=strand, ref=version, ref_db=dbname
|
jpayne@69
|
292 )
|
jpayne@69
|
293 )
|
jpayne@69
|
294 # Locations are typically in biological in order (see negative
|
jpayne@69
|
295 # strands below), but because of remote locations for
|
jpayne@69
|
296 # sub-features they are not necessarily in numerical order:
|
jpayne@69
|
297 strands = {_.strand for _ in locs}
|
jpayne@69
|
298 if len(strands) == 1 and -1 in strands:
|
jpayne@69
|
299 # Evil hack time for backwards compatibility
|
jpayne@69
|
300 # TODO - Check if BioPerl and (old) Biopython did the same,
|
jpayne@69
|
301 # we may have an existing incompatibility lurking here...
|
jpayne@69
|
302 locs = locs[::-1]
|
jpayne@69
|
303 feature.location = SeqFeature.CompoundLocation(locs, "join")
|
jpayne@69
|
304 # TODO - See Bug 2677 - we don't yet record location operator,
|
jpayne@69
|
305 # so for consistency with older versions of Biopython default
|
jpayne@69
|
306 # to assuming its a join.
|
jpayne@69
|
307 seq_feature_list.append(feature)
|
jpayne@69
|
308 return seq_feature_list
|
jpayne@69
|
309
|
jpayne@69
|
310
|
jpayne@69
|
311 def _retrieve_location_qualifier_value(adaptor, location_id):
|
jpayne@69
|
312 value = adaptor.execute_and_fetch_col0(
|
jpayne@69
|
313 "SELECT value FROM location_qualifier_value WHERE location_id = %s",
|
jpayne@69
|
314 (location_id,),
|
jpayne@69
|
315 )
|
jpayne@69
|
316 try:
|
jpayne@69
|
317 return value[0]
|
jpayne@69
|
318 except IndexError:
|
jpayne@69
|
319 return ""
|
jpayne@69
|
320
|
jpayne@69
|
321
|
jpayne@69
|
322 def _retrieve_annotations(adaptor, primary_id, taxon_id):
|
jpayne@69
|
323 annotations = {}
|
jpayne@69
|
324 annotations.update(_retrieve_alphabet(adaptor, primary_id))
|
jpayne@69
|
325 annotations.update(_retrieve_qualifier_value(adaptor, primary_id))
|
jpayne@69
|
326 annotations.update(_retrieve_reference(adaptor, primary_id))
|
jpayne@69
|
327 annotations.update(_retrieve_taxon(adaptor, primary_id, taxon_id))
|
jpayne@69
|
328 annotations.update(_retrieve_comment(adaptor, primary_id))
|
jpayne@69
|
329 return annotations
|
jpayne@69
|
330
|
jpayne@69
|
331
|
jpayne@69
|
332 def _retrieve_alphabet(adaptor, primary_id):
|
jpayne@69
|
333 results = adaptor.execute_and_fetchall(
|
jpayne@69
|
334 "SELECT alphabet FROM biosequence WHERE bioentry_id = %s", (primary_id,)
|
jpayne@69
|
335 )
|
jpayne@69
|
336 if len(results) != 1:
|
jpayne@69
|
337 raise ValueError(f"Expected 1 response, got {len(results)}.")
|
jpayne@69
|
338 alphabets = results[0]
|
jpayne@69
|
339 if len(alphabets) != 1:
|
jpayne@69
|
340 raise ValueError(f"Expected 1 alphabet in response, got {len(alphabets)}.")
|
jpayne@69
|
341 alphabet = alphabets[0]
|
jpayne@69
|
342 if alphabet == "dna":
|
jpayne@69
|
343 molecule_type = "DNA"
|
jpayne@69
|
344 elif alphabet == "rna":
|
jpayne@69
|
345 molecule_type = "RNA"
|
jpayne@69
|
346 elif alphabet == "protein":
|
jpayne@69
|
347 molecule_type = "protein"
|
jpayne@69
|
348 else:
|
jpayne@69
|
349 molecule_type = None
|
jpayne@69
|
350 if molecule_type is not None:
|
jpayne@69
|
351 return {"molecule_type": molecule_type}
|
jpayne@69
|
352 else:
|
jpayne@69
|
353 return {}
|
jpayne@69
|
354
|
jpayne@69
|
355
|
jpayne@69
|
356 def _retrieve_qualifier_value(adaptor, primary_id):
|
jpayne@69
|
357 qvs = adaptor.execute_and_fetchall(
|
jpayne@69
|
358 "SELECT name, value"
|
jpayne@69
|
359 " FROM bioentry_qualifier_value JOIN term USING (term_id)"
|
jpayne@69
|
360 " WHERE bioentry_id = %s"
|
jpayne@69
|
361 ' ORDER BY "rank"',
|
jpayne@69
|
362 (primary_id,),
|
jpayne@69
|
363 )
|
jpayne@69
|
364 qualifiers = {}
|
jpayne@69
|
365 for name, value in qvs:
|
jpayne@69
|
366 if name == "keyword":
|
jpayne@69
|
367 name = "keywords"
|
jpayne@69
|
368 # See handling of "date" in Loader.py
|
jpayne@69
|
369 elif name == "date_changed":
|
jpayne@69
|
370 name = "date"
|
jpayne@69
|
371 elif name == "secondary_accession":
|
jpayne@69
|
372 name = "accessions"
|
jpayne@69
|
373 qualifiers.setdefault(name, []).append(value)
|
jpayne@69
|
374 return qualifiers
|
jpayne@69
|
375
|
jpayne@69
|
376
|
jpayne@69
|
377 def _retrieve_reference(adaptor, primary_id):
|
jpayne@69
|
378 # XXX dbxref_qualifier_value
|
jpayne@69
|
379
|
jpayne@69
|
380 refs = adaptor.execute_and_fetchall(
|
jpayne@69
|
381 "SELECT start_pos, end_pos, "
|
jpayne@69
|
382 " location, title, authors,"
|
jpayne@69
|
383 " dbname, accession"
|
jpayne@69
|
384 " FROM bioentry_reference"
|
jpayne@69
|
385 " JOIN reference USING (reference_id)"
|
jpayne@69
|
386 " LEFT JOIN dbxref USING (dbxref_id)"
|
jpayne@69
|
387 " WHERE bioentry_id = %s"
|
jpayne@69
|
388 ' ORDER BY "rank"',
|
jpayne@69
|
389 (primary_id,),
|
jpayne@69
|
390 )
|
jpayne@69
|
391 references = []
|
jpayne@69
|
392 for start, end, location, title, authors, dbname, accession in refs:
|
jpayne@69
|
393 reference = SeqFeature.Reference()
|
jpayne@69
|
394 # If the start/end are missing, reference.location is an empty list
|
jpayne@69
|
395 if (start is not None) or (end is not None):
|
jpayne@69
|
396 if start is not None:
|
jpayne@69
|
397 start -= 1 # python counting
|
jpayne@69
|
398 reference.location = [SeqFeature.SimpleLocation(start, end)]
|
jpayne@69
|
399 # Don't replace the default "" with None.
|
jpayne@69
|
400 if authors:
|
jpayne@69
|
401 reference.authors = authors
|
jpayne@69
|
402 if title:
|
jpayne@69
|
403 reference.title = title
|
jpayne@69
|
404 reference.journal = location
|
jpayne@69
|
405 if dbname == "PUBMED":
|
jpayne@69
|
406 reference.pubmed_id = accession
|
jpayne@69
|
407 elif dbname == "MEDLINE":
|
jpayne@69
|
408 reference.medline_id = accession
|
jpayne@69
|
409 references.append(reference)
|
jpayne@69
|
410 if references:
|
jpayne@69
|
411 return {"references": references}
|
jpayne@69
|
412 else:
|
jpayne@69
|
413 return {}
|
jpayne@69
|
414
|
jpayne@69
|
415
|
jpayne@69
|
416 def _retrieve_taxon(adaptor, primary_id, taxon_id):
|
jpayne@69
|
417 a = {}
|
jpayne@69
|
418 common_names = adaptor.execute_and_fetch_col0(
|
jpayne@69
|
419 "SELECT name FROM taxon_name WHERE taxon_id = %s"
|
jpayne@69
|
420 " AND name_class = 'genbank common name'",
|
jpayne@69
|
421 (taxon_id,),
|
jpayne@69
|
422 )
|
jpayne@69
|
423 if common_names:
|
jpayne@69
|
424 a["source"] = common_names[0]
|
jpayne@69
|
425 scientific_names = adaptor.execute_and_fetch_col0(
|
jpayne@69
|
426 "SELECT name FROM taxon_name WHERE taxon_id = %s"
|
jpayne@69
|
427 " AND name_class = 'scientific name'",
|
jpayne@69
|
428 (taxon_id,),
|
jpayne@69
|
429 )
|
jpayne@69
|
430 if scientific_names:
|
jpayne@69
|
431 a["organism"] = scientific_names[0]
|
jpayne@69
|
432 ncbi_taxids = adaptor.execute_and_fetch_col0(
|
jpayne@69
|
433 "SELECT ncbi_taxon_id FROM taxon WHERE taxon_id = %s", (taxon_id,)
|
jpayne@69
|
434 )
|
jpayne@69
|
435 if ncbi_taxids and ncbi_taxids[0] and ncbi_taxids[0] != "0":
|
jpayne@69
|
436 a["ncbi_taxid"] = ncbi_taxids[0]
|
jpayne@69
|
437
|
jpayne@69
|
438 # Old code used the left/right values in the taxon table to get the
|
jpayne@69
|
439 # taxonomy lineage in one SQL command. This was actually very slow,
|
jpayne@69
|
440 # and would fail if the (optional) left/right values were missing.
|
jpayne@69
|
441 #
|
jpayne@69
|
442 # The following code is based on a contribution from Eric Gibert, and
|
jpayne@69
|
443 # relies on the taxon table's parent_taxon_id field only (ignoring the
|
jpayne@69
|
444 # optional left/right values). This means that it has to make a
|
jpayne@69
|
445 # separate SQL query for each entry in the lineage, but it does still
|
jpayne@69
|
446 # appear to be *much* faster. See Bug 2494.
|
jpayne@69
|
447 taxonomy = []
|
jpayne@69
|
448 while taxon_id:
|
jpayne@69
|
449 name, rank, parent_taxon_id = adaptor.execute_one(
|
jpayne@69
|
450 "SELECT taxon_name.name, taxon.node_rank, taxon.parent_taxon_id"
|
jpayne@69
|
451 " FROM taxon, taxon_name"
|
jpayne@69
|
452 " WHERE taxon.taxon_id=taxon_name.taxon_id"
|
jpayne@69
|
453 " AND taxon_name.name_class='scientific name'"
|
jpayne@69
|
454 " AND taxon.taxon_id = %s",
|
jpayne@69
|
455 (taxon_id,),
|
jpayne@69
|
456 )
|
jpayne@69
|
457 if taxon_id == parent_taxon_id:
|
jpayne@69
|
458 # If the taxon table has been populated by the BioSQL script
|
jpayne@69
|
459 # load_ncbi_taxonomy.pl this is how top parent nodes are stored.
|
jpayne@69
|
460 # Personally, I would have used a NULL parent_taxon_id here.
|
jpayne@69
|
461 break
|
jpayne@69
|
462
|
jpayne@69
|
463 taxonomy.insert(0, name)
|
jpayne@69
|
464 taxon_id = parent_taxon_id
|
jpayne@69
|
465
|
jpayne@69
|
466 if taxonomy:
|
jpayne@69
|
467 a["taxonomy"] = taxonomy
|
jpayne@69
|
468 return a
|
jpayne@69
|
469
|
jpayne@69
|
470
|
jpayne@69
|
471 def _retrieve_comment(adaptor, primary_id):
|
jpayne@69
|
472 qvs = adaptor.execute_and_fetchall(
|
jpayne@69
|
473 'SELECT comment_text FROM comment WHERE bioentry_id=%s ORDER BY "rank"',
|
jpayne@69
|
474 (primary_id,),
|
jpayne@69
|
475 )
|
jpayne@69
|
476 comments = [comm[0] for comm in qvs]
|
jpayne@69
|
477 # Don't want to add an empty list...
|
jpayne@69
|
478 if comments:
|
jpayne@69
|
479 return {"comment": comments}
|
jpayne@69
|
480 else:
|
jpayne@69
|
481 return {}
|
jpayne@69
|
482
|
jpayne@69
|
483
|
jpayne@69
|
484 class DBSeqRecord(SeqRecord):
|
jpayne@69
|
485 """BioSQL equivalent of the Biopython SeqRecord object."""
|
jpayne@69
|
486
|
jpayne@69
|
487 def __init__(self, adaptor, primary_id):
|
jpayne@69
|
488 """Create a DBSeqRecord object.
|
jpayne@69
|
489
|
jpayne@69
|
490 Arguments:
|
jpayne@69
|
491 - adaptor - A BioSQL.BioSeqDatabase.Adaptor object
|
jpayne@69
|
492 - primary_id - An internal integer ID used by BioSQL
|
jpayne@69
|
493
|
jpayne@69
|
494 You wouldn't normally create a DBSeqRecord object yourself,
|
jpayne@69
|
495 this is done for you when using a BioSeqDatabase object
|
jpayne@69
|
496 """
|
jpayne@69
|
497 self._adaptor = adaptor
|
jpayne@69
|
498 self._primary_id = primary_id
|
jpayne@69
|
499
|
jpayne@69
|
500 (
|
jpayne@69
|
501 self._biodatabase_id,
|
jpayne@69
|
502 self._taxon_id,
|
jpayne@69
|
503 self.name,
|
jpayne@69
|
504 accession,
|
jpayne@69
|
505 version,
|
jpayne@69
|
506 self._identifier,
|
jpayne@69
|
507 self._division,
|
jpayne@69
|
508 self.description,
|
jpayne@69
|
509 ) = self._adaptor.execute_one(
|
jpayne@69
|
510 "SELECT biodatabase_id, taxon_id, name, accession, version,"
|
jpayne@69
|
511 " identifier, division, description"
|
jpayne@69
|
512 " FROM bioentry"
|
jpayne@69
|
513 " WHERE bioentry_id = %s",
|
jpayne@69
|
514 (self._primary_id,),
|
jpayne@69
|
515 )
|
jpayne@69
|
516 if version and version != "0":
|
jpayne@69
|
517 self.id = f"{accession}.{version}"
|
jpayne@69
|
518 else:
|
jpayne@69
|
519 self.id = accession
|
jpayne@69
|
520 # We don't yet record any per-letter-annotations in the
|
jpayne@69
|
521 # BioSQL database, but we should set this property up
|
jpayne@69
|
522 # for completeness (and the __str__ method).
|
jpayne@69
|
523 # We do NOT want to load the sequence from the DB here!
|
jpayne@69
|
524 length = _retrieve_seq_len(adaptor, primary_id)
|
jpayne@69
|
525 self._per_letter_annotations = _RestrictedDict(length=length)
|
jpayne@69
|
526
|
jpayne@69
|
527 def __get_seq(self):
|
jpayne@69
|
528 if not hasattr(self, "_seq"):
|
jpayne@69
|
529 self._seq = _retrieve_seq(self._adaptor, self._primary_id)
|
jpayne@69
|
530 return self._seq
|
jpayne@69
|
531
|
jpayne@69
|
532 def __set_seq(self, seq):
|
jpayne@69
|
533 # TODO - Check consistent with self._per_letter_annotations
|
jpayne@69
|
534 self._seq = seq
|
jpayne@69
|
535
|
jpayne@69
|
536 def __del_seq(self):
|
jpayne@69
|
537 del self._seq
|
jpayne@69
|
538
|
jpayne@69
|
539 seq = property(__get_seq, __set_seq, __del_seq, "Seq object")
|
jpayne@69
|
540
|
jpayne@69
|
541 @property
|
jpayne@69
|
542 def dbxrefs(self) -> List[str]:
|
jpayne@69
|
543 """Database cross references."""
|
jpayne@69
|
544 if not hasattr(self, "_dbxrefs"):
|
jpayne@69
|
545 self._dbxrefs = _retrieve_dbxrefs(self._adaptor, self._primary_id)
|
jpayne@69
|
546 return self._dbxrefs
|
jpayne@69
|
547
|
jpayne@69
|
548 @dbxrefs.setter
|
jpayne@69
|
549 def dbxrefs(self, value: List[str]) -> None:
|
jpayne@69
|
550 self._dbxrefs = value
|
jpayne@69
|
551
|
jpayne@69
|
552 @dbxrefs.deleter
|
jpayne@69
|
553 def dbxrefs(self) -> None:
|
jpayne@69
|
554 del self._dbxrefs
|
jpayne@69
|
555
|
jpayne@69
|
556 def __get_features(self):
|
jpayne@69
|
557 if not hasattr(self, "_features"):
|
jpayne@69
|
558 self._features = _retrieve_features(self._adaptor, self._primary_id)
|
jpayne@69
|
559 return self._features
|
jpayne@69
|
560
|
jpayne@69
|
561 def __set_features(self, features):
|
jpayne@69
|
562 self._features = features
|
jpayne@69
|
563
|
jpayne@69
|
564 def __del_features(self):
|
jpayne@69
|
565 del self._features
|
jpayne@69
|
566
|
jpayne@69
|
567 features = property(__get_features, __set_features, __del_features, "Features")
|
jpayne@69
|
568
|
jpayne@69
|
569 @property
|
jpayne@69
|
570 def annotations(self) -> SeqRecord._AnnotationsDict:
|
jpayne@69
|
571 """Annotations."""
|
jpayne@69
|
572 if not hasattr(self, "_annotations"):
|
jpayne@69
|
573 self._annotations = _retrieve_annotations(
|
jpayne@69
|
574 self._adaptor, self._primary_id, self._taxon_id
|
jpayne@69
|
575 )
|
jpayne@69
|
576 if self._identifier:
|
jpayne@69
|
577 self._annotations["gi"] = self._identifier
|
jpayne@69
|
578 if self._division:
|
jpayne@69
|
579 self._annotations["data_file_division"] = self._division
|
jpayne@69
|
580 return self._annotations
|
jpayne@69
|
581
|
jpayne@69
|
582 @annotations.setter
|
jpayne@69
|
583 def annotations(self, value: Optional[SeqRecord._AnnotationsDict]) -> None:
|
jpayne@69
|
584 if value:
|
jpayne@69
|
585 self._annotations = value
|
jpayne@69
|
586 else:
|
jpayne@69
|
587 self._annotations = {}
|
jpayne@69
|
588
|
jpayne@69
|
589 @annotations.deleter
|
jpayne@69
|
590 def annotations(self) -> None:
|
jpayne@69
|
591 del self._annotations
|