diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/BioSQL/BioSeq.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/BioSQL/BioSeq.py	Tue Mar 18 17:55:14 2025 -0400
@@ -0,0 +1,591 @@
+# Copyright 2002 by Andrew Dalke.  All rights reserved.
+# Revisions 2007-2016 copyright by Peter Cock.  All rights reserved.
+# Revisions 2008-2009 copyright by Cymon J. Cox.  All rights reserved.
+#
+# This file is part of the Biopython distribution and governed by your
+# choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
+# Please see the LICENSE file that should have been included as part of this
+# package.
+#
+# Note that BioSQL (including the database schema and scripts) is
+# available and licensed separately.  Please consult www.biosql.org
+"""Implementations of Biopython-like Seq objects on top of BioSQL.
+
+This allows retrieval of items stored in a BioSQL database using
+a biopython-like SeqRecord and Seq interface.
+
+Note: Currently we do not support recording per-letter-annotations
+(like quality scores) in BioSQL.
+"""
+
+from typing import List, Optional
+
+from Bio.Seq import Seq, SequenceDataAbstractBaseClass
+from Bio.SeqRecord import SeqRecord, _RestrictedDict
+from Bio import SeqFeature
+
+
+class _BioSQLSequenceData(SequenceDataAbstractBaseClass):
+    """Retrieves sequence data from a BioSQL database (PRIVATE)."""
+
+    __slots__ = ("primary_id", "adaptor", "_length", "start")
+
+    def __init__(self, primary_id, adaptor, start=0, length=0):
+        """Create a new _BioSQLSequenceData object referring to a BioSQL entry.
+
+        You wouldn't normally create a _BioSQLSequenceData object yourself,
+        this is done for you when retrieving a DBSeqRecord object from the
+        database, which creates a Seq object using a _BioSQLSequenceData
+        instance as the data provider.
+        """
+        self.primary_id = primary_id
+        self.adaptor = adaptor
+        self._length = length
+        self.start = start
+        super().__init__()
+
+    def __len__(self):
+        """Return the length of the sequence."""
+        return self._length
+
+    def __getitem__(self, key):
+        """Return a subsequence as a bytes or a _BioSQLSequenceData object."""
+        if isinstance(key, slice):
+            start, end, step = key.indices(self._length)
+            size = len(range(start, end, step))
+            if size == 0:
+                return b""
+        else:
+            # Return a single letter as an integer (consistent with bytes)
+            i = key
+            if i < 0:
+                i += self._length
+                if i < 0:
+                    raise IndexError(key)
+            elif i >= self._length:
+                raise IndexError(key)
+            c = self.adaptor.get_subseq_as_string(
+                self.primary_id, self.start + i, self.start + i + 1
+            )
+            return ord(c)
+
+        if step == 1:
+            if start == 0 and size == self._length:
+                # Return the full sequence as bytes
+                sequence = self.adaptor.get_subseq_as_string(
+                    self.primary_id, self.start, self.start + self._length
+                )
+                return sequence.encode("ASCII")
+            else:
+                # Return a _BioSQLSequenceData with the start and end adjusted
+                return _BioSQLSequenceData(
+                    self.primary_id, self.adaptor, self.start + start, size
+                )
+        else:
+            # Will have to extract the sequence because of the stride
+            full = self.adaptor.get_subseq_as_string(
+                self.primary_id, self.start + start, self.start + end
+            )
+            return full[::step].encode("ASCII")
+
+
+def _retrieve_seq_len(adaptor, primary_id):
+    # The database schema ensures there will be only one matching row
+    seqs = adaptor.execute_and_fetchall(
+        "SELECT length FROM biosequence WHERE bioentry_id = %s", (primary_id,)
+    )
+    if not seqs:
+        return None
+    if len(seqs) != 1:
+        raise ValueError(f"Expected 1 response, got {len(seqs)}.")
+    (given_length,) = seqs[0]
+    return int(given_length)
+
+
+def _retrieve_seq(adaptor, primary_id):
+    # The database schema ensures there will be only one matching
+    # row in the table.
+
+    # If an undefined sequence was recorded, seq will be NULL,
+    # but length will be populated.  This means length(seq)
+    # will return None.
+    seqs = adaptor.execute_and_fetchall(
+        "SELECT alphabet, length, length(seq) FROM biosequence WHERE bioentry_id = %s",
+        (primary_id,),
+    )
+    if not seqs:
+        return
+    if len(seqs) != 1:
+        raise ValueError(f"Expected 1 response, got {len(seqs)}.")
+    moltype, given_length, length = seqs[0]
+
+    try:
+        length = int(length)
+        given_length = int(given_length)
+        if length != given_length:
+            raise ValueError(
+                f"'length' differs from sequence length, {given_length}, {length}"
+            )
+        have_seq = True
+    except TypeError:
+        if length is not None:
+            raise ValueError(f"Expected 'length' to be 'None', got {length}.")
+        seqs = adaptor.execute_and_fetchall(
+            "SELECT alphabet, length, seq FROM biosequence WHERE bioentry_id = %s",
+            (primary_id,),
+        )
+        if len(seqs) != 1:
+            raise ValueError(f"Expected 1 response, got {len(seqs)}.")
+        moltype, given_length, seq = seqs[0]
+        if seq:
+            raise ValueError(f"Expected 'seq' to have a falsy value, got {seq}.")
+        length = int(given_length)
+        have_seq = False
+        del seq
+    del given_length
+
+    if have_seq:
+        data = _BioSQLSequenceData(primary_id, adaptor, start=0, length=length)
+        return Seq(data)
+    else:
+        return Seq(None, length=length)
+
+
+def _retrieve_dbxrefs(adaptor, primary_id):
+    """Retrieve the database cross references for the sequence (PRIVATE)."""
+    _dbxrefs = []
+    dbxrefs = adaptor.execute_and_fetchall(
+        "SELECT dbname, accession, version"
+        " FROM bioentry_dbxref join dbxref using (dbxref_id)"
+        " WHERE bioentry_id = %s"
+        ' ORDER BY "rank"',
+        (primary_id,),
+    )
+    for dbname, accession, version in dbxrefs:
+        if version and version != "0":
+            v = f"{accession}.{version}"
+        else:
+            v = accession
+        _dbxrefs.append(f"{dbname}:{v}")
+    return _dbxrefs
+
+
+def _retrieve_features(adaptor, primary_id):
+    sql = (
+        'SELECT seqfeature_id, type.name, "rank"'
+        " FROM seqfeature join term type on (type_term_id = type.term_id)"
+        " WHERE bioentry_id = %s"
+        ' ORDER BY "rank"'
+    )
+    results = adaptor.execute_and_fetchall(sql, (primary_id,))
+    seq_feature_list = []
+    for seqfeature_id, seqfeature_type, seqfeature_rank in results:
+        # Get qualifiers [except for db_xref which is stored separately]
+        qvs = adaptor.execute_and_fetchall(
+            "SELECT name, value"
+            " FROM seqfeature_qualifier_value  join term using (term_id)"
+            " WHERE seqfeature_id = %s"
+            ' ORDER BY "rank"',
+            (seqfeature_id,),
+        )
+        qualifiers = {}
+        for qv_name, qv_value in qvs:
+            qualifiers.setdefault(qv_name, []).append(qv_value)
+        # Get db_xrefs [special case of qualifiers]
+        qvs = adaptor.execute_and_fetchall(
+            "SELECT dbxref.dbname, dbxref.accession"
+            " FROM dbxref join seqfeature_dbxref using (dbxref_id)"
+            " WHERE seqfeature_dbxref.seqfeature_id = %s"
+            ' ORDER BY "rank"',
+            (seqfeature_id,),
+        )
+        for qv_name, qv_value in qvs:
+            value = f"{qv_name}:{qv_value}"
+            qualifiers.setdefault("db_xref", []).append(value)
+        # Get locations
+        results = adaptor.execute_and_fetchall(
+            "SELECT location_id, start_pos, end_pos, strand"
+            " FROM location"
+            " WHERE seqfeature_id = %s"
+            ' ORDER BY "rank"',
+            (seqfeature_id,),
+        )
+        locations = []
+        # convert to Python standard form
+        # Convert strand = 0 to strand = None
+        # re: comment in Loader.py:
+        # Biopython uses None when we don't know strand information but
+        # BioSQL requires something (non null) and sets this as zero
+        # So we'll use the strand or 0 if Biopython spits out None
+        for location_id, start, end, strand in results:
+            if start:
+                start -= 1
+            if strand == 0:
+                strand = None
+            if strand not in (+1, -1, None):
+                raise ValueError(
+                    "Invalid strand %s found in database for "
+                    "seqfeature_id %s" % (strand, seqfeature_id)
+                )
+            if start is not None and end is not None and end < start:
+                import warnings
+                from Bio import BiopythonWarning
+
+                warnings.warn(
+                    "Inverted location start/end (%i and %i) for "
+                    "seqfeature_id %s" % (start, end, seqfeature_id),
+                    BiopythonWarning,
+                )
+
+            # For SwissProt unknown positions (?)
+            if start is None:
+                start = SeqFeature.UnknownPosition()
+            if end is None:
+                end = SeqFeature.UnknownPosition()
+
+            locations.append((location_id, start, end, strand))
+        # Get possible remote reference information
+        remote_results = adaptor.execute_and_fetchall(
+            "SELECT location_id, dbname, accession, version"
+            " FROM location join dbxref using (dbxref_id)"
+            " WHERE seqfeature_id = %s",
+            (seqfeature_id,),
+        )
+        lookup = {}
+        for location_id, dbname, accession, version in remote_results:
+            if version and version != "0":
+                v = f"{accession}.{version}"
+            else:
+                v = accession
+            # subfeature remote location db_ref are stored as a empty string
+            # when not present
+            if dbname == "":
+                dbname = None
+            lookup[location_id] = (dbname, v)
+
+        feature = SeqFeature.SeqFeature(type=seqfeature_type)
+        # Store the key as a private property
+        feature._seqfeature_id = seqfeature_id
+        feature.qualifiers = qualifiers
+        if len(locations) == 0:
+            pass
+        elif len(locations) == 1:
+            location_id, start, end, strand = locations[0]
+            # See Bug 2677, we currently don't record the location_operator
+            # For consistency with older versions Biopython, default to "".
+            feature.location_operator = _retrieve_location_qualifier_value(
+                adaptor, location_id
+            )
+            dbname, version = lookup.get(location_id, (None, None))
+            feature.location = SeqFeature.SimpleLocation(start, end)
+            feature.location.strand = strand
+            feature.location.ref_db = dbname
+            feature.location.ref = version
+        else:
+            locs = []
+            for location in locations:
+                location_id, start, end, strand = location
+                dbname, version = lookup.get(location_id, (None, None))
+                locs.append(
+                    SeqFeature.SimpleLocation(
+                        start, end, strand=strand, ref=version, ref_db=dbname
+                    )
+                )
+            # Locations are typically in biological in order (see negative
+            # strands below), but because of remote locations for
+            # sub-features they are not necessarily in numerical order:
+            strands = {_.strand for _ in locs}
+            if len(strands) == 1 and -1 in strands:
+                # Evil hack time for backwards compatibility
+                # TODO - Check if BioPerl and (old) Biopython did the same,
+                # we may have an existing incompatibility lurking here...
+                locs = locs[::-1]
+            feature.location = SeqFeature.CompoundLocation(locs, "join")
+            # TODO - See Bug 2677 - we don't yet record location operator,
+            # so for consistency with older versions of Biopython default
+            # to assuming its a join.
+        seq_feature_list.append(feature)
+    return seq_feature_list
+
+
+def _retrieve_location_qualifier_value(adaptor, location_id):
+    value = adaptor.execute_and_fetch_col0(
+        "SELECT value FROM location_qualifier_value WHERE location_id = %s",
+        (location_id,),
+    )
+    try:
+        return value[0]
+    except IndexError:
+        return ""
+
+
+def _retrieve_annotations(adaptor, primary_id, taxon_id):
+    annotations = {}
+    annotations.update(_retrieve_alphabet(adaptor, primary_id))
+    annotations.update(_retrieve_qualifier_value(adaptor, primary_id))
+    annotations.update(_retrieve_reference(adaptor, primary_id))
+    annotations.update(_retrieve_taxon(adaptor, primary_id, taxon_id))
+    annotations.update(_retrieve_comment(adaptor, primary_id))
+    return annotations
+
+
+def _retrieve_alphabet(adaptor, primary_id):
+    results = adaptor.execute_and_fetchall(
+        "SELECT alphabet FROM biosequence WHERE bioentry_id = %s", (primary_id,)
+    )
+    if len(results) != 1:
+        raise ValueError(f"Expected 1 response, got {len(results)}.")
+    alphabets = results[0]
+    if len(alphabets) != 1:
+        raise ValueError(f"Expected 1 alphabet in response, got {len(alphabets)}.")
+    alphabet = alphabets[0]
+    if alphabet == "dna":
+        molecule_type = "DNA"
+    elif alphabet == "rna":
+        molecule_type = "RNA"
+    elif alphabet == "protein":
+        molecule_type = "protein"
+    else:
+        molecule_type = None
+    if molecule_type is not None:
+        return {"molecule_type": molecule_type}
+    else:
+        return {}
+
+
+def _retrieve_qualifier_value(adaptor, primary_id):
+    qvs = adaptor.execute_and_fetchall(
+        "SELECT name, value"
+        " FROM bioentry_qualifier_value JOIN term USING (term_id)"
+        " WHERE bioentry_id = %s"
+        ' ORDER BY "rank"',
+        (primary_id,),
+    )
+    qualifiers = {}
+    for name, value in qvs:
+        if name == "keyword":
+            name = "keywords"
+        # See handling of "date" in Loader.py
+        elif name == "date_changed":
+            name = "date"
+        elif name == "secondary_accession":
+            name = "accessions"
+        qualifiers.setdefault(name, []).append(value)
+    return qualifiers
+
+
+def _retrieve_reference(adaptor, primary_id):
+    # XXX dbxref_qualifier_value
+
+    refs = adaptor.execute_and_fetchall(
+        "SELECT start_pos, end_pos, "
+        " location, title, authors,"
+        " dbname, accession"
+        " FROM bioentry_reference"
+        " JOIN reference USING (reference_id)"
+        " LEFT JOIN dbxref USING (dbxref_id)"
+        " WHERE bioentry_id = %s"
+        ' ORDER BY "rank"',
+        (primary_id,),
+    )
+    references = []
+    for start, end, location, title, authors, dbname, accession in refs:
+        reference = SeqFeature.Reference()
+        # If the start/end are missing, reference.location is an empty list
+        if (start is not None) or (end is not None):
+            if start is not None:
+                start -= 1  # python counting
+            reference.location = [SeqFeature.SimpleLocation(start, end)]
+        # Don't replace the default "" with None.
+        if authors:
+            reference.authors = authors
+        if title:
+            reference.title = title
+        reference.journal = location
+        if dbname == "PUBMED":
+            reference.pubmed_id = accession
+        elif dbname == "MEDLINE":
+            reference.medline_id = accession
+        references.append(reference)
+    if references:
+        return {"references": references}
+    else:
+        return {}
+
+
+def _retrieve_taxon(adaptor, primary_id, taxon_id):
+    a = {}
+    common_names = adaptor.execute_and_fetch_col0(
+        "SELECT name FROM taxon_name WHERE taxon_id = %s"
+        " AND name_class = 'genbank common name'",
+        (taxon_id,),
+    )
+    if common_names:
+        a["source"] = common_names[0]
+    scientific_names = adaptor.execute_and_fetch_col0(
+        "SELECT name FROM taxon_name WHERE taxon_id = %s"
+        " AND name_class = 'scientific name'",
+        (taxon_id,),
+    )
+    if scientific_names:
+        a["organism"] = scientific_names[0]
+    ncbi_taxids = adaptor.execute_and_fetch_col0(
+        "SELECT ncbi_taxon_id FROM taxon WHERE taxon_id = %s", (taxon_id,)
+    )
+    if ncbi_taxids and ncbi_taxids[0] and ncbi_taxids[0] != "0":
+        a["ncbi_taxid"] = ncbi_taxids[0]
+
+    # Old code used the left/right values in the taxon table to get the
+    # taxonomy lineage in one SQL command.  This was actually very slow,
+    # and would fail if the (optional) left/right values were missing.
+    #
+    # The following code is based on a contribution from Eric Gibert, and
+    # relies on the taxon table's parent_taxon_id field only (ignoring the
+    # optional left/right values).  This means that it has to make a
+    # separate SQL query for each entry in the lineage, but it does still
+    # appear to be *much* faster.  See Bug 2494.
+    taxonomy = []
+    while taxon_id:
+        name, rank, parent_taxon_id = adaptor.execute_one(
+            "SELECT taxon_name.name, taxon.node_rank, taxon.parent_taxon_id"
+            " FROM taxon, taxon_name"
+            " WHERE taxon.taxon_id=taxon_name.taxon_id"
+            " AND taxon_name.name_class='scientific name'"
+            " AND taxon.taxon_id = %s",
+            (taxon_id,),
+        )
+        if taxon_id == parent_taxon_id:
+            # If the taxon table has been populated by the BioSQL script
+            # load_ncbi_taxonomy.pl this is how top parent nodes are stored.
+            # Personally, I would have used a NULL parent_taxon_id here.
+            break
+
+        taxonomy.insert(0, name)
+        taxon_id = parent_taxon_id
+
+    if taxonomy:
+        a["taxonomy"] = taxonomy
+    return a
+
+
+def _retrieve_comment(adaptor, primary_id):
+    qvs = adaptor.execute_and_fetchall(
+        'SELECT comment_text FROM comment WHERE bioentry_id=%s ORDER BY "rank"',
+        (primary_id,),
+    )
+    comments = [comm[0] for comm in qvs]
+    # Don't want to add an empty list...
+    if comments:
+        return {"comment": comments}
+    else:
+        return {}
+
+
+class DBSeqRecord(SeqRecord):
+    """BioSQL equivalent of the Biopython SeqRecord object."""
+
+    def __init__(self, adaptor, primary_id):
+        """Create a DBSeqRecord object.
+
+        Arguments:
+         - adaptor - A BioSQL.BioSeqDatabase.Adaptor object
+         - primary_id - An internal integer ID used by BioSQL
+
+        You wouldn't normally create a DBSeqRecord object yourself,
+        this is done for you when using a BioSeqDatabase object
+        """
+        self._adaptor = adaptor
+        self._primary_id = primary_id
+
+        (
+            self._biodatabase_id,
+            self._taxon_id,
+            self.name,
+            accession,
+            version,
+            self._identifier,
+            self._division,
+            self.description,
+        ) = self._adaptor.execute_one(
+            "SELECT biodatabase_id, taxon_id, name, accession, version,"
+            " identifier, division, description"
+            " FROM bioentry"
+            " WHERE bioentry_id = %s",
+            (self._primary_id,),
+        )
+        if version and version != "0":
+            self.id = f"{accession}.{version}"
+        else:
+            self.id = accession
+        # We don't yet record any per-letter-annotations in the
+        # BioSQL database, but we should set this property up
+        # for completeness (and the __str__ method).
+        # We do NOT want to load the sequence from the DB here!
+        length = _retrieve_seq_len(adaptor, primary_id)
+        self._per_letter_annotations = _RestrictedDict(length=length)
+
+    def __get_seq(self):
+        if not hasattr(self, "_seq"):
+            self._seq = _retrieve_seq(self._adaptor, self._primary_id)
+        return self._seq
+
+    def __set_seq(self, seq):
+        # TODO - Check consistent with self._per_letter_annotations
+        self._seq = seq
+
+    def __del_seq(self):
+        del self._seq
+
+    seq = property(__get_seq, __set_seq, __del_seq, "Seq object")
+
+    @property
+    def dbxrefs(self) -> List[str]:
+        """Database cross references."""
+        if not hasattr(self, "_dbxrefs"):
+            self._dbxrefs = _retrieve_dbxrefs(self._adaptor, self._primary_id)
+        return self._dbxrefs
+
+    @dbxrefs.setter
+    def dbxrefs(self, value: List[str]) -> None:
+        self._dbxrefs = value
+
+    @dbxrefs.deleter
+    def dbxrefs(self) -> None:
+        del self._dbxrefs
+
+    def __get_features(self):
+        if not hasattr(self, "_features"):
+            self._features = _retrieve_features(self._adaptor, self._primary_id)
+        return self._features
+
+    def __set_features(self, features):
+        self._features = features
+
+    def __del_features(self):
+        del self._features
+
+    features = property(__get_features, __set_features, __del_features, "Features")
+
+    @property
+    def annotations(self) -> SeqRecord._AnnotationsDict:
+        """Annotations."""
+        if not hasattr(self, "_annotations"):
+            self._annotations = _retrieve_annotations(
+                self._adaptor, self._primary_id, self._taxon_id
+            )
+            if self._identifier:
+                self._annotations["gi"] = self._identifier
+            if self._division:
+                self._annotations["data_file_division"] = self._division
+        return self._annotations
+
+    @annotations.setter
+    def annotations(self, value: Optional[SeqRecord._AnnotationsDict]) -> None:
+        if value:
+            self._annotations = value
+        else:
+            self._annotations = {}
+
+    @annotations.deleter
+    def annotations(self) -> None:
+        del self._annotations