Mercurial > repos > rliterman > csp2
view CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/libcutils.pyx @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line source
# cython: language_level=3 import types import sys import string import re import tempfile import os import io from contextlib import contextmanager from codecs import register_error from cpython.version cimport PY_MAJOR_VERSION, PY_MINOR_VERSION from cpython cimport PyBytes_Check, PyUnicode_Check from cpython cimport array as c_array from libc.errno cimport errno from libc.stdlib cimport calloc, free from libc.string cimport strncpy from libc.stdint cimport INT32_MAX, int32_t from libc.stdio cimport fprintf, stderr, fflush from libc.stdio cimport stdout as c_stdout from posix.fcntl cimport open as c_open, O_WRONLY, O_CREAT, O_TRUNC from posix.unistd cimport SEEK_SET, SEEK_CUR, SEEK_END from pysam.libcsamtools cimport samtools_dispatch, samtools_set_stdout, samtools_set_stderr, \ samtools_close_stdout, samtools_close_stderr, samtools_set_stdout_fn from pysam.libcbcftools cimport bcftools_dispatch, bcftools_set_stdout, bcftools_set_stderr, \ bcftools_close_stdout, bcftools_close_stderr, bcftools_set_stdout_fn ##################################################################### # hard-coded constants cdef int MAX_POS = (1 << 31) - 1 ################################################################# # Utility functions for quality string conversions cpdef c_array.array qualitystring_to_array(input_str, int offset=33): """convert a qualitystring to an array of quality values.""" if input_str is None: return None qs = force_bytes(input_str) cdef char i return c_array.array('B', [i - offset for i in qs]) cpdef array_to_qualitystring(c_array.array qualities, int offset=33): """convert an array of quality values to a string.""" if qualities is None: return None cdef int x cdef c_array.array result result = c_array.clone(qualities, len(qualities), zero=False) for x from 0 <= x < len(qualities): result[x] = qualities[x] + offset return force_str(result.tobytes()) cpdef qualities_to_qualitystring(qualities, int offset=33): """convert a list or array of quality scores to the string representation used in the SAM format. Parameters ---------- offset : int offset to be added to the quality scores to arrive at the characters of the quality string (default=33). Returns ------- string a quality string """ cdef char x if qualities is None: return None elif isinstance(qualities, c_array.array): return array_to_qualitystring(qualities, offset=offset) else: # tuples and lists return force_str("".join([chr(x + offset) for x in qualities])) ######################################################################## ## String encoding configuration facilities ######################################################################## # Codec error handler that just interprets each bad byte as ISO-8859-1. def latin1_replace(exception): return (chr(exception.object[exception.start]), exception.end) register_error('pysam.latin1replace', latin1_replace) cdef str ERROR_HANDLER = 'strict' cpdef get_encoding_error_handler(): return ERROR_HANDLER cpdef set_encoding_error_handler(name): global ERROR_HANDLER previous = ERROR_HANDLER ERROR_HANDLER = name return previous ######################################################################## ## Python 3 compatibility functions ######################################################################## cdef from_string_and_size(const char* s, size_t length): return s[:length].decode('utf-8', ERROR_HANDLER) # filename encoding (adapted from lxml.etree.pyx) cdef str FILENAME_ENCODING = sys.getfilesystemencoding() or sys.getdefaultencoding() or 'ascii' cdef str TEXT_ENCODING = 'utf-8' cdef bytes encode_filename(object filename): """Make sure a filename is 8-bit encoded (or None).""" if filename is None: return None return os.fsencode(filename) cdef bytes force_bytes(object s, encoding=None, errors=None): """convert string or unicode object to bytes, assuming utf8 encoding. """ if s is None: return None elif PyBytes_Check(s): return s elif PyUnicode_Check(s): return s.encode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER) else: raise TypeError("Argument must be string, bytes or unicode.") cdef charptr_to_str(const char* s, encoding=None, errors=None): if s == NULL: return None return s.decode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER) cdef charptr_to_str_w_len(const char* s, size_t n, encoding=None, errors=None): if s == NULL: return None return s[:n].decode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER) cdef bytes charptr_to_bytes(const char* s, encoding=None, errors=None): if s == NULL: return None else: return s cdef force_str(object s, encoding=None, errors=None): """Return s converted to str type of current Python (bytes in Py2, unicode in Py3)""" if s is None: return None if PyBytes_Check(s): return s.decode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER) # assume unicode return s cdef decode_bytes(bytes s, encoding=None, errors=None): """Return s converted to current Python's str type, always decoding even in Python 2""" if s is None: return None else: return s.decode(encoding or TEXT_ENCODING, errors or ERROR_HANDLER) cpdef parse_region(contig=None, start=None, stop=None, region=None, reference=None, end=None): """parse alternative ways to specify a genomic region. A region can either be specified by :term:`reference`, `start` and `end`. `start` and `end` denote 0-based, half-open intervals. :term:`reference` and `end` are also accepted for backward compatibility as synonyms for :term:`contig` and `stop`, respectively. Alternatively, a samtools :term:`region` string can be supplied. If any of the coordinates are missing they will be replaced by the minimum (`start`) or maximum (`end`) coordinate. Note that region strings are 1-based, while `start` and `end` denote an interval in python coordinates. Returns ------- tuple : a tuple of `reference`, `start` and `end`. Raises ------ ValueError for invalid or out of bounds regions. """ cdef int32_t rstart cdef int32_t rstop if reference is not None: if contig is not None: raise ValueError('contig and reference should not both be specified') contig = reference if contig is not None and region is not None: raise ValueError('contig/reference and region should not both be specified') if end is not None: if stop is not None: raise ValueError('stop and end should not both be specified') stop = end if contig is None and region is None: raise ValueError("neither contig nor region are given") rstart = 0 rstop = MAX_POS if start is not None: try: rstart = start except OverflowError: raise ValueError('start out of range (%i)' % start) if stop is not None: try: rstop = stop except OverflowError: raise ValueError('stop out of range (%i)' % stop) if region: if ":" in region: contig, coord = region.split(":") parts = coord.split("-") rstart = int(parts[0]) - 1 if len(parts) >= 1: rstop = int(parts[1]) else: contig = region if rstart > rstop: raise ValueError('invalid coordinates: start (%i) > stop (%i)' % (rstart, rstop)) if not 0 <= rstart < MAX_POS: raise ValueError('start out of range (%i)' % rstart) if not 0 <= rstop <= MAX_POS: raise ValueError('stop out of range (%i)' % rstop) return contig, rstart, rstop cdef int libc_whence_from_io(int whence): # io.SEEK_SET/_CUR/_END are by definition 0/1/2 but C/POSIX's equivalents # have unspecified values. So we must translate, but checking for 0/1/2 # rather than io.SEEK_SET/etc suffices. if whence == 0: return SEEK_SET if whence == 1: return SEEK_CUR if whence == 2: return SEEK_END return whence # Otherwise likely invalid, but let HTSlib or OS report it def _pysam_dispatch(collection, method, args=None, catch_stdout=True, is_usage=False, save_stdout=None): '''call ``method`` in samtools/bcftools providing arguments in args. By default, stdout is redirected to a temporary file using the patched C sources except for a few commands that have an explicit output option (typically: -o). In these commands (such as samtools view), this explicit option is used. If *is_usage* is True, then these explicit output options will not be used. Catching of stdout can be turned off by setting *catch_stdout* to False. ''' if method == "index" and args: # We make sure that at least the first specified input file exists, # and if it doesn't we raise an IOError. ARGUMENTS = ['-m', '--min-shift', '-o', '--output', '--output-file', '-@', '--threads'] skip_next = False for arg in args: if skip_next: skip_next = False continue if arg.startswith('-'): # Skip next argument for e.g. '--min-shift' '12' or '-m' '12' but not '-m12' if arg in ARGUMENTS: skip_next = True continue if not os.path.exists(arg): raise IOError("No such file or directory: '%s'" % arg) else: break if args is None: args = [] else: args = list(args) # redirect stderr to file stderr_h, stderr_f = tempfile.mkstemp() # redirect stdout to file if save_stdout: stdout_f = save_stdout stdout_h = c_open(force_bytes(stdout_f), O_WRONLY|O_CREAT|O_TRUNC, 0666) if stdout_h == -1: raise OSError(errno, "error while opening file for writing", stdout_f) samtools_set_stdout_fn(force_bytes(stdout_f)) bcftools_set_stdout_fn(force_bytes(stdout_f)) elif catch_stdout: stdout_h, stdout_f = tempfile.mkstemp() MAP_STDOUT_OPTIONS = { "samtools": { "view": "-o {}", "mpileup": "-o {}", "depad": "-o {}", "calmd": "", # uses pysam_stdout_fn }, "bcftools": {} } stdout_option = None if collection == "bcftools": # in bcftools, most methods accept -o, the exceptions # are below: if method not in ("head", "index", "roh", "stats"): stdout_option = "-o {}" elif method in MAP_STDOUT_OPTIONS[collection]: # special case - samtools view -c outputs on stdout if not(method == "view" and "-c" in args): stdout_option = MAP_STDOUT_OPTIONS[collection][method] if stdout_option is not None and not is_usage: os.close(stdout_h) samtools_set_stdout_fn(force_bytes(stdout_f)) bcftools_set_stdout_fn(force_bytes(stdout_f)) args.extend(stdout_option.format(stdout_f).split(" ")) stdout_h = c_open(b"/dev/null", O_WRONLY) else: samtools_set_stdout_fn("-") bcftools_set_stdout_fn("-") stdout_h = c_open(b"/dev/null", O_WRONLY) # setup the function call to samtools/bcftools main cdef char ** cargs cdef int i, n, retval, l n = len(args) method = force_bytes(method) collection = force_bytes(collection) args = [force_bytes(a) for a in args] # allocate two more for first (dummy) argument (contains command) cdef int extra_args = 0 if method == b"index": extra_args = 1 # add extra arguments for commands accepting optional arguments # such as 'samtools index x.bam [out.index]' cargs = <char**>calloc(n + 2 + extra_args, sizeof(char *)) cargs[0] = collection cargs[1] = method # create copies of strings - getopt for long options permutes # arguments for i from 0 <= i < n: l = len(args[i]) cargs[i + 2] = <char *>calloc(l + 1, sizeof(char)) strncpy(cargs[i + 2], args[i], l) # call samtools/bcftools if collection == b"samtools": samtools_set_stdout(stdout_h) samtools_set_stderr(stderr_h) retval = samtools_dispatch(n + 2, cargs) samtools_close_stdout() samtools_close_stderr() elif collection == b"bcftools": bcftools_set_stdout(stdout_h) bcftools_set_stderr(stderr_h) retval = bcftools_dispatch(n + 2, cargs) bcftools_close_stdout() bcftools_close_stderr() else: # unknown -- just return a Unix shell's "command not found" exit status retval = 127 for i from 0 <= i < n: free(cargs[i + 2]) free(cargs) # get error messages def _collect(fn): out = [] try: with open(fn, "r") as inf: out = inf.read() except UnicodeDecodeError: with open(fn, "rb") as inf: # read binary output out = inf.read() finally: os.remove(fn) return out out_stderr = _collect(stderr_f) if save_stdout: out_stdout = None elif catch_stdout: out_stdout = _collect(stdout_f) else: out_stdout = None return retval, out_stderr, out_stdout __all__ = [ "qualitystring_to_array", "array_to_qualitystring", "qualities_to_qualitystring", "get_encoding_error_handler", "set_encoding_error_handler", ]