Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/libcalignedsegment.pyx @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/libcalignedsegment.pyx Tue Mar 18 16:23:26 2025 -0400 @@ -0,0 +1,3438 @@ +# cython: language_level=3 +# cython: embedsignature=True +# cython: profile=True +############################################################################### +############################################################################### +# Cython wrapper for SAM/BAM/CRAM files based on htslib +############################################################################### +# The principal classes defined in this module are: +# +# class AlignedSegment an aligned segment (read) +# +# class PileupColumn a collection of segments (PileupRead) aligned to +# a particular genomic position. +# +# class PileupRead an AlignedSegment aligned to a particular genomic +# position. Contains additional attributes with respect +# to this. +# +# Additionally this module defines numerous additional classes that are part +# of the internal API. These are: +# +# Various iterator classes to iterate over alignments in sequential (IteratorRow) +# or in a stacked fashion (IteratorColumn): +# +# class IteratorRow +# class IteratorRowRegion +# class IteratorRowHead +# class IteratorRowAll +# class IteratorRowAllRefs +# class IteratorRowSelection +# +############################################################################### +# +# The MIT License +# +# Copyright (c) 2015 Andreas Heger +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. +# +############################################################################### +import re +import array +import json +import string +import ctypes +import struct + +cimport cython +from cpython cimport array as c_array +from cpython cimport PyBytes_FromStringAndSize +from libc.string cimport memset, strchr +from cpython cimport array as c_array +from libc.stdint cimport INT8_MIN, INT16_MIN, INT32_MIN, \ + INT8_MAX, INT16_MAX, INT32_MAX, \ + UINT8_MAX, UINT16_MAX, UINT32_MAX + +from pysam.libchtslib cimport HTS_IDX_NOCOOR +from pysam.libcutils cimport force_bytes, force_str, \ + charptr_to_str, charptr_to_bytes +from pysam.libcutils cimport qualities_to_qualitystring, qualitystring_to_array, \ + array_to_qualitystring + +# Constants for binary tag conversion +cdef char * htslib_types = 'cCsSiIf' +cdef char * parray_types = 'bBhHiIf' + +# translation tables + +# cigar code to character and vice versa +cdef char* CODE2CIGAR= "MIDNSHP=XB" +cdef int NCIGAR_CODES = 10 + +CIGAR2CODE = dict([y, x] for x, y in enumerate(CODE2CIGAR)) +CIGAR_REGEX = re.compile("(\d+)([MIDNSHP=XB])") + +# names for keys in dictionary representation of an AlignedSegment +KEY_NAMES = ["name", "flag", "ref_name", "ref_pos", "map_quality", "cigar", + "next_ref_name", "next_ref_pos", "length", "seq", "qual", "tags"] + +##################################################################### +# C multiplication with wrapping around +cdef inline uint32_t c_mul(uint32_t a, uint32_t b): + return (a * b) & 0xffffffff + + +cdef inline uint8_t tolower(uint8_t ch): + if ch >= 65 and ch <= 90: + return ch + 32 + else: + return ch + + +cdef inline uint8_t toupper(uint8_t ch): + if ch >= 97 and ch <= 122: + return ch - 32 + else: + return ch + + +cdef inline uint8_t strand_mark_char(uint8_t ch, bam1_t *b): + if ch == b'=': + if bam_is_rev(b): + return b',' + else: + return b'.' + else: + if bam_is_rev(b): + return tolower(ch) + else: + return toupper(ch) + + +cdef inline bint pileup_base_qual_skip(const bam_pileup1_t * p, uint32_t threshold): + cdef uint32_t c + if p.qpos < p.b.core.l_qseq: + c = bam_get_qual(p.b)[p.qpos] + else: + c = 0 + if c < threshold: + return True + return False + + +cdef inline char map_typecode_htslib_to_python(uint8_t s): + """map an htslib typecode to the corresponding python typecode + to be used in the struct or array modules.""" + + # map type from htslib to python array + cdef char * f = strchr(htslib_types, s) + + if f == NULL: + return 0 + return parray_types[f - htslib_types] + + +cdef inline uint8_t map_typecode_python_to_htslib(char s): + """determine value type from type code of array""" + cdef char * f = strchr(parray_types, s) + if f == NULL: + return 0 + return htslib_types[f - parray_types] + + +cdef inline void update_bin(bam1_t * src): + if src.core.flag & BAM_FUNMAP: + # treat alignment as length of 1 for unmapped reads + src.core.bin = hts_reg2bin( + src.core.pos, + src.core.pos + 1, + 14, + 5) + elif pysam_get_n_cigar(src): + src.core.bin = hts_reg2bin( + src.core.pos, + bam_endpos(src), + 14, + 5) + else: + src.core.bin = hts_reg2bin( + src.core.pos, + src.core.pos + 1, + 14, + 5) + + +# optional tag data manipulation +cdef convert_binary_tag(uint8_t * tag): + """return bytesize, number of values and array of values + in aux_data memory location pointed to by tag.""" + cdef uint8_t auxtype + cdef uint8_t byte_size + cdef int32_t nvalues + # get byte size + auxtype = tag[0] + byte_size = aux_type2size(auxtype) + tag += 1 + # get number of values in array + nvalues = (<int32_t*>tag)[0] + tag += 4 + + # define python array + cdef c_array.array c_values = array.array( + chr(map_typecode_htslib_to_python(auxtype))) + c_array.resize(c_values, nvalues) + + # copy data + memcpy(c_values.data.as_voidptr, <uint8_t*>tag, nvalues * byte_size) + + # no need to check for endian-ness as bam1_core_t fields + # and aux_data are in host endian-ness. See sam.c and calls + # to swap_data + return byte_size, nvalues, c_values + + +cdef inline uint8_t get_tag_typecode(value, value_type=None): + """guess type code for a *value*. If *value_type* is None, the type + code will be inferred based on the Python type of *value* + + """ + # 0 is unknown typecode + cdef char typecode = 0 + + if value_type is None: + if isinstance(value, int): + if value < 0: + if value >= INT8_MIN: + typecode = b'c' + elif value >= INT16_MIN: + typecode = b's' + elif value >= INT32_MIN: + typecode = b'i' + # unsigned ints + else: + if value <= UINT8_MAX: + typecode = b'C' + elif value <= UINT16_MAX: + typecode = b'S' + elif value <= UINT32_MAX: + typecode = b'I' + elif isinstance(value, float): + typecode = b'f' + elif isinstance(value, str): + typecode = b'Z' + elif isinstance(value, bytes): + typecode = b'Z' + elif isinstance(value, array.array) or \ + isinstance(value, list) or \ + isinstance(value, tuple): + typecode = b'B' + else: + if value_type in 'aAsSIcCZidfH': + typecode = force_bytes(value_type)[0] + + return typecode + + +cdef inline uint8_t get_btag_typecode(value, min_value=None, max_value=None): + '''returns the value typecode of a value. + + If max is specified, the appropriate type is returned for a range + where value is the minimum. + + Note that this method returns types from the extended BAM alphabet + of types that includes tags that are not part of the SAM + specification. + ''' + + + cdef uint8_t typecode + + t = type(value) + + if t is float: + typecode = b'f' + elif t is int: + if max_value is None: + max_value = value + if min_value is None: + min_value = value + # signed ints + if min_value < 0: + if min_value >= INT8_MIN and max_value <= INT8_MAX: + typecode = b'c' + elif min_value >= INT16_MIN and max_value <= INT16_MAX: + typecode = b's' + elif min_value >= INT32_MIN or max_value <= INT32_MAX: + typecode = b'i' + else: + raise ValueError( + "at least one signed integer out of range of " + "BAM/SAM specification") + # unsigned ints + else: + if max_value <= UINT8_MAX: + typecode = b'C' + elif max_value <= UINT16_MAX: + typecode = b'S' + elif max_value <= UINT32_MAX: + typecode = b'I' + else: + raise ValueError( + "at least one integer out of range of BAM/SAM specification") + else: + # Note: hex strings (H) are not supported yet + if t is not bytes: + value = value.encode('ascii') + if len(value) == 1: + typecode = b'A' + else: + typecode = b'Z' + + return typecode + + +# mapping python array.array and htslib typecodes to struct typecodes +DATATYPE2FORMAT = { + ord('c'): ('b', 1), + ord('C'): ('B', 1), + ord('s'): ('h', 2), + ord('S'): ('H', 2), + ord('i'): ('i', 4), + ord('I'): ('I', 4), + ord('f'): ('f', 4), + ord('d'): ('d', 8), + ord('A'): ('c', 1), + ord('a'): ('c', 1)} + + +cdef inline pack_tags(tags): + """pack a list of tags. Each tag is a tuple of (tag, tuple). + + Values are packed into the most space efficient data structure + possible unless the tag contains a third field with the typecode. + + Returns a format string and the associated list of arguments to be + used in a call to struct.pack_into. + """ + fmts, args = ["<"], [] + + # htslib typecode + cdef uint8_t typecode + for tag in tags: + + if len(tag) == 2: + pytag, value = tag + valuetype = None + elif len(tag) == 3: + pytag, value, valuetype = tag + else: + raise ValueError("malformatted tag: %s" % str(tag)) + + if valuetype is None: + typecode = 0 + else: + # only first character in valuecode matters + typecode = force_bytes(valuetype)[0] + + pytag = force_bytes(pytag) + pytype = type(value) + + if pytype is tuple or pytype is list: + # binary tags from tuples or lists + if not typecode: + # automatically determine value type - first value + # determines type. If there is a mix of types, the + # result is undefined. + typecode = get_btag_typecode(min(value), + min_value=min(value), + max_value=max(value)) + + if typecode not in DATATYPE2FORMAT: + raise ValueError("invalid value type '{}'".format(chr(typecode))) + + datafmt = "2sBBI%i%s" % (len(value), DATATYPE2FORMAT[typecode][0]) + args.extend([pytag[:2], + ord("B"), + typecode, + len(value)] + list(value)) + + elif isinstance(value, array.array): + # binary tags from arrays + if typecode == 0: + typecode = map_typecode_python_to_htslib(ord(value.typecode)) + + if typecode == 0: + raise ValueError("unsupported type code '{}'".format(value.typecode)) + + if typecode not in DATATYPE2FORMAT: + raise ValueError("invalid value type '{}'".format(chr(typecode))) + + # use array.tostring() to retrieve byte representation and + # save as bytes + datafmt = "2sBBI%is" % (len(value) * DATATYPE2FORMAT[typecode][1]) + args.extend([pytag[:2], + ord("B"), + typecode, + len(value), + value.tobytes()]) + + else: + if typecode == 0: + typecode = get_tag_typecode(value) + if typecode == 0: + raise ValueError("could not deduce typecode for value {}".format(value)) + + if typecode == b'a' or typecode == b'A' or typecode == b'Z' or typecode == b'H': + value = force_bytes(value) + + if typecode == b"a": + typecode = b'A' + + if typecode == b'Z' or typecode == b'H': + datafmt = "2sB%is" % (len(value)+1) + elif typecode in DATATYPE2FORMAT: + datafmt = "2sB%s" % DATATYPE2FORMAT[typecode][0] + else: + raise ValueError("invalid value type '{}'".format(chr(typecode))) + + args.extend([pytag[:2], + typecode, + value]) + + fmts.append(datafmt) + + return "".join(fmts), args + + +cdef inline int32_t calculateQueryLengthWithoutHardClipping(bam1_t * src): + """return query length computed from CIGAR alignment. + + Length ignores hard-clipped bases. + + Return 0 if there is no CIGAR alignment. + """ + + cdef uint32_t * cigar_p = pysam_bam_get_cigar(src) + + if cigar_p == NULL: + return 0 + + cdef uint32_t k, qpos + cdef int op + qpos = 0 + + for k from 0 <= k < pysam_get_n_cigar(src): + op = cigar_p[k] & BAM_CIGAR_MASK + + if op == BAM_CMATCH or \ + op == BAM_CINS or \ + op == BAM_CSOFT_CLIP or \ + op == BAM_CEQUAL or \ + op == BAM_CDIFF: + qpos += cigar_p[k] >> BAM_CIGAR_SHIFT + + return qpos + + +cdef inline int32_t calculateQueryLengthWithHardClipping(bam1_t * src): + """return query length computed from CIGAR alignment. + + Length includes hard-clipped bases. + + Return 0 if there is no CIGAR alignment. + """ + + cdef uint32_t * cigar_p = pysam_bam_get_cigar(src) + + if cigar_p == NULL: + return 0 + + cdef uint32_t k, qpos + cdef int op + qpos = 0 + + for k from 0 <= k < pysam_get_n_cigar(src): + op = cigar_p[k] & BAM_CIGAR_MASK + + if op == BAM_CMATCH or \ + op == BAM_CINS or \ + op == BAM_CSOFT_CLIP or \ + op == BAM_CHARD_CLIP or \ + op == BAM_CEQUAL or \ + op == BAM_CDIFF: + qpos += cigar_p[k] >> BAM_CIGAR_SHIFT + + return qpos + + +cdef inline int32_t getQueryStart(bam1_t *src) except -1: + cdef uint32_t * cigar_p + cdef uint32_t start_offset = 0 + cdef uint32_t k, op + + cigar_p = pysam_bam_get_cigar(src); + for k from 0 <= k < pysam_get_n_cigar(src): + op = cigar_p[k] & BAM_CIGAR_MASK + if op == BAM_CHARD_CLIP: + if start_offset != 0 and start_offset != src.core.l_qseq: + raise ValueError('Invalid clipping in CIGAR string') + elif op == BAM_CSOFT_CLIP: + start_offset += cigar_p[k] >> BAM_CIGAR_SHIFT + else: + break + + return start_offset + + +cdef inline int32_t getQueryEnd(bam1_t *src) except -1: + cdef uint32_t * cigar_p = pysam_bam_get_cigar(src) + cdef uint32_t end_offset = src.core.l_qseq + cdef uint32_t k, op + + # if there is no sequence, compute length from cigar string + if end_offset == 0: + for k from 0 <= k < pysam_get_n_cigar(src): + op = cigar_p[k] & BAM_CIGAR_MASK + if op == BAM_CMATCH or \ + op == BAM_CINS or \ + op == BAM_CEQUAL or \ + op == BAM_CDIFF or \ + (op == BAM_CSOFT_CLIP and end_offset == 0): + end_offset += cigar_p[k] >> BAM_CIGAR_SHIFT + else: + # walk backwards in cigar string + for k from pysam_get_n_cigar(src) > k >= 1: + op = cigar_p[k] & BAM_CIGAR_MASK + if op == BAM_CHARD_CLIP: + if end_offset != src.core.l_qseq: + raise ValueError('Invalid clipping in CIGAR string') + elif op == BAM_CSOFT_CLIP: + end_offset -= cigar_p[k] >> BAM_CIGAR_SHIFT + else: + break + + return end_offset + + +cdef inline bytes getSequenceInRange(bam1_t *src, + uint32_t start, + uint32_t end): + """return python string of the sequence in a bam1_t object. + """ + + cdef uint8_t * p + cdef uint32_t k + cdef char * s + + if not src.core.l_qseq: + return None + + seq = PyBytes_FromStringAndSize(NULL, end - start) + s = <char*>seq + p = pysam_bam_get_seq(src) + + for k from start <= k < end: + # equivalent to seq_nt16_str[bam1_seqi(s, i)] (see bam.c) + # note: do not use string literal as it will be a python string + s[k-start] = seq_nt16_str[p[k//2] >> 4 * (1 - k%2) & 0xf] + + return charptr_to_bytes(seq) + + +cdef inline object getQualitiesInRange(bam1_t *src, + uint32_t start, + uint32_t end): + """return python array of quality values from a bam1_t object""" + + cdef uint8_t * p + cdef uint32_t k + + p = pysam_bam_get_qual(src) + if p[0] == 0xff: + return None + + # 'B': unsigned char + cdef c_array.array result = array.array('B', [0]) + c_array.resize(result, end - start) + + # copy data + memcpy(result.data.as_voidptr, <void*>&p[start], end - start) + + return result + + +##################################################################### +## factory methods for instantiating extension classes +cdef class AlignedSegment +cdef AlignedSegment makeAlignedSegment(bam1_t *src, + AlignmentHeader header): + '''return an AlignedSegment object constructed from `src`''' + # note that the following does not call __init__ + cdef AlignedSegment dest = AlignedSegment.__new__(AlignedSegment) + dest._delegate = bam_dup1(src) + dest.header = header + return dest + + +cdef class PileupColumn +cdef PileupColumn makePileupColumn(const bam_pileup1_t ** plp, + int tid, + int pos, + int n_pu, + uint32_t min_base_quality, + char * reference_sequence, + AlignmentHeader header): + '''return a PileupColumn object constructed from pileup in `plp` and + setting additional attributes. + + ''' + # note that the following does not call __init__ + cdef PileupColumn dest = PileupColumn.__new__(PileupColumn) + dest.header = header + dest.plp = plp + dest.tid = tid + dest.pos = pos + dest.n_pu = n_pu + dest.min_base_quality = min_base_quality + dest.reference_sequence = reference_sequence + dest.buf.l = dest.buf.m = 0 + dest.buf.s = NULL + + return dest + + +cdef class PileupRead +cdef PileupRead makePileupRead(const bam_pileup1_t *src, + AlignmentHeader header): + '''return a PileupRead object construted from a bam_pileup1_t * object.''' + # note that the following does not call __init__ + cdef PileupRead dest = PileupRead.__new__(PileupRead) + dest._alignment = makeAlignedSegment(src.b, header) + dest._qpos = src.qpos + dest._indel = src.indel + dest._level = src.level + dest._is_del = src.is_del + dest._is_head = src.is_head + dest._is_tail = src.is_tail + dest._is_refskip = src.is_refskip + return dest + + +cdef inline uint32_t get_alignment_length(bam1_t *src): + cdef uint32_t k = 0 + cdef uint32_t l = 0 + if src == NULL: + return 0 + cdef uint32_t * cigar_p = bam_get_cigar(src) + if cigar_p == NULL: + return 0 + cdef int op + cdef uint32_t n = pysam_get_n_cigar(src) + for k from 0 <= k < n: + op = cigar_p[k] & BAM_CIGAR_MASK + if op == BAM_CSOFT_CLIP or op == BAM_CHARD_CLIP: + continue + l += cigar_p[k] >> BAM_CIGAR_SHIFT + return l + + +cdef inline uint32_t get_md_reference_length(char * md_tag): + cdef int l = 0 + cdef int md_idx = 0 + cdef int nmatches = 0 + + while md_tag[md_idx] != 0: + if md_tag[md_idx] >= 48 and md_tag[md_idx] <= 57: + nmatches *= 10 + nmatches += md_tag[md_idx] - 48 + md_idx += 1 + continue + else: + l += nmatches + nmatches = 0 + if md_tag[md_idx] == b'^': + md_idx += 1 + while md_tag[md_idx] >= 65 and md_tag[md_idx] <= 90: + md_idx += 1 + l += 1 + else: + md_idx += 1 + l += 1 + + l += nmatches + return l + +# TODO: avoid string copying for getSequenceInRange, reconstituneSequenceFromMD, ... +cdef inline bytes build_alignment_sequence(bam1_t * src): + """return expanded sequence from MD tag. + + The sequence includes substitutions and both insertions in the + reference as well as deletions to the reference sequence. Combine + with the cigar string to reconstitute the query or the reference + sequence. + + Positions corresponding to `N` (skipped region from the reference) + or `P` (padding (silent deletion from padded reference)) in the CIGAR + string will not appear in the returned sequence. The MD should + correspondingly not contain these. Thus proper tags are:: + + Deletion from the reference: cigar=5M1D5M MD=5^C5 + Skipped region from reference: cigar=5M1N5M MD=10 + Padded region in the reference: cigar=5M1P5M MD=10 + + Returns + ------- + + None, if no MD tag is present. + + """ + if src == NULL: + return None + + cdef uint8_t * md_tag_ptr = bam_aux_get(src, "MD") + if md_tag_ptr == NULL: + return None + + cdef uint32_t start = getQueryStart(src) + cdef uint32_t end = getQueryEnd(src) + # get read sequence, taking into account soft-clipping + r = getSequenceInRange(src, start, end) + cdef char * read_sequence = r + cdef uint32_t * cigar_p = pysam_bam_get_cigar(src) + if cigar_p == NULL: + return None + + cdef uint32_t r_idx = 0 + cdef int op + cdef uint32_t k, i, l, x + cdef int nmatches = 0 + cdef int s_idx = 0 + + cdef uint32_t max_len = get_alignment_length(src) + if max_len == 0: + raise ValueError("could not determine alignment length") + + cdef char * s = <char*>calloc(max_len + 1, sizeof(char)) + if s == NULL: + raise ValueError( + "could not allocate sequence of length %i" % max_len) + + for k from 0 <= k < pysam_get_n_cigar(src): + op = cigar_p[k] & BAM_CIGAR_MASK + l = cigar_p[k] >> BAM_CIGAR_SHIFT + if op == BAM_CMATCH or op == BAM_CEQUAL or op == BAM_CDIFF: + for i from 0 <= i < l: + s[s_idx] = read_sequence[r_idx] + r_idx += 1 + s_idx += 1 + elif op == BAM_CDEL: + for i from 0 <= i < l: + s[s_idx] = b'-' + s_idx += 1 + elif op == BAM_CREF_SKIP: + pass + elif op == BAM_CINS or op == BAM_CPAD: + for i from 0 <= i < l: + # encode insertions into reference as lowercase + s[s_idx] = read_sequence[r_idx] + 32 + r_idx += 1 + s_idx += 1 + elif op == BAM_CSOFT_CLIP: + pass + elif op == BAM_CHARD_CLIP: + pass # advances neither + + cdef char md_buffer[2] + cdef char *md_tag + cdef uint8_t md_typecode = md_tag_ptr[0] + if md_typecode == b'Z': + md_tag = bam_aux2Z(md_tag_ptr) + elif md_typecode == b'A': + # Work around HTSeq bug that writes 1-character strings as MD:A:v + md_buffer[0] = bam_aux2A(md_tag_ptr) + md_buffer[1] = b'\0' + md_tag = md_buffer + else: + raise TypeError('Tagged field MD:{}:<value> does not have expected type MD:Z'.format(chr(md_typecode))) + + cdef int md_idx = 0 + cdef char c + s_idx = 0 + + # Check if MD tag is valid by matching CIGAR length to MD tag defined length + # Insertions would be in addition to what is described by MD, so we calculate + # the number of insertions separately. + cdef int insertions = 0 + + while s[s_idx] != 0: + if s[s_idx] >= b'a': + insertions += 1 + s_idx += 1 + s_idx = 0 + + cdef uint32_t md_len = get_md_reference_length(md_tag) + if md_len + insertions > max_len: + free(s) + raise AssertionError( + "Invalid MD tag: MD length {} mismatch with CIGAR length {} and {} insertions".format( + md_len, max_len, insertions)) + + while md_tag[md_idx] != 0: + # c is numerical + if md_tag[md_idx] >= 48 and md_tag[md_idx] <= 57: + nmatches *= 10 + nmatches += md_tag[md_idx] - 48 + md_idx += 1 + continue + else: + # save matches up to this point, skipping insertions + for x from 0 <= x < nmatches: + while s[s_idx] >= b'a': + s_idx += 1 + s_idx += 1 + while s[s_idx] >= b'a': + s_idx += 1 + + r_idx += nmatches + nmatches = 0 + if md_tag[md_idx] == b'^': + md_idx += 1 + while md_tag[md_idx] >= 65 and md_tag[md_idx] <= 90: + # assert s[s_idx] == '-' + s[s_idx] = md_tag[md_idx] + s_idx += 1 + md_idx += 1 + else: + # save mismatch + # enforce lower case + c = md_tag[md_idx] + if c <= 90: + c += 32 + s[s_idx] = c + s_idx += 1 + r_idx += 1 + md_idx += 1 + + # save matches up to this point, skipping insertions + for x from 0 <= x < nmatches: + while s[s_idx] >= b'a': + s_idx += 1 + s_idx += 1 + while s[s_idx] >= b'a': + s_idx += 1 + + seq = PyBytes_FromStringAndSize(s, s_idx) + free(s) + + return seq + + +cdef inline bytes build_reference_sequence(bam1_t * src): + """return the reference sequence in the region that is covered by the + alignment of the read to the reference. + + This method requires the MD tag to be set. + + """ + cdef uint32_t k, i, l + cdef int op + cdef int s_idx = 0 + ref_seq = build_alignment_sequence(src) + if ref_seq is None: + raise ValueError("MD tag not present") + + cdef char * s = <char*>calloc(len(ref_seq) + 1, sizeof(char)) + if s == NULL: + raise ValueError( + "could not allocate sequence of length %i" % len(ref_seq)) + + cdef char * cref_seq = ref_seq + cdef uint32_t * cigar_p = pysam_bam_get_cigar(src) + cdef uint32_t r_idx = 0 + for k from 0 <= k < pysam_get_n_cigar(src): + op = cigar_p[k] & BAM_CIGAR_MASK + l = cigar_p[k] >> BAM_CIGAR_SHIFT + if op == BAM_CMATCH or op == BAM_CEQUAL or op == BAM_CDIFF: + for i from 0 <= i < l: + s[s_idx] = cref_seq[r_idx] + r_idx += 1 + s_idx += 1 + elif op == BAM_CDEL: + for i from 0 <= i < l: + s[s_idx] = cref_seq[r_idx] + r_idx += 1 + s_idx += 1 + elif op == BAM_CREF_SKIP: + pass + elif op == BAM_CINS or op == BAM_CPAD: + r_idx += l + elif op == BAM_CSOFT_CLIP: + pass + elif op == BAM_CHARD_CLIP: + pass # advances neither + + seq = PyBytes_FromStringAndSize(s, s_idx) + free(s) + + return seq + + +cdef inline str safe_reference_name(AlignmentHeader header, int tid): + if tid == -1: return "*" + elif header is not None: return header.get_reference_name(tid) + else: return f"#{tid}" + + +# Tuple-building helper functions used by AlignedSegment.get_aligned_pairs() + +cdef _alignedpairs_positions(qpos, pos, ref_seq, uint32_t r_idx, int op): + return (qpos, pos) + + +cdef _alignedpairs_with_seq(qpos, pos, ref_seq, uint32_t r_idx, int op): + ref_base = ref_seq[r_idx] if ref_seq is not None else None + return (qpos, pos, ref_base) + + +cdef _alignedpairs_with_cigar(qpos, pos, ref_seq, uint32_t r_idx, int op): + return (qpos, pos, CIGAR_OPS(op)) + + +cdef _alignedpairs_with_seq_cigar(qpos, pos, ref_seq, uint32_t r_idx, int op): + ref_base = ref_seq[r_idx] if ref_seq is not None else None + return (qpos, pos, ref_base, CIGAR_OPS(op)) + + +cdef class AlignedSegment: + '''Class representing an aligned segment. + + This class stores a handle to the samtools C-structure representing + an aligned read. Member read access is forwarded to the C-structure + and converted into python objects. This implementation should be fast, + as only the data needed is converted. + + For write access, the C-structure is updated in-place. This is + not the most efficient way to build BAM entries, as the variable + length data is concatenated and thus needs to be resized if + a field is updated. Furthermore, the BAM entry might be + in an inconsistent state. + + One issue to look out for is that the sequence should always + be set *before* the quality scores. Setting the sequence will + also erase any quality scores that were set previously. + + Parameters + ---------- + + header: + :class:`~pysam.AlignmentHeader` object to map numerical + identifiers to chromosome names. If not given, an empty + header is created. + ''' + + # Now only called when instances are created from Python + def __init__(self, AlignmentHeader header=None): + # see bam_init1 + self._delegate = <bam1_t*>calloc(1, sizeof(bam1_t)) + if self._delegate == NULL: + raise MemoryError("could not allocated memory of {} bytes".format(sizeof(bam1_t))) + # allocate some memory. If size is 0, calloc does not return a + # pointer that can be passed to free() so allocate 40 bytes + # for a new read + self._delegate.m_data = 40 + self._delegate.data = <uint8_t *>calloc( + self._delegate.m_data, 1) + if self._delegate.data == NULL: + raise MemoryError("could not allocate memory of {} bytes".format(self._delegate.m_data)) + self._delegate.l_data = 0 + # set some data to make read approximately legit. + # Note, SAM writing fails with q_name of length 0 + self._delegate.core.l_qname = 0 + self._delegate.core.tid = -1 + self._delegate.core.pos = -1 + self._delegate.core.mtid = -1 + self._delegate.core.mpos = -1 + + # caching for selected fields + self.cache_query_qualities = None + self.cache_query_alignment_qualities = None + self.cache_query_sequence = None + self.cache_query_alignment_sequence = None + + self.header = header + + def __dealloc__(self): + bam_destroy1(self._delegate) + + def __str__(self): + """return string representation of alignment. + + The representation is an approximate :term:`SAM` format, because + an aligned read might not be associated with a :term:`AlignmentFile`. + Hence when the read does not have an associated :class:`AlignedHeader`, + :term:`tid` is shown instead of the reference name. + Similarly, the tags field is returned in its parsed state. + + To get a valid SAM record, use :meth:`to_string`. + """ + # sam-parsing is done in sam.c/bam_format1_core which + # requires a valid header. + return "\t".join(map(str, (self.query_name, + self.flag, + safe_reference_name(self.header, self.reference_id), + self.reference_start + 1, + self.mapping_quality, + self.cigarstring, + safe_reference_name(self.header, self.next_reference_id), + self.next_reference_start + 1, + self.template_length, + self.query_sequence, + self.query_qualities, + self.tags))) + + def __repr__(self): + ref = self.reference_name if self.header is not None else self.reference_id + return f'<{type(self).__name__}({self.query_name!r}, flags={self.flag}={self.flag:#x}, ref={ref!r}, zpos={self.reference_start}, mapq={self.mapping_quality}, cigar={self.cigarstring!r}, ...)>' + + def __copy__(self): + return makeAlignedSegment(self._delegate, self.header) + + def __deepcopy__(self, memo): + return makeAlignedSegment(self._delegate, self.header) + + def compare(self, AlignedSegment other): + '''return -1,0,1, if contents in this are binary + <,=,> to *other* + ''' + + # avoid segfault when other equals None + if other is None: + return -1 + + cdef int retval, x + cdef bam1_t *t + cdef bam1_t *o + + t = self._delegate + o = other._delegate + + # uncomment for debugging purposes + # cdef unsigned char * oo, * tt + # tt = <unsigned char*>(&t.core) + # oo = <unsigned char*>(&o.core) + # for x from 0 <= x < sizeof( bam1_core_t): print x, tt[x], oo[x] + # tt = <unsigned char*>(t.data) + # oo = <unsigned char*>(o.data) + # for x from 0 <= x < max(t.l_data, o.l_data): print x, tt[x], oo[x], chr(tt[x]), chr(oo[x]) + + # Fast-path test for object identity + if t == o: + return 0 + + cdef uint8_t *a = <uint8_t*>&t.core + cdef uint8_t *b = <uint8_t*>&o.core + + retval = memcmp(&t.core, &o.core, sizeof(bam1_core_t)) + if retval: + return retval + + # cmp(t.l_data, o.l_data) + retval = (t.l_data > o.l_data) - (t.l_data < o.l_data) + if retval: + return retval + return memcmp(t.data, o.data, t.l_data) + + def __richcmp__(self, AlignedSegment other, int op): + if op == 2: # == operator + return self.compare(other) == 0 + elif op == 3: # != operator + return self.compare(other) != 0 + else: + return NotImplemented + + def __hash__(self): + cdef bam1_t * src = self._delegate + cdef int x + + # see http://effbot.org/zone/python-hash.htm + cdef uint8_t * c = <uint8_t *>&src.core + cdef uint32_t hash_value = c[0] + for x from 1 <= x < sizeof(bam1_core_t): + hash_value = c_mul(hash_value, 1000003) ^ c[x] + c = <uint8_t *>src.data + for x from 0 <= x < src.l_data: + hash_value = c_mul(hash_value, 1000003) ^ c[x] + + return hash_value + + cpdef to_string(self): + """returns a string representation of the aligned segment. + + The output format is valid SAM format if a header is associated + with the AlignedSegment. + """ + cdef kstring_t line + line.l = line.m = 0 + line.s = NULL + + if self.header: + if sam_format1(self.header.ptr, self._delegate, &line) < 0: + if line.m: + free(line.s) + raise ValueError('sam_format failed') + else: + raise NotImplementedError("todo") + + ret = force_str(line.s[:line.l]) + + if line.m: + free(line.s) + + return ret + + @classmethod + def fromstring(cls, sam, AlignmentHeader header): + """parses a string representation of the aligned segment. + + The input format should be valid SAM format. + + Parameters + ---------- + sam: + :term:`SAM` formatted string + + """ + cdef AlignedSegment dest = cls.__new__(cls) + dest._delegate = <bam1_t*>calloc(1, sizeof(bam1_t)) + dest.header = header + + cdef kstring_t line + line.l = line.m = len(sam) + _sam = force_bytes(sam) + line.s = _sam + + cdef int ret + ret = sam_parse1(&line, dest.header.ptr, dest._delegate) + if ret < 0: + raise ValueError("parsing SAM record string failed (error code {})".format(ret)) + + return dest + + cpdef tostring(self, htsfile=None): + """deprecated, use :meth:`to_string()` instead. + + Parameters + ---------- + + htsfile: + (deprecated) AlignmentFile object to map numerical + identifiers to chromosome names. This parameter is present + for backwards compatibility and ignored. + """ + + return self.to_string() + + def to_dict(self): + """returns a json representation of the aligned segment. + + Field names are abbreviated versions of the class attributes. + """ + # let htslib do the string conversions, but treat optional field properly as list + vals = self.to_string().split("\t") + n = len(KEY_NAMES) - 1 + return dict(list(zip(KEY_NAMES[:-1], vals[:n])) + [(KEY_NAMES[-1], vals[n:])]) + + @classmethod + def from_dict(cls, sam_dict, AlignmentHeader header): + """parses a dictionary representation of the aligned segment. + + Parameters + ---------- + sam_dict: + dictionary of alignment values, keys corresponding to output from + :meth:`todict()`. + + """ + # let htslib do the parsing + # the tags field can be missing + return cls.fromstring( + "\t".join((sam_dict[x] for x in KEY_NAMES[:-1])) + + "\t" + + "\t".join(sam_dict.get(KEY_NAMES[-1], [])), header) + + ######################################################## + ## Basic attributes in order of appearance in SAM format + property query_name: + """the query template name (None if not present)""" + def __get__(self): + + cdef bam1_t * src = self._delegate + if src.core.l_qname == 0: + return None + + return charptr_to_str(<char *>pysam_bam_get_qname(src)) + + def __set__(self, qname): + + if qname is None or len(qname) == 0: + return + + if len(qname) > 254: + raise ValueError("query length out of range {} > 254".format( + len(qname))) + + qname = force_bytes(qname) + cdef bam1_t * src = self._delegate + # the qname is \0 terminated + cdef uint8_t l = len(qname) + 1 + + cdef char * p = pysam_bam_get_qname(src) + cdef uint8_t l_extranul = 0 + + if l % 4 != 0: + l_extranul = 4 - l % 4 + + cdef bam1_t * retval = pysam_bam_update(src, + src.core.l_qname, + l + l_extranul, + <uint8_t*>p) + if retval == NULL: + raise MemoryError("could not allocate memory") + + src.core.l_extranul = l_extranul + src.core.l_qname = l + l_extranul + + # re-acquire pointer to location in memory + # as it might have moved + p = pysam_bam_get_qname(src) + + strncpy(p, qname, l) + # x might be > 255 + cdef uint16_t x = 0 + + for x from l <= x < l + l_extranul: + p[x] = b'\0' + + property flag: + """properties flag""" + def __get__(self): + return self._delegate.core.flag + def __set__(self, flag): + self._delegate.core.flag = flag + + property reference_name: + """:term:`reference` name""" + def __get__(self): + if self._delegate.core.tid == -1: + return None + if self.header: + return self.header.get_reference_name(self._delegate.core.tid) + else: + raise ValueError("reference_name unknown if no header associated with record") + def __set__(self, reference): + cdef int tid + if reference is None or reference == "*": + self._delegate.core.tid = -1 + elif self.header: + tid = self.header.get_tid(reference) + if tid < 0: + raise ValueError("reference {} does not exist in header".format( + reference)) + self._delegate.core.tid = tid + else: + raise ValueError("reference_name can not be set if no header associated with record") + + property reference_id: + """:term:`reference` ID + + .. note:: + + This field contains the index of the reference sequence in + the sequence dictionary. To obtain the name of the + reference sequence, use :meth:`get_reference_name()` + + """ + def __get__(self): + return self._delegate.core.tid + def __set__(self, tid): + if tid != -1 and self.header and not self.header.is_valid_tid(tid): + raise ValueError("reference id {} does not exist in header".format( + tid)) + self._delegate.core.tid = tid + + property reference_start: + """0-based leftmost coordinate""" + def __get__(self): + return self._delegate.core.pos + def __set__(self, pos): + ## setting the position requires updating the "bin" attribute + cdef bam1_t * src + src = self._delegate + src.core.pos = pos + update_bin(src) + + property mapping_quality: + """mapping quality""" + def __get__(self): + return pysam_get_qual(self._delegate) + def __set__(self, qual): + pysam_set_qual(self._delegate, qual) + + property cigarstring: + '''the :term:`cigar` alignment as a string. + + The cigar string is a string of alternating integers + and characters denoting the length and the type of + an operation. + + .. note:: + The order length,operation is specified in the + SAM format. It is different from the order of + the :attr:`cigar` property. + + Returns None if not present. + + To unset the cigarstring, assign None or the + empty string. + ''' + def __get__(self): + c = self.cigartuples + if c is None: + return None + # reverse order + else: + return "".join([ "%i%c" % (y,CODE2CIGAR[x]) for x,y in c]) + + def __set__(self, cigar): + if cigar is None or len(cigar) == 0: + self.cigartuples = [] + else: + parts = CIGAR_REGEX.findall(cigar) + # reverse order + self.cigartuples = [(CIGAR2CODE[ord(y)], int(x)) for x,y in parts] + + # TODO + # property cigar: + # """the cigar alignment""" + + property next_reference_id: + """the :term:`reference` id of the mate/next read.""" + def __get__(self): + return self._delegate.core.mtid + def __set__(self, mtid): + if mtid != -1 and self.header and not self.header.is_valid_tid(mtid): + raise ValueError("reference id {} does not exist in header".format( + mtid)) + self._delegate.core.mtid = mtid + + property next_reference_name: + """:term:`reference` name of the mate/next read (None if no + AlignmentFile is associated)""" + def __get__(self): + if self._delegate.core.mtid == -1: + return None + if self.header: + return self.header.get_reference_name(self._delegate.core.mtid) + else: + raise ValueError("next_reference_name unknown if no header associated with record") + + def __set__(self, reference): + cdef int mtid + if reference is None or reference == "*": + self._delegate.core.mtid = -1 + elif reference == "=": + self._delegate.core.mtid = self._delegate.core.tid + elif self.header: + mtid = self.header.get_tid(reference) + if mtid < 0: + raise ValueError("reference {} does not exist in header".format( + reference)) + self._delegate.core.mtid = mtid + else: + raise ValueError("next_reference_name can not be set if no header associated with record") + + property next_reference_start: + """the position of the mate/next read.""" + def __get__(self): + return self._delegate.core.mpos + def __set__(self, mpos): + self._delegate.core.mpos = mpos + + property query_length: + """the length of the query/read. + + This value corresponds to the length of the sequence supplied + in the BAM/SAM file. The length of a query is 0 if there is no + sequence in the BAM/SAM file. In those cases, the read length + can be inferred from the CIGAR alignment, see + :meth:`pysam.AlignedSegment.infer_query_length`. + + The length includes soft-clipped bases and is equal to + ``len(query_sequence)``. + + This property is read-only but is updated when a new query + sequence is assigned to this AlignedSegment. + + Returns 0 if not available. + + """ + def __get__(self): + return self._delegate.core.l_qseq + + property template_length: + """the observed query template length""" + def __get__(self): + return self._delegate.core.isize + def __set__(self, isize): + self._delegate.core.isize = isize + + property query_sequence: + """read sequence bases, including :term:`soft clipped` bases + (None if not present). + + Assigning to this attribute will invalidate any quality scores. + Thus, to in-place edit the sequence and quality scores, copies of + the quality scores need to be taken. Consider trimming for example:: + + q = read.query_qualities + read.query_sequence = read.query_sequence[5:10] + read.query_qualities = q[5:10] + + The sequence is returned as it is stored in the BAM file. (This will + be the reverse complement of the original read sequence if the mapper + has aligned the read to the reverse strand.) + """ + def __get__(self): + if self.cache_query_sequence: + return self.cache_query_sequence + + cdef bam1_t * src + cdef char * s + src = self._delegate + + if src.core.l_qseq == 0: + return None + + self.cache_query_sequence = force_str(getSequenceInRange( + src, 0, src.core.l_qseq)) + return self.cache_query_sequence + + def __set__(self, seq): + # samtools manages sequence and quality length memory together + # if no quality information is present, the first byte says 0xff. + cdef bam1_t * src + cdef uint8_t * p + cdef char * s + cdef int l, k + cdef Py_ssize_t nbytes_new, nbytes_old + + if seq == None: + l = 0 + else: + l = len(seq) + seq = force_bytes(seq) + + src = self._delegate + + # as the sequence is stored in half-bytes, the total length (sequence + # plus quality scores) is (l+1)/2 + l + nbytes_new = (l + 1) // 2 + l + nbytes_old = (src.core.l_qseq + 1) // 2 + src.core.l_qseq + + # acquire pointer to location in memory + p = pysam_bam_get_seq(src) + src.core.l_qseq = l + + # change length of data field + cdef bam1_t * retval = pysam_bam_update(src, + nbytes_old, + nbytes_new, + p) + + if retval == NULL: + raise MemoryError("could not allocate memory") + + if l > 0: + # re-acquire pointer to location in memory + # as it might have moved + p = pysam_bam_get_seq(src) + for k from 0 <= k < nbytes_new: + p[k] = 0 + # convert to C string + s = seq + for k from 0 <= k < l: + p[k // 2] |= seq_nt16_table[<unsigned char>s[k]] << 4 * (1 - k % 2) + + # erase qualities + p = pysam_bam_get_qual(src) + memset(p, 0xff, l) + + self.cache_query_sequence = force_str(seq) + + # clear cached values for quality values + self.cache_query_qualities = None + self.cache_query_alignment_qualities = None + + property query_qualities: + """read sequence base qualities, including :term:`soft clipped` bases + (None if not present). + + Quality scores are returned as a python array of unsigned + chars. Note that this is not the ASCII-encoded value typically + seen in FASTQ or SAM formatted files. Thus, no offset of 33 + needs to be subtracted. + + Note that to set quality scores the sequence has to be set + beforehand as this will determine the expected length of the + quality score array. + + This method raises a ValueError if the length of the + quality scores and the sequence are not the same. + + """ + def __get__(self): + + if self.cache_query_qualities: + return self.cache_query_qualities + + cdef bam1_t * src + cdef char * q + + src = self._delegate + + if src.core.l_qseq == 0: + return None + + self.cache_query_qualities = getQualitiesInRange(src, 0, src.core.l_qseq) + return self.cache_query_qualities + + def __set__(self, qual): + + # note that memory is already allocated via setting the sequence + # hence length match of sequence and quality needs is checked. + cdef bam1_t * src + cdef uint8_t * p + cdef int l + + src = self._delegate + p = pysam_bam_get_qual(src) + if qual is None or len(qual) == 0: + # if absent and there is a sequence: set to 0xff + memset(p, 0xff, src.core.l_qseq) + return + + # check for length match + l = len(qual) + if src.core.l_qseq != l: + raise ValueError( + "quality and sequence mismatch: %i != %i" % + (l, src.core.l_qseq)) + + # create a python array object filling it + # with the quality scores + + # NB: should avoid this copying if qual is + # already of the correct type. + cdef c_array.array result = c_array.array('B', qual) + + # copy data + memcpy(p, result.data.as_voidptr, l) + + # save in cache + self.cache_query_qualities = qual + + property bin: + """properties bin""" + def __get__(self): + return self._delegate.core.bin + def __set__(self, bin): + self._delegate.core.bin = bin + + + ########################################################## + # Derived simple attributes. These are simple attributes of + # AlignedSegment getting and setting values. + ########################################################## + # 1. Flags + ########################################################## + property is_paired: + """true if read is paired in sequencing""" + def __get__(self): + return (self.flag & BAM_FPAIRED) != 0 + def __set__(self,val): + pysam_update_flag(self._delegate, val, BAM_FPAIRED) + + property is_proper_pair: + """true if read is mapped in a proper pair""" + def __get__(self): + return (self.flag & BAM_FPROPER_PAIR) != 0 + def __set__(self,val): + pysam_update_flag(self._delegate, val, BAM_FPROPER_PAIR) + + property is_unmapped: + """true if read itself is unmapped""" + def __get__(self): + return (self.flag & BAM_FUNMAP) != 0 + def __set__(self, val): + pysam_update_flag(self._delegate, val, BAM_FUNMAP) + # setting the unmapped flag requires recalculation of + # bin as alignment length is now implicitly 1 + update_bin(self._delegate) + + property is_mapped: + """true if read itself is mapped + (implemented in terms of :attr:`is_unmapped`)""" + def __get__(self): + return (self.flag & BAM_FUNMAP) == 0 + def __set__(self, val): + pysam_update_flag(self._delegate, not val, BAM_FUNMAP) + update_bin(self._delegate) + + property mate_is_unmapped: + """true if the mate is unmapped""" + def __get__(self): + return (self.flag & BAM_FMUNMAP) != 0 + def __set__(self,val): + pysam_update_flag(self._delegate, val, BAM_FMUNMAP) + + property mate_is_mapped: + """true if the mate is mapped + (implemented in terms of :attr:`mate_is_unmapped`)""" + def __get__(self): + return (self.flag & BAM_FMUNMAP) == 0 + def __set__(self,val): + pysam_update_flag(self._delegate, not val, BAM_FMUNMAP) + + property is_reverse: + """true if read is mapped to reverse strand""" + def __get__(self): + return (self.flag & BAM_FREVERSE) != 0 + def __set__(self,val): + pysam_update_flag(self._delegate, val, BAM_FREVERSE) + + property is_forward: + """true if read is mapped to forward strand + (implemented in terms of :attr:`is_reverse`)""" + def __get__(self): + return (self.flag & BAM_FREVERSE) == 0 + def __set__(self,val): + pysam_update_flag(self._delegate, not val, BAM_FREVERSE) + + property mate_is_reverse: + """true if the mate is mapped to reverse strand""" + def __get__(self): + return (self.flag & BAM_FMREVERSE) != 0 + def __set__(self,val): + pysam_update_flag(self._delegate, val, BAM_FMREVERSE) + + property mate_is_forward: + """true if the mate is mapped to forward strand + (implemented in terms of :attr:`mate_is_reverse`)""" + def __get__(self): + return (self.flag & BAM_FMREVERSE) == 0 + def __set__(self,val): + pysam_update_flag(self._delegate, not val, BAM_FMREVERSE) + + property is_read1: + """true if this is read1""" + def __get__(self): + return (self.flag & BAM_FREAD1) != 0 + def __set__(self,val): + pysam_update_flag(self._delegate, val, BAM_FREAD1) + property is_read2: + """true if this is read2""" + def __get__(self): + return (self.flag & BAM_FREAD2) != 0 + def __set__(self, val): + pysam_update_flag(self._delegate, val, BAM_FREAD2) + property is_secondary: + """true if not primary alignment""" + def __get__(self): + return (self.flag & BAM_FSECONDARY) != 0 + def __set__(self, val): + pysam_update_flag(self._delegate, val, BAM_FSECONDARY) + property is_qcfail: + """true if QC failure""" + def __get__(self): + return (self.flag & BAM_FQCFAIL) != 0 + def __set__(self, val): + pysam_update_flag(self._delegate, val, BAM_FQCFAIL) + property is_duplicate: + """true if optical or PCR duplicate""" + def __get__(self): + return (self.flag & BAM_FDUP) != 0 + def __set__(self, val): + pysam_update_flag(self._delegate, val, BAM_FDUP) + property is_supplementary: + """true if this is a supplementary alignment""" + def __get__(self): + return (self.flag & BAM_FSUPPLEMENTARY) != 0 + def __set__(self, val): + pysam_update_flag(self._delegate, val, BAM_FSUPPLEMENTARY) + + # 2. Coordinates and lengths + property reference_end: + '''aligned reference position of the read on the reference genome. + + reference_end points to one past the last aligned residue. + Returns None if not available (read is unmapped or no cigar + alignment present). + + ''' + def __get__(self): + cdef bam1_t * src + src = self._delegate + if (self.flag & BAM_FUNMAP) or pysam_get_n_cigar(src) == 0: + return None + return bam_endpos(src) + + property reference_length: + '''aligned length of the read on the reference genome. + + This is equal to `reference_end - reference_start`. + Returns None if not available. + ''' + def __get__(self): + cdef bam1_t * src + src = self._delegate + if (self.flag & BAM_FUNMAP) or pysam_get_n_cigar(src) == 0: + return None + return bam_endpos(src) - \ + self._delegate.core.pos + + property query_alignment_sequence: + """aligned portion of the read. + + This is a substring of :attr:`query_sequence` that excludes flanking + bases that were :term:`soft clipped` (None if not present). It + is equal to ``query_sequence[query_alignment_start:query_alignment_end]``. + + SAM/BAM files may include extra flanking bases that are not + part of the alignment. These bases may be the result of the + Smith-Waterman or other algorithms, which may not require + alignments that begin at the first residue or end at the last. + In addition, extra sequencing adapters, multiplex identifiers, + and low-quality bases that were not considered for alignment + may have been retained. + + """ + + def __get__(self): + if self.cache_query_alignment_sequence: + return self.cache_query_alignment_sequence + + cdef bam1_t * src + cdef uint32_t start, end + + src = self._delegate + + if src.core.l_qseq == 0: + return None + + start = getQueryStart(src) + end = getQueryEnd(src) + + self.cache_query_alignment_sequence = force_str( + getSequenceInRange(src, start, end)) + return self.cache_query_alignment_sequence + + property query_alignment_qualities: + """aligned query sequence quality values (None if not present). These + are the quality values that correspond to + :attr:`query_alignment_sequence`, that is, they exclude qualities of + :term:`soft clipped` bases. This is equal to + ``query_qualities[query_alignment_start:query_alignment_end]``. + + Quality scores are returned as a python array of unsigned + chars. Note that this is not the ASCII-encoded value typically + seen in FASTQ or SAM formatted files. Thus, no offset of 33 + needs to be subtracted. + + This property is read-only. + + """ + def __get__(self): + + if self.cache_query_alignment_qualities: + return self.cache_query_alignment_qualities + + cdef bam1_t * src + cdef uint32_t start, end + + src = self._delegate + + if src.core.l_qseq == 0: + return None + + start = getQueryStart(src) + end = getQueryEnd(src) + self.cache_query_alignment_qualities = \ + getQualitiesInRange(src, start, end) + return self.cache_query_alignment_qualities + + property query_alignment_start: + """start index of the aligned query portion of the sequence (0-based, + inclusive). + + This the index of the first base in :attr:`query_sequence` + that is not soft-clipped. + """ + def __get__(self): + return getQueryStart(self._delegate) + + property query_alignment_end: + """end index of the aligned query portion of the sequence (0-based, + exclusive) + + This the index just past the last base in :attr:`query_sequence` + that is not soft-clipped. + """ + def __get__(self): + return getQueryEnd(self._delegate) + + property modified_bases: + """Modified bases annotations from Ml/Mm tags. The output is + Dict[(canonical base, strand, modification)] -> [ (pos,qual), ...] + with qual being (256*probability), or -1 if unknown. + Strand==0 for forward and 1 for reverse strand modification + """ + def __get__(self): + cdef bam1_t * src + cdef hts_base_mod_state *m = hts_base_mod_state_alloc() + cdef hts_base_mod mods[5] + cdef int pos + + ret = {} + src = self._delegate + + if bam_parse_basemod(src, m) < 0: + return None + + n = bam_next_basemod(src, m, mods, 5, &pos) + + while n>0: + for i in range(n): + mod_code = chr(mods[i].modified_base) if mods[i].modified_base>0 else -mods[i].modified_base + mod_strand = mods[i].strand + if self.is_reverse: + mod_strand = 1 - mod_strand + key = (chr(mods[i].canonical_base), + mod_strand, + mod_code ) + ret.setdefault(key,[]).append((pos,mods[i].qual)) + + n = bam_next_basemod(src, m, mods, 5, &pos) + + if n<0: + return None + + hts_base_mod_state_free(m) + return ret + + property modified_bases_forward: + """Modified bases annotations from Ml/Mm tags. The output is + Dict[(canonical base, strand, modification)] -> [ (pos,qual), ...] + with qual being (256*probability), or -1 if unknown. + Strand==0 for forward and 1 for reverse strand modification. + The positions are with respect to the original sequence from get_forward_sequence() + """ + def __get__(self): + pmods = self.modified_bases + if pmods and self.is_reverse: + rmod = {} + + # Try to find the length of the original sequence + rlen = self.infer_read_length() + if rlen is None and self.query_sequence is None: + return rmod + else: + rlen = len(self.query_sequence) + + for k,mods in pmods.items(): + nk = k[0],1 - k[1],k[2] + for i in range(len(mods)): + + mods[i] = (rlen - 1 -mods[i][0], mods[i][1]) + rmod[nk] = mods + return rmod + + return pmods + + + property query_alignment_length: + """length of the aligned query sequence. + + This is equal to :attr:`query_alignment_end` - + :attr:`query_alignment_start` + """ + def __get__(self): + cdef bam1_t * src + src = self._delegate + return getQueryEnd(src) - getQueryStart(src) + + ##################################################### + # Computed properties + + def get_reference_positions(self, full_length=False): + """a list of reference positions that this read aligns to. + + By default, this method returns the (0-based) positions on the + reference that are within the read's alignment, leaving gaps + corresponding to deletions and other reference skips. + + When *full_length* is True, the returned list is the same length + as the read and additionally includes None values corresponding + to insertions or soft-clipping, i.e., to bases of the read that + are not aligned to a reference position. + (See also :meth:`get_aligned_pairs` which additionally returns + the corresponding positions along the read.) + """ + cdef uint32_t k, i, l, pos + cdef int op + cdef uint32_t * cigar_p + cdef bam1_t * src + cdef bint _full = full_length + + src = self._delegate + if pysam_get_n_cigar(src) == 0: + return [] + + result = [] + pos = src.core.pos + cigar_p = pysam_bam_get_cigar(src) + + for k from 0 <= k < pysam_get_n_cigar(src): + op = cigar_p[k] & BAM_CIGAR_MASK + l = cigar_p[k] >> BAM_CIGAR_SHIFT + + if op == BAM_CSOFT_CLIP or op == BAM_CINS: + if _full: + for i from 0 <= i < l: + result.append(None) + elif op == BAM_CMATCH or op == BAM_CEQUAL or op == BAM_CDIFF: + for i from pos <= i < pos + l: + result.append(i) + pos += l + elif op == BAM_CDEL or op == BAM_CREF_SKIP: + pos += l + + return result + + def infer_query_length(self, always=False): + """infer query length from CIGAR alignment. + + This method deduces the query length from the CIGAR alignment + but does not include hard-clipped bases. + + Returns None if CIGAR alignment is not present. + + If *always* is set to True, `infer_read_length` is used instead. + This is deprecated and only present for backward compatibility. + """ + if always is True: + return self.infer_read_length() + cdef int32_t l = calculateQueryLengthWithoutHardClipping(self._delegate) + if l > 0: + return l + else: + return None + + def infer_read_length(self): + """infer read length from CIGAR alignment. + + This method deduces the read length from the CIGAR alignment + including hard-clipped bases. + + Returns None if CIGAR alignment is not present. + """ + cdef int32_t l = calculateQueryLengthWithHardClipping(self._delegate) + if l > 0: + return l + else: + return None + + def get_reference_sequence(self): + """return the reference sequence in the region that is covered by the + alignment of the read to the reference. + + This method requires the MD tag to be set. + + """ + return force_str(build_reference_sequence(self._delegate)) + + def get_forward_sequence(self): + """return the original read sequence. + + Reads mapped to the reverse strand are stored reverse complemented in + the BAM file. This method returns such reads reverse complemented back + to their original orientation. + + Returns None if the record has no query sequence. + """ + if self.query_sequence is None: + return None + s = force_str(self.query_sequence) + if self.is_reverse: + s = s.translate(str.maketrans("ACGTacgtNnXx", "TGCAtgcaNnXx"))[::-1] + return s + + def get_forward_qualities(self): + """return the original base qualities of the read sequence, + in the same format as the :attr:`query_qualities` property. + + Reads mapped to the reverse strand have their base qualities stored + reversed in the BAM file. This method returns such reads' base qualities + reversed back to their original orientation. + """ + if self.is_reverse: + return self.query_qualities[::-1] + else: + return self.query_qualities + + def get_aligned_pairs(self, matches_only=False, with_seq=False, with_cigar=False): + """a list of aligned read (query) and reference positions. + + Each item in the returned list is a tuple consisting of + the 0-based offset from the start of the read sequence + followed by the 0-based reference position. + + For inserts, deletions, skipping either query or reference + position may be None. + + For padding in the reference, the reference position will + always be None. + + Parameters + ---------- + + matches_only : bool + If True, only matched bases are returned --- no None on either + side. + with_seq : bool + If True, return a third element in the tuple containing the + reference sequence. For CIGAR 'P' (padding in the reference) + operations, the third tuple element will be None. Substitutions + are lower-case. This option requires an MD tag to be present. + with_cigar : bool + If True, return an extra element in the tuple containing the + CIGAR operator corresponding to this position tuple. + + Returns + ------- + + aligned_pairs : list of tuples + + """ + cdef uint32_t k, i, pos, qpos, r_idx, l + cdef int op + cdef uint32_t * cigar_p + cdef bam1_t * src = self._delegate + cdef bint _matches_only = bool(matches_only) + cdef bint _with_seq = bool(with_seq) + cdef bint _with_cigar = bool(with_cigar) + cdef object (*make_tuple)(object, object, object, uint32_t, int) + + # TODO: this method performs no checking and assumes that + # read sequence, cigar and MD tag are consistent. + + if _with_seq: + # force_str required for py2/py3 compatibility + ref_seq = force_str(build_reference_sequence(src)) + if ref_seq is None: + raise ValueError("MD tag not present") + make_tuple = _alignedpairs_with_seq_cigar if _with_cigar else _alignedpairs_with_seq + else: + ref_seq = None + make_tuple = _alignedpairs_with_cigar if _with_cigar else _alignedpairs_positions + + r_idx = 0 + + if pysam_get_n_cigar(src) == 0: + return [] + + result = [] + pos = src.core.pos + qpos = 0 + cigar_p = pysam_bam_get_cigar(src) + for k from 0 <= k < pysam_get_n_cigar(src): + op = cigar_p[k] & BAM_CIGAR_MASK + l = cigar_p[k] >> BAM_CIGAR_SHIFT + + if op == BAM_CMATCH or op == BAM_CEQUAL or op == BAM_CDIFF: + for i from pos <= i < pos + l: + result.append(make_tuple(qpos, i, ref_seq, r_idx, op)) + r_idx += 1 + qpos += 1 + pos += l + + elif op == BAM_CINS or op == BAM_CSOFT_CLIP or op == BAM_CPAD: + if not _matches_only: + for i from pos <= i < pos + l: + result.append(make_tuple(qpos, None, None, 0, op)) + qpos += 1 + else: + qpos += l + + elif op == BAM_CDEL: + if not _matches_only: + for i from pos <= i < pos + l: + result.append(make_tuple(None, i, ref_seq, r_idx, op)) + r_idx += 1 + else: + r_idx += l + pos += l + + elif op == BAM_CHARD_CLIP: + pass # advances neither + + elif op == BAM_CREF_SKIP: + if not _matches_only: + for i from pos <= i < pos + l: + result.append(make_tuple(None, i, None, 0, op)) + + pos += l + + return result + + def get_blocks(self): + """ a list of start and end positions of + aligned gapless blocks. + + The start and end positions are in genomic + coordinates. + + Blocks are not normalized, i.e. two blocks + might be directly adjacent. This happens if + the two blocks are separated by an insertion + in the read. + """ + + cdef uint32_t k, pos, l + cdef int op + cdef uint32_t * cigar_p + cdef bam1_t * src + + src = self._delegate + if pysam_get_n_cigar(src) == 0: + return [] + + result = [] + pos = src.core.pos + cigar_p = pysam_bam_get_cigar(src) + l = 0 + + for k from 0 <= k < pysam_get_n_cigar(src): + op = cigar_p[k] & BAM_CIGAR_MASK + l = cigar_p[k] >> BAM_CIGAR_SHIFT + if op == BAM_CMATCH or op == BAM_CEQUAL or op == BAM_CDIFF: + result.append((pos, pos + l)) + pos += l + elif op == BAM_CDEL or op == BAM_CREF_SKIP: + pos += l + + return result + + def get_overlap(self, uint32_t start, uint32_t end): + """return number of aligned bases of read overlapping the interval + *start* and *end* on the reference sequence. + + Return None if cigar alignment is not available. + """ + cdef uint32_t k, i, pos, overlap + cdef int op, o + cdef uint32_t * cigar_p + cdef bam1_t * src + + overlap = 0 + + src = self._delegate + if pysam_get_n_cigar(src) == 0: + return None + pos = src.core.pos + o = 0 + + cigar_p = pysam_bam_get_cigar(src) + for k from 0 <= k < pysam_get_n_cigar(src): + op = cigar_p[k] & BAM_CIGAR_MASK + l = cigar_p[k] >> BAM_CIGAR_SHIFT + + if op == BAM_CMATCH or op == BAM_CEQUAL or op == BAM_CDIFF: + o = min( pos + l, end) - max( pos, start ) + if o > 0: overlap += o + + if op == BAM_CMATCH or op == BAM_CDEL or op == BAM_CREF_SKIP or op == BAM_CEQUAL or op == BAM_CDIFF: + pos += l + + return overlap + + def get_cigar_stats(self): + """summary of operations in cigar string. + + The output order in the array is "MIDNSHP=X" followed by a + field for the NM tag. If the NM tag is not present, this + field will always be 0. (Accessing this field via index -1 + avoids changes if more CIGAR operators are added in future.) + + +-----+----------------+--------+ + |M |pysam.CMATCH |0 | + +-----+----------------+--------+ + |I |pysam.CINS |1 | + +-----+----------------+--------+ + |D |pysam.CDEL |2 | + +-----+----------------+--------+ + |N |pysam.CREF_SKIP |3 | + +-----+----------------+--------+ + |S |pysam.CSOFT_CLIP|4 | + +-----+----------------+--------+ + |H |pysam.CHARD_CLIP|5 | + +-----+----------------+--------+ + |P |pysam.CPAD |6 | + +-----+----------------+--------+ + |= |pysam.CEQUAL |7 | + +-----+----------------+--------+ + |X |pysam.CDIFF |8 | + +-----+----------------+--------+ + |B |pysam.CBACK |9 | + +-----+----------------+--------+ + |NM |NM tag |10 or -1| + +-----+----------------+--------+ + + If no cigar string is present, empty arrays will be returned. + + Returns: + arrays : + two arrays. The first contains the nucleotide counts within + each cigar operation, the second contains the number of blocks + for each cigar operation. + + """ + + cdef int nfields = NCIGAR_CODES + 1 + + cdef c_array.array base_counts = array.array( + "I", + [0] * nfields) + cdef uint32_t [:] base_view = base_counts + cdef c_array.array block_counts = array.array( + "I", + [0] * nfields) + cdef uint32_t [:] block_view = block_counts + + cdef bam1_t * src = self._delegate + cdef int op + cdef uint32_t l + cdef int32_t k + cdef uint32_t * cigar_p = pysam_bam_get_cigar(src) + + if cigar_p == NULL: + return None + + for k from 0 <= k < pysam_get_n_cigar(src): + op = cigar_p[k] & BAM_CIGAR_MASK + l = cigar_p[k] >> BAM_CIGAR_SHIFT + base_view[op] += l + block_view[op] += 1 + + cdef uint8_t * v = bam_aux_get(src, 'NM') + if v != NULL: + base_view[nfields - 1] = <int32_t>bam_aux2i(v) + + return base_counts, block_counts + + ##################################################### + ## Unsorted as yet + # TODO: capture in CIGAR object + property cigartuples: + """the :term:`cigar` alignment. The alignment + is returned as a list of tuples of (operation, length). + + If the alignment is not present, None is returned. + + The operations are: + + +-----+----------------+-----+ + |M |pysam.CMATCH |0 | + +-----+----------------+-----+ + |I |pysam.CINS |1 | + +-----+----------------+-----+ + |D |pysam.CDEL |2 | + +-----+----------------+-----+ + |N |pysam.CREF_SKIP |3 | + +-----+----------------+-----+ + |S |pysam.CSOFT_CLIP|4 | + +-----+----------------+-----+ + |H |pysam.CHARD_CLIP|5 | + +-----+----------------+-----+ + |P |pysam.CPAD |6 | + +-----+----------------+-----+ + |= |pysam.CEQUAL |7 | + +-----+----------------+-----+ + |X |pysam.CDIFF |8 | + +-----+----------------+-----+ + |B |pysam.CBACK |9 | + +-----+----------------+-----+ + + .. note:: + The output is a list of (operation, length) tuples, such as + ``[(0, 30)]``. + This is different from the SAM specification and + the :attr:`cigarstring` property, which uses a + (length, operation) order, for example: ``30M``. + + To unset the cigar property, assign an empty list + or None. + """ + def __get__(self): + cdef uint32_t * cigar_p + cdef bam1_t * src + cdef uint32_t op, l + cdef uint32_t k + + src = self._delegate + if pysam_get_n_cigar(src) == 0: + return None + + cigar = [] + + cigar_p = pysam_bam_get_cigar(src); + for k from 0 <= k < pysam_get_n_cigar(src): + op = cigar_p[k] & BAM_CIGAR_MASK + l = cigar_p[k] >> BAM_CIGAR_SHIFT + cigar.append((op, l)) + return cigar + + def __set__(self, values): + cdef uint32_t * p + cdef bam1_t * src + cdef op, l + cdef int k + + k = 0 + + src = self._delegate + + # get location of cigar string + p = pysam_bam_get_cigar(src) + + # empty values for cigar string + if values is None: + values = [] + + cdef uint32_t ncigar = len(values) + + cdef bam1_t * retval = pysam_bam_update(src, + pysam_get_n_cigar(src) * 4, + ncigar * 4, + <uint8_t*>p) + + if retval == NULL: + raise MemoryError("could not allocate memory") + + # length is number of cigar operations, not bytes + pysam_set_n_cigar(src, ncigar) + + # re-acquire pointer to location in memory + # as it might have moved + p = pysam_bam_get_cigar(src) + + # insert cigar operations + for op, l in values: + p[k] = l << BAM_CIGAR_SHIFT | op + k += 1 + + ## setting the cigar string requires updating the bin + update_bin(src) + + cpdef set_tag(self, + tag, + value, + value_type=None, + replace=True): + """sets a particular field *tag* to *value* in the optional alignment + section. + + *value_type* describes the type of *value* that is to entered + into the alignment record. It can be set explicitly to one of + the valid one-letter type codes. If unset, an appropriate type + will be chosen automatically based on the python type of + *value*. + + An existing value of the same *tag* will be overwritten unless + *replace* is set to False. This is usually not recommended as a + tag may only appear once in the optional alignment section. + + If *value* is `None`, the tag will be deleted. + + This method accepts valid SAM specification value types, which + are:: + + A: printable char + i: signed int + f: float + Z: printable string + H: Byte array in hex format + B: Integer or numeric array + + Additionally, it will accept the integer BAM types ('cCsSI') + + For htslib compatibility, 'a' is synonymous with 'A' and the + method accepts a 'd' type code for a double precision float. + + When deducing the type code by the python type of *value*, the + following mapping is applied:: + + i: python int + f: python float + Z: python str or bytes + B: python array.array, list or tuple + + Note that a single character string will be output as 'Z' and + not 'A' as the former is the more general type. + """ + + cdef int value_size + cdef uint8_t tc + cdef uint8_t * value_ptr + cdef uint8_t *existing_ptr + cdef float float_value + cdef double double_value + cdef int32_t int32_t_value + cdef uint32_t uint32_t_value + cdef int16_t int16_t_value + cdef uint16_t uint16_t_value + cdef int8_t int8_t_value + cdef uint8_t uint8_t_value + cdef bam1_t * src = self._delegate + cdef char * _value_type + cdef c_array.array array_value + cdef object buffer + + if len(tag) != 2: + raise ValueError('Invalid tag: %s' % tag) + + tag = force_bytes(tag) + if replace: + existing_ptr = bam_aux_get(src, tag) + if existing_ptr: + bam_aux_del(src, existing_ptr) + + # setting value to None deletes a tag + if value is None: + return + + cdef uint8_t typecode = get_tag_typecode(value, value_type) + if typecode == 0: + raise ValueError("can't guess type or invalid type code specified: {} {}".format( + value, value_type)) + + # sam_format1 for typecasting + if typecode == b'Z': + value = force_bytes(value) + value_ptr = <uint8_t*><char*>value + value_size = len(value)+1 + elif typecode == b'H': + # Note that hex tags are stored the very same + # way as Z string.s + value = force_bytes(value) + value_ptr = <uint8_t*><char*>value + value_size = len(value)+1 + elif typecode == b'A' or typecode == b'a': + value = force_bytes(value) + value_ptr = <uint8_t*><char*>value + value_size = sizeof(char) + typecode = b'A' + elif typecode == b'i': + int32_t_value = value + value_ptr = <uint8_t*>&int32_t_value + value_size = sizeof(int32_t) + elif typecode == b'I': + uint32_t_value = value + value_ptr = <uint8_t*>&uint32_t_value + value_size = sizeof(uint32_t) + elif typecode == b's': + int16_t_value = value + value_ptr = <uint8_t*>&int16_t_value + value_size = sizeof(int16_t) + elif typecode == b'S': + uint16_t_value = value + value_ptr = <uint8_t*>&uint16_t_value + value_size = sizeof(uint16_t) + elif typecode == b'c': + int8_t_value = value + value_ptr = <uint8_t*>&int8_t_value + value_size = sizeof(int8_t) + elif typecode == b'C': + uint8_t_value = value + value_ptr = <uint8_t*>&uint8_t_value + value_size = sizeof(uint8_t) + elif typecode == b'd': + double_value = value + value_ptr = <uint8_t*>&double_value + value_size = sizeof(double) + elif typecode == b'f': + float_value = value + value_ptr = <uint8_t*>&float_value + value_size = sizeof(float) + elif typecode == b'B': + # the following goes through python, needs to be cleaned up + # pack array using struct + fmt, args = pack_tags([(tag, value, value_type)]) + + # remove tag and type code as set by bam_aux_append + # first four chars of format (<2sB) + fmt = '<' + fmt[4:] + # first two values to pack + args = args[2:] + value_size = struct.calcsize(fmt) + # buffer will be freed when object goes out of scope + buffer = ctypes.create_string_buffer(value_size) + struct.pack_into(fmt, buffer, 0, *args) + # bam_aux_append copies data from value_ptr + bam_aux_append(src, + tag, + typecode, + value_size, + <uint8_t*>buffer.raw) + return + else: + raise ValueError('unsupported value_type {} in set_option'.format(typecode)) + + bam_aux_append(src, + tag, + typecode, + value_size, + value_ptr) + + cpdef has_tag(self, tag): + """returns true if the optional alignment section + contains a given *tag*.""" + cdef uint8_t * v + cdef int nvalues + btag = force_bytes(tag) + v = bam_aux_get(self._delegate, btag) + return v != NULL + + cpdef get_tag(self, tag, with_value_type=False): + """ + retrieves data from the optional alignment section + given a two-letter *tag* denoting the field. + + The returned value is cast into an appropriate python type. + + This method is the fastest way to access the optional + alignment section if only few tags need to be retrieved. + + Possible value types are "AcCsSiIfZHB" (see BAM format + specification) as well as additional value type 'd' as + implemented in htslib. + + Parameters: + + tag : + data tag. + + with_value_type : Optional[bool] + if set to True, the return value is a tuple of (tag value, type + code). (default False) + + Returns: + + A python object with the value of the `tag`. The type of the + object depends on the data type in the data record. + + Raises: + + KeyError + If `tag` is not present, a KeyError is raised. + + """ + cdef uint8_t * v + cdef int nvalues + btag = force_bytes(tag) + v = bam_aux_get(self._delegate, btag) + if v == NULL: + raise KeyError("tag '%s' not present" % tag) + if chr(v[0]) == "B": + auxtype = chr(v[0]) + chr(v[1]) + else: + auxtype = chr(v[0]) + + if auxtype in "iIcCsS": + value = bam_aux2i(v) + elif auxtype == 'f' or auxtype == 'F': + value = bam_aux2f(v) + elif auxtype == 'd' or auxtype == 'D': + value = bam_aux2f(v) + elif auxtype == 'A' or auxtype == 'a': + # force A to a + v[0] = b'A' + # there might a more efficient way + # to convert a char into a string + value = '%c' % <char>bam_aux2A(v) + elif auxtype == 'Z' or auxtype == 'H': + # Z and H are treated equally as strings in htslib + value = charptr_to_str(<char*>bam_aux2Z(v)) + elif auxtype[0] == 'B': + bytesize, nvalues, values = convert_binary_tag(v + 1) + value = values + else: + raise ValueError("unknown auxiliary type '%s'" % auxtype) + + if with_value_type: + return (value, auxtype) + else: + return value + + def get_tags(self, with_value_type=False): + """the fields in the optional alignment section. + + Returns a list of all fields in the optional + alignment section. Values are converted to appropriate python + values. For example: ``[(NM, 2), (RG, "GJP00TM04")]`` + + If *with_value_type* is set, the value type as encode in + the AlignedSegment record will be returned as well: + + [(NM, 2, "i"), (RG, "GJP00TM04", "Z")] + + This method will convert all values in the optional alignment + section. When getting only one or few tags, please see + :meth:`get_tag` for a quicker way to achieve this. + + """ + + cdef char * ctag + cdef bam1_t * src + cdef uint8_t * s + cdef char auxtag[3] + cdef char auxtype + cdef uint8_t byte_size + cdef int32_t nvalues + + src = self._delegate + if src.l_data == 0: + return [] + s = pysam_bam_get_aux(src) + result = [] + auxtag[2] = 0 + while s < (src.data + src.l_data): + # get tag + auxtag[0] = s[0] + auxtag[1] = s[1] + s += 2 + auxtype = s[0] + if auxtype in (b'c', b'C'): + value = <int>bam_aux2i(s) + s += 1 + elif auxtype in (b's', b'S'): + value = <int>bam_aux2i(s) + s += 2 + elif auxtype in (b'i', b'I'): + value = <int32_t>bam_aux2i(s) + s += 4 + elif auxtype == b'f': + value = <float>bam_aux2f(s) + s += 4 + elif auxtype == b'd': + value = <double>bam_aux2f(s) + s += 8 + elif auxtype in (b'A', b'a'): + value = "%c" % <char>bam_aux2A(s) + s += 1 + elif auxtype in (b'Z', b'H'): + value = charptr_to_str(<char*>bam_aux2Z(s)) + # +1 for NULL terminated string + s += len(value) + 1 + elif auxtype == b'B': + s += 1 + byte_size, nvalues, value = convert_binary_tag(s) + # 5 for 1 char and 1 int + s += 5 + (nvalues * byte_size) - 1 + else: + raise KeyError("unknown type '%s'" % auxtype) + + s += 1 + + if with_value_type: + result.append((charptr_to_str(auxtag), value, chr(auxtype))) + else: + result.append((charptr_to_str(auxtag), value)) + + return result + + def set_tags(self, tags): + """sets the fields in the optional alignment section with + a list of (tag, value) tuples. + + The value type of the values is determined from the + python type. Optionally, a type may be given explicitly as + a third value in the tuple, For example: + + x.set_tags([(NM, 2, "i"), (RG, "GJP00TM04", "Z")] + + This method will not enforce the rule that the same tag may appear + only once in the optional alignment section. + """ + + cdef bam1_t * src + cdef uint8_t * s + cdef char * temp + cdef int new_size = 0 + cdef int old_size + src = self._delegate + + # convert and pack the data + if tags is not None and len(tags) > 0: + fmt, args = pack_tags(tags) + new_size = struct.calcsize(fmt) + buffer = ctypes.create_string_buffer(new_size) + struct.pack_into(fmt, + buffer, + 0, + *args) + + + # delete the old data and allocate new space. + # If total_size == 0, the aux field will be + # empty + old_size = pysam_bam_get_l_aux(src) + cdef bam1_t * retval = pysam_bam_update(src, + old_size, + new_size, + pysam_bam_get_aux(src)) + if retval == NULL: + raise MemoryError("could not allocate memory") + + # copy data only if there is any + if new_size > 0: + + # get location of new data + s = pysam_bam_get_aux(src) + + # check if there is direct path from buffer.raw to tmp + p = buffer.raw + # create handle to make sure buffer stays alive long + # enough for memcpy, see issue 129 + temp = p + memcpy(s, temp, new_size) + + + ######################################################## + # Compatibility Accessors + # Functions, properties for compatibility with pysam < 0.8 + # + # Several options + # change the factory functions according to API + # * requires code changes throughout, incl passing + # handles to factory functions + # subclass functions and add attributes at runtime + # e.g.: AlignedSegments.qname = AlignedSegments.query_name + # * will slow down the default interface + # explicit declaration of getters/setters + ######################################################## + property qname: + """deprecated, use :attr:`query_name` instead.""" + def __get__(self): return self.query_name + def __set__(self, v): self.query_name = v + property tid: + """deprecated, use :attr:`reference_id` instead.""" + def __get__(self): return self.reference_id + def __set__(self, v): self.reference_id = v + property pos: + """deprecated, use :attr:`reference_start` instead.""" + def __get__(self): return self.reference_start + def __set__(self, v): self.reference_start = v + property mapq: + """deprecated, use :attr:`mapping_quality` instead.""" + def __get__(self): return self.mapping_quality + def __set__(self, v): self.mapping_quality = v + property rnext: + """deprecated, use :attr:`next_reference_id` instead.""" + def __get__(self): return self.next_reference_id + def __set__(self, v): self.next_reference_id = v + property pnext: + """deprecated, use :attr:`next_reference_start` instead.""" + def __get__(self): + return self.next_reference_start + def __set__(self, v): + self.next_reference_start = v + property cigar: + """deprecated, use :attr:`cigarstring` or :attr:`cigartuples` instead.""" + def __get__(self): + r = self.cigartuples + if r is None: + r = [] + return r + def __set__(self, v): self.cigartuples = v + property tlen: + """deprecated, use :attr:`template_length` instead.""" + def __get__(self): + return self.template_length + def __set__(self, v): + self.template_length = v + property seq: + """deprecated, use :attr:`query_sequence` instead.""" + def __get__(self): + return self.query_sequence + def __set__(self, v): + self.query_sequence = v + property qual: + """deprecated, use :attr:`query_qualities` instead.""" + def __get__(self): + return array_to_qualitystring(self.query_qualities) + def __set__(self, v): + self.query_qualities = qualitystring_to_array(v) + property alen: + """deprecated, use :attr:`reference_length` instead.""" + def __get__(self): + return self.reference_length + def __set__(self, v): + self.reference_length = v + property aend: + """deprecated, use :attr:`reference_end` instead.""" + def __get__(self): + return self.reference_end + def __set__(self, v): + self.reference_end = v + property rlen: + """deprecated, use :attr:`query_length` instead.""" + def __get__(self): + return self.query_length + def __set__(self, v): + self.query_length = v + property query: + """deprecated, use :attr:`query_alignment_sequence` + instead.""" + def __get__(self): + return self.query_alignment_sequence + def __set__(self, v): + self.query_alignment_sequence = v + property qqual: + """deprecated, use :attr:`query_alignment_qualities` + instead.""" + def __get__(self): + return array_to_qualitystring(self.query_alignment_qualities) + def __set__(self, v): + self.query_alignment_qualities = qualitystring_to_array(v) + property qstart: + """deprecated, use :attr:`query_alignment_start` instead.""" + def __get__(self): + return self.query_alignment_start + def __set__(self, v): + self.query_alignment_start = v + property qend: + """deprecated, use :attr:`query_alignment_end` instead.""" + def __get__(self): + return self.query_alignment_end + def __set__(self, v): + self.query_alignment_end = v + property qlen: + """deprecated, use :attr:`query_alignment_length` + instead.""" + def __get__(self): + return self.query_alignment_length + def __set__(self, v): + self.query_alignment_length = v + property mrnm: + """deprecated, use :attr:`next_reference_id` instead.""" + def __get__(self): + return self.next_reference_id + def __set__(self, v): + self.next_reference_id = v + property mpos: + """deprecated, use :attr:`next_reference_start` + instead.""" + def __get__(self): + return self.next_reference_start + def __set__(self, v): + self.next_reference_start = v + property rname: + """deprecated, use :attr:`reference_id` instead.""" + def __get__(self): + return self.reference_id + def __set__(self, v): + self.reference_id = v + property isize: + """deprecated, use :attr:`template_length` instead.""" + def __get__(self): + return self.template_length + def __set__(self, v): + self.template_length = v + property blocks: + """deprecated, use :meth:`get_blocks()` instead.""" + def __get__(self): + return self.get_blocks() + property aligned_pairs: + """deprecated, use :meth:`get_aligned_pairs()` instead.""" + def __get__(self): + return self.get_aligned_pairs() + property inferred_length: + """deprecated, use :meth:`infer_query_length()` instead.""" + def __get__(self): + return self.infer_query_length() + property positions: + """deprecated, use :meth:`get_reference_positions()` instead.""" + def __get__(self): + return self.get_reference_positions() + property tags: + """deprecated, use :meth:`get_tags()` instead.""" + def __get__(self): + return self.get_tags() + def __set__(self, tags): + self.set_tags(tags) + def overlap(self): + """deprecated, use :meth:`get_overlap()` instead.""" + return self.get_overlap() + def opt(self, tag): + """deprecated, use :meth:`get_tag()` instead.""" + return self.get_tag(tag) + def setTag(self, tag, value, value_type=None, replace=True): + """deprecated, use :meth:`set_tag()` instead.""" + return self.set_tag(tag, value, value_type, replace) + + +cdef class PileupColumn: + '''A pileup of reads at a particular reference sequence position + (:term:`column`). A pileup column contains all the reads that map + to a certain target base. + + This class is a proxy for results returned by the samtools pileup + engine. If the underlying engine iterator advances, the results + of this column will change. + ''' + def __init__(self): + raise TypeError("this class cannot be instantiated from Python") + + def __str__(self): + return "\t".join(map(str, + (self.reference_id, + self.reference_pos, + self.nsegments))) +\ + "\n" +\ + "\n".join(map(str, self.pileups)) + + def __dealloc__(self): + free(self.buf.s) + + def set_min_base_quality(self, min_base_quality): + """set the minimum base quality for this pileup column. + """ + self.min_base_quality = min_base_quality + + def __len__(self): + """return number of reads aligned to this column. + + see :meth:`get_num_aligned` + """ + return self.get_num_aligned() + + property reference_id: + '''the reference sequence number as defined in the header''' + def __get__(self): + return self.tid + + property reference_name: + """:term:`reference` name (None if no AlignmentFile is associated)""" + def __get__(self): + if self.header is not None: + return self.header.get_reference_name(self.tid) + return None + + property nsegments: + '''number of reads mapping to this column. + + Note that this number ignores the base quality filter.''' + def __get__(self): + return self.n_pu + def __set__(self, n): + self.n_pu = n + + property reference_pos: + '''the position in the reference sequence (0-based).''' + def __get__(self): + return self.pos + + property pileups: + '''list of reads (:class:`pysam.PileupRead`) aligned to this column''' + def __get__(self): + if self.plp == NULL or self.plp[0] == NULL: + raise ValueError("PileupColumn accessed after iterator finished") + + cdef int x + cdef const bam_pileup1_t * p = NULL + pileups = [] + + # warning: there could be problems if self.n and self.buf are + # out of sync. + for x from 0 <= x < self.n_pu: + p = &(self.plp[0][x]) + if p == NULL: + raise ValueError( + "pileup buffer out of sync - most likely use of iterator " + "outside loop") + if pileup_base_qual_skip(p, self.min_base_quality): + continue + pileups.append(makePileupRead(p, self.header)) + return pileups + + ######################################################## + # Compatibility Accessors + # Functions, properties for compatibility with pysam < 0.8 + ######################################################## + property pos: + """deprecated, use :attr:`reference_pos` instead.""" + def __get__(self): + return self.reference_pos + def __set__(self, v): + self.reference_pos = v + + property tid: + """deprecated, use :attr:`reference_id` instead.""" + def __get__(self): + return self.reference_id + def __set__(self, v): + self.reference_id = v + + property n: + """deprecated, use :attr:`nsegments` instead.""" + def __get__(self): + return self.nsegments + def __set__(self, v): + self.nsegments = v + + def get_num_aligned(self): + """return number of aligned bases at pileup column position. + + This method applies a base quality filter and the number is + equal to the size of :meth:`get_query_sequences`, + :meth:`get_mapping_qualities`, etc. + + """ + cdef uint32_t x = 0 + cdef uint32_t c = 0 + cdef uint32_t cnt = 0 + cdef const bam_pileup1_t * p = NULL + if self.plp == NULL or self.plp[0] == NULL: + raise ValueError("PileupColumn accessed after iterator finished") + + for x from 0 <= x < self.n_pu: + p = &(self.plp[0][x]) + if p == NULL: + raise ValueError( + "pileup buffer out of sync - most likely use of iterator " + "outside loop") + if pileup_base_qual_skip(p, self.min_base_quality): + continue + cnt += 1 + return cnt + + def get_query_sequences(self, bint mark_matches=False, bint mark_ends=False, bint add_indels=False): + """query bases/sequences at pileup column position. + + Optionally, the bases/sequences can be annotated according to the samtools + mpileup format. This is the format description from the samtools mpileup tool:: + + Information on match, mismatch, indel, strand, mapping + quality and start and end of a read are all encoded at the + read base column. At this column, a dot stands for a match + to the reference base on the forward strand, a comma for a + match on the reverse strand, a '>' or '<' for a reference + skip, `ACGTN' for a mismatch on the forward strand and + `acgtn' for a mismatch on the reverse strand. A pattern + `\\+[0-9]+[ACGTNacgtn]+' indicates there is an insertion + between this reference position and the next reference + position. The length of the insertion is given by the + integer in the pattern, followed by the inserted + sequence. Similarly, a pattern `-[0-9]+[ACGTNacgtn]+' + represents a deletion from the reference. The deleted bases + will be presented as `*' in the following lines. Also at + the read base column, a symbol `^' marks the start of a + read. The ASCII of the character following `^' minus 33 + gives the mapping quality. A symbol `$' marks the end of a + read segment + + To reproduce samtools mpileup format, set all of mark_matches, + mark_ends and add_indels to True. + + Parameters + ---------- + + mark_matches: bool + + If True, output bases matching the reference as "." or "," + for forward and reverse strand, respectively. This mark + requires the reference sequence. If no reference is + present, this option is ignored. + + mark_ends : bool + + If True, add markers "^" and "$" for read start and end, respectively. + + add_indels : bool + + If True, add bases for bases inserted into or skipped from the + reference. The latter requires a reference sequence file to have + been given, e.g. via `pileup(fastafile = ...)`. If no reference + sequence is available, skipped bases are represented as 'N's. + + Returns + ------- + + a list of bases/sequences per read at pileup column position. : list + + """ + cdef uint32_t x = 0 + cdef uint32_t j = 0 + cdef uint32_t c = 0 + cdef uint8_t cc = 0 + cdef uint8_t rb = 0 + cdef kstring_t * buf = &self.buf + cdef const bam_pileup1_t * p = NULL + + if self.plp == NULL or self.plp[0] == NULL: + raise ValueError("PileupColumn accessed after iterator finished") + + buf.l = 0 + + # todo: reference sequence to count matches/mismatches + # todo: convert assertions to exceptions + for x from 0 <= x < self.n_pu: + p = &(self.plp[0][x]) + if p == NULL: + raise ValueError( + "pileup buffer out of sync - most likely use of iterator " + "outside loop") + if pileup_base_qual_skip(p, self.min_base_quality): + continue + # see samtools pileup_seq + if mark_ends and p.is_head: + kputc(b'^', buf) + + if p.b.core.qual > 93: + kputc(126, buf) + else: + kputc(p.b.core.qual + 33, buf) + if not p.is_del: + if p.qpos < p.b.core.l_qseq: + cc = <uint8_t>seq_nt16_str[bam_seqi(bam_get_seq(p.b), p.qpos)] + else: + cc = b'N' + + if mark_matches and self.reference_sequence != NULL: + rb = self.reference_sequence[self.reference_pos] + if seq_nt16_table[cc] == seq_nt16_table[rb]: + cc = b'=' + kputc(strand_mark_char(cc, p.b), buf) + elif add_indels: + if p.is_refskip: + if bam_is_rev(p.b): + kputc(b'<', buf) + else: + kputc(b'>', buf) + else: + kputc(b'*', buf) + if add_indels: + if p.indel > 0: + kputc(b'+', buf) + kputw(p.indel, buf) + for j from 1 <= j <= p.indel: + cc = seq_nt16_str[bam_seqi(bam_get_seq(p.b), p.qpos + j)] + kputc(strand_mark_char(cc, p.b), buf) + elif p.indel < 0: + kputc(b'-', buf) + kputw(-p.indel, buf) + for j from 1 <= j <= -p.indel: + # TODO: out-of-range check here? + if self.reference_sequence == NULL: + cc = b'N' + else: + cc = self.reference_sequence[self.reference_pos + j] + kputc(strand_mark_char(cc, p.b), buf) + if mark_ends and p.is_tail: + kputc(b'$', buf) + + kputc(b':', buf) + + if buf.l == 0: + # could be zero if all qualities are too low + return "" + else: + # quicker to ensemble all and split than to encode all separately. + # ignore last ":" + return force_str(PyBytes_FromStringAndSize(buf.s, buf.l-1)).split(":") + + def get_query_qualities(self): + """query base quality scores at pileup column position. + + Returns + ------- + + a list of quality scores : list + """ + cdef uint32_t x = 0 + cdef const bam_pileup1_t * p = NULL + cdef uint32_t c = 0 + result = [] + for x from 0 <= x < self.n_pu: + p = &(self.plp[0][x]) + if p == NULL: + raise ValueError( + "pileup buffer out of sync - most likely use of iterator " + "outside loop") + + if p.qpos < p.b.core.l_qseq: + c = bam_get_qual(p.b)[p.qpos] + else: + c = 0 + if c < self.min_base_quality: + continue + result.append(c) + return result + + def get_mapping_qualities(self): + """query mapping quality scores at pileup column position. + + Returns + ------- + + a list of quality scores : list + """ + if self.plp == NULL or self.plp[0] == NULL: + raise ValueError("PileupColumn accessed after iterator finished") + + cdef uint32_t x = 0 + cdef const bam_pileup1_t * p = NULL + result = [] + for x from 0 <= x < self.n_pu: + p = &(self.plp[0][x]) + if p == NULL: + raise ValueError( + "pileup buffer out of sync - most likely use of iterator " + "outside loop") + + if pileup_base_qual_skip(p, self.min_base_quality): + continue + result.append(p.b.core.qual) + return result + + def get_query_positions(self): + """positions in read at pileup column position. + + Returns + ------- + + a list of read positions : list + """ + if self.plp == NULL or self.plp[0] == NULL: + raise ValueError("PileupColumn accessed after iterator finished") + + cdef uint32_t x = 0 + cdef const bam_pileup1_t * p = NULL + result = [] + for x from 0 <= x < self.n_pu: + p = &(self.plp[0][x]) + if p == NULL: + raise ValueError( + "pileup buffer out of sync - most likely use of iterator " + "outside loop") + + if pileup_base_qual_skip(p, self.min_base_quality): + continue + result.append(p.qpos) + return result + + def get_query_names(self): + """query/read names aligned at pileup column position. + + Returns + ------- + + a list of query names at pileup column position. : list + """ + if self.plp == NULL or self.plp[0] == NULL: + raise ValueError("PileupColumn accessed after iterator finished") + + cdef uint32_t x = 0 + cdef const bam_pileup1_t * p = NULL + result = [] + for x from 0 <= x < self.n_pu: + p = &(self.plp[0][x]) + if p == NULL: + raise ValueError( + "pileup buffer out of sync - most likely use of iterator " + "outside loop") + + if pileup_base_qual_skip(p, self.min_base_quality): + continue + result.append(charptr_to_str(pysam_bam_get_qname(p.b))) + return result + + +cdef class PileupRead: + '''Representation of a read aligned to a particular position in the + reference sequence. + + ''' + + def __init__(self): + raise TypeError( + "this class cannot be instantiated from Python") + + def __str__(self): + return "\t".join( + map(str, + (self.alignment, self.query_position, + self.indel, self.level, + self.is_del, self.is_head, + self.is_tail, self.is_refskip))) + + property alignment: + """a :class:`pysam.AlignedSegment` object of the aligned read""" + def __get__(self): + return self._alignment + + property query_position: + """position of the read base at the pileup site, 0-based. + None if :attr:`is_del` or :attr:`is_refskip` is set. + + """ + def __get__(self): + if self.is_del or self.is_refskip: + return None + else: + return self._qpos + + property query_position_or_next: + """position of the read base at the pileup site, 0-based. + + If the current position is a deletion, returns the next + aligned base. + + """ + def __get__(self): + return self._qpos + + property indel: + """indel length for the position following the current pileup site. + + This quantity peeks ahead to the next cigar operation in this + alignment. If the next operation is an insertion, indel will + be positive. If the next operation is a deletion, it will be + negation. 0 if the next operation is not an indel. + + """ + def __get__(self): + return self._indel + + property level: + """the level of the read in the "viewer" mode. Note that this value + is currently not computed.""" + def __get__(self): + return self._level + + property is_del: + """1 iff the base on the padded read is a deletion""" + def __get__(self): + return self._is_del + + property is_head: + """1 iff the base on the padded read is the left-most base.""" + def __get__(self): + return self._is_head + + property is_tail: + """1 iff the base on the padded read is the right-most base.""" + def __get__(self): + return self._is_tail + + property is_refskip: + """1 iff the base on the padded read is part of CIGAR N op.""" + def __get__(self): + return self._is_refskip + + + +cpdef enum CIGAR_OPS: + CMATCH = 0 + CINS = 1 + CDEL = 2 + CREF_SKIP = 3 + CSOFT_CLIP = 4 + CHARD_CLIP = 5 + CPAD = 6 + CEQUAL = 7 + CDIFF = 8 + CBACK = 9 + + +cpdef enum SAM_FLAGS: + # the read is paired in sequencing, no matter whether it is mapped in a pair + FPAIRED = 1 + # the read is mapped in a proper pair + FPROPER_PAIR = 2 + # the read itself is unmapped; conflictive with FPROPER_PAIR + FUNMAP = 4 + # the mate is unmapped + FMUNMAP = 8 + # the read is mapped to the reverse strand + FREVERSE = 16 + # the mate is mapped to the reverse strand + FMREVERSE = 32 + # this is read1 + FREAD1 = 64 + # this is read2 + FREAD2 = 128 + # not primary alignment + FSECONDARY = 256 + # QC failure + FQCFAIL = 512 + # optical or PCR duplicate + FDUP = 1024 + # supplementary alignment + FSUPPLEMENTARY = 2048 + + +__all__ = [ + "AlignedSegment", + "PileupColumn", + "PileupRead", + "CMATCH", + "CINS", + "CDEL", + "CREF_SKIP", + "CSOFT_CLIP", + "CHARD_CLIP", + "CPAD", + "CEQUAL", + "CDIFF", + "CBACK", + "FPAIRED", + "FPROPER_PAIR", + "FUNMAP", + "FMUNMAP", + "FREVERSE", + "FMREVERSE", + "FREAD1", + "FREAD2", + "FSECONDARY", + "FQCFAIL", + "FDUP", + "FSUPPLEMENTARY", + "KEY_NAMES"]