jpayne@68: # cython: language_level=3 jpayne@68: # cython: embedsignature=True jpayne@68: # cython: profile=True jpayne@68: # adds doc-strings for sphinx jpayne@68: jpayne@68: ######################################################################## jpayne@68: ######################################################################## jpayne@68: ## Cython cimports jpayne@68: ######################################################################## jpayne@68: jpayne@68: from posix.unistd cimport dup jpayne@68: from libc.errno cimport errno jpayne@68: from libc.stdint cimport INT32_MAX jpayne@68: from cpython cimport PyBytes_FromStringAndSize jpayne@68: from pysam.libchtslib cimport * jpayne@68: from pysam.libcutils cimport force_bytes, force_str, charptr_to_str, charptr_to_str_w_len jpayne@68: from pysam.libcutils cimport encode_filename, from_string_and_size, libc_whence_from_io jpayne@68: jpayne@68: jpayne@68: ######################################################################## jpayne@68: ######################################################################## jpayne@68: ## Python imports jpayne@68: ######################################################################## jpayne@68: jpayne@68: import os jpayne@68: import io jpayne@68: import re jpayne@68: from warnings import warn jpayne@68: jpayne@68: jpayne@68: ######################################################################## jpayne@68: ######################################################################## jpayne@68: ## Constants jpayne@68: ######################################################################## jpayne@68: jpayne@68: __all__ = ['get_verbosity', 'set_verbosity', 'HFile', 'HTSFile'] jpayne@68: jpayne@68: # maximum genomic coordinace jpayne@68: cdef int MAX_POS = (1 << 31) - 1 jpayne@68: jpayne@68: cdef tuple FORMAT_CATEGORIES = ('UNKNOWN', 'ALIGNMENTS', 'VARIANTS', 'INDEX', 'REGIONS') jpayne@68: cdef tuple FORMATS = ('UNKNOWN', 'BINARY_FORMAT', 'TEXT_FORMAT', 'SAM', 'BAM', 'BAI', 'CRAM', 'CRAI', jpayne@68: 'VCF', 'BCF', 'CSI', 'GZI', 'TBI', 'BED') jpayne@68: cdef tuple COMPRESSION = ('NONE', 'GZIP', 'BGZF', 'CUSTOM') jpayne@68: jpayne@68: jpayne@68: ######################################################################## jpayne@68: ######################################################################## jpayne@68: ## Verbosity functions jpayne@68: ######################################################################## jpayne@68: jpayne@68: jpayne@68: cpdef set_verbosity(int verbosity): jpayne@68: """Set htslib's hts_verbose global variable to the specified value.""" jpayne@68: return hts_set_verbosity(verbosity) jpayne@68: jpayne@68: cpdef get_verbosity(): jpayne@68: """Return the value of htslib's hts_verbose global variable.""" jpayne@68: return hts_get_verbosity() jpayne@68: jpayne@68: jpayne@68: ######################################################################## jpayne@68: ######################################################################## jpayne@68: ## HFile wrapper class jpayne@68: ######################################################################## jpayne@68: jpayne@68: cdef class HFile(object): jpayne@68: cdef hFILE *fp jpayne@68: cdef readonly object name, mode jpayne@68: jpayne@68: def __init__(self, name, mode='r', closefd=True): jpayne@68: self._open(name, mode, closefd=True) jpayne@68: jpayne@68: def __dealloc__(self): jpayne@68: self.close() jpayne@68: jpayne@68: @property jpayne@68: def closed(self): jpayne@68: return self.fp == NULL jpayne@68: jpayne@68: cdef _open(self, name, mode, closefd=True): jpayne@68: self.name = name jpayne@68: self.mode = mode jpayne@68: jpayne@68: mode = force_bytes(mode) jpayne@68: jpayne@68: if isinstance(name, int): jpayne@68: if self.fp != NULL: jpayne@68: name = dup(name) jpayne@68: self.fp = hdopen(name, mode) jpayne@68: else: jpayne@68: name = encode_filename(name) jpayne@68: self.fp = hopen(name, mode) jpayne@68: jpayne@68: if not self.fp: jpayne@68: raise IOError(errno, 'failed to open HFile', self.name) jpayne@68: jpayne@68: def close(self): jpayne@68: if self.fp == NULL: jpayne@68: return jpayne@68: jpayne@68: cdef hFILE *fp = self.fp jpayne@68: self.fp = NULL jpayne@68: jpayne@68: if hclose(fp) != 0: jpayne@68: raise IOError(errno, 'failed to close HFile', self.name) jpayne@68: jpayne@68: def fileno(self): jpayne@68: if self.fp == NULL: jpayne@68: raise IOError('operation on closed HFile') jpayne@68: if isinstance(self.name, int): jpayne@68: return self.name jpayne@68: else: jpayne@68: raise AttributeError('fileno not available') jpayne@68: jpayne@68: def __enter__(self): jpayne@68: return self jpayne@68: jpayne@68: def __exit__(self, type, value, tb): jpayne@68: self.close() jpayne@68: jpayne@68: def __iter__(self): jpayne@68: return self jpayne@68: jpayne@68: def __next__(self): jpayne@68: line = self.readline() jpayne@68: if not line: jpayne@68: raise StopIteration() jpayne@68: return line jpayne@68: jpayne@68: def flush(self): jpayne@68: if self.fp == NULL: jpayne@68: raise IOError('operation on closed HFile') jpayne@68: if hflush(self.fp) != 0: jpayne@68: raise IOError(herrno(self.fp), 'failed to flush HFile', self.name) jpayne@68: jpayne@68: def isatty(self): jpayne@68: if self.fp == NULL: jpayne@68: raise IOError('operation on closed HFile') jpayne@68: return False jpayne@68: jpayne@68: def readable(self): jpayne@68: return self.fp != NULL and 'r' in self.mode jpayne@68: jpayne@68: def read(self, Py_ssize_t size=-1): jpayne@68: if self.fp == NULL: jpayne@68: raise IOError('operation on closed HFile') jpayne@68: jpayne@68: if size == 0: jpayne@68: return b'' jpayne@68: jpayne@68: cdef list parts = [] jpayne@68: cdef bytes part jpayne@68: cdef Py_ssize_t chunk_size, ret, bytes_read = 0 jpayne@68: cdef char *cpart jpayne@68: jpayne@68: while size == -1 or bytes_read < size: jpayne@68: chunk_size = 4096 jpayne@68: if size != -1: jpayne@68: chunk_size = min(chunk_size, size - bytes_read) jpayne@68: jpayne@68: part = PyBytes_FromStringAndSize(NULL, chunk_size) jpayne@68: cpart = part jpayne@68: ret = hread(self.fp, cpart, chunk_size) jpayne@68: jpayne@68: if ret < 0: jpayne@68: IOError(herrno(self.fp), 'failed to read HFile', self.name) jpayne@68: elif not ret: jpayne@68: break jpayne@68: jpayne@68: bytes_read += ret jpayne@68: jpayne@68: if ret < chunk_size: jpayne@68: part = cpart[:ret] jpayne@68: jpayne@68: parts.append(part) jpayne@68: jpayne@68: return b''.join(parts) jpayne@68: jpayne@68: def readall(self): jpayne@68: return self.read() jpayne@68: jpayne@68: def readinto(self, buf): jpayne@68: if self.fp == NULL: jpayne@68: raise IOError('operation on closed HFile') jpayne@68: jpayne@68: size = len(buf) jpayne@68: jpayne@68: if size == 0: jpayne@68: return size jpayne@68: jpayne@68: mv = memoryview(buf) jpayne@68: ret = hread(self.fp, mv, size) jpayne@68: jpayne@68: if ret < 0: jpayne@68: IOError(herrno(self.fp), 'failed to read HFile', self.name) jpayne@68: jpayne@68: return ret jpayne@68: jpayne@68: def readline(self, Py_ssize_t size=-1): jpayne@68: if self.fp == NULL: jpayne@68: raise IOError('operation on closed HFile') jpayne@68: jpayne@68: if size == 0: jpayne@68: return b'' jpayne@68: jpayne@68: cdef list parts = [] jpayne@68: cdef bytes part jpayne@68: cdef Py_ssize_t chunk_size, ret, bytes_read = 0 jpayne@68: cdef char *cpart jpayne@68: jpayne@68: while size == -1 or bytes_read < size: jpayne@68: chunk_size = 4096 jpayne@68: if size != -1: jpayne@68: chunk_size = min(chunk_size, size - bytes_read) jpayne@68: jpayne@68: part = PyBytes_FromStringAndSize(NULL, chunk_size) jpayne@68: cpart = part jpayne@68: jpayne@68: # Python bytes objects allocate an extra byte for a null terminator jpayne@68: ret = hgetln(cpart, chunk_size+1, self.fp) jpayne@68: jpayne@68: if ret < 0: jpayne@68: IOError(herrno(self.fp), 'failed to read HFile', self.name) jpayne@68: elif not ret: jpayne@68: break jpayne@68: jpayne@68: bytes_read += ret jpayne@68: jpayne@68: if ret < chunk_size: jpayne@68: part = cpart[:ret] jpayne@68: cpart = part jpayne@68: jpayne@68: parts.append(part) jpayne@68: jpayne@68: if cpart[ret-1] == b'\n': jpayne@68: break jpayne@68: jpayne@68: return b''.join(parts) jpayne@68: jpayne@68: def readlines(self): jpayne@68: return list(self) jpayne@68: jpayne@68: def seek(self, Py_ssize_t offset, int whence=io.SEEK_SET): jpayne@68: if self.fp == NULL: jpayne@68: raise IOError('operation on closed HFile') jpayne@68: jpayne@68: cdef Py_ssize_t off = hseek(self.fp, offset, libc_whence_from_io(whence)) jpayne@68: jpayne@68: if off < 0: jpayne@68: raise IOError(herrno(self.fp), 'seek failed on HFile', self.name) jpayne@68: jpayne@68: return off jpayne@68: jpayne@68: def tell(self): jpayne@68: if self.fp == NULL: jpayne@68: raise IOError('operation on closed HFile') jpayne@68: jpayne@68: ret = htell(self.fp) jpayne@68: jpayne@68: if ret < 0: jpayne@68: raise IOError(herrno(self.fp), 'tell failed on HFile', self.name) jpayne@68: jpayne@68: return ret jpayne@68: jpayne@68: def seekable(self): jpayne@68: return self.fp != NULL jpayne@68: jpayne@68: def truncate(self, size=None): jpayne@68: raise NotImplementedError() jpayne@68: jpayne@68: def writable(self): jpayne@68: return self.fp != NULL and 'w' in self.mode jpayne@68: jpayne@68: def write(self, bytes b): jpayne@68: if self.fp == NULL: jpayne@68: raise IOError('operation on closed HFile') jpayne@68: jpayne@68: got = hwrite(self.fp, b, len(b)) jpayne@68: jpayne@68: if got < 0: jpayne@68: raise IOError(herrno(self.fp), 'write failed on HFile', self.name) jpayne@68: jpayne@68: return got jpayne@68: jpayne@68: def writelines(self, lines): jpayne@68: for line in lines: jpayne@68: self.write(line) jpayne@68: jpayne@68: jpayne@68: ######################################################################## jpayne@68: ######################################################################## jpayne@68: ## Helpers for backward compatibility to hide the difference between jpayne@68: ## boolean properties and methods jpayne@68: ######################################################################## jpayne@68: jpayne@68: class CallableValue(object): jpayne@68: def __init__(self, value): jpayne@68: self.value = value jpayne@68: def __call__(self): jpayne@68: return self.value jpayne@68: def __bool__(self): jpayne@68: return self.value jpayne@68: def __nonzero__(self): jpayne@68: return self.value jpayne@68: def __eq__(self, other): jpayne@68: return self.value == other jpayne@68: def __ne__(self, other): jpayne@68: return self.value != other jpayne@68: jpayne@68: jpayne@68: CTrue = CallableValue(True) jpayne@68: CFalse = CallableValue(False) jpayne@68: jpayne@68: jpayne@68: ######################################################################## jpayne@68: ######################################################################## jpayne@68: ## HTSFile wrapper class (base class for AlignmentFile and VariantFile) jpayne@68: ######################################################################## jpayne@68: jpayne@68: cdef class HTSFile(object): jpayne@68: """ jpayne@68: Base class for HTS file types jpayne@68: """ jpayne@68: jpayne@68: def __cinit__(self, *args, **kwargs): jpayne@68: self.htsfile = NULL jpayne@68: self.threads = 1 jpayne@68: self.duplicate_filehandle = True jpayne@68: jpayne@68: def close(self): jpayne@68: if self.htsfile: jpayne@68: hts_close(self.htsfile) jpayne@68: self.htsfile = NULL jpayne@68: jpayne@68: def __dealloc__(self): jpayne@68: if self.htsfile: jpayne@68: hts_close(self.htsfile) jpayne@68: self.htsfile = NULL jpayne@68: jpayne@68: def flush(self): jpayne@68: """Flush any buffered data to the underlying output stream.""" jpayne@68: if self.htsfile: jpayne@68: if hts_flush(self.htsfile) < 0: jpayne@68: raise OSError(errno, f'Flushing {type(self).__name__} failed', force_str(self.filename)) jpayne@68: jpayne@68: def check_truncation(self, ignore_truncation=False): jpayne@68: """Check if file is truncated.""" jpayne@68: if not self.htsfile: jpayne@68: return jpayne@68: jpayne@68: if self.htsfile.format.compression != bgzf: jpayne@68: return jpayne@68: jpayne@68: cdef BGZF *bgzfp = hts_get_bgzfp(self.htsfile) jpayne@68: if not bgzfp: jpayne@68: return jpayne@68: jpayne@68: cdef int ret = bgzf_check_EOF(bgzfp) jpayne@68: if ret < 0: jpayne@68: raise IOError(errno, 'error checking for EOF marker') jpayne@68: elif ret == 0: jpayne@68: msg = 'no BGZF EOF marker; file may be truncated'.format(self.filename) jpayne@68: if ignore_truncation: jpayne@68: warn(msg) jpayne@68: else: jpayne@68: raise IOError(msg) jpayne@68: jpayne@68: def __enter__(self): jpayne@68: return self jpayne@68: jpayne@68: def __exit__(self, exc_type, exc_value, traceback): jpayne@68: self.close() jpayne@68: return False jpayne@68: jpayne@68: @property jpayne@68: def category(self): jpayne@68: """General file format category. One of UNKNOWN, ALIGNMENTS, jpayne@68: VARIANTS, INDEX, REGIONS""" jpayne@68: if not self.htsfile: jpayne@68: raise ValueError('metadata not available on closed file') jpayne@68: return FORMAT_CATEGORIES[self.htsfile.format.category] jpayne@68: jpayne@68: @property jpayne@68: def format(self): jpayne@68: """File format. jpayne@68: jpayne@68: One of UNKNOWN, BINARY_FORMAT, TEXT_FORMAT, SAM, BAM, jpayne@68: BAI, CRAM, CRAI, VCF, BCF, CSI, GZI, TBI, BED. jpayne@68: """ jpayne@68: if not self.htsfile: jpayne@68: raise ValueError('metadata not available on closed file') jpayne@68: return FORMATS[self.htsfile.format.format] jpayne@68: jpayne@68: @property jpayne@68: def version(self): jpayne@68: """Tuple of file format version numbers (major, minor)""" jpayne@68: if not self.htsfile: jpayne@68: raise ValueError('metadata not available on closed file') jpayne@68: return self.htsfile.format.version.major, self.htsfile.format.version.minor jpayne@68: jpayne@68: @property jpayne@68: def compression(self): jpayne@68: """File compression. jpayne@68: jpayne@68: One of NONE, GZIP, BGZF, CUSTOM.""" jpayne@68: if not self.htsfile: jpayne@68: raise ValueError('metadata not available on closed file') jpayne@68: return COMPRESSION[self.htsfile.format.compression] jpayne@68: jpayne@68: @property jpayne@68: def description(self): jpayne@68: """Vaguely human readable description of the file format""" jpayne@68: if not self.htsfile: jpayne@68: raise ValueError('metadata not available on closed file') jpayne@68: cdef char *desc = hts_format_description(&self.htsfile.format) jpayne@68: try: jpayne@68: return charptr_to_str(desc) jpayne@68: finally: jpayne@68: free(desc) jpayne@68: jpayne@68: @property jpayne@68: def is_open(self): jpayne@68: """return True if HTSFile is open and in a valid state.""" jpayne@68: return CTrue if self.htsfile != NULL else CFalse jpayne@68: jpayne@68: @property jpayne@68: def is_closed(self): jpayne@68: """return True if HTSFile is closed.""" jpayne@68: return self.htsfile == NULL jpayne@68: jpayne@68: @property jpayne@68: def closed(self): jpayne@68: """return True if HTSFile is closed.""" jpayne@68: return self.htsfile == NULL jpayne@68: jpayne@68: @property jpayne@68: def is_write(self): jpayne@68: """return True if HTSFile is open for writing""" jpayne@68: return self.htsfile != NULL and self.htsfile.is_write != 0 jpayne@68: jpayne@68: @property jpayne@68: def is_read(self): jpayne@68: """return True if HTSFile is open for reading""" jpayne@68: return self.htsfile != NULL and self.htsfile.is_write == 0 jpayne@68: jpayne@68: @property jpayne@68: def is_sam(self): jpayne@68: """return True if HTSFile is reading or writing a SAM alignment file""" jpayne@68: return self.htsfile != NULL and self.htsfile.format.format == sam jpayne@68: jpayne@68: @property jpayne@68: def is_bam(self): jpayne@68: """return True if HTSFile is reading or writing a BAM alignment file""" jpayne@68: return self.htsfile != NULL and self.htsfile.format.format == bam jpayne@68: jpayne@68: @property jpayne@68: def is_cram(self): jpayne@68: """return True if HTSFile is reading or writing a BAM alignment file""" jpayne@68: return self.htsfile != NULL and self.htsfile.format.format == cram jpayne@68: jpayne@68: @property jpayne@68: def is_vcf(self): jpayne@68: """return True if HTSFile is reading or writing a VCF variant file""" jpayne@68: return self.htsfile != NULL and self.htsfile.format.format == vcf jpayne@68: jpayne@68: @property jpayne@68: def is_bcf(self): jpayne@68: """return True if HTSFile is reading or writing a BCF variant file""" jpayne@68: return self.htsfile != NULL and self.htsfile.format.format == bcf jpayne@68: jpayne@68: def reset(self): jpayne@68: """reset file position to beginning of file just after the header. jpayne@68: jpayne@68: Returns jpayne@68: ------- jpayne@68: jpayne@68: The file position after moving the file pointer. : pointer jpayne@68: jpayne@68: """ jpayne@68: return self.seek(self.start_offset) jpayne@68: jpayne@68: def seek(self, uint64_t offset, int whence=io.SEEK_SET): jpayne@68: """move file pointer to position *offset*, see :meth:`pysam.HTSFile.tell`.""" jpayne@68: if not self.is_open: jpayne@68: raise ValueError('I/O operation on closed file') jpayne@68: if self.is_stream: jpayne@68: raise IOError('seek not available in streams') jpayne@68: jpayne@68: whence = libc_whence_from_io(whence) jpayne@68: jpayne@68: cdef int64_t ret jpayne@68: if self.htsfile.format.compression == bgzf: jpayne@68: with nogil: jpayne@68: ret = bgzf_seek(hts_get_bgzfp(self.htsfile), offset, whence) jpayne@68: elif self.htsfile.format.compression == no_compression: jpayne@68: ret = 0 if (hseek(self.htsfile.fp.hfile, offset, whence) >= 0) else -1 jpayne@68: else: jpayne@68: raise NotImplementedError("seek not implemented in files compressed by method {}".format( jpayne@68: self.htsfile.format.compression)) jpayne@68: return ret jpayne@68: jpayne@68: def tell(self): jpayne@68: """return current file position, see :meth:`pysam.HTSFile.seek`.""" jpayne@68: if not self.is_open: jpayne@68: raise ValueError('I/O operation on closed file') jpayne@68: if self.is_stream: jpayne@68: raise IOError('tell not available in streams') jpayne@68: jpayne@68: cdef int64_t ret jpayne@68: if self.htsfile.format.compression == bgzf: jpayne@68: with nogil: jpayne@68: ret = bgzf_tell(hts_get_bgzfp(self.htsfile)) jpayne@68: elif self.htsfile.format.compression == no_compression: jpayne@68: ret = htell(self.htsfile.fp.hfile) jpayne@68: elif self.htsfile.format.format == cram: jpayne@68: with nogil: jpayne@68: ret = htell(cram_fd_get_fp(self.htsfile.fp.cram)) jpayne@68: else: jpayne@68: raise NotImplementedError("seek not implemented in files compressed by method {}".format( jpayne@68: self.htsfile.format.compression)) jpayne@68: jpayne@68: return ret jpayne@68: jpayne@68: cdef htsFile *_open_htsfile(self) except? NULL: jpayne@68: cdef char *cfilename jpayne@68: cdef char *cmode = self.mode jpayne@68: cdef int fd, dup_fd, threads jpayne@68: jpayne@68: threads = self.threads - 1 jpayne@68: if isinstance(self.filename, bytes): jpayne@68: cfilename = self.filename jpayne@68: with nogil: jpayne@68: htsfile = hts_open(cfilename, cmode) jpayne@68: if htsfile != NULL: jpayne@68: hts_set_threads(htsfile, threads) jpayne@68: return htsfile jpayne@68: else: jpayne@68: if isinstance(self.filename, int): jpayne@68: fd = self.filename jpayne@68: else: jpayne@68: fd = self.filename.fileno() jpayne@68: jpayne@68: if self.duplicate_filehandle: jpayne@68: dup_fd = dup(fd) jpayne@68: else: jpayne@68: dup_fd = fd jpayne@68: jpayne@68: # Replicate mode normalization done in hts_open_format jpayne@68: smode = self.mode.replace(b'b', b'').replace(b'c', b'') jpayne@68: if b'b' in self.mode: jpayne@68: smode += b'b' jpayne@68: elif b'c' in self.mode: jpayne@68: smode += b'c' jpayne@68: cmode = smode jpayne@68: jpayne@68: hfile = hdopen(dup_fd, cmode) jpayne@68: if hfile == NULL: jpayne@68: raise IOError('Cannot create hfile') jpayne@68: jpayne@68: try: jpayne@68: # filename.name can be an int jpayne@68: filename = str(self.filename.name) jpayne@68: except AttributeError: jpayne@68: filename = ''.format(fd) jpayne@68: jpayne@68: filename = encode_filename(filename) jpayne@68: cfilename = filename jpayne@68: with nogil: jpayne@68: htsfile = hts_hopen(hfile, cfilename, cmode) jpayne@68: if htsfile != NULL: jpayne@68: hts_set_threads(htsfile, threads) jpayne@68: return htsfile jpayne@68: jpayne@68: def add_hts_options(self, format_options=None): jpayne@68: """Given a list of key=value format option strings, add them to an open htsFile jpayne@68: """ jpayne@68: cdef int rval jpayne@68: cdef hts_opt *opts = NULL jpayne@68: jpayne@68: if format_options: jpayne@68: for format_option in format_options: jpayne@68: rval = hts_opt_add(&opts, format_option) jpayne@68: if rval != 0: jpayne@68: if opts != NULL: jpayne@68: hts_opt_free(opts) jpayne@68: raise RuntimeError('Invalid format option ({}) specified'.format(format_option)) jpayne@68: if opts != NULL: jpayne@68: rval = hts_opt_apply(self.htsfile, opts) jpayne@68: if rval != 0: jpayne@68: hts_opt_free(opts) jpayne@68: raise RuntimeError('An error occurred while applying the requested format options') jpayne@68: hts_opt_free(opts) jpayne@68: jpayne@68: def parse_region(self, contig=None, start=None, stop=None, jpayne@68: region=None, tid=None, jpayne@68: reference=None, end=None): jpayne@68: """parse alternative ways to specify a genomic region. A region can jpayne@68: either be specified by :term:`contig`, `start` and jpayne@68: `stop`. `start` and `stop` denote 0-based, half-open jpayne@68: intervals. :term:`reference` and `end` are also accepted for jpayne@68: backward compatibility as synonyms for :term:`contig` and jpayne@68: `stop`, respectively. jpayne@68: jpayne@68: Alternatively, a samtools :term:`region` string can be jpayne@68: supplied. jpayne@68: jpayne@68: If any of the coordinates are missing they will be replaced by jpayne@68: the minimum (`start`) or maximum (`stop`) coordinate. jpayne@68: jpayne@68: Note that region strings are 1-based inclusive, while `start` jpayne@68: and `stop` denote an interval in 0-based, half-open jpayne@68: coordinates (like BED files and Python slices). jpayne@68: jpayne@68: If `contig` or `region` or are ``*``, unmapped reads at the end jpayne@68: of a BAM file will be returned. Setting either to ``.`` will jpayne@68: iterate from the beginning of the file. jpayne@68: jpayne@68: Returns jpayne@68: ------- jpayne@68: jpayne@68: tuple : jpayne@68: a tuple of `flag`, :term:`tid`, `start` and jpayne@68: `stop`. The flag indicates whether no coordinates were jpayne@68: supplied and the genomic region is the complete genomic space. jpayne@68: jpayne@68: Raises jpayne@68: ------ jpayne@68: jpayne@68: ValueError jpayne@68: for invalid or out of bounds regions. jpayne@68: jpayne@68: """ jpayne@68: cdef int rtid jpayne@68: cdef int32_t rstart jpayne@68: cdef int32_t rstop jpayne@68: jpayne@68: if reference is not None: jpayne@68: if contig is not None: jpayne@68: raise ValueError('contig and reference should not both be specified') jpayne@68: contig = reference jpayne@68: jpayne@68: if end is not None: jpayne@68: if stop is not None: jpayne@68: raise ValueError('stop and end should not both be specified') jpayne@68: stop = end jpayne@68: jpayne@68: if contig is None and tid is None and region is None: jpayne@68: return 0, 0, 0, MAX_POS jpayne@68: jpayne@68: rtid = -1 jpayne@68: rstart = 0 jpayne@68: rstop = MAX_POS jpayne@68: if start is not None: jpayne@68: try: jpayne@68: rstart = start jpayne@68: except OverflowError: jpayne@68: raise ValueError('start out of range (%i)' % start) jpayne@68: jpayne@68: if stop is not None: jpayne@68: try: jpayne@68: rstop = stop jpayne@68: except OverflowError: jpayne@68: raise ValueError('stop out of range (%i)' % stop) jpayne@68: jpayne@68: if region: jpayne@68: region = force_str(region) jpayne@68: if ":" in region: jpayne@68: contig, coord = region.split(":") jpayne@68: parts = coord.split("-") jpayne@68: rstart = int(parts[0]) - 1 jpayne@68: if len(parts) >= 1: jpayne@68: rstop = int(parts[1]) jpayne@68: else: jpayne@68: contig = region jpayne@68: jpayne@68: if tid is not None: jpayne@68: if not self.is_valid_tid(tid): jpayne@68: raise IndexError('invalid tid') jpayne@68: rtid = tid jpayne@68: else: jpayne@68: if contig == "*": jpayne@68: rtid = HTS_IDX_NOCOOR jpayne@68: elif contig == ".": jpayne@68: rtid = HTS_IDX_START jpayne@68: else: jpayne@68: rtid = self.get_tid(contig) jpayne@68: if rtid < 0: jpayne@68: raise ValueError('invalid contig `%s`' % contig) jpayne@68: jpayne@68: if rstart > rstop: jpayne@68: raise ValueError('invalid coordinates: start (%i) > stop (%i)' % (rstart, rstop)) jpayne@68: if not 0 <= rstart < MAX_POS: jpayne@68: raise ValueError('start out of range (%i)' % rstart) jpayne@68: if not 0 <= rstop <= MAX_POS: jpayne@68: raise ValueError('stop out of range (%i)' % rstop) jpayne@68: jpayne@68: return 1, rtid, rstart, rstop jpayne@68: jpayne@68: def is_valid_tid(self, tid): jpayne@68: """ jpayne@68: return True if the numerical :term:`tid` is valid; False otherwise. jpayne@68: jpayne@68: returns -1 if contig is not known. jpayne@68: """ jpayne@68: raise NotImplementedError() jpayne@68: jpayne@68: def is_valid_reference_name(self, contig): jpayne@68: """ jpayne@68: return True if the contig name :term:`contig` is valid; False otherwise. jpayne@68: """ jpayne@68: return self.get_tid(contig) != -1 jpayne@68: jpayne@68: def get_tid(self, contig): jpayne@68: """ jpayne@68: return the numerical :term:`tid` corresponding to jpayne@68: :term:`contig` jpayne@68: jpayne@68: returns -1 if contig is not known. jpayne@68: """ jpayne@68: raise NotImplementedError() jpayne@68: jpayne@68: def get_reference_name(self, tid): jpayne@68: """ jpayne@68: return :term:`contig` name corresponding to numerical :term:`tid` jpayne@68: """ jpayne@68: raise NotImplementedError()