Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/libctabixproxies.pyx @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/libctabixproxies.pyx Tue Mar 18 17:55:14 2025 -0400 @@ -0,0 +1,835 @@ +from cpython cimport PyBytes_FromStringAndSize + +from libc.stdio cimport printf, feof, fgets +from libc.string cimport strcpy, strlen, memcmp, memcpy, memchr, strstr, strchr +from libc.stdlib cimport free, malloc, calloc, realloc +from libc.stdlib cimport atoi, atol, atof + +from pysam.libcutils cimport force_bytes, force_str, charptr_to_str +from pysam.libcutils cimport encode_filename, from_string_and_size + +import collections +import copy + + +cdef char *StrOrEmpty(char * buffer): + if buffer == NULL: + return "" + else: return buffer + + +cdef int isNew(char * p, char * buffer, size_t nbytes): + """return True if `p` is located within `buffer` of size + `nbytes` + """ + if p == NULL: + return 0 + + return not (buffer <= p <= buffer + nbytes) + + +cdef class TupleProxy: + '''Proxy class for access to parsed row as a tuple. + + This class represents a table row for fast read-access. + + Access to individual fields is via the [] operator. + + Only read-only access is implemented. + + ''' + + def __cinit__(self, encoding="ascii"): + self.data = NULL + self.fields = NULL + self.nbytes = 0 + self.is_modified = 0 + self.nfields = 0 + # start counting at field offset + self.offset = 0 + self.encoding = encoding + + def __dealloc__(self): + cdef int x + if self.is_modified: + for x from 0 <= x < self.nfields: + if isNew(self.fields[x], self.data, self.nbytes): + free(self.fields[x]) + self.fields[x] = NULL + + if self.data != NULL: + free(self.data) + if self.fields != NULL: + free(self.fields) + + def __copy__(self): + if self.is_modified: + raise NotImplementedError( + "copying modified tuples is not implemented") + cdef TupleProxy n = type(self)() + n.copy(self.data, self.nbytes, reset=True) + return n + + def compare(self, TupleProxy other): + '''return -1,0,1, if contents in this are binary + <,=,> to *other* + + ''' + if self.is_modified or other.is_modified: + raise NotImplementedError( + 'comparison of modified TupleProxies is not implemented') + if self.data == other.data: + return 0 + + if self.nbytes < other.nbytes: + return -1 + elif self.nbytes > other.nbytes: + return 1 + return memcmp(self.data, other.data, self.nbytes) + + def __richcmp__(self, TupleProxy other, int op): + if op == 2: # == operator + return self.compare(other) == 0 + elif op == 3: # != operator + return self.compare(other) != 0 + else: + err_msg = "op {0} isn't implemented yet".format(op) + raise NotImplementedError(err_msg) + + cdef take(self, char * buffer, size_t nbytes): + '''start presenting buffer. + + Take ownership of the pointer. + ''' + self.data = buffer + self.nbytes = nbytes + self.update(buffer, nbytes) + + cdef present(self, char * buffer, size_t nbytes): + '''start presenting buffer. + + Do not take ownership of the pointer. + ''' + self.update(buffer, nbytes) + + cdef copy(self, char * buffer, size_t nbytes, bint reset=False): + '''start presenting buffer of size *nbytes*. + + Buffer is a '\0'-terminated string without the '\n'. + + Take a copy of buffer. + ''' + # +1 for '\0' + cdef int s = sizeof(char) * (nbytes + 1) + self.data = <char*>malloc(s) + if self.data == NULL: + raise ValueError("out of memory in TupleProxy.copy()") + memcpy(<char*>self.data, buffer, s) + + if reset: + for x from 0 <= x < nbytes: + if self.data[x] == b'\0': + self.data[x] = b'\t' + + self.update(self.data, nbytes) + + cpdef int getMinFields(self): + '''return minimum number of fields.''' + # 1 is not a valid tabix entry, but TupleProxy + # could be more generic. + return 1 + + cpdef int getMaxFields(self): + '''return maximum number of fields. Return + 0 for unknown length.''' + return 0 + + cdef update(self, char * buffer, size_t nbytes): + '''update internal data. + + *buffer* is a \0 terminated string. + + *nbytes* is the number of bytes in buffer (excluding + the \0) + + Update starts work in buffer, thus can be used + to collect any number of fields until nbytes + is exhausted. + + If max_fields is set, the number of fields is initialized to + max_fields. + + ''' + cdef char * pos + cdef char * old_pos + cdef int field + cdef int max_fields, min_fields, x + + assert strlen(buffer) == nbytes, \ + "length of buffer (%i) != number of bytes (%i)" % ( + strlen(buffer), nbytes) + + if buffer[nbytes] != 0: + raise ValueError("incomplete line at %s" % buffer) + + ################################# + # remove line breaks and feeds and update number of bytes + x = nbytes - 1 + while x > 0 and (buffer[x] == b'\n' or buffer[x] == b'\r'): + buffer[x] = b'\0' + x -= 1 + self.nbytes = x + 1 + + ################################# + # clear data + if self.fields != NULL: + free(self.fields) + + for field from 0 <= field < self.nfields: + if isNew(self.fields[field], self.data, self.nbytes): + free(self.fields[field]) + + self.is_modified = self.nfields = 0 + + ################################# + # allocate new + max_fields = self.getMaxFields() + # pre-count fields - better would be + # to guess or dynamically grow + if max_fields == 0: + for x from 0 <= x < nbytes: + if buffer[x] == b'\t': + max_fields += 1 + max_fields += 1 + + self.fields = <char **>calloc(max_fields, sizeof(char *)) + if self.fields == NULL: + raise ValueError("out of memory in TupleProxy.update()") + + ################################# + # start filling + field = 0 + self.fields[field] = pos = buffer + field += 1 + old_pos = pos + while 1: + + pos = <char*>memchr(pos, b'\t', nbytes) + if pos == NULL: + break + if field >= max_fields: + raise ValueError( + "parsing error: more than %i fields in line: %s" % + (max_fields, buffer)) + + pos[0] = b'\0' + pos += 1 + self.fields[field] = pos + field += 1 + nbytes -= pos - old_pos + if nbytes < 0: + break + old_pos = pos + self.nfields = field + if self.nfields < self.getMinFields(): + raise ValueError( + "parsing error: fewer than %i fields in line: %s" % + (self.getMinFields(), buffer)) + + def _getindex(self, int index): + '''return item at idx index''' + cdef int i = index + if i < 0: + i += self.nfields + if i < 0: + raise IndexError("list index out of range") + # apply offset - separating a fixed number + # of fields from a variable number such as in VCF + i += self.offset + if i >= self.nfields: + raise IndexError( + "list index out of range %i >= %i" % + (i, self.nfields)) + return force_str(self.fields[i], self.encoding) + + def __getitem__(self, key): + if type(key) == int: + return self._getindex(key) + # slice object + start, end, step = key.indices(self.nfields) + result = [] + for index in range(start, end, step): + result.append(self._getindex(index)) + return result + + def _setindex(self, index, value): + '''set item at idx index.''' + cdef int idx = index + if idx < 0: + raise IndexError("list index out of range") + if idx >= self.nfields: + raise IndexError("list index out of range") + + if isNew(self.fields[idx], self.data, self.nbytes): + free(self.fields[idx]) + + self.is_modified = 1 + + if value is None: + self.fields[idx] = NULL + return + + # conversion with error checking + value = force_bytes(value) + cdef char * tmp = <char*>value + self.fields[idx] = <char*>malloc((strlen( tmp ) + 1) * sizeof(char)) + if self.fields[idx] == NULL: + raise ValueError("out of memory" ) + strcpy(self.fields[idx], tmp) + + def __setitem__(self, index, value): + '''set item at *index* to *value*''' + cdef int i = index + if i < 0: + i += self.nfields + i += self.offset + + self._setindex(i, value) + + def __len__(self): + return self.nfields + + def __iter__(self): + return TupleProxyIterator(self) + + def __str__(self): + '''return original data''' + # copy and replace \0 bytes with \t characters + cdef char * cpy + if self.is_modified: + # todo: treat NULL values + result = [] + for x in xrange(0, self.nfields): + result.append(StrOrEmpty(self.fields[x]).decode(self.encoding)) + return "\t".join(result) + else: + cpy = <char*>calloc(sizeof(char), self.nbytes+1) + if cpy == NULL: + raise ValueError("out of memory") + memcpy(cpy, self.data, self.nbytes+1) + for x from 0 <= x < self.nbytes: + if cpy[x] == b'\0': + cpy[x] = b'\t' + result = cpy[:self.nbytes] + free(cpy) + r = result.decode(self.encoding) + return r + + +cdef class TupleProxyIterator: + def __init__(self, proxy): + self.proxy = proxy + self.index = 0 + + def __iter__(self): + return self + + def __next__(self): + if self.index >= self.proxy.nfields: + raise StopIteration + cdef char *retval = self.proxy.fields[self.index] + self.index += 1 + return force_str(retval, self.proxy.encoding) if retval != NULL else None + + +def toDot(v): + '''convert value to '.' if None''' + if v is None: + return "." + else: + return str(v) + +def quote(v): + '''return a quoted attribute.''' + if isinstance(v, str): + return '"%s"' % v + else: + return str(v) + + +cdef class NamedTupleProxy(TupleProxy): + + map_key2field = {} + + def __setattr__(self, key, value): + '''set attribute.''' + cdef int idx + idx, f = self.map_key2field[key] + if idx >= self.nfields: + raise KeyError("field %s not set" % key) + TupleProxy.__setitem__(self, idx, str(value)) + + def __getattr__(self, key): + cdef int idx + idx, f = self.map_key2field[key] + if idx >= self.nfields: + raise KeyError("field %s not set" % key) + if f == str: + return force_str(self.fields[idx], + self.encoding) + return f(self.fields[idx]) + + +cdef dot_or_float(v): + if v == "" or v == b".": + return None + else: + try: + return int(v) + except ValueError: + return float(v) + + +cdef dot_or_int(v): + if v == "" or v == b".": + return None + else: + return int(v) + + +cdef dot_or_str(v): + if v == "" or v == b".": + return None + else: + return force_str(v) + + +cdef int from1based(v): + return atoi(v) - 1 + + +cdef str to1based(int v): + return str(v + 1) + + +cdef class GTFProxy(NamedTupleProxy): + '''Proxy class for access to GTF fields. + + This class represents a GTF entry for fast read-access. + Write-access has been added as well, though some care must + be taken. If any of the string fields (contig, source, ...) + are set, the new value is tied to the lifetime of the + argument that was supplied. + + The only exception is the attributes field when set from + a dictionary - this field will manage its own memory. + + ''' + separator = "; " + + # first value is field index, the tuple contains conversion + # functions for getting (converting internal string representation + # to pythonic value) and setting (converting pythonic value to + # interval string representation) + map_key2field = { + 'contig' : (0, (str, str)), + 'source' : (1, (dot_or_str, str)), + 'feature': (2, (dot_or_str, str)), + 'start' : (3, (from1based, to1based)), + 'end' : (4, (int, int)), + 'score' : (5, (dot_or_float, toDot)), + 'strand' : (6, (dot_or_str, str)), + 'frame' : (7, (dot_or_int, toDot)), + 'attributes': (8, (str, str))} + + def __cinit__(self): + # automatically calls TupleProxy.__cinit__ + self.attribute_dict = None + + cpdef int getMinFields(self): + '''return minimum number of fields.''' + return 9 + + cpdef int getMaxFields(self): + '''return max number of fields.''' + return 9 + + def to_dict(self): + """parse attributes - return as dict + + The dictionary can be modified to update attributes. + """ + if not self.attribute_dict: + self.attribute_dict = self.attribute_string2dict( + self.attributes) + self.is_modified = True + return self.attribute_dict + + def as_dict(self): + """deprecated: use :meth:`to_dict` + """ + return self.to_dict() + + def from_dict(self, d): + '''set attributes from a dictionary.''' + self.attribute_dict = None + attribute_string = force_bytes( + self.attribute_dict2string(d), + self.encoding) + self._setindex(8, attribute_string) + + def __str__(self): + cdef char * cpy + cdef int x + + if self.is_modified: + return "\t".join( + (self.contig, + toDot(self.source), + toDot(self.feature), + str(self.start + 1), + str(self.end), + toDot(self.score), + toDot(self.strand), + toDot(self.frame), + self.attributes)) + else: + return super().__str__() + + def invert(self, int lcontig): + '''invert coordinates to negative strand coordinates + + This method will only act if the feature is on the + negative strand.''' + + if self.strand[0] == '-': + start = min(self.start, self.end) + end = max(self.start, self.end) + self.start, self.end = lcontig - end, lcontig - start + + def keys(self): + '''return a list of attributes defined in this entry.''' + if not self.attribute_dict: + self.attribute_dict = self.attribute_string2dict( + self.attributes) + return self.attribute_dict.keys() + + def __getitem__(self, key): + return self.__getattr__(key) + + def setAttribute(self, name, value): + '''convenience method to set an attribute. + ''' + if not self.attribute_dict: + self.attribute_dict = self.attribute_string2dict( + self.attributes) + self.attribute_dict[name] = value + self.is_modified = True + + def attribute_string2dict(self, s): + return collections.OrderedDict( + self.attribute_string2iterator(s)) + + def __cmp__(self, other): + return (self.contig, self.strand, self.start) < \ + (other.contig, other.strand, other.start) + + # python 3 compatibility + def __richcmp__(GTFProxy self, GTFProxy other, int op): + if op == 0: + return (self.contig, self.strand, self.start) < \ + (other.contig, other.strand, other.start) + elif op == 1: + return (self.contig, self.strand, self.start) <= \ + (other.contig, other.strand, other.start) + elif op == 2: + return self.compare(other) == 0 + elif op == 3: + return self.compare(other) != 0 + else: + err_msg = "op {0} isn't implemented yet".format(op) + raise NotImplementedError(err_msg) + + def dict2attribute_string(self, d): + """convert dictionary to attribute string in GTF format. + + """ + aa = [] + for k, v in d.items(): + if isinstance(v, str): + aa.append('{} "{}"'.format(k, v)) + else: + aa.append("{} {}".format(k, str(v))) + + return self.separator.join(aa) + ";" + + def attribute_string2iterator(self, s): + """convert attribute string in GTF format to records + and iterate over key, value pairs. + """ + + # remove comments + attributes = force_str(s, encoding=self.encoding) + + # separate into fields + # Fields might contain a ";", for example in ENSEMBL GTF file + # for mouse, v78: + # ...; transcript_name "TXNRD2;-001"; .... + # The current heuristic is to split on a semicolon followed by a + # space, see also http://mblab.wustl.edu/GTF22.html + + # Remove white space to prevent a last empty field. + fields = [x.strip() for x in attributes.strip().split("; ")] + for f in fields: + + # strip semicolon (GTF files without a space after the last semicolon) + if f.endswith(";"): + f = f[:-1] + + # split at most once in order to avoid separating + # multi-word values + d = [x.strip() for x in f.split(" ", 1)] + + n, v = d[0], d[1] + if len(d) > 2: + v = d[1:] + + if v[0] == '"' and v[-1] == '"': + v = v[1:-1] + else: + ## try to convert to a value + try: + v = float(v) + v = int(v) + except ValueError: + pass + except TypeError: + pass + + yield n, v + + def __getattr__(self, key): + """Generic lookup of attribute from GFF/GTF attributes + """ + + # Only called if there *isn't* an attribute with this name + cdef int idx + idx, f = self.map_key2field.get(key, (-1, None)) + if idx >= 0: + # deal with known attributes (fields 0-8) + if idx == 8: + # flush attributes if requested + if self.is_modified and self.attribute_dict is not None: + s = self.dict2attribute_string(self.attribute_dict) + TupleProxy._setindex(self, idx, s) + self.attribute_dict = None + return s + + if f[0] == str: + return force_str(self.fields[idx], + self.encoding) + else: + return f[0](self.fields[idx]) + else: + # deal with generic attributes (gene_id, ...) + if self.attribute_dict is None: + self.attribute_dict = self.attribute_string2dict( + self.attributes) + return self.attribute_dict[key] + + def __setattr__(self, key, value): + '''set attribute.''' + + # Note that __setattr__ is called before properties, so __setattr__ and + # properties don't mix well. This is different from __getattr__ which is + # called after any properties have been resolved. + cdef int idx + idx, f = self.map_key2field.get(key, (-1, None)) + + if idx >= 0: + if value is None: + s = "." + elif f[1] == str: + s = force_bytes(value, + self.encoding) + else: + s = str(f[1](value)) + TupleProxy._setindex(self, idx, s) + else: + if self.attribute_dict is None: + self.attribute_dict = self.attribute_string2dict( + self.attributes) + self.attribute_dict[key] = value + self.is_modified = True + + # for backwards compatibility + def asDict(self, *args, **kwargs): + return self.to_dict(*args, **kwargs) + + def fromDict(self, *args, **kwargs): + return self.from_dict(*args, **kwargs) + + +cdef class GFF3Proxy(GTFProxy): + + def dict2attribute_string(self, d): + """convert dictionary to attribute string.""" + return ";".join(["{}={}".format(k, v) for k, v in d.items()]) + + def attribute_string2iterator(self, s): + """convert attribute string in GFF3 format to records + and iterate over key, value pairs. + """ + + for f in (x.strip() for x in s.split(";")): + if not f: + continue + + key, value = f.split("=", 1) + value = value.strip() + + ## try to convert to a value + try: + value = float(value) + value = int(value) + except ValueError: + pass + except TypeError: + pass + + yield key.strip(), value + + +cdef class BedProxy(NamedTupleProxy): + '''Proxy class for access to Bed fields. + + This class represents a BED entry for fast read-access. + ''' + map_key2field = { + 'contig' : (0, str), + 'start' : (1, int), + 'end' : (2, int), + 'name' : (3, str), + 'score' : (4, float), + 'strand' : (5, str), + 'thickStart' : (6, int), + 'thickEnd' : (7, int), + 'itemRGB' : (8, str), + 'blockCount': (9, int), + 'blockSizes': (10, str), + 'blockStarts': (11, str), } + + cpdef int getMinFields(self): + '''return minimum number of fields.''' + return 3 + + cpdef int getMaxFields(self): + '''return max number of fields.''' + return 12 + + cdef update(self, char * buffer, size_t nbytes): + '''update internal data. + + nbytes does not include the terminal '\0'. + ''' + NamedTupleProxy.update(self, buffer, nbytes) + + if self.nfields < 3: + raise ValueError( + "bed format requires at least three columns") + + # determines bed format + self.bedfields = self.nfields + + # do automatic conversion + self.contig = self.fields[0] + self.start = atoi(self.fields[1]) + self.end = atoi(self.fields[2]) + + # __setattr__ in base class seems to take precedence + # hence implement setters in __setattr__ + #property start: + # def __get__( self ): return self.start + #property end: + # def __get__( self ): return self.end + + def __str__(self): + cdef int save_fields = self.nfields + # ensure fields to use correct format + self.nfields = self.bedfields + retval = super().__str__() + self.nfields = save_fields + return retval + + def __setattr__(self, key, value): + '''set attribute.''' + if key == "start": + self.start = value + elif key == "end": + self.end = value + + cdef int idx + idx, f = self.map_key2field[key] + TupleProxy._setindex(self, idx, str(value)) + + +cdef class VCFProxy(NamedTupleProxy): + '''Proxy class for access to VCF fields. + + The genotypes are accessed via a numeric index. + Sample headers are not available. + ''' + map_key2field = { + 'contig' : (0, str), + 'pos' : (1, int), + 'id' : (2, str), + 'ref' : (3, str), + 'alt' : (4, str), + 'qual' : (5, str), + 'filter' : (6, str), + 'info' : (7, str), + 'format' : (8, str) } + + def __cinit__(self): + # automatically calls TupleProxy.__cinit__ + # start indexed access at genotypes + self.offset = 9 + + cdef update(self, char * buffer, size_t nbytes): + '''update internal data. + + nbytes does not include the terminal '\0'. + ''' + NamedTupleProxy.update(self, buffer, nbytes) + + self.contig = self.fields[0] + # vcf counts from 1 - correct here + self.pos = atoi(self.fields[1]) - 1 + + def __len__(self): + '''return number of genotype fields.''' + return max(0, self.nfields - 9) + + property pos: + '''feature end (in 0-based open/closed coordinates).''' + def __get__(self): + return self.pos + + def __setattr__(self, key, value): + '''set attribute.''' + if key == "pos": + self.pos = value + value += 1 + + cdef int idx + idx, f = self.map_key2field[key] + TupleProxy._setindex(self, idx, str(value)) + + +__all__ = [ + "TupleProxy", + "NamedTupleProxy", + "GTFProxy", + "GFF3Proxy", + "BedProxy", + "VCFProxy"]