diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/libcutils.pyx @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/libcutils.pyx	Tue Mar 18 16:23:26 2025 -0400
@@ -0,0 +1,443 @@
+# cython: language_level=3
+import types
+import sys
+import string
+import re
+import tempfile
+import os
+import io
+from contextlib import contextmanager
+from codecs import register_error
+
+from cpython.version cimport PY_MAJOR_VERSION, PY_MINOR_VERSION
+from cpython cimport PyBytes_Check, PyUnicode_Check
+from cpython cimport array as c_array
+from libc.errno cimport errno
+from libc.stdlib cimport calloc, free
+from libc.string cimport strncpy
+from libc.stdint cimport INT32_MAX, int32_t
+from libc.stdio cimport fprintf, stderr, fflush
+from libc.stdio cimport stdout as c_stdout
+from posix.fcntl cimport open as c_open, O_WRONLY, O_CREAT, O_TRUNC
+from posix.unistd cimport SEEK_SET, SEEK_CUR, SEEK_END
+
+from pysam.libcsamtools cimport samtools_dispatch, samtools_set_stdout, samtools_set_stderr, \
+    samtools_close_stdout, samtools_close_stderr, samtools_set_stdout_fn
+
+from pysam.libcbcftools cimport bcftools_dispatch, bcftools_set_stdout, bcftools_set_stderr, \
+    bcftools_close_stdout, bcftools_close_stderr, bcftools_set_stdout_fn
+
+#####################################################################
+# hard-coded constants
+cdef int MAX_POS = (1 << 31) - 1
+
+#################################################################
+# Utility functions for quality string conversions
+cpdef c_array.array qualitystring_to_array(input_str, int offset=33):
+    """convert a qualitystring to an array of quality values."""
+    if input_str is None:
+        return None
+    qs = force_bytes(input_str)
+    cdef char i
+    return c_array.array('B', [i - offset for i in qs])
+
+
+cpdef array_to_qualitystring(c_array.array qualities, int offset=33):
+    """convert an array of quality values to a string."""
+    if qualities is None:
+        return None
+    cdef int x
+
+    cdef c_array.array result
+    result = c_array.clone(qualities, len(qualities), zero=False)
+
+    for x from 0 <= x < len(qualities):
+        result[x] = qualities[x] + offset
+    return force_str(result.tobytes())
+
+
+cpdef qualities_to_qualitystring(qualities, int offset=33):
+    """convert a list or array of quality scores to the string
+    representation used in the SAM format.
+
+    Parameters
+    ----------
+    offset : int
+        offset to be added to the quality scores to arrive at
+        the characters of the quality string (default=33).
+
+    Returns
+    -------
+    string
+         a quality string
+
+    """
+    cdef char x
+    if qualities is None:
+        return None
+    elif isinstance(qualities, c_array.array):
+        return array_to_qualitystring(qualities, offset=offset)
+    else:
+        # tuples and lists
+        return force_str("".join([chr(x + offset) for x in qualities]))
+
+
+########################################################################
+## String encoding configuration facilities
+########################################################################
+
+# Codec error handler that just interprets each bad byte as ISO-8859-1.
+def latin1_replace(exception):
+    return (chr(exception.object[exception.start]), exception.end)
+
+register_error('pysam.latin1replace', latin1_replace)
+
+
+cdef str ERROR_HANDLER = 'strict'
+
+cpdef get_encoding_error_handler():
+    return ERROR_HANDLER
+
+cpdef set_encoding_error_handler(name):
+    global ERROR_HANDLER
+    previous = ERROR_HANDLER
+    ERROR_HANDLER = name
+    return previous
+
+########################################################################
+## Python 3 compatibility functions
+########################################################################
+
+cdef from_string_and_size(const char* s, size_t length):
+    return s[:length].decode('utf-8', ERROR_HANDLER)
+
+# filename encoding (adapted from lxml.etree.pyx)
+cdef str FILENAME_ENCODING = sys.getfilesystemencoding() or sys.getdefaultencoding() or 'ascii'
+cdef str TEXT_ENCODING = 'utf-8'
+
+cdef bytes encode_filename(object filename):
+    """Make sure a filename is 8-bit encoded (or None)."""
+    if filename is None:
+        return None
+    return os.fsencode(filename)
+
+
+cdef bytes force_bytes(object s, encoding=None, errors=None):
+    """convert string or unicode object to bytes, assuming
+    utf8 encoding.
+    """
+    if s is None:
+        return None
+    elif PyBytes_Check(s):
+        return s
+    elif PyUnicode_Check(s):
+        return s.encode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER)
+    else:
+        raise TypeError("Argument must be string, bytes or unicode.")
+
+
+cdef charptr_to_str(const char* s, encoding=None, errors=None):
+    if s == NULL:
+        return None
+    return s.decode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER)
+
+
+cdef charptr_to_str_w_len(const char* s, size_t n, encoding=None, errors=None):
+    if s == NULL:
+        return None
+    return s[:n].decode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER)
+
+
+cdef bytes charptr_to_bytes(const char* s, encoding=None, errors=None):
+    if s == NULL:
+        return None
+    else:
+        return s
+
+
+cdef force_str(object s, encoding=None, errors=None):
+    """Return s converted to str type of current Python
+    (bytes in Py2, unicode in Py3)"""
+    if s is None:
+        return None
+    if PyBytes_Check(s):
+        return s.decode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER)
+    # assume unicode
+    return s
+
+
+cdef decode_bytes(bytes s, encoding=None, errors=None):
+    """Return s converted to current Python's str type,
+    always decoding even in Python 2"""
+    if s is None:
+        return None
+    else:
+        return s.decode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER)
+
+
+cpdef parse_region(contig=None,
+                   start=None,
+                   stop=None,
+                   region=None,
+                   reference=None,
+                   end=None):
+    """parse alternative ways to specify a genomic region. A region can
+    either be specified by :term:`reference`, `start` and
+    `end`. `start` and `end` denote 0-based, half-open intervals.
+    
+    :term:`reference` and `end` are also accepted for backward
+    compatibility as synonyms for :term:`contig` and `stop`,
+    respectively.
+
+    Alternatively, a samtools :term:`region` string can be supplied.
+
+    If any of the coordinates are missing they will be replaced by the
+    minimum (`start`) or maximum (`end`) coordinate.
+
+    Note that region strings are 1-based, while `start` and `end`
+    denote an interval in python coordinates.
+
+    Returns
+    -------
+
+    tuple :  a tuple of `reference`, `start` and `end`.
+
+    Raises
+    ------
+
+    ValueError
+       for invalid or out of bounds regions.
+
+    """
+    cdef int32_t rstart
+    cdef int32_t rstop
+
+    
+    if reference is not None:
+        if contig is not None:
+           raise ValueError('contig and reference should not both be specified')
+        contig = reference
+
+    if contig is not None and region is not None:
+        raise ValueError('contig/reference and region should not both be specified')
+        
+    if end is not None:
+        if stop is not None:
+            raise ValueError('stop and end should not both be specified')
+        stop = end
+
+    if contig is None and region is None:
+        raise ValueError("neither contig nor region are given")
+
+    rstart = 0
+    rstop = MAX_POS
+    if start is not None:
+        try:
+            rstart = start
+        except OverflowError:
+            raise ValueError('start out of range (%i)' % start)
+
+    if stop is not None:
+        try:
+            rstop = stop
+        except OverflowError:
+            raise ValueError('stop out of range (%i)' % stop)
+
+    if region:
+        if ":" in region:
+            contig, coord = region.split(":")
+            parts = coord.split("-")
+            rstart = int(parts[0]) - 1
+            if len(parts) >= 1:
+                rstop = int(parts[1])
+        else:
+            contig = region
+
+    if rstart > rstop:
+        raise ValueError('invalid coordinates: start (%i) > stop (%i)' % (rstart, rstop))
+    if not 0 <= rstart < MAX_POS:
+        raise ValueError('start out of range (%i)' % rstart)
+    if not 0 <= rstop <= MAX_POS:
+        raise ValueError('stop out of range (%i)' % rstop)
+
+    return contig, rstart, rstop
+
+
+cdef int libc_whence_from_io(int whence):
+    # io.SEEK_SET/_CUR/_END are by definition 0/1/2 but C/POSIX's equivalents
+    # have unspecified values. So we must translate, but checking for 0/1/2
+    # rather than io.SEEK_SET/etc suffices.
+    if whence == 0: return SEEK_SET
+    if whence == 1: return SEEK_CUR
+    if whence == 2: return SEEK_END
+    return whence  # Otherwise likely invalid, but let HTSlib or OS report it
+
+
+def _pysam_dispatch(collection,
+                    method,
+                    args=None,
+                    catch_stdout=True,
+                    is_usage=False,
+                    save_stdout=None):
+    '''call ``method`` in samtools/bcftools providing arguments in args.
+    
+    By default, stdout is redirected to a temporary file using the patched
+    C sources except for a few commands that have an explicit output option
+    (typically: -o). In these commands (such as samtools view), this explicit
+    option is used. If *is_usage* is True, then these explicit output options
+    will not be used.
+
+    Catching of stdout can be turned off by setting *catch_stdout* to
+    False.
+    '''
+
+    if method == "index" and args:
+        # We make sure that at least the first specified input file exists,
+        # and if it doesn't we raise an IOError.
+        ARGUMENTS = ['-m', '--min-shift', '-o', '--output', '--output-file', '-@', '--threads']
+        skip_next = False
+        for arg in args:
+            if skip_next:
+                skip_next = False
+                continue
+            if arg.startswith('-'):
+                # Skip next argument for e.g. '--min-shift' '12' or '-m' '12' but not '-m12'
+                if arg in ARGUMENTS:
+                    skip_next = True
+                continue
+            if not os.path.exists(arg):
+                raise IOError("No such file or directory: '%s'" % arg)
+            else:
+                break
+            
+    if args is None:
+        args = []
+    else:
+        args = list(args)
+
+    # redirect stderr to file
+    stderr_h, stderr_f = tempfile.mkstemp()
+        
+    # redirect stdout to file
+    if save_stdout:
+        stdout_f = save_stdout
+        stdout_h = c_open(force_bytes(stdout_f),
+                          O_WRONLY|O_CREAT|O_TRUNC, 0666)
+        if stdout_h == -1:
+            raise OSError(errno, "error while opening file for writing", stdout_f)
+
+        samtools_set_stdout_fn(force_bytes(stdout_f))
+        bcftools_set_stdout_fn(force_bytes(stdout_f))
+            
+    elif catch_stdout:
+        stdout_h, stdout_f = tempfile.mkstemp()
+        MAP_STDOUT_OPTIONS = {
+        "samtools": {
+            "view": "-o {}",
+            "mpileup": "-o {}",
+            "depad": "-o {}",
+            "calmd": "",  # uses pysam_stdout_fn
+        },
+            "bcftools": {}
+        }
+        
+        stdout_option = None
+        if collection == "bcftools":
+            # in bcftools, most methods accept -o, the exceptions
+            # are below:
+            if method not in ("head", "index", "roh", "stats"):
+                stdout_option = "-o {}"
+        elif method in MAP_STDOUT_OPTIONS[collection]:
+            # special case - samtools view -c outputs on stdout
+            if not(method == "view" and "-c" in args):
+                stdout_option = MAP_STDOUT_OPTIONS[collection][method]
+
+        if stdout_option is not None and not is_usage:
+            os.close(stdout_h)
+            samtools_set_stdout_fn(force_bytes(stdout_f))
+            bcftools_set_stdout_fn(force_bytes(stdout_f))
+            args.extend(stdout_option.format(stdout_f).split(" "))
+            stdout_h = c_open(b"/dev/null", O_WRONLY)
+    else:
+        samtools_set_stdout_fn("-")
+        bcftools_set_stdout_fn("-")
+        stdout_h = c_open(b"/dev/null", O_WRONLY)
+
+    # setup the function call to samtools/bcftools main
+    cdef char ** cargs
+    cdef int i, n, retval, l
+    n = len(args)
+    method = force_bytes(method)
+    collection = force_bytes(collection)
+    args = [force_bytes(a) for a in args]
+
+    # allocate two more for first (dummy) argument (contains command)
+    cdef int extra_args = 0
+    if method == b"index":
+        extra_args = 1
+    # add extra arguments for commands accepting optional arguments
+    # such as 'samtools index x.bam [out.index]'
+    cargs = <char**>calloc(n + 2 + extra_args, sizeof(char *))
+    cargs[0] = collection
+    cargs[1] = method
+
+    # create copies of strings - getopt for long options permutes
+    # arguments
+    for i from 0 <= i < n:
+        l = len(args[i])
+        cargs[i + 2] = <char *>calloc(l + 1, sizeof(char))
+        strncpy(cargs[i + 2], args[i], l)
+
+    # call samtools/bcftools
+    if collection == b"samtools":
+        samtools_set_stdout(stdout_h)
+        samtools_set_stderr(stderr_h)
+        retval = samtools_dispatch(n + 2, cargs)
+        samtools_close_stdout()
+        samtools_close_stderr()
+    elif collection == b"bcftools":
+        bcftools_set_stdout(stdout_h)
+        bcftools_set_stderr(stderr_h)
+        retval = bcftools_dispatch(n + 2, cargs)
+        bcftools_close_stdout()
+        bcftools_close_stderr()
+    else:
+        # unknown -- just return a Unix shell's "command not found" exit status
+        retval = 127
+
+    for i from 0 <= i < n:
+        free(cargs[i + 2])
+    free(cargs)
+
+    # get error messages
+    def _collect(fn):
+        out = []
+        try:
+            with open(fn, "r") as inf:
+                out = inf.read()
+        except UnicodeDecodeError:
+            with open(fn, "rb") as inf:
+                # read binary output
+                out = inf.read()
+        finally:
+            os.remove(fn)
+        return out
+
+    out_stderr = _collect(stderr_f)
+    if save_stdout:
+        out_stdout = None
+    elif catch_stdout:
+        out_stdout = _collect(stdout_f)
+    else:
+        out_stdout = None
+        
+    return retval, out_stderr, out_stdout
+
+
+__all__ = [
+    "qualitystring_to_array",
+    "array_to_qualitystring",
+    "qualities_to_qualitystring",
+    "get_encoding_error_handler",
+    "set_encoding_error_handler",
+]