jpayne@69: # cython: language_level=3 jpayne@69: # cython: embedsignature=True jpayne@69: # cython: profile=True jpayne@69: ############################################################################### jpayne@69: ############################################################################### jpayne@69: # Cython wrapper for SAM/BAM/CRAM files based on htslib jpayne@69: ############################################################################### jpayne@69: # The principal classes defined in this module are: jpayne@69: # jpayne@69: # class AlignedSegment an aligned segment (read) jpayne@69: # jpayne@69: # class PileupColumn a collection of segments (PileupRead) aligned to jpayne@69: # a particular genomic position. jpayne@69: # jpayne@69: # class PileupRead an AlignedSegment aligned to a particular genomic jpayne@69: # position. Contains additional attributes with respect jpayne@69: # to this. jpayne@69: # jpayne@69: # Additionally this module defines numerous additional classes that are part jpayne@69: # of the internal API. These are: jpayne@69: # jpayne@69: # Various iterator classes to iterate over alignments in sequential (IteratorRow) jpayne@69: # or in a stacked fashion (IteratorColumn): jpayne@69: # jpayne@69: # class IteratorRow jpayne@69: # class IteratorRowRegion jpayne@69: # class IteratorRowHead jpayne@69: # class IteratorRowAll jpayne@69: # class IteratorRowAllRefs jpayne@69: # class IteratorRowSelection jpayne@69: # jpayne@69: ############################################################################### jpayne@69: # jpayne@69: # The MIT License jpayne@69: # jpayne@69: # Copyright (c) 2015 Andreas Heger jpayne@69: # jpayne@69: # Permission is hereby granted, free of charge, to any person obtaining a jpayne@69: # copy of this software and associated documentation files (the "Software"), jpayne@69: # to deal in the Software without restriction, including without limitation jpayne@69: # the rights to use, copy, modify, merge, publish, distribute, sublicense, jpayne@69: # and/or sell copies of the Software, and to permit persons to whom the jpayne@69: # Software is furnished to do so, subject to the following conditions: jpayne@69: # jpayne@69: # The above copyright notice and this permission notice shall be included in jpayne@69: # all copies or substantial portions of the Software. jpayne@69: # jpayne@69: # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR jpayne@69: # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, jpayne@69: # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL jpayne@69: # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER jpayne@69: # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING jpayne@69: # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER jpayne@69: # DEALINGS IN THE SOFTWARE. jpayne@69: # jpayne@69: ############################################################################### jpayne@69: import re jpayne@69: import array jpayne@69: import json jpayne@69: import string jpayne@69: import ctypes jpayne@69: import struct jpayne@69: jpayne@69: cimport cython jpayne@69: from cpython cimport array as c_array jpayne@69: from cpython cimport PyBytes_FromStringAndSize jpayne@69: from libc.string cimport memset, strchr jpayne@69: from cpython cimport array as c_array jpayne@69: from libc.stdint cimport INT8_MIN, INT16_MIN, INT32_MIN, \ jpayne@69: INT8_MAX, INT16_MAX, INT32_MAX, \ jpayne@69: UINT8_MAX, UINT16_MAX, UINT32_MAX jpayne@69: jpayne@69: from pysam.libchtslib cimport HTS_IDX_NOCOOR jpayne@69: from pysam.libcutils cimport force_bytes, force_str, \ jpayne@69: charptr_to_str, charptr_to_bytes jpayne@69: from pysam.libcutils cimport qualities_to_qualitystring, qualitystring_to_array, \ jpayne@69: array_to_qualitystring jpayne@69: jpayne@69: # Constants for binary tag conversion jpayne@69: cdef char * htslib_types = 'cCsSiIf' jpayne@69: cdef char * parray_types = 'bBhHiIf' jpayne@69: jpayne@69: # translation tables jpayne@69: jpayne@69: # cigar code to character and vice versa jpayne@69: cdef char* CODE2CIGAR= "MIDNSHP=XB" jpayne@69: cdef int NCIGAR_CODES = 10 jpayne@69: jpayne@69: CIGAR2CODE = dict([y, x] for x, y in enumerate(CODE2CIGAR)) jpayne@69: CIGAR_REGEX = re.compile("(\d+)([MIDNSHP=XB])") jpayne@69: jpayne@69: # names for keys in dictionary representation of an AlignedSegment jpayne@69: KEY_NAMES = ["name", "flag", "ref_name", "ref_pos", "map_quality", "cigar", jpayne@69: "next_ref_name", "next_ref_pos", "length", "seq", "qual", "tags"] jpayne@69: jpayne@69: ##################################################################### jpayne@69: # C multiplication with wrapping around jpayne@69: cdef inline uint32_t c_mul(uint32_t a, uint32_t b): jpayne@69: return (a * b) & 0xffffffff jpayne@69: jpayne@69: jpayne@69: cdef inline uint8_t tolower(uint8_t ch): jpayne@69: if ch >= 65 and ch <= 90: jpayne@69: return ch + 32 jpayne@69: else: jpayne@69: return ch jpayne@69: jpayne@69: jpayne@69: cdef inline uint8_t toupper(uint8_t ch): jpayne@69: if ch >= 97 and ch <= 122: jpayne@69: return ch - 32 jpayne@69: else: jpayne@69: return ch jpayne@69: jpayne@69: jpayne@69: cdef inline uint8_t strand_mark_char(uint8_t ch, bam1_t *b): jpayne@69: if ch == b'=': jpayne@69: if bam_is_rev(b): jpayne@69: return b',' jpayne@69: else: jpayne@69: return b'.' jpayne@69: else: jpayne@69: if bam_is_rev(b): jpayne@69: return tolower(ch) jpayne@69: else: jpayne@69: return toupper(ch) jpayne@69: jpayne@69: jpayne@69: cdef inline bint pileup_base_qual_skip(const bam_pileup1_t * p, uint32_t threshold): jpayne@69: cdef uint32_t c jpayne@69: if p.qpos < p.b.core.l_qseq: jpayne@69: c = bam_get_qual(p.b)[p.qpos] jpayne@69: else: jpayne@69: c = 0 jpayne@69: if c < threshold: jpayne@69: return True jpayne@69: return False jpayne@69: jpayne@69: jpayne@69: cdef inline char map_typecode_htslib_to_python(uint8_t s): jpayne@69: """map an htslib typecode to the corresponding python typecode jpayne@69: to be used in the struct or array modules.""" jpayne@69: jpayne@69: # map type from htslib to python array jpayne@69: cdef char * f = strchr(htslib_types, s) jpayne@69: jpayne@69: if f == NULL: jpayne@69: return 0 jpayne@69: return parray_types[f - htslib_types] jpayne@69: jpayne@69: jpayne@69: cdef inline uint8_t map_typecode_python_to_htslib(char s): jpayne@69: """determine value type from type code of array""" jpayne@69: cdef char * f = strchr(parray_types, s) jpayne@69: if f == NULL: jpayne@69: return 0 jpayne@69: return htslib_types[f - parray_types] jpayne@69: jpayne@69: jpayne@69: cdef inline void update_bin(bam1_t * src): jpayne@69: if src.core.flag & BAM_FUNMAP: jpayne@69: # treat alignment as length of 1 for unmapped reads jpayne@69: src.core.bin = hts_reg2bin( jpayne@69: src.core.pos, jpayne@69: src.core.pos + 1, jpayne@69: 14, jpayne@69: 5) jpayne@69: elif pysam_get_n_cigar(src): jpayne@69: src.core.bin = hts_reg2bin( jpayne@69: src.core.pos, jpayne@69: bam_endpos(src), jpayne@69: 14, jpayne@69: 5) jpayne@69: else: jpayne@69: src.core.bin = hts_reg2bin( jpayne@69: src.core.pos, jpayne@69: src.core.pos + 1, jpayne@69: 14, jpayne@69: 5) jpayne@69: jpayne@69: jpayne@69: # optional tag data manipulation jpayne@69: cdef convert_binary_tag(uint8_t * tag): jpayne@69: """return bytesize, number of values and array of values jpayne@69: in aux_data memory location pointed to by tag.""" jpayne@69: cdef uint8_t auxtype jpayne@69: cdef uint8_t byte_size jpayne@69: cdef int32_t nvalues jpayne@69: # get byte size jpayne@69: auxtype = tag[0] jpayne@69: byte_size = aux_type2size(auxtype) jpayne@69: tag += 1 jpayne@69: # get number of values in array jpayne@69: nvalues = (tag)[0] jpayne@69: tag += 4 jpayne@69: jpayne@69: # define python array jpayne@69: cdef c_array.array c_values = array.array( jpayne@69: chr(map_typecode_htslib_to_python(auxtype))) jpayne@69: c_array.resize(c_values, nvalues) jpayne@69: jpayne@69: # copy data jpayne@69: memcpy(c_values.data.as_voidptr, tag, nvalues * byte_size) jpayne@69: jpayne@69: # no need to check for endian-ness as bam1_core_t fields jpayne@69: # and aux_data are in host endian-ness. See sam.c and calls jpayne@69: # to swap_data jpayne@69: return byte_size, nvalues, c_values jpayne@69: jpayne@69: jpayne@69: cdef inline uint8_t get_tag_typecode(value, value_type=None): jpayne@69: """guess type code for a *value*. If *value_type* is None, the type jpayne@69: code will be inferred based on the Python type of *value* jpayne@69: jpayne@69: """ jpayne@69: # 0 is unknown typecode jpayne@69: cdef char typecode = 0 jpayne@69: jpayne@69: if value_type is None: jpayne@69: if isinstance(value, int): jpayne@69: if value < 0: jpayne@69: if value >= INT8_MIN: jpayne@69: typecode = b'c' jpayne@69: elif value >= INT16_MIN: jpayne@69: typecode = b's' jpayne@69: elif value >= INT32_MIN: jpayne@69: typecode = b'i' jpayne@69: # unsigned ints jpayne@69: else: jpayne@69: if value <= UINT8_MAX: jpayne@69: typecode = b'C' jpayne@69: elif value <= UINT16_MAX: jpayne@69: typecode = b'S' jpayne@69: elif value <= UINT32_MAX: jpayne@69: typecode = b'I' jpayne@69: elif isinstance(value, float): jpayne@69: typecode = b'f' jpayne@69: elif isinstance(value, str): jpayne@69: typecode = b'Z' jpayne@69: elif isinstance(value, bytes): jpayne@69: typecode = b'Z' jpayne@69: elif isinstance(value, array.array) or \ jpayne@69: isinstance(value, list) or \ jpayne@69: isinstance(value, tuple): jpayne@69: typecode = b'B' jpayne@69: else: jpayne@69: if value_type in 'aAsSIcCZidfH': jpayne@69: typecode = force_bytes(value_type)[0] jpayne@69: jpayne@69: return typecode jpayne@69: jpayne@69: jpayne@69: cdef inline uint8_t get_btag_typecode(value, min_value=None, max_value=None): jpayne@69: '''returns the value typecode of a value. jpayne@69: jpayne@69: If max is specified, the appropriate type is returned for a range jpayne@69: where value is the minimum. jpayne@69: jpayne@69: Note that this method returns types from the extended BAM alphabet jpayne@69: of types that includes tags that are not part of the SAM jpayne@69: specification. jpayne@69: ''' jpayne@69: jpayne@69: jpayne@69: cdef uint8_t typecode jpayne@69: jpayne@69: t = type(value) jpayne@69: jpayne@69: if t is float: jpayne@69: typecode = b'f' jpayne@69: elif t is int: jpayne@69: if max_value is None: jpayne@69: max_value = value jpayne@69: if min_value is None: jpayne@69: min_value = value jpayne@69: # signed ints jpayne@69: if min_value < 0: jpayne@69: if min_value >= INT8_MIN and max_value <= INT8_MAX: jpayne@69: typecode = b'c' jpayne@69: elif min_value >= INT16_MIN and max_value <= INT16_MAX: jpayne@69: typecode = b's' jpayne@69: elif min_value >= INT32_MIN or max_value <= INT32_MAX: jpayne@69: typecode = b'i' jpayne@69: else: jpayne@69: raise ValueError( jpayne@69: "at least one signed integer out of range of " jpayne@69: "BAM/SAM specification") jpayne@69: # unsigned ints jpayne@69: else: jpayne@69: if max_value <= UINT8_MAX: jpayne@69: typecode = b'C' jpayne@69: elif max_value <= UINT16_MAX: jpayne@69: typecode = b'S' jpayne@69: elif max_value <= UINT32_MAX: jpayne@69: typecode = b'I' jpayne@69: else: jpayne@69: raise ValueError( jpayne@69: "at least one integer out of range of BAM/SAM specification") jpayne@69: else: jpayne@69: # Note: hex strings (H) are not supported yet jpayne@69: if t is not bytes: jpayne@69: value = value.encode('ascii') jpayne@69: if len(value) == 1: jpayne@69: typecode = b'A' jpayne@69: else: jpayne@69: typecode = b'Z' jpayne@69: jpayne@69: return typecode jpayne@69: jpayne@69: jpayne@69: # mapping python array.array and htslib typecodes to struct typecodes jpayne@69: DATATYPE2FORMAT = { jpayne@69: ord('c'): ('b', 1), jpayne@69: ord('C'): ('B', 1), jpayne@69: ord('s'): ('h', 2), jpayne@69: ord('S'): ('H', 2), jpayne@69: ord('i'): ('i', 4), jpayne@69: ord('I'): ('I', 4), jpayne@69: ord('f'): ('f', 4), jpayne@69: ord('d'): ('d', 8), jpayne@69: ord('A'): ('c', 1), jpayne@69: ord('a'): ('c', 1)} jpayne@69: jpayne@69: jpayne@69: cdef inline pack_tags(tags): jpayne@69: """pack a list of tags. Each tag is a tuple of (tag, tuple). jpayne@69: jpayne@69: Values are packed into the most space efficient data structure jpayne@69: possible unless the tag contains a third field with the typecode. jpayne@69: jpayne@69: Returns a format string and the associated list of arguments to be jpayne@69: used in a call to struct.pack_into. jpayne@69: """ jpayne@69: fmts, args = ["<"], [] jpayne@69: jpayne@69: # htslib typecode jpayne@69: cdef uint8_t typecode jpayne@69: for tag in tags: jpayne@69: jpayne@69: if len(tag) == 2: jpayne@69: pytag, value = tag jpayne@69: valuetype = None jpayne@69: elif len(tag) == 3: jpayne@69: pytag, value, valuetype = tag jpayne@69: else: jpayne@69: raise ValueError("malformatted tag: %s" % str(tag)) jpayne@69: jpayne@69: if valuetype is None: jpayne@69: typecode = 0 jpayne@69: else: jpayne@69: # only first character in valuecode matters jpayne@69: typecode = force_bytes(valuetype)[0] jpayne@69: jpayne@69: pytag = force_bytes(pytag) jpayne@69: pytype = type(value) jpayne@69: jpayne@69: if pytype is tuple or pytype is list: jpayne@69: # binary tags from tuples or lists jpayne@69: if not typecode: jpayne@69: # automatically determine value type - first value jpayne@69: # determines type. If there is a mix of types, the jpayne@69: # result is undefined. jpayne@69: typecode = get_btag_typecode(min(value), jpayne@69: min_value=min(value), jpayne@69: max_value=max(value)) jpayne@69: jpayne@69: if typecode not in DATATYPE2FORMAT: jpayne@69: raise ValueError("invalid value type '{}'".format(chr(typecode))) jpayne@69: jpayne@69: datafmt = "2sBBI%i%s" % (len(value), DATATYPE2FORMAT[typecode][0]) jpayne@69: args.extend([pytag[:2], jpayne@69: ord("B"), jpayne@69: typecode, jpayne@69: len(value)] + list(value)) jpayne@69: jpayne@69: elif isinstance(value, array.array): jpayne@69: # binary tags from arrays jpayne@69: if typecode == 0: jpayne@69: typecode = map_typecode_python_to_htslib(ord(value.typecode)) jpayne@69: jpayne@69: if typecode == 0: jpayne@69: raise ValueError("unsupported type code '{}'".format(value.typecode)) jpayne@69: jpayne@69: if typecode not in DATATYPE2FORMAT: jpayne@69: raise ValueError("invalid value type '{}'".format(chr(typecode))) jpayne@69: jpayne@69: # use array.tostring() to retrieve byte representation and jpayne@69: # save as bytes jpayne@69: datafmt = "2sBBI%is" % (len(value) * DATATYPE2FORMAT[typecode][1]) jpayne@69: args.extend([pytag[:2], jpayne@69: ord("B"), jpayne@69: typecode, jpayne@69: len(value), jpayne@69: value.tobytes()]) jpayne@69: jpayne@69: else: jpayne@69: if typecode == 0: jpayne@69: typecode = get_tag_typecode(value) jpayne@69: if typecode == 0: jpayne@69: raise ValueError("could not deduce typecode for value {}".format(value)) jpayne@69: jpayne@69: if typecode == b'a' or typecode == b'A' or typecode == b'Z' or typecode == b'H': jpayne@69: value = force_bytes(value) jpayne@69: jpayne@69: if typecode == b"a": jpayne@69: typecode = b'A' jpayne@69: jpayne@69: if typecode == b'Z' or typecode == b'H': jpayne@69: datafmt = "2sB%is" % (len(value)+1) jpayne@69: elif typecode in DATATYPE2FORMAT: jpayne@69: datafmt = "2sB%s" % DATATYPE2FORMAT[typecode][0] jpayne@69: else: jpayne@69: raise ValueError("invalid value type '{}'".format(chr(typecode))) jpayne@69: jpayne@69: args.extend([pytag[:2], jpayne@69: typecode, jpayne@69: value]) jpayne@69: jpayne@69: fmts.append(datafmt) jpayne@69: jpayne@69: return "".join(fmts), args jpayne@69: jpayne@69: jpayne@69: cdef inline int32_t calculateQueryLengthWithoutHardClipping(bam1_t * src): jpayne@69: """return query length computed from CIGAR alignment. jpayne@69: jpayne@69: Length ignores hard-clipped bases. jpayne@69: jpayne@69: Return 0 if there is no CIGAR alignment. jpayne@69: """ jpayne@69: jpayne@69: cdef uint32_t * cigar_p = pysam_bam_get_cigar(src) jpayne@69: jpayne@69: if cigar_p == NULL: jpayne@69: return 0 jpayne@69: jpayne@69: cdef uint32_t k, qpos jpayne@69: cdef int op jpayne@69: qpos = 0 jpayne@69: jpayne@69: for k from 0 <= k < pysam_get_n_cigar(src): jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: jpayne@69: if op == BAM_CMATCH or \ jpayne@69: op == BAM_CINS or \ jpayne@69: op == BAM_CSOFT_CLIP or \ jpayne@69: op == BAM_CEQUAL or \ jpayne@69: op == BAM_CDIFF: jpayne@69: qpos += cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: jpayne@69: return qpos jpayne@69: jpayne@69: jpayne@69: cdef inline int32_t calculateQueryLengthWithHardClipping(bam1_t * src): jpayne@69: """return query length computed from CIGAR alignment. jpayne@69: jpayne@69: Length includes hard-clipped bases. jpayne@69: jpayne@69: Return 0 if there is no CIGAR alignment. jpayne@69: """ jpayne@69: jpayne@69: cdef uint32_t * cigar_p = pysam_bam_get_cigar(src) jpayne@69: jpayne@69: if cigar_p == NULL: jpayne@69: return 0 jpayne@69: jpayne@69: cdef uint32_t k, qpos jpayne@69: cdef int op jpayne@69: qpos = 0 jpayne@69: jpayne@69: for k from 0 <= k < pysam_get_n_cigar(src): jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: jpayne@69: if op == BAM_CMATCH or \ jpayne@69: op == BAM_CINS or \ jpayne@69: op == BAM_CSOFT_CLIP or \ jpayne@69: op == BAM_CHARD_CLIP or \ jpayne@69: op == BAM_CEQUAL or \ jpayne@69: op == BAM_CDIFF: jpayne@69: qpos += cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: jpayne@69: return qpos jpayne@69: jpayne@69: jpayne@69: cdef inline int32_t getQueryStart(bam1_t *src) except -1: jpayne@69: cdef uint32_t * cigar_p jpayne@69: cdef uint32_t start_offset = 0 jpayne@69: cdef uint32_t k, op jpayne@69: jpayne@69: cigar_p = pysam_bam_get_cigar(src); jpayne@69: for k from 0 <= k < pysam_get_n_cigar(src): jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: if op == BAM_CHARD_CLIP: jpayne@69: if start_offset != 0 and start_offset != src.core.l_qseq: jpayne@69: raise ValueError('Invalid clipping in CIGAR string') jpayne@69: elif op == BAM_CSOFT_CLIP: jpayne@69: start_offset += cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: else: jpayne@69: break jpayne@69: jpayne@69: return start_offset jpayne@69: jpayne@69: jpayne@69: cdef inline int32_t getQueryEnd(bam1_t *src) except -1: jpayne@69: cdef uint32_t * cigar_p = pysam_bam_get_cigar(src) jpayne@69: cdef uint32_t end_offset = src.core.l_qseq jpayne@69: cdef uint32_t k, op jpayne@69: jpayne@69: # if there is no sequence, compute length from cigar string jpayne@69: if end_offset == 0: jpayne@69: for k from 0 <= k < pysam_get_n_cigar(src): jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: if op == BAM_CMATCH or \ jpayne@69: op == BAM_CINS or \ jpayne@69: op == BAM_CEQUAL or \ jpayne@69: op == BAM_CDIFF or \ jpayne@69: (op == BAM_CSOFT_CLIP and end_offset == 0): jpayne@69: end_offset += cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: else: jpayne@69: # walk backwards in cigar string jpayne@69: for k from pysam_get_n_cigar(src) > k >= 1: jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: if op == BAM_CHARD_CLIP: jpayne@69: if end_offset != src.core.l_qseq: jpayne@69: raise ValueError('Invalid clipping in CIGAR string') jpayne@69: elif op == BAM_CSOFT_CLIP: jpayne@69: end_offset -= cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: else: jpayne@69: break jpayne@69: jpayne@69: return end_offset jpayne@69: jpayne@69: jpayne@69: cdef inline bytes getSequenceInRange(bam1_t *src, jpayne@69: uint32_t start, jpayne@69: uint32_t end): jpayne@69: """return python string of the sequence in a bam1_t object. jpayne@69: """ jpayne@69: jpayne@69: cdef uint8_t * p jpayne@69: cdef uint32_t k jpayne@69: cdef char * s jpayne@69: jpayne@69: if not src.core.l_qseq: jpayne@69: return None jpayne@69: jpayne@69: seq = PyBytes_FromStringAndSize(NULL, end - start) jpayne@69: s = seq jpayne@69: p = pysam_bam_get_seq(src) jpayne@69: jpayne@69: for k from start <= k < end: jpayne@69: # equivalent to seq_nt16_str[bam1_seqi(s, i)] (see bam.c) jpayne@69: # note: do not use string literal as it will be a python string jpayne@69: s[k-start] = seq_nt16_str[p[k//2] >> 4 * (1 - k%2) & 0xf] jpayne@69: jpayne@69: return charptr_to_bytes(seq) jpayne@69: jpayne@69: jpayne@69: cdef inline object getQualitiesInRange(bam1_t *src, jpayne@69: uint32_t start, jpayne@69: uint32_t end): jpayne@69: """return python array of quality values from a bam1_t object""" jpayne@69: jpayne@69: cdef uint8_t * p jpayne@69: cdef uint32_t k jpayne@69: jpayne@69: p = pysam_bam_get_qual(src) jpayne@69: if p[0] == 0xff: jpayne@69: return None jpayne@69: jpayne@69: # 'B': unsigned char jpayne@69: cdef c_array.array result = array.array('B', [0]) jpayne@69: c_array.resize(result, end - start) jpayne@69: jpayne@69: # copy data jpayne@69: memcpy(result.data.as_voidptr, &p[start], end - start) jpayne@69: jpayne@69: return result jpayne@69: jpayne@69: jpayne@69: ##################################################################### jpayne@69: ## factory methods for instantiating extension classes jpayne@69: cdef class AlignedSegment jpayne@69: cdef AlignedSegment makeAlignedSegment(bam1_t *src, jpayne@69: AlignmentHeader header): jpayne@69: '''return an AlignedSegment object constructed from `src`''' jpayne@69: # note that the following does not call __init__ jpayne@69: cdef AlignedSegment dest = AlignedSegment.__new__(AlignedSegment) jpayne@69: dest._delegate = bam_dup1(src) jpayne@69: dest.header = header jpayne@69: return dest jpayne@69: jpayne@69: jpayne@69: cdef class PileupColumn jpayne@69: cdef PileupColumn makePileupColumn(const bam_pileup1_t ** plp, jpayne@69: int tid, jpayne@69: int pos, jpayne@69: int n_pu, jpayne@69: uint32_t min_base_quality, jpayne@69: char * reference_sequence, jpayne@69: AlignmentHeader header): jpayne@69: '''return a PileupColumn object constructed from pileup in `plp` and jpayne@69: setting additional attributes. jpayne@69: jpayne@69: ''' jpayne@69: # note that the following does not call __init__ jpayne@69: cdef PileupColumn dest = PileupColumn.__new__(PileupColumn) jpayne@69: dest.header = header jpayne@69: dest.plp = plp jpayne@69: dest.tid = tid jpayne@69: dest.pos = pos jpayne@69: dest.n_pu = n_pu jpayne@69: dest.min_base_quality = min_base_quality jpayne@69: dest.reference_sequence = reference_sequence jpayne@69: dest.buf.l = dest.buf.m = 0 jpayne@69: dest.buf.s = NULL jpayne@69: jpayne@69: return dest jpayne@69: jpayne@69: jpayne@69: cdef class PileupRead jpayne@69: cdef PileupRead makePileupRead(const bam_pileup1_t *src, jpayne@69: AlignmentHeader header): jpayne@69: '''return a PileupRead object construted from a bam_pileup1_t * object.''' jpayne@69: # note that the following does not call __init__ jpayne@69: cdef PileupRead dest = PileupRead.__new__(PileupRead) jpayne@69: dest._alignment = makeAlignedSegment(src.b, header) jpayne@69: dest._qpos = src.qpos jpayne@69: dest._indel = src.indel jpayne@69: dest._level = src.level jpayne@69: dest._is_del = src.is_del jpayne@69: dest._is_head = src.is_head jpayne@69: dest._is_tail = src.is_tail jpayne@69: dest._is_refskip = src.is_refskip jpayne@69: return dest jpayne@69: jpayne@69: jpayne@69: cdef inline uint32_t get_alignment_length(bam1_t *src): jpayne@69: cdef uint32_t k = 0 jpayne@69: cdef uint32_t l = 0 jpayne@69: if src == NULL: jpayne@69: return 0 jpayne@69: cdef uint32_t * cigar_p = bam_get_cigar(src) jpayne@69: if cigar_p == NULL: jpayne@69: return 0 jpayne@69: cdef int op jpayne@69: cdef uint32_t n = pysam_get_n_cigar(src) jpayne@69: for k from 0 <= k < n: jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: if op == BAM_CSOFT_CLIP or op == BAM_CHARD_CLIP: jpayne@69: continue jpayne@69: l += cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: return l jpayne@69: jpayne@69: jpayne@69: cdef inline uint32_t get_md_reference_length(char * md_tag): jpayne@69: cdef int l = 0 jpayne@69: cdef int md_idx = 0 jpayne@69: cdef int nmatches = 0 jpayne@69: jpayne@69: while md_tag[md_idx] != 0: jpayne@69: if md_tag[md_idx] >= 48 and md_tag[md_idx] <= 57: jpayne@69: nmatches *= 10 jpayne@69: nmatches += md_tag[md_idx] - 48 jpayne@69: md_idx += 1 jpayne@69: continue jpayne@69: else: jpayne@69: l += nmatches jpayne@69: nmatches = 0 jpayne@69: if md_tag[md_idx] == b'^': jpayne@69: md_idx += 1 jpayne@69: while md_tag[md_idx] >= 65 and md_tag[md_idx] <= 90: jpayne@69: md_idx += 1 jpayne@69: l += 1 jpayne@69: else: jpayne@69: md_idx += 1 jpayne@69: l += 1 jpayne@69: jpayne@69: l += nmatches jpayne@69: return l jpayne@69: jpayne@69: # TODO: avoid string copying for getSequenceInRange, reconstituneSequenceFromMD, ... jpayne@69: cdef inline bytes build_alignment_sequence(bam1_t * src): jpayne@69: """return expanded sequence from MD tag. jpayne@69: jpayne@69: The sequence includes substitutions and both insertions in the jpayne@69: reference as well as deletions to the reference sequence. Combine jpayne@69: with the cigar string to reconstitute the query or the reference jpayne@69: sequence. jpayne@69: jpayne@69: Positions corresponding to `N` (skipped region from the reference) jpayne@69: or `P` (padding (silent deletion from padded reference)) in the CIGAR jpayne@69: string will not appear in the returned sequence. The MD should jpayne@69: correspondingly not contain these. Thus proper tags are:: jpayne@69: jpayne@69: Deletion from the reference: cigar=5M1D5M MD=5^C5 jpayne@69: Skipped region from reference: cigar=5M1N5M MD=10 jpayne@69: Padded region in the reference: cigar=5M1P5M MD=10 jpayne@69: jpayne@69: Returns jpayne@69: ------- jpayne@69: jpayne@69: None, if no MD tag is present. jpayne@69: jpayne@69: """ jpayne@69: if src == NULL: jpayne@69: return None jpayne@69: jpayne@69: cdef uint8_t * md_tag_ptr = bam_aux_get(src, "MD") jpayne@69: if md_tag_ptr == NULL: jpayne@69: return None jpayne@69: jpayne@69: cdef uint32_t start = getQueryStart(src) jpayne@69: cdef uint32_t end = getQueryEnd(src) jpayne@69: # get read sequence, taking into account soft-clipping jpayne@69: r = getSequenceInRange(src, start, end) jpayne@69: cdef char * read_sequence = r jpayne@69: cdef uint32_t * cigar_p = pysam_bam_get_cigar(src) jpayne@69: if cigar_p == NULL: jpayne@69: return None jpayne@69: jpayne@69: cdef uint32_t r_idx = 0 jpayne@69: cdef int op jpayne@69: cdef uint32_t k, i, l, x jpayne@69: cdef int nmatches = 0 jpayne@69: cdef int s_idx = 0 jpayne@69: jpayne@69: cdef uint32_t max_len = get_alignment_length(src) jpayne@69: if max_len == 0: jpayne@69: raise ValueError("could not determine alignment length") jpayne@69: jpayne@69: cdef char * s = calloc(max_len + 1, sizeof(char)) jpayne@69: if s == NULL: jpayne@69: raise ValueError( jpayne@69: "could not allocate sequence of length %i" % max_len) jpayne@69: jpayne@69: for k from 0 <= k < pysam_get_n_cigar(src): jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: l = cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: if op == BAM_CMATCH or op == BAM_CEQUAL or op == BAM_CDIFF: jpayne@69: for i from 0 <= i < l: jpayne@69: s[s_idx] = read_sequence[r_idx] jpayne@69: r_idx += 1 jpayne@69: s_idx += 1 jpayne@69: elif op == BAM_CDEL: jpayne@69: for i from 0 <= i < l: jpayne@69: s[s_idx] = b'-' jpayne@69: s_idx += 1 jpayne@69: elif op == BAM_CREF_SKIP: jpayne@69: pass jpayne@69: elif op == BAM_CINS or op == BAM_CPAD: jpayne@69: for i from 0 <= i < l: jpayne@69: # encode insertions into reference as lowercase jpayne@69: s[s_idx] = read_sequence[r_idx] + 32 jpayne@69: r_idx += 1 jpayne@69: s_idx += 1 jpayne@69: elif op == BAM_CSOFT_CLIP: jpayne@69: pass jpayne@69: elif op == BAM_CHARD_CLIP: jpayne@69: pass # advances neither jpayne@69: jpayne@69: cdef char md_buffer[2] jpayne@69: cdef char *md_tag jpayne@69: cdef uint8_t md_typecode = md_tag_ptr[0] jpayne@69: if md_typecode == b'Z': jpayne@69: md_tag = bam_aux2Z(md_tag_ptr) jpayne@69: elif md_typecode == b'A': jpayne@69: # Work around HTSeq bug that writes 1-character strings as MD:A:v jpayne@69: md_buffer[0] = bam_aux2A(md_tag_ptr) jpayne@69: md_buffer[1] = b'\0' jpayne@69: md_tag = md_buffer jpayne@69: else: jpayne@69: raise TypeError('Tagged field MD:{}: does not have expected type MD:Z'.format(chr(md_typecode))) jpayne@69: jpayne@69: cdef int md_idx = 0 jpayne@69: cdef char c jpayne@69: s_idx = 0 jpayne@69: jpayne@69: # Check if MD tag is valid by matching CIGAR length to MD tag defined length jpayne@69: # Insertions would be in addition to what is described by MD, so we calculate jpayne@69: # the number of insertions separately. jpayne@69: cdef int insertions = 0 jpayne@69: jpayne@69: while s[s_idx] != 0: jpayne@69: if s[s_idx] >= b'a': jpayne@69: insertions += 1 jpayne@69: s_idx += 1 jpayne@69: s_idx = 0 jpayne@69: jpayne@69: cdef uint32_t md_len = get_md_reference_length(md_tag) jpayne@69: if md_len + insertions > max_len: jpayne@69: free(s) jpayne@69: raise AssertionError( jpayne@69: "Invalid MD tag: MD length {} mismatch with CIGAR length {} and {} insertions".format( jpayne@69: md_len, max_len, insertions)) jpayne@69: jpayne@69: while md_tag[md_idx] != 0: jpayne@69: # c is numerical jpayne@69: if md_tag[md_idx] >= 48 and md_tag[md_idx] <= 57: jpayne@69: nmatches *= 10 jpayne@69: nmatches += md_tag[md_idx] - 48 jpayne@69: md_idx += 1 jpayne@69: continue jpayne@69: else: jpayne@69: # save matches up to this point, skipping insertions jpayne@69: for x from 0 <= x < nmatches: jpayne@69: while s[s_idx] >= b'a': jpayne@69: s_idx += 1 jpayne@69: s_idx += 1 jpayne@69: while s[s_idx] >= b'a': jpayne@69: s_idx += 1 jpayne@69: jpayne@69: r_idx += nmatches jpayne@69: nmatches = 0 jpayne@69: if md_tag[md_idx] == b'^': jpayne@69: md_idx += 1 jpayne@69: while md_tag[md_idx] >= 65 and md_tag[md_idx] <= 90: jpayne@69: # assert s[s_idx] == '-' jpayne@69: s[s_idx] = md_tag[md_idx] jpayne@69: s_idx += 1 jpayne@69: md_idx += 1 jpayne@69: else: jpayne@69: # save mismatch jpayne@69: # enforce lower case jpayne@69: c = md_tag[md_idx] jpayne@69: if c <= 90: jpayne@69: c += 32 jpayne@69: s[s_idx] = c jpayne@69: s_idx += 1 jpayne@69: r_idx += 1 jpayne@69: md_idx += 1 jpayne@69: jpayne@69: # save matches up to this point, skipping insertions jpayne@69: for x from 0 <= x < nmatches: jpayne@69: while s[s_idx] >= b'a': jpayne@69: s_idx += 1 jpayne@69: s_idx += 1 jpayne@69: while s[s_idx] >= b'a': jpayne@69: s_idx += 1 jpayne@69: jpayne@69: seq = PyBytes_FromStringAndSize(s, s_idx) jpayne@69: free(s) jpayne@69: jpayne@69: return seq jpayne@69: jpayne@69: jpayne@69: cdef inline bytes build_reference_sequence(bam1_t * src): jpayne@69: """return the reference sequence in the region that is covered by the jpayne@69: alignment of the read to the reference. jpayne@69: jpayne@69: This method requires the MD tag to be set. jpayne@69: jpayne@69: """ jpayne@69: cdef uint32_t k, i, l jpayne@69: cdef int op jpayne@69: cdef int s_idx = 0 jpayne@69: ref_seq = build_alignment_sequence(src) jpayne@69: if ref_seq is None: jpayne@69: raise ValueError("MD tag not present") jpayne@69: jpayne@69: cdef char * s = calloc(len(ref_seq) + 1, sizeof(char)) jpayne@69: if s == NULL: jpayne@69: raise ValueError( jpayne@69: "could not allocate sequence of length %i" % len(ref_seq)) jpayne@69: jpayne@69: cdef char * cref_seq = ref_seq jpayne@69: cdef uint32_t * cigar_p = pysam_bam_get_cigar(src) jpayne@69: cdef uint32_t r_idx = 0 jpayne@69: for k from 0 <= k < pysam_get_n_cigar(src): jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: l = cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: if op == BAM_CMATCH or op == BAM_CEQUAL or op == BAM_CDIFF: jpayne@69: for i from 0 <= i < l: jpayne@69: s[s_idx] = cref_seq[r_idx] jpayne@69: r_idx += 1 jpayne@69: s_idx += 1 jpayne@69: elif op == BAM_CDEL: jpayne@69: for i from 0 <= i < l: jpayne@69: s[s_idx] = cref_seq[r_idx] jpayne@69: r_idx += 1 jpayne@69: s_idx += 1 jpayne@69: elif op == BAM_CREF_SKIP: jpayne@69: pass jpayne@69: elif op == BAM_CINS or op == BAM_CPAD: jpayne@69: r_idx += l jpayne@69: elif op == BAM_CSOFT_CLIP: jpayne@69: pass jpayne@69: elif op == BAM_CHARD_CLIP: jpayne@69: pass # advances neither jpayne@69: jpayne@69: seq = PyBytes_FromStringAndSize(s, s_idx) jpayne@69: free(s) jpayne@69: jpayne@69: return seq jpayne@69: jpayne@69: jpayne@69: cdef inline str safe_reference_name(AlignmentHeader header, int tid): jpayne@69: if tid == -1: return "*" jpayne@69: elif header is not None: return header.get_reference_name(tid) jpayne@69: else: return f"#{tid}" jpayne@69: jpayne@69: jpayne@69: # Tuple-building helper functions used by AlignedSegment.get_aligned_pairs() jpayne@69: jpayne@69: cdef _alignedpairs_positions(qpos, pos, ref_seq, uint32_t r_idx, int op): jpayne@69: return (qpos, pos) jpayne@69: jpayne@69: jpayne@69: cdef _alignedpairs_with_seq(qpos, pos, ref_seq, uint32_t r_idx, int op): jpayne@69: ref_base = ref_seq[r_idx] if ref_seq is not None else None jpayne@69: return (qpos, pos, ref_base) jpayne@69: jpayne@69: jpayne@69: cdef _alignedpairs_with_cigar(qpos, pos, ref_seq, uint32_t r_idx, int op): jpayne@69: return (qpos, pos, CIGAR_OPS(op)) jpayne@69: jpayne@69: jpayne@69: cdef _alignedpairs_with_seq_cigar(qpos, pos, ref_seq, uint32_t r_idx, int op): jpayne@69: ref_base = ref_seq[r_idx] if ref_seq is not None else None jpayne@69: return (qpos, pos, ref_base, CIGAR_OPS(op)) jpayne@69: jpayne@69: jpayne@69: cdef class AlignedSegment: jpayne@69: '''Class representing an aligned segment. jpayne@69: jpayne@69: This class stores a handle to the samtools C-structure representing jpayne@69: an aligned read. Member read access is forwarded to the C-structure jpayne@69: and converted into python objects. This implementation should be fast, jpayne@69: as only the data needed is converted. jpayne@69: jpayne@69: For write access, the C-structure is updated in-place. This is jpayne@69: not the most efficient way to build BAM entries, as the variable jpayne@69: length data is concatenated and thus needs to be resized if jpayne@69: a field is updated. Furthermore, the BAM entry might be jpayne@69: in an inconsistent state. jpayne@69: jpayne@69: One issue to look out for is that the sequence should always jpayne@69: be set *before* the quality scores. Setting the sequence will jpayne@69: also erase any quality scores that were set previously. jpayne@69: jpayne@69: Parameters jpayne@69: ---------- jpayne@69: jpayne@69: header: jpayne@69: :class:`~pysam.AlignmentHeader` object to map numerical jpayne@69: identifiers to chromosome names. If not given, an empty jpayne@69: header is created. jpayne@69: ''' jpayne@69: jpayne@69: # Now only called when instances are created from Python jpayne@69: def __init__(self, AlignmentHeader header=None): jpayne@69: # see bam_init1 jpayne@69: self._delegate = calloc(1, sizeof(bam1_t)) jpayne@69: if self._delegate == NULL: jpayne@69: raise MemoryError("could not allocated memory of {} bytes".format(sizeof(bam1_t))) jpayne@69: # allocate some memory. If size is 0, calloc does not return a jpayne@69: # pointer that can be passed to free() so allocate 40 bytes jpayne@69: # for a new read jpayne@69: self._delegate.m_data = 40 jpayne@69: self._delegate.data = calloc( jpayne@69: self._delegate.m_data, 1) jpayne@69: if self._delegate.data == NULL: jpayne@69: raise MemoryError("could not allocate memory of {} bytes".format(self._delegate.m_data)) jpayne@69: self._delegate.l_data = 0 jpayne@69: # set some data to make read approximately legit. jpayne@69: # Note, SAM writing fails with q_name of length 0 jpayne@69: self._delegate.core.l_qname = 0 jpayne@69: self._delegate.core.tid = -1 jpayne@69: self._delegate.core.pos = -1 jpayne@69: self._delegate.core.mtid = -1 jpayne@69: self._delegate.core.mpos = -1 jpayne@69: jpayne@69: # caching for selected fields jpayne@69: self.cache_query_qualities = None jpayne@69: self.cache_query_alignment_qualities = None jpayne@69: self.cache_query_sequence = None jpayne@69: self.cache_query_alignment_sequence = None jpayne@69: jpayne@69: self.header = header jpayne@69: jpayne@69: def __dealloc__(self): jpayne@69: bam_destroy1(self._delegate) jpayne@69: jpayne@69: def __str__(self): jpayne@69: """return string representation of alignment. jpayne@69: jpayne@69: The representation is an approximate :term:`SAM` format, because jpayne@69: an aligned read might not be associated with a :term:`AlignmentFile`. jpayne@69: Hence when the read does not have an associated :class:`AlignedHeader`, jpayne@69: :term:`tid` is shown instead of the reference name. jpayne@69: Similarly, the tags field is returned in its parsed state. jpayne@69: jpayne@69: To get a valid SAM record, use :meth:`to_string`. jpayne@69: """ jpayne@69: # sam-parsing is done in sam.c/bam_format1_core which jpayne@69: # requires a valid header. jpayne@69: return "\t".join(map(str, (self.query_name, jpayne@69: self.flag, jpayne@69: safe_reference_name(self.header, self.reference_id), jpayne@69: self.reference_start + 1, jpayne@69: self.mapping_quality, jpayne@69: self.cigarstring, jpayne@69: safe_reference_name(self.header, self.next_reference_id), jpayne@69: self.next_reference_start + 1, jpayne@69: self.template_length, jpayne@69: self.query_sequence, jpayne@69: self.query_qualities, jpayne@69: self.tags))) jpayne@69: jpayne@69: def __repr__(self): jpayne@69: ref = self.reference_name if self.header is not None else self.reference_id jpayne@69: return f'<{type(self).__name__}({self.query_name!r}, flags={self.flag}={self.flag:#x}, ref={ref!r}, zpos={self.reference_start}, mapq={self.mapping_quality}, cigar={self.cigarstring!r}, ...)>' jpayne@69: jpayne@69: def __copy__(self): jpayne@69: return makeAlignedSegment(self._delegate, self.header) jpayne@69: jpayne@69: def __deepcopy__(self, memo): jpayne@69: return makeAlignedSegment(self._delegate, self.header) jpayne@69: jpayne@69: def compare(self, AlignedSegment other): jpayne@69: '''return -1,0,1, if contents in this are binary jpayne@69: <,=,> to *other* jpayne@69: ''' jpayne@69: jpayne@69: # avoid segfault when other equals None jpayne@69: if other is None: jpayne@69: return -1 jpayne@69: jpayne@69: cdef int retval, x jpayne@69: cdef bam1_t *t jpayne@69: cdef bam1_t *o jpayne@69: jpayne@69: t = self._delegate jpayne@69: o = other._delegate jpayne@69: jpayne@69: # uncomment for debugging purposes jpayne@69: # cdef unsigned char * oo, * tt jpayne@69: # tt = (&t.core) jpayne@69: # oo = (&o.core) jpayne@69: # for x from 0 <= x < sizeof( bam1_core_t): print x, tt[x], oo[x] jpayne@69: # tt = (t.data) jpayne@69: # oo = (o.data) jpayne@69: # for x from 0 <= x < max(t.l_data, o.l_data): print x, tt[x], oo[x], chr(tt[x]), chr(oo[x]) jpayne@69: jpayne@69: # Fast-path test for object identity jpayne@69: if t == o: jpayne@69: return 0 jpayne@69: jpayne@69: cdef uint8_t *a = &t.core jpayne@69: cdef uint8_t *b = &o.core jpayne@69: jpayne@69: retval = memcmp(&t.core, &o.core, sizeof(bam1_core_t)) jpayne@69: if retval: jpayne@69: return retval jpayne@69: jpayne@69: # cmp(t.l_data, o.l_data) jpayne@69: retval = (t.l_data > o.l_data) - (t.l_data < o.l_data) jpayne@69: if retval: jpayne@69: return retval jpayne@69: return memcmp(t.data, o.data, t.l_data) jpayne@69: jpayne@69: def __richcmp__(self, AlignedSegment other, int op): jpayne@69: if op == 2: # == operator jpayne@69: return self.compare(other) == 0 jpayne@69: elif op == 3: # != operator jpayne@69: return self.compare(other) != 0 jpayne@69: else: jpayne@69: return NotImplemented jpayne@69: jpayne@69: def __hash__(self): jpayne@69: cdef bam1_t * src = self._delegate jpayne@69: cdef int x jpayne@69: jpayne@69: # see http://effbot.org/zone/python-hash.htm jpayne@69: cdef uint8_t * c = &src.core jpayne@69: cdef uint32_t hash_value = c[0] jpayne@69: for x from 1 <= x < sizeof(bam1_core_t): jpayne@69: hash_value = c_mul(hash_value, 1000003) ^ c[x] jpayne@69: c = src.data jpayne@69: for x from 0 <= x < src.l_data: jpayne@69: hash_value = c_mul(hash_value, 1000003) ^ c[x] jpayne@69: jpayne@69: return hash_value jpayne@69: jpayne@69: cpdef to_string(self): jpayne@69: """returns a string representation of the aligned segment. jpayne@69: jpayne@69: The output format is valid SAM format if a header is associated jpayne@69: with the AlignedSegment. jpayne@69: """ jpayne@69: cdef kstring_t line jpayne@69: line.l = line.m = 0 jpayne@69: line.s = NULL jpayne@69: jpayne@69: if self.header: jpayne@69: if sam_format1(self.header.ptr, self._delegate, &line) < 0: jpayne@69: if line.m: jpayne@69: free(line.s) jpayne@69: raise ValueError('sam_format failed') jpayne@69: else: jpayne@69: raise NotImplementedError("todo") jpayne@69: jpayne@69: ret = force_str(line.s[:line.l]) jpayne@69: jpayne@69: if line.m: jpayne@69: free(line.s) jpayne@69: jpayne@69: return ret jpayne@69: jpayne@69: @classmethod jpayne@69: def fromstring(cls, sam, AlignmentHeader header): jpayne@69: """parses a string representation of the aligned segment. jpayne@69: jpayne@69: The input format should be valid SAM format. jpayne@69: jpayne@69: Parameters jpayne@69: ---------- jpayne@69: sam: jpayne@69: :term:`SAM` formatted string jpayne@69: jpayne@69: """ jpayne@69: cdef AlignedSegment dest = cls.__new__(cls) jpayne@69: dest._delegate = calloc(1, sizeof(bam1_t)) jpayne@69: dest.header = header jpayne@69: jpayne@69: cdef kstring_t line jpayne@69: line.l = line.m = len(sam) jpayne@69: _sam = force_bytes(sam) jpayne@69: line.s = _sam jpayne@69: jpayne@69: cdef int ret jpayne@69: ret = sam_parse1(&line, dest.header.ptr, dest._delegate) jpayne@69: if ret < 0: jpayne@69: raise ValueError("parsing SAM record string failed (error code {})".format(ret)) jpayne@69: jpayne@69: return dest jpayne@69: jpayne@69: cpdef tostring(self, htsfile=None): jpayne@69: """deprecated, use :meth:`to_string()` instead. jpayne@69: jpayne@69: Parameters jpayne@69: ---------- jpayne@69: jpayne@69: htsfile: jpayne@69: (deprecated) AlignmentFile object to map numerical jpayne@69: identifiers to chromosome names. This parameter is present jpayne@69: for backwards compatibility and ignored. jpayne@69: """ jpayne@69: jpayne@69: return self.to_string() jpayne@69: jpayne@69: def to_dict(self): jpayne@69: """returns a json representation of the aligned segment. jpayne@69: jpayne@69: Field names are abbreviated versions of the class attributes. jpayne@69: """ jpayne@69: # let htslib do the string conversions, but treat optional field properly as list jpayne@69: vals = self.to_string().split("\t") jpayne@69: n = len(KEY_NAMES) - 1 jpayne@69: return dict(list(zip(KEY_NAMES[:-1], vals[:n])) + [(KEY_NAMES[-1], vals[n:])]) jpayne@69: jpayne@69: @classmethod jpayne@69: def from_dict(cls, sam_dict, AlignmentHeader header): jpayne@69: """parses a dictionary representation of the aligned segment. jpayne@69: jpayne@69: Parameters jpayne@69: ---------- jpayne@69: sam_dict: jpayne@69: dictionary of alignment values, keys corresponding to output from jpayne@69: :meth:`todict()`. jpayne@69: jpayne@69: """ jpayne@69: # let htslib do the parsing jpayne@69: # the tags field can be missing jpayne@69: return cls.fromstring( jpayne@69: "\t".join((sam_dict[x] for x in KEY_NAMES[:-1])) + jpayne@69: "\t" + jpayne@69: "\t".join(sam_dict.get(KEY_NAMES[-1], [])), header) jpayne@69: jpayne@69: ######################################################## jpayne@69: ## Basic attributes in order of appearance in SAM format jpayne@69: property query_name: jpayne@69: """the query template name (None if not present)""" jpayne@69: def __get__(self): jpayne@69: jpayne@69: cdef bam1_t * src = self._delegate jpayne@69: if src.core.l_qname == 0: jpayne@69: return None jpayne@69: jpayne@69: return charptr_to_str(pysam_bam_get_qname(src)) jpayne@69: jpayne@69: def __set__(self, qname): jpayne@69: jpayne@69: if qname is None or len(qname) == 0: jpayne@69: return jpayne@69: jpayne@69: if len(qname) > 254: jpayne@69: raise ValueError("query length out of range {} > 254".format( jpayne@69: len(qname))) jpayne@69: jpayne@69: qname = force_bytes(qname) jpayne@69: cdef bam1_t * src = self._delegate jpayne@69: # the qname is \0 terminated jpayne@69: cdef uint8_t l = len(qname) + 1 jpayne@69: jpayne@69: cdef char * p = pysam_bam_get_qname(src) jpayne@69: cdef uint8_t l_extranul = 0 jpayne@69: jpayne@69: if l % 4 != 0: jpayne@69: l_extranul = 4 - l % 4 jpayne@69: jpayne@69: cdef bam1_t * retval = pysam_bam_update(src, jpayne@69: src.core.l_qname, jpayne@69: l + l_extranul, jpayne@69: p) jpayne@69: if retval == NULL: jpayne@69: raise MemoryError("could not allocate memory") jpayne@69: jpayne@69: src.core.l_extranul = l_extranul jpayne@69: src.core.l_qname = l + l_extranul jpayne@69: jpayne@69: # re-acquire pointer to location in memory jpayne@69: # as it might have moved jpayne@69: p = pysam_bam_get_qname(src) jpayne@69: jpayne@69: strncpy(p, qname, l) jpayne@69: # x might be > 255 jpayne@69: cdef uint16_t x = 0 jpayne@69: jpayne@69: for x from l <= x < l + l_extranul: jpayne@69: p[x] = b'\0' jpayne@69: jpayne@69: property flag: jpayne@69: """properties flag""" jpayne@69: def __get__(self): jpayne@69: return self._delegate.core.flag jpayne@69: def __set__(self, flag): jpayne@69: self._delegate.core.flag = flag jpayne@69: jpayne@69: property reference_name: jpayne@69: """:term:`reference` name""" jpayne@69: def __get__(self): jpayne@69: if self._delegate.core.tid == -1: jpayne@69: return None jpayne@69: if self.header: jpayne@69: return self.header.get_reference_name(self._delegate.core.tid) jpayne@69: else: jpayne@69: raise ValueError("reference_name unknown if no header associated with record") jpayne@69: def __set__(self, reference): jpayne@69: cdef int tid jpayne@69: if reference is None or reference == "*": jpayne@69: self._delegate.core.tid = -1 jpayne@69: elif self.header: jpayne@69: tid = self.header.get_tid(reference) jpayne@69: if tid < 0: jpayne@69: raise ValueError("reference {} does not exist in header".format( jpayne@69: reference)) jpayne@69: self._delegate.core.tid = tid jpayne@69: else: jpayne@69: raise ValueError("reference_name can not be set if no header associated with record") jpayne@69: jpayne@69: property reference_id: jpayne@69: """:term:`reference` ID jpayne@69: jpayne@69: .. note:: jpayne@69: jpayne@69: This field contains the index of the reference sequence in jpayne@69: the sequence dictionary. To obtain the name of the jpayne@69: reference sequence, use :meth:`get_reference_name()` jpayne@69: jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: return self._delegate.core.tid jpayne@69: def __set__(self, tid): jpayne@69: if tid != -1 and self.header and not self.header.is_valid_tid(tid): jpayne@69: raise ValueError("reference id {} does not exist in header".format( jpayne@69: tid)) jpayne@69: self._delegate.core.tid = tid jpayne@69: jpayne@69: property reference_start: jpayne@69: """0-based leftmost coordinate""" jpayne@69: def __get__(self): jpayne@69: return self._delegate.core.pos jpayne@69: def __set__(self, pos): jpayne@69: ## setting the position requires updating the "bin" attribute jpayne@69: cdef bam1_t * src jpayne@69: src = self._delegate jpayne@69: src.core.pos = pos jpayne@69: update_bin(src) jpayne@69: jpayne@69: property mapping_quality: jpayne@69: """mapping quality""" jpayne@69: def __get__(self): jpayne@69: return pysam_get_qual(self._delegate) jpayne@69: def __set__(self, qual): jpayne@69: pysam_set_qual(self._delegate, qual) jpayne@69: jpayne@69: property cigarstring: jpayne@69: '''the :term:`cigar` alignment as a string. jpayne@69: jpayne@69: The cigar string is a string of alternating integers jpayne@69: and characters denoting the length and the type of jpayne@69: an operation. jpayne@69: jpayne@69: .. note:: jpayne@69: The order length,operation is specified in the jpayne@69: SAM format. It is different from the order of jpayne@69: the :attr:`cigar` property. jpayne@69: jpayne@69: Returns None if not present. jpayne@69: jpayne@69: To unset the cigarstring, assign None or the jpayne@69: empty string. jpayne@69: ''' jpayne@69: def __get__(self): jpayne@69: c = self.cigartuples jpayne@69: if c is None: jpayne@69: return None jpayne@69: # reverse order jpayne@69: else: jpayne@69: return "".join([ "%i%c" % (y,CODE2CIGAR[x]) for x,y in c]) jpayne@69: jpayne@69: def __set__(self, cigar): jpayne@69: if cigar is None or len(cigar) == 0: jpayne@69: self.cigartuples = [] jpayne@69: else: jpayne@69: parts = CIGAR_REGEX.findall(cigar) jpayne@69: # reverse order jpayne@69: self.cigartuples = [(CIGAR2CODE[ord(y)], int(x)) for x,y in parts] jpayne@69: jpayne@69: # TODO jpayne@69: # property cigar: jpayne@69: # """the cigar alignment""" jpayne@69: jpayne@69: property next_reference_id: jpayne@69: """the :term:`reference` id of the mate/next read.""" jpayne@69: def __get__(self): jpayne@69: return self._delegate.core.mtid jpayne@69: def __set__(self, mtid): jpayne@69: if mtid != -1 and self.header and not self.header.is_valid_tid(mtid): jpayne@69: raise ValueError("reference id {} does not exist in header".format( jpayne@69: mtid)) jpayne@69: self._delegate.core.mtid = mtid jpayne@69: jpayne@69: property next_reference_name: jpayne@69: """:term:`reference` name of the mate/next read (None if no jpayne@69: AlignmentFile is associated)""" jpayne@69: def __get__(self): jpayne@69: if self._delegate.core.mtid == -1: jpayne@69: return None jpayne@69: if self.header: jpayne@69: return self.header.get_reference_name(self._delegate.core.mtid) jpayne@69: else: jpayne@69: raise ValueError("next_reference_name unknown if no header associated with record") jpayne@69: jpayne@69: def __set__(self, reference): jpayne@69: cdef int mtid jpayne@69: if reference is None or reference == "*": jpayne@69: self._delegate.core.mtid = -1 jpayne@69: elif reference == "=": jpayne@69: self._delegate.core.mtid = self._delegate.core.tid jpayne@69: elif self.header: jpayne@69: mtid = self.header.get_tid(reference) jpayne@69: if mtid < 0: jpayne@69: raise ValueError("reference {} does not exist in header".format( jpayne@69: reference)) jpayne@69: self._delegate.core.mtid = mtid jpayne@69: else: jpayne@69: raise ValueError("next_reference_name can not be set if no header associated with record") jpayne@69: jpayne@69: property next_reference_start: jpayne@69: """the position of the mate/next read.""" jpayne@69: def __get__(self): jpayne@69: return self._delegate.core.mpos jpayne@69: def __set__(self, mpos): jpayne@69: self._delegate.core.mpos = mpos jpayne@69: jpayne@69: property query_length: jpayne@69: """the length of the query/read. jpayne@69: jpayne@69: This value corresponds to the length of the sequence supplied jpayne@69: in the BAM/SAM file. The length of a query is 0 if there is no jpayne@69: sequence in the BAM/SAM file. In those cases, the read length jpayne@69: can be inferred from the CIGAR alignment, see jpayne@69: :meth:`pysam.AlignedSegment.infer_query_length`. jpayne@69: jpayne@69: The length includes soft-clipped bases and is equal to jpayne@69: ``len(query_sequence)``. jpayne@69: jpayne@69: This property is read-only but is updated when a new query jpayne@69: sequence is assigned to this AlignedSegment. jpayne@69: jpayne@69: Returns 0 if not available. jpayne@69: jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: return self._delegate.core.l_qseq jpayne@69: jpayne@69: property template_length: jpayne@69: """the observed query template length""" jpayne@69: def __get__(self): jpayne@69: return self._delegate.core.isize jpayne@69: def __set__(self, isize): jpayne@69: self._delegate.core.isize = isize jpayne@69: jpayne@69: property query_sequence: jpayne@69: """read sequence bases, including :term:`soft clipped` bases jpayne@69: (None if not present). jpayne@69: jpayne@69: Assigning to this attribute will invalidate any quality scores. jpayne@69: Thus, to in-place edit the sequence and quality scores, copies of jpayne@69: the quality scores need to be taken. Consider trimming for example:: jpayne@69: jpayne@69: q = read.query_qualities jpayne@69: read.query_sequence = read.query_sequence[5:10] jpayne@69: read.query_qualities = q[5:10] jpayne@69: jpayne@69: The sequence is returned as it is stored in the BAM file. (This will jpayne@69: be the reverse complement of the original read sequence if the mapper jpayne@69: has aligned the read to the reverse strand.) jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: if self.cache_query_sequence: jpayne@69: return self.cache_query_sequence jpayne@69: jpayne@69: cdef bam1_t * src jpayne@69: cdef char * s jpayne@69: src = self._delegate jpayne@69: jpayne@69: if src.core.l_qseq == 0: jpayne@69: return None jpayne@69: jpayne@69: self.cache_query_sequence = force_str(getSequenceInRange( jpayne@69: src, 0, src.core.l_qseq)) jpayne@69: return self.cache_query_sequence jpayne@69: jpayne@69: def __set__(self, seq): jpayne@69: # samtools manages sequence and quality length memory together jpayne@69: # if no quality information is present, the first byte says 0xff. jpayne@69: cdef bam1_t * src jpayne@69: cdef uint8_t * p jpayne@69: cdef char * s jpayne@69: cdef int l, k jpayne@69: cdef Py_ssize_t nbytes_new, nbytes_old jpayne@69: jpayne@69: if seq == None: jpayne@69: l = 0 jpayne@69: else: jpayne@69: l = len(seq) jpayne@69: seq = force_bytes(seq) jpayne@69: jpayne@69: src = self._delegate jpayne@69: jpayne@69: # as the sequence is stored in half-bytes, the total length (sequence jpayne@69: # plus quality scores) is (l+1)/2 + l jpayne@69: nbytes_new = (l + 1) // 2 + l jpayne@69: nbytes_old = (src.core.l_qseq + 1) // 2 + src.core.l_qseq jpayne@69: jpayne@69: # acquire pointer to location in memory jpayne@69: p = pysam_bam_get_seq(src) jpayne@69: src.core.l_qseq = l jpayne@69: jpayne@69: # change length of data field jpayne@69: cdef bam1_t * retval = pysam_bam_update(src, jpayne@69: nbytes_old, jpayne@69: nbytes_new, jpayne@69: p) jpayne@69: jpayne@69: if retval == NULL: jpayne@69: raise MemoryError("could not allocate memory") jpayne@69: jpayne@69: if l > 0: jpayne@69: # re-acquire pointer to location in memory jpayne@69: # as it might have moved jpayne@69: p = pysam_bam_get_seq(src) jpayne@69: for k from 0 <= k < nbytes_new: jpayne@69: p[k] = 0 jpayne@69: # convert to C string jpayne@69: s = seq jpayne@69: for k from 0 <= k < l: jpayne@69: p[k // 2] |= seq_nt16_table[s[k]] << 4 * (1 - k % 2) jpayne@69: jpayne@69: # erase qualities jpayne@69: p = pysam_bam_get_qual(src) jpayne@69: memset(p, 0xff, l) jpayne@69: jpayne@69: self.cache_query_sequence = force_str(seq) jpayne@69: jpayne@69: # clear cached values for quality values jpayne@69: self.cache_query_qualities = None jpayne@69: self.cache_query_alignment_qualities = None jpayne@69: jpayne@69: property query_qualities: jpayne@69: """read sequence base qualities, including :term:`soft clipped` bases jpayne@69: (None if not present). jpayne@69: jpayne@69: Quality scores are returned as a python array of unsigned jpayne@69: chars. Note that this is not the ASCII-encoded value typically jpayne@69: seen in FASTQ or SAM formatted files. Thus, no offset of 33 jpayne@69: needs to be subtracted. jpayne@69: jpayne@69: Note that to set quality scores the sequence has to be set jpayne@69: beforehand as this will determine the expected length of the jpayne@69: quality score array. jpayne@69: jpayne@69: This method raises a ValueError if the length of the jpayne@69: quality scores and the sequence are not the same. jpayne@69: jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: jpayne@69: if self.cache_query_qualities: jpayne@69: return self.cache_query_qualities jpayne@69: jpayne@69: cdef bam1_t * src jpayne@69: cdef char * q jpayne@69: jpayne@69: src = self._delegate jpayne@69: jpayne@69: if src.core.l_qseq == 0: jpayne@69: return None jpayne@69: jpayne@69: self.cache_query_qualities = getQualitiesInRange(src, 0, src.core.l_qseq) jpayne@69: return self.cache_query_qualities jpayne@69: jpayne@69: def __set__(self, qual): jpayne@69: jpayne@69: # note that memory is already allocated via setting the sequence jpayne@69: # hence length match of sequence and quality needs is checked. jpayne@69: cdef bam1_t * src jpayne@69: cdef uint8_t * p jpayne@69: cdef int l jpayne@69: jpayne@69: src = self._delegate jpayne@69: p = pysam_bam_get_qual(src) jpayne@69: if qual is None or len(qual) == 0: jpayne@69: # if absent and there is a sequence: set to 0xff jpayne@69: memset(p, 0xff, src.core.l_qseq) jpayne@69: return jpayne@69: jpayne@69: # check for length match jpayne@69: l = len(qual) jpayne@69: if src.core.l_qseq != l: jpayne@69: raise ValueError( jpayne@69: "quality and sequence mismatch: %i != %i" % jpayne@69: (l, src.core.l_qseq)) jpayne@69: jpayne@69: # create a python array object filling it jpayne@69: # with the quality scores jpayne@69: jpayne@69: # NB: should avoid this copying if qual is jpayne@69: # already of the correct type. jpayne@69: cdef c_array.array result = c_array.array('B', qual) jpayne@69: jpayne@69: # copy data jpayne@69: memcpy(p, result.data.as_voidptr, l) jpayne@69: jpayne@69: # save in cache jpayne@69: self.cache_query_qualities = qual jpayne@69: jpayne@69: property bin: jpayne@69: """properties bin""" jpayne@69: def __get__(self): jpayne@69: return self._delegate.core.bin jpayne@69: def __set__(self, bin): jpayne@69: self._delegate.core.bin = bin jpayne@69: jpayne@69: jpayne@69: ########################################################## jpayne@69: # Derived simple attributes. These are simple attributes of jpayne@69: # AlignedSegment getting and setting values. jpayne@69: ########################################################## jpayne@69: # 1. Flags jpayne@69: ########################################################## jpayne@69: property is_paired: jpayne@69: """true if read is paired in sequencing""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FPAIRED) != 0 jpayne@69: def __set__(self,val): jpayne@69: pysam_update_flag(self._delegate, val, BAM_FPAIRED) jpayne@69: jpayne@69: property is_proper_pair: jpayne@69: """true if read is mapped in a proper pair""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FPROPER_PAIR) != 0 jpayne@69: def __set__(self,val): jpayne@69: pysam_update_flag(self._delegate, val, BAM_FPROPER_PAIR) jpayne@69: jpayne@69: property is_unmapped: jpayne@69: """true if read itself is unmapped""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FUNMAP) != 0 jpayne@69: def __set__(self, val): jpayne@69: pysam_update_flag(self._delegate, val, BAM_FUNMAP) jpayne@69: # setting the unmapped flag requires recalculation of jpayne@69: # bin as alignment length is now implicitly 1 jpayne@69: update_bin(self._delegate) jpayne@69: jpayne@69: property is_mapped: jpayne@69: """true if read itself is mapped jpayne@69: (implemented in terms of :attr:`is_unmapped`)""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FUNMAP) == 0 jpayne@69: def __set__(self, val): jpayne@69: pysam_update_flag(self._delegate, not val, BAM_FUNMAP) jpayne@69: update_bin(self._delegate) jpayne@69: jpayne@69: property mate_is_unmapped: jpayne@69: """true if the mate is unmapped""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FMUNMAP) != 0 jpayne@69: def __set__(self,val): jpayne@69: pysam_update_flag(self._delegate, val, BAM_FMUNMAP) jpayne@69: jpayne@69: property mate_is_mapped: jpayne@69: """true if the mate is mapped jpayne@69: (implemented in terms of :attr:`mate_is_unmapped`)""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FMUNMAP) == 0 jpayne@69: def __set__(self,val): jpayne@69: pysam_update_flag(self._delegate, not val, BAM_FMUNMAP) jpayne@69: jpayne@69: property is_reverse: jpayne@69: """true if read is mapped to reverse strand""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FREVERSE) != 0 jpayne@69: def __set__(self,val): jpayne@69: pysam_update_flag(self._delegate, val, BAM_FREVERSE) jpayne@69: jpayne@69: property is_forward: jpayne@69: """true if read is mapped to forward strand jpayne@69: (implemented in terms of :attr:`is_reverse`)""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FREVERSE) == 0 jpayne@69: def __set__(self,val): jpayne@69: pysam_update_flag(self._delegate, not val, BAM_FREVERSE) jpayne@69: jpayne@69: property mate_is_reverse: jpayne@69: """true if the mate is mapped to reverse strand""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FMREVERSE) != 0 jpayne@69: def __set__(self,val): jpayne@69: pysam_update_flag(self._delegate, val, BAM_FMREVERSE) jpayne@69: jpayne@69: property mate_is_forward: jpayne@69: """true if the mate is mapped to forward strand jpayne@69: (implemented in terms of :attr:`mate_is_reverse`)""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FMREVERSE) == 0 jpayne@69: def __set__(self,val): jpayne@69: pysam_update_flag(self._delegate, not val, BAM_FMREVERSE) jpayne@69: jpayne@69: property is_read1: jpayne@69: """true if this is read1""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FREAD1) != 0 jpayne@69: def __set__(self,val): jpayne@69: pysam_update_flag(self._delegate, val, BAM_FREAD1) jpayne@69: property is_read2: jpayne@69: """true if this is read2""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FREAD2) != 0 jpayne@69: def __set__(self, val): jpayne@69: pysam_update_flag(self._delegate, val, BAM_FREAD2) jpayne@69: property is_secondary: jpayne@69: """true if not primary alignment""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FSECONDARY) != 0 jpayne@69: def __set__(self, val): jpayne@69: pysam_update_flag(self._delegate, val, BAM_FSECONDARY) jpayne@69: property is_qcfail: jpayne@69: """true if QC failure""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FQCFAIL) != 0 jpayne@69: def __set__(self, val): jpayne@69: pysam_update_flag(self._delegate, val, BAM_FQCFAIL) jpayne@69: property is_duplicate: jpayne@69: """true if optical or PCR duplicate""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FDUP) != 0 jpayne@69: def __set__(self, val): jpayne@69: pysam_update_flag(self._delegate, val, BAM_FDUP) jpayne@69: property is_supplementary: jpayne@69: """true if this is a supplementary alignment""" jpayne@69: def __get__(self): jpayne@69: return (self.flag & BAM_FSUPPLEMENTARY) != 0 jpayne@69: def __set__(self, val): jpayne@69: pysam_update_flag(self._delegate, val, BAM_FSUPPLEMENTARY) jpayne@69: jpayne@69: # 2. Coordinates and lengths jpayne@69: property reference_end: jpayne@69: '''aligned reference position of the read on the reference genome. jpayne@69: jpayne@69: reference_end points to one past the last aligned residue. jpayne@69: Returns None if not available (read is unmapped or no cigar jpayne@69: alignment present). jpayne@69: jpayne@69: ''' jpayne@69: def __get__(self): jpayne@69: cdef bam1_t * src jpayne@69: src = self._delegate jpayne@69: if (self.flag & BAM_FUNMAP) or pysam_get_n_cigar(src) == 0: jpayne@69: return None jpayne@69: return bam_endpos(src) jpayne@69: jpayne@69: property reference_length: jpayne@69: '''aligned length of the read on the reference genome. jpayne@69: jpayne@69: This is equal to `reference_end - reference_start`. jpayne@69: Returns None if not available. jpayne@69: ''' jpayne@69: def __get__(self): jpayne@69: cdef bam1_t * src jpayne@69: src = self._delegate jpayne@69: if (self.flag & BAM_FUNMAP) or pysam_get_n_cigar(src) == 0: jpayne@69: return None jpayne@69: return bam_endpos(src) - \ jpayne@69: self._delegate.core.pos jpayne@69: jpayne@69: property query_alignment_sequence: jpayne@69: """aligned portion of the read. jpayne@69: jpayne@69: This is a substring of :attr:`query_sequence` that excludes flanking jpayne@69: bases that were :term:`soft clipped` (None if not present). It jpayne@69: is equal to ``query_sequence[query_alignment_start:query_alignment_end]``. jpayne@69: jpayne@69: SAM/BAM files may include extra flanking bases that are not jpayne@69: part of the alignment. These bases may be the result of the jpayne@69: Smith-Waterman or other algorithms, which may not require jpayne@69: alignments that begin at the first residue or end at the last. jpayne@69: In addition, extra sequencing adapters, multiplex identifiers, jpayne@69: and low-quality bases that were not considered for alignment jpayne@69: may have been retained. jpayne@69: jpayne@69: """ jpayne@69: jpayne@69: def __get__(self): jpayne@69: if self.cache_query_alignment_sequence: jpayne@69: return self.cache_query_alignment_sequence jpayne@69: jpayne@69: cdef bam1_t * src jpayne@69: cdef uint32_t start, end jpayne@69: jpayne@69: src = self._delegate jpayne@69: jpayne@69: if src.core.l_qseq == 0: jpayne@69: return None jpayne@69: jpayne@69: start = getQueryStart(src) jpayne@69: end = getQueryEnd(src) jpayne@69: jpayne@69: self.cache_query_alignment_sequence = force_str( jpayne@69: getSequenceInRange(src, start, end)) jpayne@69: return self.cache_query_alignment_sequence jpayne@69: jpayne@69: property query_alignment_qualities: jpayne@69: """aligned query sequence quality values (None if not present). These jpayne@69: are the quality values that correspond to jpayne@69: :attr:`query_alignment_sequence`, that is, they exclude qualities of jpayne@69: :term:`soft clipped` bases. This is equal to jpayne@69: ``query_qualities[query_alignment_start:query_alignment_end]``. jpayne@69: jpayne@69: Quality scores are returned as a python array of unsigned jpayne@69: chars. Note that this is not the ASCII-encoded value typically jpayne@69: seen in FASTQ or SAM formatted files. Thus, no offset of 33 jpayne@69: needs to be subtracted. jpayne@69: jpayne@69: This property is read-only. jpayne@69: jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: jpayne@69: if self.cache_query_alignment_qualities: jpayne@69: return self.cache_query_alignment_qualities jpayne@69: jpayne@69: cdef bam1_t * src jpayne@69: cdef uint32_t start, end jpayne@69: jpayne@69: src = self._delegate jpayne@69: jpayne@69: if src.core.l_qseq == 0: jpayne@69: return None jpayne@69: jpayne@69: start = getQueryStart(src) jpayne@69: end = getQueryEnd(src) jpayne@69: self.cache_query_alignment_qualities = \ jpayne@69: getQualitiesInRange(src, start, end) jpayne@69: return self.cache_query_alignment_qualities jpayne@69: jpayne@69: property query_alignment_start: jpayne@69: """start index of the aligned query portion of the sequence (0-based, jpayne@69: inclusive). jpayne@69: jpayne@69: This the index of the first base in :attr:`query_sequence` jpayne@69: that is not soft-clipped. jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: return getQueryStart(self._delegate) jpayne@69: jpayne@69: property query_alignment_end: jpayne@69: """end index of the aligned query portion of the sequence (0-based, jpayne@69: exclusive) jpayne@69: jpayne@69: This the index just past the last base in :attr:`query_sequence` jpayne@69: that is not soft-clipped. jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: return getQueryEnd(self._delegate) jpayne@69: jpayne@69: property modified_bases: jpayne@69: """Modified bases annotations from Ml/Mm tags. The output is jpayne@69: Dict[(canonical base, strand, modification)] -> [ (pos,qual), ...] jpayne@69: with qual being (256*probability), or -1 if unknown. jpayne@69: Strand==0 for forward and 1 for reverse strand modification jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: cdef bam1_t * src jpayne@69: cdef hts_base_mod_state *m = hts_base_mod_state_alloc() jpayne@69: cdef hts_base_mod mods[5] jpayne@69: cdef int pos jpayne@69: jpayne@69: ret = {} jpayne@69: src = self._delegate jpayne@69: jpayne@69: if bam_parse_basemod(src, m) < 0: jpayne@69: return None jpayne@69: jpayne@69: n = bam_next_basemod(src, m, mods, 5, &pos) jpayne@69: jpayne@69: while n>0: jpayne@69: for i in range(n): jpayne@69: mod_code = chr(mods[i].modified_base) if mods[i].modified_base>0 else -mods[i].modified_base jpayne@69: mod_strand = mods[i].strand jpayne@69: if self.is_reverse: jpayne@69: mod_strand = 1 - mod_strand jpayne@69: key = (chr(mods[i].canonical_base), jpayne@69: mod_strand, jpayne@69: mod_code ) jpayne@69: ret.setdefault(key,[]).append((pos,mods[i].qual)) jpayne@69: jpayne@69: n = bam_next_basemod(src, m, mods, 5, &pos) jpayne@69: jpayne@69: if n<0: jpayne@69: return None jpayne@69: jpayne@69: hts_base_mod_state_free(m) jpayne@69: return ret jpayne@69: jpayne@69: property modified_bases_forward: jpayne@69: """Modified bases annotations from Ml/Mm tags. The output is jpayne@69: Dict[(canonical base, strand, modification)] -> [ (pos,qual), ...] jpayne@69: with qual being (256*probability), or -1 if unknown. jpayne@69: Strand==0 for forward and 1 for reverse strand modification. jpayne@69: The positions are with respect to the original sequence from get_forward_sequence() jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: pmods = self.modified_bases jpayne@69: if pmods and self.is_reverse: jpayne@69: rmod = {} jpayne@69: jpayne@69: # Try to find the length of the original sequence jpayne@69: rlen = self.infer_read_length() jpayne@69: if rlen is None and self.query_sequence is None: jpayne@69: return rmod jpayne@69: else: jpayne@69: rlen = len(self.query_sequence) jpayne@69: jpayne@69: for k,mods in pmods.items(): jpayne@69: nk = k[0],1 - k[1],k[2] jpayne@69: for i in range(len(mods)): jpayne@69: jpayne@69: mods[i] = (rlen - 1 -mods[i][0], mods[i][1]) jpayne@69: rmod[nk] = mods jpayne@69: return rmod jpayne@69: jpayne@69: return pmods jpayne@69: jpayne@69: jpayne@69: property query_alignment_length: jpayne@69: """length of the aligned query sequence. jpayne@69: jpayne@69: This is equal to :attr:`query_alignment_end` - jpayne@69: :attr:`query_alignment_start` jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: cdef bam1_t * src jpayne@69: src = self._delegate jpayne@69: return getQueryEnd(src) - getQueryStart(src) jpayne@69: jpayne@69: ##################################################### jpayne@69: # Computed properties jpayne@69: jpayne@69: def get_reference_positions(self, full_length=False): jpayne@69: """a list of reference positions that this read aligns to. jpayne@69: jpayne@69: By default, this method returns the (0-based) positions on the jpayne@69: reference that are within the read's alignment, leaving gaps jpayne@69: corresponding to deletions and other reference skips. jpayne@69: jpayne@69: When *full_length* is True, the returned list is the same length jpayne@69: as the read and additionally includes None values corresponding jpayne@69: to insertions or soft-clipping, i.e., to bases of the read that jpayne@69: are not aligned to a reference position. jpayne@69: (See also :meth:`get_aligned_pairs` which additionally returns jpayne@69: the corresponding positions along the read.) jpayne@69: """ jpayne@69: cdef uint32_t k, i, l, pos jpayne@69: cdef int op jpayne@69: cdef uint32_t * cigar_p jpayne@69: cdef bam1_t * src jpayne@69: cdef bint _full = full_length jpayne@69: jpayne@69: src = self._delegate jpayne@69: if pysam_get_n_cigar(src) == 0: jpayne@69: return [] jpayne@69: jpayne@69: result = [] jpayne@69: pos = src.core.pos jpayne@69: cigar_p = pysam_bam_get_cigar(src) jpayne@69: jpayne@69: for k from 0 <= k < pysam_get_n_cigar(src): jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: l = cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: jpayne@69: if op == BAM_CSOFT_CLIP or op == BAM_CINS: jpayne@69: if _full: jpayne@69: for i from 0 <= i < l: jpayne@69: result.append(None) jpayne@69: elif op == BAM_CMATCH or op == BAM_CEQUAL or op == BAM_CDIFF: jpayne@69: for i from pos <= i < pos + l: jpayne@69: result.append(i) jpayne@69: pos += l jpayne@69: elif op == BAM_CDEL or op == BAM_CREF_SKIP: jpayne@69: pos += l jpayne@69: jpayne@69: return result jpayne@69: jpayne@69: def infer_query_length(self, always=False): jpayne@69: """infer query length from CIGAR alignment. jpayne@69: jpayne@69: This method deduces the query length from the CIGAR alignment jpayne@69: but does not include hard-clipped bases. jpayne@69: jpayne@69: Returns None if CIGAR alignment is not present. jpayne@69: jpayne@69: If *always* is set to True, `infer_read_length` is used instead. jpayne@69: This is deprecated and only present for backward compatibility. jpayne@69: """ jpayne@69: if always is True: jpayne@69: return self.infer_read_length() jpayne@69: cdef int32_t l = calculateQueryLengthWithoutHardClipping(self._delegate) jpayne@69: if l > 0: jpayne@69: return l jpayne@69: else: jpayne@69: return None jpayne@69: jpayne@69: def infer_read_length(self): jpayne@69: """infer read length from CIGAR alignment. jpayne@69: jpayne@69: This method deduces the read length from the CIGAR alignment jpayne@69: including hard-clipped bases. jpayne@69: jpayne@69: Returns None if CIGAR alignment is not present. jpayne@69: """ jpayne@69: cdef int32_t l = calculateQueryLengthWithHardClipping(self._delegate) jpayne@69: if l > 0: jpayne@69: return l jpayne@69: else: jpayne@69: return None jpayne@69: jpayne@69: def get_reference_sequence(self): jpayne@69: """return the reference sequence in the region that is covered by the jpayne@69: alignment of the read to the reference. jpayne@69: jpayne@69: This method requires the MD tag to be set. jpayne@69: jpayne@69: """ jpayne@69: return force_str(build_reference_sequence(self._delegate)) jpayne@69: jpayne@69: def get_forward_sequence(self): jpayne@69: """return the original read sequence. jpayne@69: jpayne@69: Reads mapped to the reverse strand are stored reverse complemented in jpayne@69: the BAM file. This method returns such reads reverse complemented back jpayne@69: to their original orientation. jpayne@69: jpayne@69: Returns None if the record has no query sequence. jpayne@69: """ jpayne@69: if self.query_sequence is None: jpayne@69: return None jpayne@69: s = force_str(self.query_sequence) jpayne@69: if self.is_reverse: jpayne@69: s = s.translate(str.maketrans("ACGTacgtNnXx", "TGCAtgcaNnXx"))[::-1] jpayne@69: return s jpayne@69: jpayne@69: def get_forward_qualities(self): jpayne@69: """return the original base qualities of the read sequence, jpayne@69: in the same format as the :attr:`query_qualities` property. jpayne@69: jpayne@69: Reads mapped to the reverse strand have their base qualities stored jpayne@69: reversed in the BAM file. This method returns such reads' base qualities jpayne@69: reversed back to their original orientation. jpayne@69: """ jpayne@69: if self.is_reverse: jpayne@69: return self.query_qualities[::-1] jpayne@69: else: jpayne@69: return self.query_qualities jpayne@69: jpayne@69: def get_aligned_pairs(self, matches_only=False, with_seq=False, with_cigar=False): jpayne@69: """a list of aligned read (query) and reference positions. jpayne@69: jpayne@69: Each item in the returned list is a tuple consisting of jpayne@69: the 0-based offset from the start of the read sequence jpayne@69: followed by the 0-based reference position. jpayne@69: jpayne@69: For inserts, deletions, skipping either query or reference jpayne@69: position may be None. jpayne@69: jpayne@69: For padding in the reference, the reference position will jpayne@69: always be None. jpayne@69: jpayne@69: Parameters jpayne@69: ---------- jpayne@69: jpayne@69: matches_only : bool jpayne@69: If True, only matched bases are returned --- no None on either jpayne@69: side. jpayne@69: with_seq : bool jpayne@69: If True, return a third element in the tuple containing the jpayne@69: reference sequence. For CIGAR 'P' (padding in the reference) jpayne@69: operations, the third tuple element will be None. Substitutions jpayne@69: are lower-case. This option requires an MD tag to be present. jpayne@69: with_cigar : bool jpayne@69: If True, return an extra element in the tuple containing the jpayne@69: CIGAR operator corresponding to this position tuple. jpayne@69: jpayne@69: Returns jpayne@69: ------- jpayne@69: jpayne@69: aligned_pairs : list of tuples jpayne@69: jpayne@69: """ jpayne@69: cdef uint32_t k, i, pos, qpos, r_idx, l jpayne@69: cdef int op jpayne@69: cdef uint32_t * cigar_p jpayne@69: cdef bam1_t * src = self._delegate jpayne@69: cdef bint _matches_only = bool(matches_only) jpayne@69: cdef bint _with_seq = bool(with_seq) jpayne@69: cdef bint _with_cigar = bool(with_cigar) jpayne@69: cdef object (*make_tuple)(object, object, object, uint32_t, int) jpayne@69: jpayne@69: # TODO: this method performs no checking and assumes that jpayne@69: # read sequence, cigar and MD tag are consistent. jpayne@69: jpayne@69: if _with_seq: jpayne@69: # force_str required for py2/py3 compatibility jpayne@69: ref_seq = force_str(build_reference_sequence(src)) jpayne@69: if ref_seq is None: jpayne@69: raise ValueError("MD tag not present") jpayne@69: make_tuple = _alignedpairs_with_seq_cigar if _with_cigar else _alignedpairs_with_seq jpayne@69: else: jpayne@69: ref_seq = None jpayne@69: make_tuple = _alignedpairs_with_cigar if _with_cigar else _alignedpairs_positions jpayne@69: jpayne@69: r_idx = 0 jpayne@69: jpayne@69: if pysam_get_n_cigar(src) == 0: jpayne@69: return [] jpayne@69: jpayne@69: result = [] jpayne@69: pos = src.core.pos jpayne@69: qpos = 0 jpayne@69: cigar_p = pysam_bam_get_cigar(src) jpayne@69: for k from 0 <= k < pysam_get_n_cigar(src): jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: l = cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: jpayne@69: if op == BAM_CMATCH or op == BAM_CEQUAL or op == BAM_CDIFF: jpayne@69: for i from pos <= i < pos + l: jpayne@69: result.append(make_tuple(qpos, i, ref_seq, r_idx, op)) jpayne@69: r_idx += 1 jpayne@69: qpos += 1 jpayne@69: pos += l jpayne@69: jpayne@69: elif op == BAM_CINS or op == BAM_CSOFT_CLIP or op == BAM_CPAD: jpayne@69: if not _matches_only: jpayne@69: for i from pos <= i < pos + l: jpayne@69: result.append(make_tuple(qpos, None, None, 0, op)) jpayne@69: qpos += 1 jpayne@69: else: jpayne@69: qpos += l jpayne@69: jpayne@69: elif op == BAM_CDEL: jpayne@69: if not _matches_only: jpayne@69: for i from pos <= i < pos + l: jpayne@69: result.append(make_tuple(None, i, ref_seq, r_idx, op)) jpayne@69: r_idx += 1 jpayne@69: else: jpayne@69: r_idx += l jpayne@69: pos += l jpayne@69: jpayne@69: elif op == BAM_CHARD_CLIP: jpayne@69: pass # advances neither jpayne@69: jpayne@69: elif op == BAM_CREF_SKIP: jpayne@69: if not _matches_only: jpayne@69: for i from pos <= i < pos + l: jpayne@69: result.append(make_tuple(None, i, None, 0, op)) jpayne@69: jpayne@69: pos += l jpayne@69: jpayne@69: return result jpayne@69: jpayne@69: def get_blocks(self): jpayne@69: """ a list of start and end positions of jpayne@69: aligned gapless blocks. jpayne@69: jpayne@69: The start and end positions are in genomic jpayne@69: coordinates. jpayne@69: jpayne@69: Blocks are not normalized, i.e. two blocks jpayne@69: might be directly adjacent. This happens if jpayne@69: the two blocks are separated by an insertion jpayne@69: in the read. jpayne@69: """ jpayne@69: jpayne@69: cdef uint32_t k, pos, l jpayne@69: cdef int op jpayne@69: cdef uint32_t * cigar_p jpayne@69: cdef bam1_t * src jpayne@69: jpayne@69: src = self._delegate jpayne@69: if pysam_get_n_cigar(src) == 0: jpayne@69: return [] jpayne@69: jpayne@69: result = [] jpayne@69: pos = src.core.pos jpayne@69: cigar_p = pysam_bam_get_cigar(src) jpayne@69: l = 0 jpayne@69: jpayne@69: for k from 0 <= k < pysam_get_n_cigar(src): jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: l = cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: if op == BAM_CMATCH or op == BAM_CEQUAL or op == BAM_CDIFF: jpayne@69: result.append((pos, pos + l)) jpayne@69: pos += l jpayne@69: elif op == BAM_CDEL or op == BAM_CREF_SKIP: jpayne@69: pos += l jpayne@69: jpayne@69: return result jpayne@69: jpayne@69: def get_overlap(self, uint32_t start, uint32_t end): jpayne@69: """return number of aligned bases of read overlapping the interval jpayne@69: *start* and *end* on the reference sequence. jpayne@69: jpayne@69: Return None if cigar alignment is not available. jpayne@69: """ jpayne@69: cdef uint32_t k, i, pos, overlap jpayne@69: cdef int op, o jpayne@69: cdef uint32_t * cigar_p jpayne@69: cdef bam1_t * src jpayne@69: jpayne@69: overlap = 0 jpayne@69: jpayne@69: src = self._delegate jpayne@69: if pysam_get_n_cigar(src) == 0: jpayne@69: return None jpayne@69: pos = src.core.pos jpayne@69: o = 0 jpayne@69: jpayne@69: cigar_p = pysam_bam_get_cigar(src) jpayne@69: for k from 0 <= k < pysam_get_n_cigar(src): jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: l = cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: jpayne@69: if op == BAM_CMATCH or op == BAM_CEQUAL or op == BAM_CDIFF: jpayne@69: o = min( pos + l, end) - max( pos, start ) jpayne@69: if o > 0: overlap += o jpayne@69: jpayne@69: if op == BAM_CMATCH or op == BAM_CDEL or op == BAM_CREF_SKIP or op == BAM_CEQUAL or op == BAM_CDIFF: jpayne@69: pos += l jpayne@69: jpayne@69: return overlap jpayne@69: jpayne@69: def get_cigar_stats(self): jpayne@69: """summary of operations in cigar string. jpayne@69: jpayne@69: The output order in the array is "MIDNSHP=X" followed by a jpayne@69: field for the NM tag. If the NM tag is not present, this jpayne@69: field will always be 0. (Accessing this field via index -1 jpayne@69: avoids changes if more CIGAR operators are added in future.) jpayne@69: jpayne@69: +-----+----------------+--------+ jpayne@69: |M |pysam.CMATCH |0 | jpayne@69: +-----+----------------+--------+ jpayne@69: |I |pysam.CINS |1 | jpayne@69: +-----+----------------+--------+ jpayne@69: |D |pysam.CDEL |2 | jpayne@69: +-----+----------------+--------+ jpayne@69: |N |pysam.CREF_SKIP |3 | jpayne@69: +-----+----------------+--------+ jpayne@69: |S |pysam.CSOFT_CLIP|4 | jpayne@69: +-----+----------------+--------+ jpayne@69: |H |pysam.CHARD_CLIP|5 | jpayne@69: +-----+----------------+--------+ jpayne@69: |P |pysam.CPAD |6 | jpayne@69: +-----+----------------+--------+ jpayne@69: |= |pysam.CEQUAL |7 | jpayne@69: +-----+----------------+--------+ jpayne@69: |X |pysam.CDIFF |8 | jpayne@69: +-----+----------------+--------+ jpayne@69: |B |pysam.CBACK |9 | jpayne@69: +-----+----------------+--------+ jpayne@69: |NM |NM tag |10 or -1| jpayne@69: +-----+----------------+--------+ jpayne@69: jpayne@69: If no cigar string is present, empty arrays will be returned. jpayne@69: jpayne@69: Returns: jpayne@69: arrays : jpayne@69: two arrays. The first contains the nucleotide counts within jpayne@69: each cigar operation, the second contains the number of blocks jpayne@69: for each cigar operation. jpayne@69: jpayne@69: """ jpayne@69: jpayne@69: cdef int nfields = NCIGAR_CODES + 1 jpayne@69: jpayne@69: cdef c_array.array base_counts = array.array( jpayne@69: "I", jpayne@69: [0] * nfields) jpayne@69: cdef uint32_t [:] base_view = base_counts jpayne@69: cdef c_array.array block_counts = array.array( jpayne@69: "I", jpayne@69: [0] * nfields) jpayne@69: cdef uint32_t [:] block_view = block_counts jpayne@69: jpayne@69: cdef bam1_t * src = self._delegate jpayne@69: cdef int op jpayne@69: cdef uint32_t l jpayne@69: cdef int32_t k jpayne@69: cdef uint32_t * cigar_p = pysam_bam_get_cigar(src) jpayne@69: jpayne@69: if cigar_p == NULL: jpayne@69: return None jpayne@69: jpayne@69: for k from 0 <= k < pysam_get_n_cigar(src): jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: l = cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: base_view[op] += l jpayne@69: block_view[op] += 1 jpayne@69: jpayne@69: cdef uint8_t * v = bam_aux_get(src, 'NM') jpayne@69: if v != NULL: jpayne@69: base_view[nfields - 1] = bam_aux2i(v) jpayne@69: jpayne@69: return base_counts, block_counts jpayne@69: jpayne@69: ##################################################### jpayne@69: ## Unsorted as yet jpayne@69: # TODO: capture in CIGAR object jpayne@69: property cigartuples: jpayne@69: """the :term:`cigar` alignment. The alignment jpayne@69: is returned as a list of tuples of (operation, length). jpayne@69: jpayne@69: If the alignment is not present, None is returned. jpayne@69: jpayne@69: The operations are: jpayne@69: jpayne@69: +-----+----------------+-----+ jpayne@69: |M |pysam.CMATCH |0 | jpayne@69: +-----+----------------+-----+ jpayne@69: |I |pysam.CINS |1 | jpayne@69: +-----+----------------+-----+ jpayne@69: |D |pysam.CDEL |2 | jpayne@69: +-----+----------------+-----+ jpayne@69: |N |pysam.CREF_SKIP |3 | jpayne@69: +-----+----------------+-----+ jpayne@69: |S |pysam.CSOFT_CLIP|4 | jpayne@69: +-----+----------------+-----+ jpayne@69: |H |pysam.CHARD_CLIP|5 | jpayne@69: +-----+----------------+-----+ jpayne@69: |P |pysam.CPAD |6 | jpayne@69: +-----+----------------+-----+ jpayne@69: |= |pysam.CEQUAL |7 | jpayne@69: +-----+----------------+-----+ jpayne@69: |X |pysam.CDIFF |8 | jpayne@69: +-----+----------------+-----+ jpayne@69: |B |pysam.CBACK |9 | jpayne@69: +-----+----------------+-----+ jpayne@69: jpayne@69: .. note:: jpayne@69: The output is a list of (operation, length) tuples, such as jpayne@69: ``[(0, 30)]``. jpayne@69: This is different from the SAM specification and jpayne@69: the :attr:`cigarstring` property, which uses a jpayne@69: (length, operation) order, for example: ``30M``. jpayne@69: jpayne@69: To unset the cigar property, assign an empty list jpayne@69: or None. jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: cdef uint32_t * cigar_p jpayne@69: cdef bam1_t * src jpayne@69: cdef uint32_t op, l jpayne@69: cdef uint32_t k jpayne@69: jpayne@69: src = self._delegate jpayne@69: if pysam_get_n_cigar(src) == 0: jpayne@69: return None jpayne@69: jpayne@69: cigar = [] jpayne@69: jpayne@69: cigar_p = pysam_bam_get_cigar(src); jpayne@69: for k from 0 <= k < pysam_get_n_cigar(src): jpayne@69: op = cigar_p[k] & BAM_CIGAR_MASK jpayne@69: l = cigar_p[k] >> BAM_CIGAR_SHIFT jpayne@69: cigar.append((op, l)) jpayne@69: return cigar jpayne@69: jpayne@69: def __set__(self, values): jpayne@69: cdef uint32_t * p jpayne@69: cdef bam1_t * src jpayne@69: cdef op, l jpayne@69: cdef int k jpayne@69: jpayne@69: k = 0 jpayne@69: jpayne@69: src = self._delegate jpayne@69: jpayne@69: # get location of cigar string jpayne@69: p = pysam_bam_get_cigar(src) jpayne@69: jpayne@69: # empty values for cigar string jpayne@69: if values is None: jpayne@69: values = [] jpayne@69: jpayne@69: cdef uint32_t ncigar = len(values) jpayne@69: jpayne@69: cdef bam1_t * retval = pysam_bam_update(src, jpayne@69: pysam_get_n_cigar(src) * 4, jpayne@69: ncigar * 4, jpayne@69: p) jpayne@69: jpayne@69: if retval == NULL: jpayne@69: raise MemoryError("could not allocate memory") jpayne@69: jpayne@69: # length is number of cigar operations, not bytes jpayne@69: pysam_set_n_cigar(src, ncigar) jpayne@69: jpayne@69: # re-acquire pointer to location in memory jpayne@69: # as it might have moved jpayne@69: p = pysam_bam_get_cigar(src) jpayne@69: jpayne@69: # insert cigar operations jpayne@69: for op, l in values: jpayne@69: p[k] = l << BAM_CIGAR_SHIFT | op jpayne@69: k += 1 jpayne@69: jpayne@69: ## setting the cigar string requires updating the bin jpayne@69: update_bin(src) jpayne@69: jpayne@69: cpdef set_tag(self, jpayne@69: tag, jpayne@69: value, jpayne@69: value_type=None, jpayne@69: replace=True): jpayne@69: """sets a particular field *tag* to *value* in the optional alignment jpayne@69: section. jpayne@69: jpayne@69: *value_type* describes the type of *value* that is to entered jpayne@69: into the alignment record. It can be set explicitly to one of jpayne@69: the valid one-letter type codes. If unset, an appropriate type jpayne@69: will be chosen automatically based on the python type of jpayne@69: *value*. jpayne@69: jpayne@69: An existing value of the same *tag* will be overwritten unless jpayne@69: *replace* is set to False. This is usually not recommended as a jpayne@69: tag may only appear once in the optional alignment section. jpayne@69: jpayne@69: If *value* is `None`, the tag will be deleted. jpayne@69: jpayne@69: This method accepts valid SAM specification value types, which jpayne@69: are:: jpayne@69: jpayne@69: A: printable char jpayne@69: i: signed int jpayne@69: f: float jpayne@69: Z: printable string jpayne@69: H: Byte array in hex format jpayne@69: B: Integer or numeric array jpayne@69: jpayne@69: Additionally, it will accept the integer BAM types ('cCsSI') jpayne@69: jpayne@69: For htslib compatibility, 'a' is synonymous with 'A' and the jpayne@69: method accepts a 'd' type code for a double precision float. jpayne@69: jpayne@69: When deducing the type code by the python type of *value*, the jpayne@69: following mapping is applied:: jpayne@69: jpayne@69: i: python int jpayne@69: f: python float jpayne@69: Z: python str or bytes jpayne@69: B: python array.array, list or tuple jpayne@69: jpayne@69: Note that a single character string will be output as 'Z' and jpayne@69: not 'A' as the former is the more general type. jpayne@69: """ jpayne@69: jpayne@69: cdef int value_size jpayne@69: cdef uint8_t tc jpayne@69: cdef uint8_t * value_ptr jpayne@69: cdef uint8_t *existing_ptr jpayne@69: cdef float float_value jpayne@69: cdef double double_value jpayne@69: cdef int32_t int32_t_value jpayne@69: cdef uint32_t uint32_t_value jpayne@69: cdef int16_t int16_t_value jpayne@69: cdef uint16_t uint16_t_value jpayne@69: cdef int8_t int8_t_value jpayne@69: cdef uint8_t uint8_t_value jpayne@69: cdef bam1_t * src = self._delegate jpayne@69: cdef char * _value_type jpayne@69: cdef c_array.array array_value jpayne@69: cdef object buffer jpayne@69: jpayne@69: if len(tag) != 2: jpayne@69: raise ValueError('Invalid tag: %s' % tag) jpayne@69: jpayne@69: tag = force_bytes(tag) jpayne@69: if replace: jpayne@69: existing_ptr = bam_aux_get(src, tag) jpayne@69: if existing_ptr: jpayne@69: bam_aux_del(src, existing_ptr) jpayne@69: jpayne@69: # setting value to None deletes a tag jpayne@69: if value is None: jpayne@69: return jpayne@69: jpayne@69: cdef uint8_t typecode = get_tag_typecode(value, value_type) jpayne@69: if typecode == 0: jpayne@69: raise ValueError("can't guess type or invalid type code specified: {} {}".format( jpayne@69: value, value_type)) jpayne@69: jpayne@69: # sam_format1 for typecasting jpayne@69: if typecode == b'Z': jpayne@69: value = force_bytes(value) jpayne@69: value_ptr = value jpayne@69: value_size = len(value)+1 jpayne@69: elif typecode == b'H': jpayne@69: # Note that hex tags are stored the very same jpayne@69: # way as Z string.s jpayne@69: value = force_bytes(value) jpayne@69: value_ptr = value jpayne@69: value_size = len(value)+1 jpayne@69: elif typecode == b'A' or typecode == b'a': jpayne@69: value = force_bytes(value) jpayne@69: value_ptr = value jpayne@69: value_size = sizeof(char) jpayne@69: typecode = b'A' jpayne@69: elif typecode == b'i': jpayne@69: int32_t_value = value jpayne@69: value_ptr = &int32_t_value jpayne@69: value_size = sizeof(int32_t) jpayne@69: elif typecode == b'I': jpayne@69: uint32_t_value = value jpayne@69: value_ptr = &uint32_t_value jpayne@69: value_size = sizeof(uint32_t) jpayne@69: elif typecode == b's': jpayne@69: int16_t_value = value jpayne@69: value_ptr = &int16_t_value jpayne@69: value_size = sizeof(int16_t) jpayne@69: elif typecode == b'S': jpayne@69: uint16_t_value = value jpayne@69: value_ptr = &uint16_t_value jpayne@69: value_size = sizeof(uint16_t) jpayne@69: elif typecode == b'c': jpayne@69: int8_t_value = value jpayne@69: value_ptr = &int8_t_value jpayne@69: value_size = sizeof(int8_t) jpayne@69: elif typecode == b'C': jpayne@69: uint8_t_value = value jpayne@69: value_ptr = &uint8_t_value jpayne@69: value_size = sizeof(uint8_t) jpayne@69: elif typecode == b'd': jpayne@69: double_value = value jpayne@69: value_ptr = &double_value jpayne@69: value_size = sizeof(double) jpayne@69: elif typecode == b'f': jpayne@69: float_value = value jpayne@69: value_ptr = &float_value jpayne@69: value_size = sizeof(float) jpayne@69: elif typecode == b'B': jpayne@69: # the following goes through python, needs to be cleaned up jpayne@69: # pack array using struct jpayne@69: fmt, args = pack_tags([(tag, value, value_type)]) jpayne@69: jpayne@69: # remove tag and type code as set by bam_aux_append jpayne@69: # first four chars of format (<2sB) jpayne@69: fmt = '<' + fmt[4:] jpayne@69: # first two values to pack jpayne@69: args = args[2:] jpayne@69: value_size = struct.calcsize(fmt) jpayne@69: # buffer will be freed when object goes out of scope jpayne@69: buffer = ctypes.create_string_buffer(value_size) jpayne@69: struct.pack_into(fmt, buffer, 0, *args) jpayne@69: # bam_aux_append copies data from value_ptr jpayne@69: bam_aux_append(src, jpayne@69: tag, jpayne@69: typecode, jpayne@69: value_size, jpayne@69: buffer.raw) jpayne@69: return jpayne@69: else: jpayne@69: raise ValueError('unsupported value_type {} in set_option'.format(typecode)) jpayne@69: jpayne@69: bam_aux_append(src, jpayne@69: tag, jpayne@69: typecode, jpayne@69: value_size, jpayne@69: value_ptr) jpayne@69: jpayne@69: cpdef has_tag(self, tag): jpayne@69: """returns true if the optional alignment section jpayne@69: contains a given *tag*.""" jpayne@69: cdef uint8_t * v jpayne@69: cdef int nvalues jpayne@69: btag = force_bytes(tag) jpayne@69: v = bam_aux_get(self._delegate, btag) jpayne@69: return v != NULL jpayne@69: jpayne@69: cpdef get_tag(self, tag, with_value_type=False): jpayne@69: """ jpayne@69: retrieves data from the optional alignment section jpayne@69: given a two-letter *tag* denoting the field. jpayne@69: jpayne@69: The returned value is cast into an appropriate python type. jpayne@69: jpayne@69: This method is the fastest way to access the optional jpayne@69: alignment section if only few tags need to be retrieved. jpayne@69: jpayne@69: Possible value types are "AcCsSiIfZHB" (see BAM format jpayne@69: specification) as well as additional value type 'd' as jpayne@69: implemented in htslib. jpayne@69: jpayne@69: Parameters: jpayne@69: jpayne@69: tag : jpayne@69: data tag. jpayne@69: jpayne@69: with_value_type : Optional[bool] jpayne@69: if set to True, the return value is a tuple of (tag value, type jpayne@69: code). (default False) jpayne@69: jpayne@69: Returns: jpayne@69: jpayne@69: A python object with the value of the `tag`. The type of the jpayne@69: object depends on the data type in the data record. jpayne@69: jpayne@69: Raises: jpayne@69: jpayne@69: KeyError jpayne@69: If `tag` is not present, a KeyError is raised. jpayne@69: jpayne@69: """ jpayne@69: cdef uint8_t * v jpayne@69: cdef int nvalues jpayne@69: btag = force_bytes(tag) jpayne@69: v = bam_aux_get(self._delegate, btag) jpayne@69: if v == NULL: jpayne@69: raise KeyError("tag '%s' not present" % tag) jpayne@69: if chr(v[0]) == "B": jpayne@69: auxtype = chr(v[0]) + chr(v[1]) jpayne@69: else: jpayne@69: auxtype = chr(v[0]) jpayne@69: jpayne@69: if auxtype in "iIcCsS": jpayne@69: value = bam_aux2i(v) jpayne@69: elif auxtype == 'f' or auxtype == 'F': jpayne@69: value = bam_aux2f(v) jpayne@69: elif auxtype == 'd' or auxtype == 'D': jpayne@69: value = bam_aux2f(v) jpayne@69: elif auxtype == 'A' or auxtype == 'a': jpayne@69: # force A to a jpayne@69: v[0] = b'A' jpayne@69: # there might a more efficient way jpayne@69: # to convert a char into a string jpayne@69: value = '%c' % bam_aux2A(v) jpayne@69: elif auxtype == 'Z' or auxtype == 'H': jpayne@69: # Z and H are treated equally as strings in htslib jpayne@69: value = charptr_to_str(bam_aux2Z(v)) jpayne@69: elif auxtype[0] == 'B': jpayne@69: bytesize, nvalues, values = convert_binary_tag(v + 1) jpayne@69: value = values jpayne@69: else: jpayne@69: raise ValueError("unknown auxiliary type '%s'" % auxtype) jpayne@69: jpayne@69: if with_value_type: jpayne@69: return (value, auxtype) jpayne@69: else: jpayne@69: return value jpayne@69: jpayne@69: def get_tags(self, with_value_type=False): jpayne@69: """the fields in the optional alignment section. jpayne@69: jpayne@69: Returns a list of all fields in the optional jpayne@69: alignment section. Values are converted to appropriate python jpayne@69: values. For example: ``[(NM, 2), (RG, "GJP00TM04")]`` jpayne@69: jpayne@69: If *with_value_type* is set, the value type as encode in jpayne@69: the AlignedSegment record will be returned as well: jpayne@69: jpayne@69: [(NM, 2, "i"), (RG, "GJP00TM04", "Z")] jpayne@69: jpayne@69: This method will convert all values in the optional alignment jpayne@69: section. When getting only one or few tags, please see jpayne@69: :meth:`get_tag` for a quicker way to achieve this. jpayne@69: jpayne@69: """ jpayne@69: jpayne@69: cdef char * ctag jpayne@69: cdef bam1_t * src jpayne@69: cdef uint8_t * s jpayne@69: cdef char auxtag[3] jpayne@69: cdef char auxtype jpayne@69: cdef uint8_t byte_size jpayne@69: cdef int32_t nvalues jpayne@69: jpayne@69: src = self._delegate jpayne@69: if src.l_data == 0: jpayne@69: return [] jpayne@69: s = pysam_bam_get_aux(src) jpayne@69: result = [] jpayne@69: auxtag[2] = 0 jpayne@69: while s < (src.data + src.l_data): jpayne@69: # get tag jpayne@69: auxtag[0] = s[0] jpayne@69: auxtag[1] = s[1] jpayne@69: s += 2 jpayne@69: auxtype = s[0] jpayne@69: if auxtype in (b'c', b'C'): jpayne@69: value = bam_aux2i(s) jpayne@69: s += 1 jpayne@69: elif auxtype in (b's', b'S'): jpayne@69: value = bam_aux2i(s) jpayne@69: s += 2 jpayne@69: elif auxtype in (b'i', b'I'): jpayne@69: value = bam_aux2i(s) jpayne@69: s += 4 jpayne@69: elif auxtype == b'f': jpayne@69: value = bam_aux2f(s) jpayne@69: s += 4 jpayne@69: elif auxtype == b'd': jpayne@69: value = bam_aux2f(s) jpayne@69: s += 8 jpayne@69: elif auxtype in (b'A', b'a'): jpayne@69: value = "%c" % bam_aux2A(s) jpayne@69: s += 1 jpayne@69: elif auxtype in (b'Z', b'H'): jpayne@69: value = charptr_to_str(bam_aux2Z(s)) jpayne@69: # +1 for NULL terminated string jpayne@69: s += len(value) + 1 jpayne@69: elif auxtype == b'B': jpayne@69: s += 1 jpayne@69: byte_size, nvalues, value = convert_binary_tag(s) jpayne@69: # 5 for 1 char and 1 int jpayne@69: s += 5 + (nvalues * byte_size) - 1 jpayne@69: else: jpayne@69: raise KeyError("unknown type '%s'" % auxtype) jpayne@69: jpayne@69: s += 1 jpayne@69: jpayne@69: if with_value_type: jpayne@69: result.append((charptr_to_str(auxtag), value, chr(auxtype))) jpayne@69: else: jpayne@69: result.append((charptr_to_str(auxtag), value)) jpayne@69: jpayne@69: return result jpayne@69: jpayne@69: def set_tags(self, tags): jpayne@69: """sets the fields in the optional alignment section with jpayne@69: a list of (tag, value) tuples. jpayne@69: jpayne@69: The value type of the values is determined from the jpayne@69: python type. Optionally, a type may be given explicitly as jpayne@69: a third value in the tuple, For example: jpayne@69: jpayne@69: x.set_tags([(NM, 2, "i"), (RG, "GJP00TM04", "Z")] jpayne@69: jpayne@69: This method will not enforce the rule that the same tag may appear jpayne@69: only once in the optional alignment section. jpayne@69: """ jpayne@69: jpayne@69: cdef bam1_t * src jpayne@69: cdef uint8_t * s jpayne@69: cdef char * temp jpayne@69: cdef int new_size = 0 jpayne@69: cdef int old_size jpayne@69: src = self._delegate jpayne@69: jpayne@69: # convert and pack the data jpayne@69: if tags is not None and len(tags) > 0: jpayne@69: fmt, args = pack_tags(tags) jpayne@69: new_size = struct.calcsize(fmt) jpayne@69: buffer = ctypes.create_string_buffer(new_size) jpayne@69: struct.pack_into(fmt, jpayne@69: buffer, jpayne@69: 0, jpayne@69: *args) jpayne@69: jpayne@69: jpayne@69: # delete the old data and allocate new space. jpayne@69: # If total_size == 0, the aux field will be jpayne@69: # empty jpayne@69: old_size = pysam_bam_get_l_aux(src) jpayne@69: cdef bam1_t * retval = pysam_bam_update(src, jpayne@69: old_size, jpayne@69: new_size, jpayne@69: pysam_bam_get_aux(src)) jpayne@69: if retval == NULL: jpayne@69: raise MemoryError("could not allocate memory") jpayne@69: jpayne@69: # copy data only if there is any jpayne@69: if new_size > 0: jpayne@69: jpayne@69: # get location of new data jpayne@69: s = pysam_bam_get_aux(src) jpayne@69: jpayne@69: # check if there is direct path from buffer.raw to tmp jpayne@69: p = buffer.raw jpayne@69: # create handle to make sure buffer stays alive long jpayne@69: # enough for memcpy, see issue 129 jpayne@69: temp = p jpayne@69: memcpy(s, temp, new_size) jpayne@69: jpayne@69: jpayne@69: ######################################################## jpayne@69: # Compatibility Accessors jpayne@69: # Functions, properties for compatibility with pysam < 0.8 jpayne@69: # jpayne@69: # Several options jpayne@69: # change the factory functions according to API jpayne@69: # * requires code changes throughout, incl passing jpayne@69: # handles to factory functions jpayne@69: # subclass functions and add attributes at runtime jpayne@69: # e.g.: AlignedSegments.qname = AlignedSegments.query_name jpayne@69: # * will slow down the default interface jpayne@69: # explicit declaration of getters/setters jpayne@69: ######################################################## jpayne@69: property qname: jpayne@69: """deprecated, use :attr:`query_name` instead.""" jpayne@69: def __get__(self): return self.query_name jpayne@69: def __set__(self, v): self.query_name = v jpayne@69: property tid: jpayne@69: """deprecated, use :attr:`reference_id` instead.""" jpayne@69: def __get__(self): return self.reference_id jpayne@69: def __set__(self, v): self.reference_id = v jpayne@69: property pos: jpayne@69: """deprecated, use :attr:`reference_start` instead.""" jpayne@69: def __get__(self): return self.reference_start jpayne@69: def __set__(self, v): self.reference_start = v jpayne@69: property mapq: jpayne@69: """deprecated, use :attr:`mapping_quality` instead.""" jpayne@69: def __get__(self): return self.mapping_quality jpayne@69: def __set__(self, v): self.mapping_quality = v jpayne@69: property rnext: jpayne@69: """deprecated, use :attr:`next_reference_id` instead.""" jpayne@69: def __get__(self): return self.next_reference_id jpayne@69: def __set__(self, v): self.next_reference_id = v jpayne@69: property pnext: jpayne@69: """deprecated, use :attr:`next_reference_start` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.next_reference_start jpayne@69: def __set__(self, v): jpayne@69: self.next_reference_start = v jpayne@69: property cigar: jpayne@69: """deprecated, use :attr:`cigarstring` or :attr:`cigartuples` instead.""" jpayne@69: def __get__(self): jpayne@69: r = self.cigartuples jpayne@69: if r is None: jpayne@69: r = [] jpayne@69: return r jpayne@69: def __set__(self, v): self.cigartuples = v jpayne@69: property tlen: jpayne@69: """deprecated, use :attr:`template_length` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.template_length jpayne@69: def __set__(self, v): jpayne@69: self.template_length = v jpayne@69: property seq: jpayne@69: """deprecated, use :attr:`query_sequence` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.query_sequence jpayne@69: def __set__(self, v): jpayne@69: self.query_sequence = v jpayne@69: property qual: jpayne@69: """deprecated, use :attr:`query_qualities` instead.""" jpayne@69: def __get__(self): jpayne@69: return array_to_qualitystring(self.query_qualities) jpayne@69: def __set__(self, v): jpayne@69: self.query_qualities = qualitystring_to_array(v) jpayne@69: property alen: jpayne@69: """deprecated, use :attr:`reference_length` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.reference_length jpayne@69: def __set__(self, v): jpayne@69: self.reference_length = v jpayne@69: property aend: jpayne@69: """deprecated, use :attr:`reference_end` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.reference_end jpayne@69: def __set__(self, v): jpayne@69: self.reference_end = v jpayne@69: property rlen: jpayne@69: """deprecated, use :attr:`query_length` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.query_length jpayne@69: def __set__(self, v): jpayne@69: self.query_length = v jpayne@69: property query: jpayne@69: """deprecated, use :attr:`query_alignment_sequence` jpayne@69: instead.""" jpayne@69: def __get__(self): jpayne@69: return self.query_alignment_sequence jpayne@69: def __set__(self, v): jpayne@69: self.query_alignment_sequence = v jpayne@69: property qqual: jpayne@69: """deprecated, use :attr:`query_alignment_qualities` jpayne@69: instead.""" jpayne@69: def __get__(self): jpayne@69: return array_to_qualitystring(self.query_alignment_qualities) jpayne@69: def __set__(self, v): jpayne@69: self.query_alignment_qualities = qualitystring_to_array(v) jpayne@69: property qstart: jpayne@69: """deprecated, use :attr:`query_alignment_start` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.query_alignment_start jpayne@69: def __set__(self, v): jpayne@69: self.query_alignment_start = v jpayne@69: property qend: jpayne@69: """deprecated, use :attr:`query_alignment_end` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.query_alignment_end jpayne@69: def __set__(self, v): jpayne@69: self.query_alignment_end = v jpayne@69: property qlen: jpayne@69: """deprecated, use :attr:`query_alignment_length` jpayne@69: instead.""" jpayne@69: def __get__(self): jpayne@69: return self.query_alignment_length jpayne@69: def __set__(self, v): jpayne@69: self.query_alignment_length = v jpayne@69: property mrnm: jpayne@69: """deprecated, use :attr:`next_reference_id` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.next_reference_id jpayne@69: def __set__(self, v): jpayne@69: self.next_reference_id = v jpayne@69: property mpos: jpayne@69: """deprecated, use :attr:`next_reference_start` jpayne@69: instead.""" jpayne@69: def __get__(self): jpayne@69: return self.next_reference_start jpayne@69: def __set__(self, v): jpayne@69: self.next_reference_start = v jpayne@69: property rname: jpayne@69: """deprecated, use :attr:`reference_id` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.reference_id jpayne@69: def __set__(self, v): jpayne@69: self.reference_id = v jpayne@69: property isize: jpayne@69: """deprecated, use :attr:`template_length` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.template_length jpayne@69: def __set__(self, v): jpayne@69: self.template_length = v jpayne@69: property blocks: jpayne@69: """deprecated, use :meth:`get_blocks()` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.get_blocks() jpayne@69: property aligned_pairs: jpayne@69: """deprecated, use :meth:`get_aligned_pairs()` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.get_aligned_pairs() jpayne@69: property inferred_length: jpayne@69: """deprecated, use :meth:`infer_query_length()` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.infer_query_length() jpayne@69: property positions: jpayne@69: """deprecated, use :meth:`get_reference_positions()` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.get_reference_positions() jpayne@69: property tags: jpayne@69: """deprecated, use :meth:`get_tags()` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.get_tags() jpayne@69: def __set__(self, tags): jpayne@69: self.set_tags(tags) jpayne@69: def overlap(self): jpayne@69: """deprecated, use :meth:`get_overlap()` instead.""" jpayne@69: return self.get_overlap() jpayne@69: def opt(self, tag): jpayne@69: """deprecated, use :meth:`get_tag()` instead.""" jpayne@69: return self.get_tag(tag) jpayne@69: def setTag(self, tag, value, value_type=None, replace=True): jpayne@69: """deprecated, use :meth:`set_tag()` instead.""" jpayne@69: return self.set_tag(tag, value, value_type, replace) jpayne@69: jpayne@69: jpayne@69: cdef class PileupColumn: jpayne@69: '''A pileup of reads at a particular reference sequence position jpayne@69: (:term:`column`). A pileup column contains all the reads that map jpayne@69: to a certain target base. jpayne@69: jpayne@69: This class is a proxy for results returned by the samtools pileup jpayne@69: engine. If the underlying engine iterator advances, the results jpayne@69: of this column will change. jpayne@69: ''' jpayne@69: def __init__(self): jpayne@69: raise TypeError("this class cannot be instantiated from Python") jpayne@69: jpayne@69: def __str__(self): jpayne@69: return "\t".join(map(str, jpayne@69: (self.reference_id, jpayne@69: self.reference_pos, jpayne@69: self.nsegments))) +\ jpayne@69: "\n" +\ jpayne@69: "\n".join(map(str, self.pileups)) jpayne@69: jpayne@69: def __dealloc__(self): jpayne@69: free(self.buf.s) jpayne@69: jpayne@69: def set_min_base_quality(self, min_base_quality): jpayne@69: """set the minimum base quality for this pileup column. jpayne@69: """ jpayne@69: self.min_base_quality = min_base_quality jpayne@69: jpayne@69: def __len__(self): jpayne@69: """return number of reads aligned to this column. jpayne@69: jpayne@69: see :meth:`get_num_aligned` jpayne@69: """ jpayne@69: return self.get_num_aligned() jpayne@69: jpayne@69: property reference_id: jpayne@69: '''the reference sequence number as defined in the header''' jpayne@69: def __get__(self): jpayne@69: return self.tid jpayne@69: jpayne@69: property reference_name: jpayne@69: """:term:`reference` name (None if no AlignmentFile is associated)""" jpayne@69: def __get__(self): jpayne@69: if self.header is not None: jpayne@69: return self.header.get_reference_name(self.tid) jpayne@69: return None jpayne@69: jpayne@69: property nsegments: jpayne@69: '''number of reads mapping to this column. jpayne@69: jpayne@69: Note that this number ignores the base quality filter.''' jpayne@69: def __get__(self): jpayne@69: return self.n_pu jpayne@69: def __set__(self, n): jpayne@69: self.n_pu = n jpayne@69: jpayne@69: property reference_pos: jpayne@69: '''the position in the reference sequence (0-based).''' jpayne@69: def __get__(self): jpayne@69: return self.pos jpayne@69: jpayne@69: property pileups: jpayne@69: '''list of reads (:class:`pysam.PileupRead`) aligned to this column''' jpayne@69: def __get__(self): jpayne@69: if self.plp == NULL or self.plp[0] == NULL: jpayne@69: raise ValueError("PileupColumn accessed after iterator finished") jpayne@69: jpayne@69: cdef int x jpayne@69: cdef const bam_pileup1_t * p = NULL jpayne@69: pileups = [] jpayne@69: jpayne@69: # warning: there could be problems if self.n and self.buf are jpayne@69: # out of sync. jpayne@69: for x from 0 <= x < self.n_pu: jpayne@69: p = &(self.plp[0][x]) jpayne@69: if p == NULL: jpayne@69: raise ValueError( jpayne@69: "pileup buffer out of sync - most likely use of iterator " jpayne@69: "outside loop") jpayne@69: if pileup_base_qual_skip(p, self.min_base_quality): jpayne@69: continue jpayne@69: pileups.append(makePileupRead(p, self.header)) jpayne@69: return pileups jpayne@69: jpayne@69: ######################################################## jpayne@69: # Compatibility Accessors jpayne@69: # Functions, properties for compatibility with pysam < 0.8 jpayne@69: ######################################################## jpayne@69: property pos: jpayne@69: """deprecated, use :attr:`reference_pos` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.reference_pos jpayne@69: def __set__(self, v): jpayne@69: self.reference_pos = v jpayne@69: jpayne@69: property tid: jpayne@69: """deprecated, use :attr:`reference_id` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.reference_id jpayne@69: def __set__(self, v): jpayne@69: self.reference_id = v jpayne@69: jpayne@69: property n: jpayne@69: """deprecated, use :attr:`nsegments` instead.""" jpayne@69: def __get__(self): jpayne@69: return self.nsegments jpayne@69: def __set__(self, v): jpayne@69: self.nsegments = v jpayne@69: jpayne@69: def get_num_aligned(self): jpayne@69: """return number of aligned bases at pileup column position. jpayne@69: jpayne@69: This method applies a base quality filter and the number is jpayne@69: equal to the size of :meth:`get_query_sequences`, jpayne@69: :meth:`get_mapping_qualities`, etc. jpayne@69: jpayne@69: """ jpayne@69: cdef uint32_t x = 0 jpayne@69: cdef uint32_t c = 0 jpayne@69: cdef uint32_t cnt = 0 jpayne@69: cdef const bam_pileup1_t * p = NULL jpayne@69: if self.plp == NULL or self.plp[0] == NULL: jpayne@69: raise ValueError("PileupColumn accessed after iterator finished") jpayne@69: jpayne@69: for x from 0 <= x < self.n_pu: jpayne@69: p = &(self.plp[0][x]) jpayne@69: if p == NULL: jpayne@69: raise ValueError( jpayne@69: "pileup buffer out of sync - most likely use of iterator " jpayne@69: "outside loop") jpayne@69: if pileup_base_qual_skip(p, self.min_base_quality): jpayne@69: continue jpayne@69: cnt += 1 jpayne@69: return cnt jpayne@69: jpayne@69: def get_query_sequences(self, bint mark_matches=False, bint mark_ends=False, bint add_indels=False): jpayne@69: """query bases/sequences at pileup column position. jpayne@69: jpayne@69: Optionally, the bases/sequences can be annotated according to the samtools jpayne@69: mpileup format. This is the format description from the samtools mpileup tool:: jpayne@69: jpayne@69: Information on match, mismatch, indel, strand, mapping jpayne@69: quality and start and end of a read are all encoded at the jpayne@69: read base column. At this column, a dot stands for a match jpayne@69: to the reference base on the forward strand, a comma for a jpayne@69: match on the reverse strand, a '>' or '<' for a reference jpayne@69: skip, `ACGTN' for a mismatch on the forward strand and jpayne@69: `acgtn' for a mismatch on the reverse strand. A pattern jpayne@69: `\\+[0-9]+[ACGTNacgtn]+' indicates there is an insertion jpayne@69: between this reference position and the next reference jpayne@69: position. The length of the insertion is given by the jpayne@69: integer in the pattern, followed by the inserted jpayne@69: sequence. Similarly, a pattern `-[0-9]+[ACGTNacgtn]+' jpayne@69: represents a deletion from the reference. The deleted bases jpayne@69: will be presented as `*' in the following lines. Also at jpayne@69: the read base column, a symbol `^' marks the start of a jpayne@69: read. The ASCII of the character following `^' minus 33 jpayne@69: gives the mapping quality. A symbol `$' marks the end of a jpayne@69: read segment jpayne@69: jpayne@69: To reproduce samtools mpileup format, set all of mark_matches, jpayne@69: mark_ends and add_indels to True. jpayne@69: jpayne@69: Parameters jpayne@69: ---------- jpayne@69: jpayne@69: mark_matches: bool jpayne@69: jpayne@69: If True, output bases matching the reference as "." or "," jpayne@69: for forward and reverse strand, respectively. This mark jpayne@69: requires the reference sequence. If no reference is jpayne@69: present, this option is ignored. jpayne@69: jpayne@69: mark_ends : bool jpayne@69: jpayne@69: If True, add markers "^" and "$" for read start and end, respectively. jpayne@69: jpayne@69: add_indels : bool jpayne@69: jpayne@69: If True, add bases for bases inserted into or skipped from the jpayne@69: reference. The latter requires a reference sequence file to have jpayne@69: been given, e.g. via `pileup(fastafile = ...)`. If no reference jpayne@69: sequence is available, skipped bases are represented as 'N's. jpayne@69: jpayne@69: Returns jpayne@69: ------- jpayne@69: jpayne@69: a list of bases/sequences per read at pileup column position. : list jpayne@69: jpayne@69: """ jpayne@69: cdef uint32_t x = 0 jpayne@69: cdef uint32_t j = 0 jpayne@69: cdef uint32_t c = 0 jpayne@69: cdef uint8_t cc = 0 jpayne@69: cdef uint8_t rb = 0 jpayne@69: cdef kstring_t * buf = &self.buf jpayne@69: cdef const bam_pileup1_t * p = NULL jpayne@69: jpayne@69: if self.plp == NULL or self.plp[0] == NULL: jpayne@69: raise ValueError("PileupColumn accessed after iterator finished") jpayne@69: jpayne@69: buf.l = 0 jpayne@69: jpayne@69: # todo: reference sequence to count matches/mismatches jpayne@69: # todo: convert assertions to exceptions jpayne@69: for x from 0 <= x < self.n_pu: jpayne@69: p = &(self.plp[0][x]) jpayne@69: if p == NULL: jpayne@69: raise ValueError( jpayne@69: "pileup buffer out of sync - most likely use of iterator " jpayne@69: "outside loop") jpayne@69: if pileup_base_qual_skip(p, self.min_base_quality): jpayne@69: continue jpayne@69: # see samtools pileup_seq jpayne@69: if mark_ends and p.is_head: jpayne@69: kputc(b'^', buf) jpayne@69: jpayne@69: if p.b.core.qual > 93: jpayne@69: kputc(126, buf) jpayne@69: else: jpayne@69: kputc(p.b.core.qual + 33, buf) jpayne@69: if not p.is_del: jpayne@69: if p.qpos < p.b.core.l_qseq: jpayne@69: cc = seq_nt16_str[bam_seqi(bam_get_seq(p.b), p.qpos)] jpayne@69: else: jpayne@69: cc = b'N' jpayne@69: jpayne@69: if mark_matches and self.reference_sequence != NULL: jpayne@69: rb = self.reference_sequence[self.reference_pos] jpayne@69: if seq_nt16_table[cc] == seq_nt16_table[rb]: jpayne@69: cc = b'=' jpayne@69: kputc(strand_mark_char(cc, p.b), buf) jpayne@69: elif add_indels: jpayne@69: if p.is_refskip: jpayne@69: if bam_is_rev(p.b): jpayne@69: kputc(b'<', buf) jpayne@69: else: jpayne@69: kputc(b'>', buf) jpayne@69: else: jpayne@69: kputc(b'*', buf) jpayne@69: if add_indels: jpayne@69: if p.indel > 0: jpayne@69: kputc(b'+', buf) jpayne@69: kputw(p.indel, buf) jpayne@69: for j from 1 <= j <= p.indel: jpayne@69: cc = seq_nt16_str[bam_seqi(bam_get_seq(p.b), p.qpos + j)] jpayne@69: kputc(strand_mark_char(cc, p.b), buf) jpayne@69: elif p.indel < 0: jpayne@69: kputc(b'-', buf) jpayne@69: kputw(-p.indel, buf) jpayne@69: for j from 1 <= j <= -p.indel: jpayne@69: # TODO: out-of-range check here? jpayne@69: if self.reference_sequence == NULL: jpayne@69: cc = b'N' jpayne@69: else: jpayne@69: cc = self.reference_sequence[self.reference_pos + j] jpayne@69: kputc(strand_mark_char(cc, p.b), buf) jpayne@69: if mark_ends and p.is_tail: jpayne@69: kputc(b'$', buf) jpayne@69: jpayne@69: kputc(b':', buf) jpayne@69: jpayne@69: if buf.l == 0: jpayne@69: # could be zero if all qualities are too low jpayne@69: return "" jpayne@69: else: jpayne@69: # quicker to ensemble all and split than to encode all separately. jpayne@69: # ignore last ":" jpayne@69: return force_str(PyBytes_FromStringAndSize(buf.s, buf.l-1)).split(":") jpayne@69: jpayne@69: def get_query_qualities(self): jpayne@69: """query base quality scores at pileup column position. jpayne@69: jpayne@69: Returns jpayne@69: ------- jpayne@69: jpayne@69: a list of quality scores : list jpayne@69: """ jpayne@69: cdef uint32_t x = 0 jpayne@69: cdef const bam_pileup1_t * p = NULL jpayne@69: cdef uint32_t c = 0 jpayne@69: result = [] jpayne@69: for x from 0 <= x < self.n_pu: jpayne@69: p = &(self.plp[0][x]) jpayne@69: if p == NULL: jpayne@69: raise ValueError( jpayne@69: "pileup buffer out of sync - most likely use of iterator " jpayne@69: "outside loop") jpayne@69: jpayne@69: if p.qpos < p.b.core.l_qseq: jpayne@69: c = bam_get_qual(p.b)[p.qpos] jpayne@69: else: jpayne@69: c = 0 jpayne@69: if c < self.min_base_quality: jpayne@69: continue jpayne@69: result.append(c) jpayne@69: return result jpayne@69: jpayne@69: def get_mapping_qualities(self): jpayne@69: """query mapping quality scores at pileup column position. jpayne@69: jpayne@69: Returns jpayne@69: ------- jpayne@69: jpayne@69: a list of quality scores : list jpayne@69: """ jpayne@69: if self.plp == NULL or self.plp[0] == NULL: jpayne@69: raise ValueError("PileupColumn accessed after iterator finished") jpayne@69: jpayne@69: cdef uint32_t x = 0 jpayne@69: cdef const bam_pileup1_t * p = NULL jpayne@69: result = [] jpayne@69: for x from 0 <= x < self.n_pu: jpayne@69: p = &(self.plp[0][x]) jpayne@69: if p == NULL: jpayne@69: raise ValueError( jpayne@69: "pileup buffer out of sync - most likely use of iterator " jpayne@69: "outside loop") jpayne@69: jpayne@69: if pileup_base_qual_skip(p, self.min_base_quality): jpayne@69: continue jpayne@69: result.append(p.b.core.qual) jpayne@69: return result jpayne@69: jpayne@69: def get_query_positions(self): jpayne@69: """positions in read at pileup column position. jpayne@69: jpayne@69: Returns jpayne@69: ------- jpayne@69: jpayne@69: a list of read positions : list jpayne@69: """ jpayne@69: if self.plp == NULL or self.plp[0] == NULL: jpayne@69: raise ValueError("PileupColumn accessed after iterator finished") jpayne@69: jpayne@69: cdef uint32_t x = 0 jpayne@69: cdef const bam_pileup1_t * p = NULL jpayne@69: result = [] jpayne@69: for x from 0 <= x < self.n_pu: jpayne@69: p = &(self.plp[0][x]) jpayne@69: if p == NULL: jpayne@69: raise ValueError( jpayne@69: "pileup buffer out of sync - most likely use of iterator " jpayne@69: "outside loop") jpayne@69: jpayne@69: if pileup_base_qual_skip(p, self.min_base_quality): jpayne@69: continue jpayne@69: result.append(p.qpos) jpayne@69: return result jpayne@69: jpayne@69: def get_query_names(self): jpayne@69: """query/read names aligned at pileup column position. jpayne@69: jpayne@69: Returns jpayne@69: ------- jpayne@69: jpayne@69: a list of query names at pileup column position. : list jpayne@69: """ jpayne@69: if self.plp == NULL or self.plp[0] == NULL: jpayne@69: raise ValueError("PileupColumn accessed after iterator finished") jpayne@69: jpayne@69: cdef uint32_t x = 0 jpayne@69: cdef const bam_pileup1_t * p = NULL jpayne@69: result = [] jpayne@69: for x from 0 <= x < self.n_pu: jpayne@69: p = &(self.plp[0][x]) jpayne@69: if p == NULL: jpayne@69: raise ValueError( jpayne@69: "pileup buffer out of sync - most likely use of iterator " jpayne@69: "outside loop") jpayne@69: jpayne@69: if pileup_base_qual_skip(p, self.min_base_quality): jpayne@69: continue jpayne@69: result.append(charptr_to_str(pysam_bam_get_qname(p.b))) jpayne@69: return result jpayne@69: jpayne@69: jpayne@69: cdef class PileupRead: jpayne@69: '''Representation of a read aligned to a particular position in the jpayne@69: reference sequence. jpayne@69: jpayne@69: ''' jpayne@69: jpayne@69: def __init__(self): jpayne@69: raise TypeError( jpayne@69: "this class cannot be instantiated from Python") jpayne@69: jpayne@69: def __str__(self): jpayne@69: return "\t".join( jpayne@69: map(str, jpayne@69: (self.alignment, self.query_position, jpayne@69: self.indel, self.level, jpayne@69: self.is_del, self.is_head, jpayne@69: self.is_tail, self.is_refskip))) jpayne@69: jpayne@69: property alignment: jpayne@69: """a :class:`pysam.AlignedSegment` object of the aligned read""" jpayne@69: def __get__(self): jpayne@69: return self._alignment jpayne@69: jpayne@69: property query_position: jpayne@69: """position of the read base at the pileup site, 0-based. jpayne@69: None if :attr:`is_del` or :attr:`is_refskip` is set. jpayne@69: jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: if self.is_del or self.is_refskip: jpayne@69: return None jpayne@69: else: jpayne@69: return self._qpos jpayne@69: jpayne@69: property query_position_or_next: jpayne@69: """position of the read base at the pileup site, 0-based. jpayne@69: jpayne@69: If the current position is a deletion, returns the next jpayne@69: aligned base. jpayne@69: jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: return self._qpos jpayne@69: jpayne@69: property indel: jpayne@69: """indel length for the position following the current pileup site. jpayne@69: jpayne@69: This quantity peeks ahead to the next cigar operation in this jpayne@69: alignment. If the next operation is an insertion, indel will jpayne@69: be positive. If the next operation is a deletion, it will be jpayne@69: negation. 0 if the next operation is not an indel. jpayne@69: jpayne@69: """ jpayne@69: def __get__(self): jpayne@69: return self._indel jpayne@69: jpayne@69: property level: jpayne@69: """the level of the read in the "viewer" mode. Note that this value jpayne@69: is currently not computed.""" jpayne@69: def __get__(self): jpayne@69: return self._level jpayne@69: jpayne@69: property is_del: jpayne@69: """1 iff the base on the padded read is a deletion""" jpayne@69: def __get__(self): jpayne@69: return self._is_del jpayne@69: jpayne@69: property is_head: jpayne@69: """1 iff the base on the padded read is the left-most base.""" jpayne@69: def __get__(self): jpayne@69: return self._is_head jpayne@69: jpayne@69: property is_tail: jpayne@69: """1 iff the base on the padded read is the right-most base.""" jpayne@69: def __get__(self): jpayne@69: return self._is_tail jpayne@69: jpayne@69: property is_refskip: jpayne@69: """1 iff the base on the padded read is part of CIGAR N op.""" jpayne@69: def __get__(self): jpayne@69: return self._is_refskip jpayne@69: jpayne@69: jpayne@69: jpayne@69: cpdef enum CIGAR_OPS: jpayne@69: CMATCH = 0 jpayne@69: CINS = 1 jpayne@69: CDEL = 2 jpayne@69: CREF_SKIP = 3 jpayne@69: CSOFT_CLIP = 4 jpayne@69: CHARD_CLIP = 5 jpayne@69: CPAD = 6 jpayne@69: CEQUAL = 7 jpayne@69: CDIFF = 8 jpayne@69: CBACK = 9 jpayne@69: jpayne@69: jpayne@69: cpdef enum SAM_FLAGS: jpayne@69: # the read is paired in sequencing, no matter whether it is mapped in a pair jpayne@69: FPAIRED = 1 jpayne@69: # the read is mapped in a proper pair jpayne@69: FPROPER_PAIR = 2 jpayne@69: # the read itself is unmapped; conflictive with FPROPER_PAIR jpayne@69: FUNMAP = 4 jpayne@69: # the mate is unmapped jpayne@69: FMUNMAP = 8 jpayne@69: # the read is mapped to the reverse strand jpayne@69: FREVERSE = 16 jpayne@69: # the mate is mapped to the reverse strand jpayne@69: FMREVERSE = 32 jpayne@69: # this is read1 jpayne@69: FREAD1 = 64 jpayne@69: # this is read2 jpayne@69: FREAD2 = 128 jpayne@69: # not primary alignment jpayne@69: FSECONDARY = 256 jpayne@69: # QC failure jpayne@69: FQCFAIL = 512 jpayne@69: # optical or PCR duplicate jpayne@69: FDUP = 1024 jpayne@69: # supplementary alignment jpayne@69: FSUPPLEMENTARY = 2048 jpayne@69: jpayne@69: jpayne@69: __all__ = [ jpayne@69: "AlignedSegment", jpayne@69: "PileupColumn", jpayne@69: "PileupRead", jpayne@69: "CMATCH", jpayne@69: "CINS", jpayne@69: "CDEL", jpayne@69: "CREF_SKIP", jpayne@69: "CSOFT_CLIP", jpayne@69: "CHARD_CLIP", jpayne@69: "CPAD", jpayne@69: "CEQUAL", jpayne@69: "CDIFF", jpayne@69: "CBACK", jpayne@69: "FPAIRED", jpayne@69: "FPROPER_PAIR", jpayne@69: "FUNMAP", jpayne@69: "FMUNMAP", jpayne@69: "FREVERSE", jpayne@69: "FMREVERSE", jpayne@69: "FREAD1", jpayne@69: "FREAD2", jpayne@69: "FSECONDARY", jpayne@69: "FQCFAIL", jpayne@69: "FDUP", jpayne@69: "FSUPPLEMENTARY", jpayne@69: "KEY_NAMES"]