view CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/BioSQL/BioSeq.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
line wrap: on
line source
# Copyright 2002 by Andrew Dalke.  All rights reserved.
# Revisions 2007-2016 copyright by Peter Cock.  All rights reserved.
# Revisions 2008-2009 copyright by Cymon J. Cox.  All rights reserved.
#
# This file is part of the Biopython distribution and governed by your
# choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
# Please see the LICENSE file that should have been included as part of this
# package.
#
# Note that BioSQL (including the database schema and scripts) is
# available and licensed separately.  Please consult www.biosql.org
"""Implementations of Biopython-like Seq objects on top of BioSQL.

This allows retrieval of items stored in a BioSQL database using
a biopython-like SeqRecord and Seq interface.

Note: Currently we do not support recording per-letter-annotations
(like quality scores) in BioSQL.
"""

from typing import List, Optional

from Bio.Seq import Seq, SequenceDataAbstractBaseClass
from Bio.SeqRecord import SeqRecord, _RestrictedDict
from Bio import SeqFeature


class _BioSQLSequenceData(SequenceDataAbstractBaseClass):
    """Retrieves sequence data from a BioSQL database (PRIVATE)."""

    __slots__ = ("primary_id", "adaptor", "_length", "start")

    def __init__(self, primary_id, adaptor, start=0, length=0):
        """Create a new _BioSQLSequenceData object referring to a BioSQL entry.

        You wouldn't normally create a _BioSQLSequenceData object yourself,
        this is done for you when retrieving a DBSeqRecord object from the
        database, which creates a Seq object using a _BioSQLSequenceData
        instance as the data provider.
        """
        self.primary_id = primary_id
        self.adaptor = adaptor
        self._length = length
        self.start = start
        super().__init__()

    def __len__(self):
        """Return the length of the sequence."""
        return self._length

    def __getitem__(self, key):
        """Return a subsequence as a bytes or a _BioSQLSequenceData object."""
        if isinstance(key, slice):
            start, end, step = key.indices(self._length)
            size = len(range(start, end, step))
            if size == 0:
                return b""
        else:
            # Return a single letter as an integer (consistent with bytes)
            i = key
            if i < 0:
                i += self._length
                if i < 0:
                    raise IndexError(key)
            elif i >= self._length:
                raise IndexError(key)
            c = self.adaptor.get_subseq_as_string(
                self.primary_id, self.start + i, self.start + i + 1
            )
            return ord(c)

        if step == 1:
            if start == 0 and size == self._length:
                # Return the full sequence as bytes
                sequence = self.adaptor.get_subseq_as_string(
                    self.primary_id, self.start, self.start + self._length
                )
                return sequence.encode("ASCII")
            else:
                # Return a _BioSQLSequenceData with the start and end adjusted
                return _BioSQLSequenceData(
                    self.primary_id, self.adaptor, self.start + start, size
                )
        else:
            # Will have to extract the sequence because of the stride
            full = self.adaptor.get_subseq_as_string(
                self.primary_id, self.start + start, self.start + end
            )
            return full[::step].encode("ASCII")


def _retrieve_seq_len(adaptor, primary_id):
    # The database schema ensures there will be only one matching row
    seqs = adaptor.execute_and_fetchall(
        "SELECT length FROM biosequence WHERE bioentry_id = %s", (primary_id,)
    )
    if not seqs:
        return None
    if len(seqs) != 1:
        raise ValueError(f"Expected 1 response, got {len(seqs)}.")
    (given_length,) = seqs[0]
    return int(given_length)


def _retrieve_seq(adaptor, primary_id):
    # The database schema ensures there will be only one matching
    # row in the table.

    # If an undefined sequence was recorded, seq will be NULL,
    # but length will be populated.  This means length(seq)
    # will return None.
    seqs = adaptor.execute_and_fetchall(
        "SELECT alphabet, length, length(seq) FROM biosequence WHERE bioentry_id = %s",
        (primary_id,),
    )
    if not seqs:
        return
    if len(seqs) != 1:
        raise ValueError(f"Expected 1 response, got {len(seqs)}.")
    moltype, given_length, length = seqs[0]

    try:
        length = int(length)
        given_length = int(given_length)
        if length != given_length:
            raise ValueError(
                f"'length' differs from sequence length, {given_length}, {length}"
            )
        have_seq = True
    except TypeError:
        if length is not None:
            raise ValueError(f"Expected 'length' to be 'None', got {length}.")
        seqs = adaptor.execute_and_fetchall(
            "SELECT alphabet, length, seq FROM biosequence WHERE bioentry_id = %s",
            (primary_id,),
        )
        if len(seqs) != 1:
            raise ValueError(f"Expected 1 response, got {len(seqs)}.")
        moltype, given_length, seq = seqs[0]
        if seq:
            raise ValueError(f"Expected 'seq' to have a falsy value, got {seq}.")
        length = int(given_length)
        have_seq = False
        del seq
    del given_length

    if have_seq:
        data = _BioSQLSequenceData(primary_id, adaptor, start=0, length=length)
        return Seq(data)
    else:
        return Seq(None, length=length)


def _retrieve_dbxrefs(adaptor, primary_id):
    """Retrieve the database cross references for the sequence (PRIVATE)."""
    _dbxrefs = []
    dbxrefs = adaptor.execute_and_fetchall(
        "SELECT dbname, accession, version"
        " FROM bioentry_dbxref join dbxref using (dbxref_id)"
        " WHERE bioentry_id = %s"
        ' ORDER BY "rank"',
        (primary_id,),
    )
    for dbname, accession, version in dbxrefs:
        if version and version != "0":
            v = f"{accession}.{version}"
        else:
            v = accession
        _dbxrefs.append(f"{dbname}:{v}")
    return _dbxrefs


def _retrieve_features(adaptor, primary_id):
    sql = (
        'SELECT seqfeature_id, type.name, "rank"'
        " FROM seqfeature join term type on (type_term_id = type.term_id)"
        " WHERE bioentry_id = %s"
        ' ORDER BY "rank"'
    )
    results = adaptor.execute_and_fetchall(sql, (primary_id,))
    seq_feature_list = []
    for seqfeature_id, seqfeature_type, seqfeature_rank in results:
        # Get qualifiers [except for db_xref which is stored separately]
        qvs = adaptor.execute_and_fetchall(
            "SELECT name, value"
            " FROM seqfeature_qualifier_value  join term using (term_id)"
            " WHERE seqfeature_id = %s"
            ' ORDER BY "rank"',
            (seqfeature_id,),
        )
        qualifiers = {}
        for qv_name, qv_value in qvs:
            qualifiers.setdefault(qv_name, []).append(qv_value)
        # Get db_xrefs [special case of qualifiers]
        qvs = adaptor.execute_and_fetchall(
            "SELECT dbxref.dbname, dbxref.accession"
            " FROM dbxref join seqfeature_dbxref using (dbxref_id)"
            " WHERE seqfeature_dbxref.seqfeature_id = %s"
            ' ORDER BY "rank"',
            (seqfeature_id,),
        )
        for qv_name, qv_value in qvs:
            value = f"{qv_name}:{qv_value}"
            qualifiers.setdefault("db_xref", []).append(value)
        # Get locations
        results = adaptor.execute_and_fetchall(
            "SELECT location_id, start_pos, end_pos, strand"
            " FROM location"
            " WHERE seqfeature_id = %s"
            ' ORDER BY "rank"',
            (seqfeature_id,),
        )
        locations = []
        # convert to Python standard form
        # Convert strand = 0 to strand = None
        # re: comment in Loader.py:
        # Biopython uses None when we don't know strand information but
        # BioSQL requires something (non null) and sets this as zero
        # So we'll use the strand or 0 if Biopython spits out None
        for location_id, start, end, strand in results:
            if start:
                start -= 1
            if strand == 0:
                strand = None
            if strand not in (+1, -1, None):
                raise ValueError(
                    "Invalid strand %s found in database for "
                    "seqfeature_id %s" % (strand, seqfeature_id)
                )
            if start is not None and end is not None and end < start:
                import warnings
                from Bio import BiopythonWarning

                warnings.warn(
                    "Inverted location start/end (%i and %i) for "
                    "seqfeature_id %s" % (start, end, seqfeature_id),
                    BiopythonWarning,
                )

            # For SwissProt unknown positions (?)
            if start is None:
                start = SeqFeature.UnknownPosition()
            if end is None:
                end = SeqFeature.UnknownPosition()

            locations.append((location_id, start, end, strand))
        # Get possible remote reference information
        remote_results = adaptor.execute_and_fetchall(
            "SELECT location_id, dbname, accession, version"
            " FROM location join dbxref using (dbxref_id)"
            " WHERE seqfeature_id = %s",
            (seqfeature_id,),
        )
        lookup = {}
        for location_id, dbname, accession, version in remote_results:
            if version and version != "0":
                v = f"{accession}.{version}"
            else:
                v = accession
            # subfeature remote location db_ref are stored as a empty string
            # when not present
            if dbname == "":
                dbname = None
            lookup[location_id] = (dbname, v)

        feature = SeqFeature.SeqFeature(type=seqfeature_type)
        # Store the key as a private property
        feature._seqfeature_id = seqfeature_id
        feature.qualifiers = qualifiers
        if len(locations) == 0:
            pass
        elif len(locations) == 1:
            location_id, start, end, strand = locations[0]
            # See Bug 2677, we currently don't record the location_operator
            # For consistency with older versions Biopython, default to "".
            feature.location_operator = _retrieve_location_qualifier_value(
                adaptor, location_id
            )
            dbname, version = lookup.get(location_id, (None, None))
            feature.location = SeqFeature.SimpleLocation(start, end)
            feature.location.strand = strand
            feature.location.ref_db = dbname
            feature.location.ref = version
        else:
            locs = []
            for location in locations:
                location_id, start, end, strand = location
                dbname, version = lookup.get(location_id, (None, None))
                locs.append(
                    SeqFeature.SimpleLocation(
                        start, end, strand=strand, ref=version, ref_db=dbname
                    )
                )
            # Locations are typically in biological in order (see negative
            # strands below), but because of remote locations for
            # sub-features they are not necessarily in numerical order:
            strands = {_.strand for _ in locs}
            if len(strands) == 1 and -1 in strands:
                # Evil hack time for backwards compatibility
                # TODO - Check if BioPerl and (old) Biopython did the same,
                # we may have an existing incompatibility lurking here...
                locs = locs[::-1]
            feature.location = SeqFeature.CompoundLocation(locs, "join")
            # TODO - See Bug 2677 - we don't yet record location operator,
            # so for consistency with older versions of Biopython default
            # to assuming its a join.
        seq_feature_list.append(feature)
    return seq_feature_list


def _retrieve_location_qualifier_value(adaptor, location_id):
    value = adaptor.execute_and_fetch_col0(
        "SELECT value FROM location_qualifier_value WHERE location_id = %s",
        (location_id,),
    )
    try:
        return value[0]
    except IndexError:
        return ""


def _retrieve_annotations(adaptor, primary_id, taxon_id):
    annotations = {}
    annotations.update(_retrieve_alphabet(adaptor, primary_id))
    annotations.update(_retrieve_qualifier_value(adaptor, primary_id))
    annotations.update(_retrieve_reference(adaptor, primary_id))
    annotations.update(_retrieve_taxon(adaptor, primary_id, taxon_id))
    annotations.update(_retrieve_comment(adaptor, primary_id))
    return annotations


def _retrieve_alphabet(adaptor, primary_id):
    results = adaptor.execute_and_fetchall(
        "SELECT alphabet FROM biosequence WHERE bioentry_id = %s", (primary_id,)
    )
    if len(results) != 1:
        raise ValueError(f"Expected 1 response, got {len(results)}.")
    alphabets = results[0]
    if len(alphabets) != 1:
        raise ValueError(f"Expected 1 alphabet in response, got {len(alphabets)}.")
    alphabet = alphabets[0]
    if alphabet == "dna":
        molecule_type = "DNA"
    elif alphabet == "rna":
        molecule_type = "RNA"
    elif alphabet == "protein":
        molecule_type = "protein"
    else:
        molecule_type = None
    if molecule_type is not None:
        return {"molecule_type": molecule_type}
    else:
        return {}


def _retrieve_qualifier_value(adaptor, primary_id):
    qvs = adaptor.execute_and_fetchall(
        "SELECT name, value"
        " FROM bioentry_qualifier_value JOIN term USING (term_id)"
        " WHERE bioentry_id = %s"
        ' ORDER BY "rank"',
        (primary_id,),
    )
    qualifiers = {}
    for name, value in qvs:
        if name == "keyword":
            name = "keywords"
        # See handling of "date" in Loader.py
        elif name == "date_changed":
            name = "date"
        elif name == "secondary_accession":
            name = "accessions"
        qualifiers.setdefault(name, []).append(value)
    return qualifiers


def _retrieve_reference(adaptor, primary_id):
    # XXX dbxref_qualifier_value

    refs = adaptor.execute_and_fetchall(
        "SELECT start_pos, end_pos, "
        " location, title, authors,"
        " dbname, accession"
        " FROM bioentry_reference"
        " JOIN reference USING (reference_id)"
        " LEFT JOIN dbxref USING (dbxref_id)"
        " WHERE bioentry_id = %s"
        ' ORDER BY "rank"',
        (primary_id,),
    )
    references = []
    for start, end, location, title, authors, dbname, accession in refs:
        reference = SeqFeature.Reference()
        # If the start/end are missing, reference.location is an empty list
        if (start is not None) or (end is not None):
            if start is not None:
                start -= 1  # python counting
            reference.location = [SeqFeature.SimpleLocation(start, end)]
        # Don't replace the default "" with None.
        if authors:
            reference.authors = authors
        if title:
            reference.title = title
        reference.journal = location
        if dbname == "PUBMED":
            reference.pubmed_id = accession
        elif dbname == "MEDLINE":
            reference.medline_id = accession
        references.append(reference)
    if references:
        return {"references": references}
    else:
        return {}


def _retrieve_taxon(adaptor, primary_id, taxon_id):
    a = {}
    common_names = adaptor.execute_and_fetch_col0(
        "SELECT name FROM taxon_name WHERE taxon_id = %s"
        " AND name_class = 'genbank common name'",
        (taxon_id,),
    )
    if common_names:
        a["source"] = common_names[0]
    scientific_names = adaptor.execute_and_fetch_col0(
        "SELECT name FROM taxon_name WHERE taxon_id = %s"
        " AND name_class = 'scientific name'",
        (taxon_id,),
    )
    if scientific_names:
        a["organism"] = scientific_names[0]
    ncbi_taxids = adaptor.execute_and_fetch_col0(
        "SELECT ncbi_taxon_id FROM taxon WHERE taxon_id = %s", (taxon_id,)
    )
    if ncbi_taxids and ncbi_taxids[0] and ncbi_taxids[0] != "0":
        a["ncbi_taxid"] = ncbi_taxids[0]

    # Old code used the left/right values in the taxon table to get the
    # taxonomy lineage in one SQL command.  This was actually very slow,
    # and would fail if the (optional) left/right values were missing.
    #
    # The following code is based on a contribution from Eric Gibert, and
    # relies on the taxon table's parent_taxon_id field only (ignoring the
    # optional left/right values).  This means that it has to make a
    # separate SQL query for each entry in the lineage, but it does still
    # appear to be *much* faster.  See Bug 2494.
    taxonomy = []
    while taxon_id:
        name, rank, parent_taxon_id = adaptor.execute_one(
            "SELECT taxon_name.name, taxon.node_rank, taxon.parent_taxon_id"
            " FROM taxon, taxon_name"
            " WHERE taxon.taxon_id=taxon_name.taxon_id"
            " AND taxon_name.name_class='scientific name'"
            " AND taxon.taxon_id = %s",
            (taxon_id,),
        )
        if taxon_id == parent_taxon_id:
            # If the taxon table has been populated by the BioSQL script
            # load_ncbi_taxonomy.pl this is how top parent nodes are stored.
            # Personally, I would have used a NULL parent_taxon_id here.
            break

        taxonomy.insert(0, name)
        taxon_id = parent_taxon_id

    if taxonomy:
        a["taxonomy"] = taxonomy
    return a


def _retrieve_comment(adaptor, primary_id):
    qvs = adaptor.execute_and_fetchall(
        'SELECT comment_text FROM comment WHERE bioentry_id=%s ORDER BY "rank"',
        (primary_id,),
    )
    comments = [comm[0] for comm in qvs]
    # Don't want to add an empty list...
    if comments:
        return {"comment": comments}
    else:
        return {}


class DBSeqRecord(SeqRecord):
    """BioSQL equivalent of the Biopython SeqRecord object."""

    def __init__(self, adaptor, primary_id):
        """Create a DBSeqRecord object.

        Arguments:
         - adaptor - A BioSQL.BioSeqDatabase.Adaptor object
         - primary_id - An internal integer ID used by BioSQL

        You wouldn't normally create a DBSeqRecord object yourself,
        this is done for you when using a BioSeqDatabase object
        """
        self._adaptor = adaptor
        self._primary_id = primary_id

        (
            self._biodatabase_id,
            self._taxon_id,
            self.name,
            accession,
            version,
            self._identifier,
            self._division,
            self.description,
        ) = self._adaptor.execute_one(
            "SELECT biodatabase_id, taxon_id, name, accession, version,"
            " identifier, division, description"
            " FROM bioentry"
            " WHERE bioentry_id = %s",
            (self._primary_id,),
        )
        if version and version != "0":
            self.id = f"{accession}.{version}"
        else:
            self.id = accession
        # We don't yet record any per-letter-annotations in the
        # BioSQL database, but we should set this property up
        # for completeness (and the __str__ method).
        # We do NOT want to load the sequence from the DB here!
        length = _retrieve_seq_len(adaptor, primary_id)
        self._per_letter_annotations = _RestrictedDict(length=length)

    def __get_seq(self):
        if not hasattr(self, "_seq"):
            self._seq = _retrieve_seq(self._adaptor, self._primary_id)
        return self._seq

    def __set_seq(self, seq):
        # TODO - Check consistent with self._per_letter_annotations
        self._seq = seq

    def __del_seq(self):
        del self._seq

    seq = property(__get_seq, __set_seq, __del_seq, "Seq object")

    @property
    def dbxrefs(self) -> List[str]:
        """Database cross references."""
        if not hasattr(self, "_dbxrefs"):
            self._dbxrefs = _retrieve_dbxrefs(self._adaptor, self._primary_id)
        return self._dbxrefs

    @dbxrefs.setter
    def dbxrefs(self, value: List[str]) -> None:
        self._dbxrefs = value

    @dbxrefs.deleter
    def dbxrefs(self) -> None:
        del self._dbxrefs

    def __get_features(self):
        if not hasattr(self, "_features"):
            self._features = _retrieve_features(self._adaptor, self._primary_id)
        return self._features

    def __set_features(self, features):
        self._features = features

    def __del_features(self):
        del self._features

    features = property(__get_features, __set_features, __del_features, "Features")

    @property
    def annotations(self) -> SeqRecord._AnnotationsDict:
        """Annotations."""
        if not hasattr(self, "_annotations"):
            self._annotations = _retrieve_annotations(
                self._adaptor, self._primary_id, self._taxon_id
            )
            if self._identifier:
                self._annotations["gi"] = self._identifier
            if self._division:
                self._annotations["data_file_division"] = self._division
        return self._annotations

    @annotations.setter
    def annotations(self, value: Optional[SeqRecord._AnnotationsDict]) -> None:
        if value:
            self._annotations = value
        else:
            self._annotations = {}

    @annotations.deleter
    def annotations(self) -> None:
        del self._annotations