Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/libcutils.pyx @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/libcutils.pyx Tue Mar 18 16:23:26 2025 -0400 @@ -0,0 +1,443 @@ +# cython: language_level=3 +import types +import sys +import string +import re +import tempfile +import os +import io +from contextlib import contextmanager +from codecs import register_error + +from cpython.version cimport PY_MAJOR_VERSION, PY_MINOR_VERSION +from cpython cimport PyBytes_Check, PyUnicode_Check +from cpython cimport array as c_array +from libc.errno cimport errno +from libc.stdlib cimport calloc, free +from libc.string cimport strncpy +from libc.stdint cimport INT32_MAX, int32_t +from libc.stdio cimport fprintf, stderr, fflush +from libc.stdio cimport stdout as c_stdout +from posix.fcntl cimport open as c_open, O_WRONLY, O_CREAT, O_TRUNC +from posix.unistd cimport SEEK_SET, SEEK_CUR, SEEK_END + +from pysam.libcsamtools cimport samtools_dispatch, samtools_set_stdout, samtools_set_stderr, \ + samtools_close_stdout, samtools_close_stderr, samtools_set_stdout_fn + +from pysam.libcbcftools cimport bcftools_dispatch, bcftools_set_stdout, bcftools_set_stderr, \ + bcftools_close_stdout, bcftools_close_stderr, bcftools_set_stdout_fn + +##################################################################### +# hard-coded constants +cdef int MAX_POS = (1 << 31) - 1 + +################################################################# +# Utility functions for quality string conversions +cpdef c_array.array qualitystring_to_array(input_str, int offset=33): + """convert a qualitystring to an array of quality values.""" + if input_str is None: + return None + qs = force_bytes(input_str) + cdef char i + return c_array.array('B', [i - offset for i in qs]) + + +cpdef array_to_qualitystring(c_array.array qualities, int offset=33): + """convert an array of quality values to a string.""" + if qualities is None: + return None + cdef int x + + cdef c_array.array result + result = c_array.clone(qualities, len(qualities), zero=False) + + for x from 0 <= x < len(qualities): + result[x] = qualities[x] + offset + return force_str(result.tobytes()) + + +cpdef qualities_to_qualitystring(qualities, int offset=33): + """convert a list or array of quality scores to the string + representation used in the SAM format. + + Parameters + ---------- + offset : int + offset to be added to the quality scores to arrive at + the characters of the quality string (default=33). + + Returns + ------- + string + a quality string + + """ + cdef char x + if qualities is None: + return None + elif isinstance(qualities, c_array.array): + return array_to_qualitystring(qualities, offset=offset) + else: + # tuples and lists + return force_str("".join([chr(x + offset) for x in qualities])) + + +######################################################################## +## String encoding configuration facilities +######################################################################## + +# Codec error handler that just interprets each bad byte as ISO-8859-1. +def latin1_replace(exception): + return (chr(exception.object[exception.start]), exception.end) + +register_error('pysam.latin1replace', latin1_replace) + + +cdef str ERROR_HANDLER = 'strict' + +cpdef get_encoding_error_handler(): + return ERROR_HANDLER + +cpdef set_encoding_error_handler(name): + global ERROR_HANDLER + previous = ERROR_HANDLER + ERROR_HANDLER = name + return previous + +######################################################################## +## Python 3 compatibility functions +######################################################################## + +cdef from_string_and_size(const char* s, size_t length): + return s[:length].decode('utf-8', ERROR_HANDLER) + +# filename encoding (adapted from lxml.etree.pyx) +cdef str FILENAME_ENCODING = sys.getfilesystemencoding() or sys.getdefaultencoding() or 'ascii' +cdef str TEXT_ENCODING = 'utf-8' + +cdef bytes encode_filename(object filename): + """Make sure a filename is 8-bit encoded (or None).""" + if filename is None: + return None + return os.fsencode(filename) + + +cdef bytes force_bytes(object s, encoding=None, errors=None): + """convert string or unicode object to bytes, assuming + utf8 encoding. + """ + if s is None: + return None + elif PyBytes_Check(s): + return s + elif PyUnicode_Check(s): + return s.encode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER) + else: + raise TypeError("Argument must be string, bytes or unicode.") + + +cdef charptr_to_str(const char* s, encoding=None, errors=None): + if s == NULL: + return None + return s.decode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER) + + +cdef charptr_to_str_w_len(const char* s, size_t n, encoding=None, errors=None): + if s == NULL: + return None + return s[:n].decode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER) + + +cdef bytes charptr_to_bytes(const char* s, encoding=None, errors=None): + if s == NULL: + return None + else: + return s + + +cdef force_str(object s, encoding=None, errors=None): + """Return s converted to str type of current Python + (bytes in Py2, unicode in Py3)""" + if s is None: + return None + if PyBytes_Check(s): + return s.decode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER) + # assume unicode + return s + + +cdef decode_bytes(bytes s, encoding=None, errors=None): + """Return s converted to current Python's str type, + always decoding even in Python 2""" + if s is None: + return None + else: + return s.decode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER) + + +cpdef parse_region(contig=None, + start=None, + stop=None, + region=None, + reference=None, + end=None): + """parse alternative ways to specify a genomic region. A region can + either be specified by :term:`reference`, `start` and + `end`. `start` and `end` denote 0-based, half-open intervals. + + :term:`reference` and `end` are also accepted for backward + compatibility as synonyms for :term:`contig` and `stop`, + respectively. + + Alternatively, a samtools :term:`region` string can be supplied. + + If any of the coordinates are missing they will be replaced by the + minimum (`start`) or maximum (`end`) coordinate. + + Note that region strings are 1-based, while `start` and `end` + denote an interval in python coordinates. + + Returns + ------- + + tuple : a tuple of `reference`, `start` and `end`. + + Raises + ------ + + ValueError + for invalid or out of bounds regions. + + """ + cdef int32_t rstart + cdef int32_t rstop + + + if reference is not None: + if contig is not None: + raise ValueError('contig and reference should not both be specified') + contig = reference + + if contig is not None and region is not None: + raise ValueError('contig/reference and region should not both be specified') + + if end is not None: + if stop is not None: + raise ValueError('stop and end should not both be specified') + stop = end + + if contig is None and region is None: + raise ValueError("neither contig nor region are given") + + rstart = 0 + rstop = MAX_POS + if start is not None: + try: + rstart = start + except OverflowError: + raise ValueError('start out of range (%i)' % start) + + if stop is not None: + try: + rstop = stop + except OverflowError: + raise ValueError('stop out of range (%i)' % stop) + + if region: + if ":" in region: + contig, coord = region.split(":") + parts = coord.split("-") + rstart = int(parts[0]) - 1 + if len(parts) >= 1: + rstop = int(parts[1]) + else: + contig = region + + if rstart > rstop: + raise ValueError('invalid coordinates: start (%i) > stop (%i)' % (rstart, rstop)) + if not 0 <= rstart < MAX_POS: + raise ValueError('start out of range (%i)' % rstart) + if not 0 <= rstop <= MAX_POS: + raise ValueError('stop out of range (%i)' % rstop) + + return contig, rstart, rstop + + +cdef int libc_whence_from_io(int whence): + # io.SEEK_SET/_CUR/_END are by definition 0/1/2 but C/POSIX's equivalents + # have unspecified values. So we must translate, but checking for 0/1/2 + # rather than io.SEEK_SET/etc suffices. + if whence == 0: return SEEK_SET + if whence == 1: return SEEK_CUR + if whence == 2: return SEEK_END + return whence # Otherwise likely invalid, but let HTSlib or OS report it + + +def _pysam_dispatch(collection, + method, + args=None, + catch_stdout=True, + is_usage=False, + save_stdout=None): + '''call ``method`` in samtools/bcftools providing arguments in args. + + By default, stdout is redirected to a temporary file using the patched + C sources except for a few commands that have an explicit output option + (typically: -o). In these commands (such as samtools view), this explicit + option is used. If *is_usage* is True, then these explicit output options + will not be used. + + Catching of stdout can be turned off by setting *catch_stdout* to + False. + ''' + + if method == "index" and args: + # We make sure that at least the first specified input file exists, + # and if it doesn't we raise an IOError. + ARGUMENTS = ['-m', '--min-shift', '-o', '--output', '--output-file', '-@', '--threads'] + skip_next = False + for arg in args: + if skip_next: + skip_next = False + continue + if arg.startswith('-'): + # Skip next argument for e.g. '--min-shift' '12' or '-m' '12' but not '-m12' + if arg in ARGUMENTS: + skip_next = True + continue + if not os.path.exists(arg): + raise IOError("No such file or directory: '%s'" % arg) + else: + break + + if args is None: + args = [] + else: + args = list(args) + + # redirect stderr to file + stderr_h, stderr_f = tempfile.mkstemp() + + # redirect stdout to file + if save_stdout: + stdout_f = save_stdout + stdout_h = c_open(force_bytes(stdout_f), + O_WRONLY|O_CREAT|O_TRUNC, 0666) + if stdout_h == -1: + raise OSError(errno, "error while opening file for writing", stdout_f) + + samtools_set_stdout_fn(force_bytes(stdout_f)) + bcftools_set_stdout_fn(force_bytes(stdout_f)) + + elif catch_stdout: + stdout_h, stdout_f = tempfile.mkstemp() + MAP_STDOUT_OPTIONS = { + "samtools": { + "view": "-o {}", + "mpileup": "-o {}", + "depad": "-o {}", + "calmd": "", # uses pysam_stdout_fn + }, + "bcftools": {} + } + + stdout_option = None + if collection == "bcftools": + # in bcftools, most methods accept -o, the exceptions + # are below: + if method not in ("head", "index", "roh", "stats"): + stdout_option = "-o {}" + elif method in MAP_STDOUT_OPTIONS[collection]: + # special case - samtools view -c outputs on stdout + if not(method == "view" and "-c" in args): + stdout_option = MAP_STDOUT_OPTIONS[collection][method] + + if stdout_option is not None and not is_usage: + os.close(stdout_h) + samtools_set_stdout_fn(force_bytes(stdout_f)) + bcftools_set_stdout_fn(force_bytes(stdout_f)) + args.extend(stdout_option.format(stdout_f).split(" ")) + stdout_h = c_open(b"/dev/null", O_WRONLY) + else: + samtools_set_stdout_fn("-") + bcftools_set_stdout_fn("-") + stdout_h = c_open(b"/dev/null", O_WRONLY) + + # setup the function call to samtools/bcftools main + cdef char ** cargs + cdef int i, n, retval, l + n = len(args) + method = force_bytes(method) + collection = force_bytes(collection) + args = [force_bytes(a) for a in args] + + # allocate two more for first (dummy) argument (contains command) + cdef int extra_args = 0 + if method == b"index": + extra_args = 1 + # add extra arguments for commands accepting optional arguments + # such as 'samtools index x.bam [out.index]' + cargs = <char**>calloc(n + 2 + extra_args, sizeof(char *)) + cargs[0] = collection + cargs[1] = method + + # create copies of strings - getopt for long options permutes + # arguments + for i from 0 <= i < n: + l = len(args[i]) + cargs[i + 2] = <char *>calloc(l + 1, sizeof(char)) + strncpy(cargs[i + 2], args[i], l) + + # call samtools/bcftools + if collection == b"samtools": + samtools_set_stdout(stdout_h) + samtools_set_stderr(stderr_h) + retval = samtools_dispatch(n + 2, cargs) + samtools_close_stdout() + samtools_close_stderr() + elif collection == b"bcftools": + bcftools_set_stdout(stdout_h) + bcftools_set_stderr(stderr_h) + retval = bcftools_dispatch(n + 2, cargs) + bcftools_close_stdout() + bcftools_close_stderr() + else: + # unknown -- just return a Unix shell's "command not found" exit status + retval = 127 + + for i from 0 <= i < n: + free(cargs[i + 2]) + free(cargs) + + # get error messages + def _collect(fn): + out = [] + try: + with open(fn, "r") as inf: + out = inf.read() + except UnicodeDecodeError: + with open(fn, "rb") as inf: + # read binary output + out = inf.read() + finally: + os.remove(fn) + return out + + out_stderr = _collect(stderr_f) + if save_stdout: + out_stdout = None + elif catch_stdout: + out_stdout = _collect(stdout_f) + else: + out_stdout = None + + return retval, out_stderr, out_stdout + + +__all__ = [ + "qualitystring_to_array", + "array_to_qualitystring", + "qualities_to_qualitystring", + "get_encoding_error_handler", + "set_encoding_error_handler", +]