jpayne@69: # cython: language_level=3 jpayne@69: # cython: embedsignature=True jpayne@69: # cython: profile=True jpayne@69: # adds doc-strings for sphinx jpayne@69: jpayne@69: ######################################################################## jpayne@69: ######################################################################## jpayne@69: ## Cython cimports jpayne@69: ######################################################################## jpayne@69: jpayne@69: from posix.unistd cimport dup jpayne@69: from libc.errno cimport errno jpayne@69: from libc.stdint cimport INT32_MAX jpayne@69: from cpython cimport PyBytes_FromStringAndSize jpayne@69: from pysam.libchtslib cimport * jpayne@69: from pysam.libcutils cimport force_bytes, force_str, charptr_to_str, charptr_to_str_w_len jpayne@69: from pysam.libcutils cimport encode_filename, from_string_and_size, libc_whence_from_io jpayne@69: jpayne@69: jpayne@69: ######################################################################## jpayne@69: ######################################################################## jpayne@69: ## Python imports jpayne@69: ######################################################################## jpayne@69: jpayne@69: import os jpayne@69: import io jpayne@69: import re jpayne@69: from warnings import warn jpayne@69: jpayne@69: jpayne@69: ######################################################################## jpayne@69: ######################################################################## jpayne@69: ## Constants jpayne@69: ######################################################################## jpayne@69: jpayne@69: __all__ = ['get_verbosity', 'set_verbosity', 'HFile', 'HTSFile'] jpayne@69: jpayne@69: # maximum genomic coordinace jpayne@69: cdef int MAX_POS = (1 << 31) - 1 jpayne@69: jpayne@69: cdef tuple FORMAT_CATEGORIES = ('UNKNOWN', 'ALIGNMENTS', 'VARIANTS', 'INDEX', 'REGIONS') jpayne@69: cdef tuple FORMATS = ('UNKNOWN', 'BINARY_FORMAT', 'TEXT_FORMAT', 'SAM', 'BAM', 'BAI', 'CRAM', 'CRAI', jpayne@69: 'VCF', 'BCF', 'CSI', 'GZI', 'TBI', 'BED') jpayne@69: cdef tuple COMPRESSION = ('NONE', 'GZIP', 'BGZF', 'CUSTOM') jpayne@69: jpayne@69: jpayne@69: ######################################################################## jpayne@69: ######################################################################## jpayne@69: ## Verbosity functions jpayne@69: ######################################################################## jpayne@69: jpayne@69: jpayne@69: cpdef set_verbosity(int verbosity): jpayne@69: """Set htslib's hts_verbose global variable to the specified value.""" jpayne@69: return hts_set_verbosity(verbosity) jpayne@69: jpayne@69: cpdef get_verbosity(): jpayne@69: """Return the value of htslib's hts_verbose global variable.""" jpayne@69: return hts_get_verbosity() jpayne@69: jpayne@69: jpayne@69: ######################################################################## jpayne@69: ######################################################################## jpayne@69: ## HFile wrapper class jpayne@69: ######################################################################## jpayne@69: jpayne@69: cdef class HFile(object): jpayne@69: cdef hFILE *fp jpayne@69: cdef readonly object name, mode jpayne@69: jpayne@69: def __init__(self, name, mode='r', closefd=True): jpayne@69: self._open(name, mode, closefd=True) jpayne@69: jpayne@69: def __dealloc__(self): jpayne@69: self.close() jpayne@69: jpayne@69: @property jpayne@69: def closed(self): jpayne@69: return self.fp == NULL jpayne@69: jpayne@69: cdef _open(self, name, mode, closefd=True): jpayne@69: self.name = name jpayne@69: self.mode = mode jpayne@69: jpayne@69: mode = force_bytes(mode) jpayne@69: jpayne@69: if isinstance(name, int): jpayne@69: if self.fp != NULL: jpayne@69: name = dup(name) jpayne@69: self.fp = hdopen(name, mode) jpayne@69: else: jpayne@69: name = encode_filename(name) jpayne@69: self.fp = hopen(name, mode) jpayne@69: jpayne@69: if not self.fp: jpayne@69: raise IOError(errno, 'failed to open HFile', self.name) jpayne@69: jpayne@69: def close(self): jpayne@69: if self.fp == NULL: jpayne@69: return jpayne@69: jpayne@69: cdef hFILE *fp = self.fp jpayne@69: self.fp = NULL jpayne@69: jpayne@69: if hclose(fp) != 0: jpayne@69: raise IOError(errno, 'failed to close HFile', self.name) jpayne@69: jpayne@69: def fileno(self): jpayne@69: if self.fp == NULL: jpayne@69: raise IOError('operation on closed HFile') jpayne@69: if isinstance(self.name, int): jpayne@69: return self.name jpayne@69: else: jpayne@69: raise AttributeError('fileno not available') jpayne@69: jpayne@69: def __enter__(self): jpayne@69: return self jpayne@69: jpayne@69: def __exit__(self, type, value, tb): jpayne@69: self.close() jpayne@69: jpayne@69: def __iter__(self): jpayne@69: return self jpayne@69: jpayne@69: def __next__(self): jpayne@69: line = self.readline() jpayne@69: if not line: jpayne@69: raise StopIteration() jpayne@69: return line jpayne@69: jpayne@69: def flush(self): jpayne@69: if self.fp == NULL: jpayne@69: raise IOError('operation on closed HFile') jpayne@69: if hflush(self.fp) != 0: jpayne@69: raise IOError(herrno(self.fp), 'failed to flush HFile', self.name) jpayne@69: jpayne@69: def isatty(self): jpayne@69: if self.fp == NULL: jpayne@69: raise IOError('operation on closed HFile') jpayne@69: return False jpayne@69: jpayne@69: def readable(self): jpayne@69: return self.fp != NULL and 'r' in self.mode jpayne@69: jpayne@69: def read(self, Py_ssize_t size=-1): jpayne@69: if self.fp == NULL: jpayne@69: raise IOError('operation on closed HFile') jpayne@69: jpayne@69: if size == 0: jpayne@69: return b'' jpayne@69: jpayne@69: cdef list parts = [] jpayne@69: cdef bytes part jpayne@69: cdef Py_ssize_t chunk_size, ret, bytes_read = 0 jpayne@69: cdef char *cpart jpayne@69: jpayne@69: while size == -1 or bytes_read < size: jpayne@69: chunk_size = 4096 jpayne@69: if size != -1: jpayne@69: chunk_size = min(chunk_size, size - bytes_read) jpayne@69: jpayne@69: part = PyBytes_FromStringAndSize(NULL, chunk_size) jpayne@69: cpart = part jpayne@69: ret = hread(self.fp, cpart, chunk_size) jpayne@69: jpayne@69: if ret < 0: jpayne@69: IOError(herrno(self.fp), 'failed to read HFile', self.name) jpayne@69: elif not ret: jpayne@69: break jpayne@69: jpayne@69: bytes_read += ret jpayne@69: jpayne@69: if ret < chunk_size: jpayne@69: part = cpart[:ret] jpayne@69: jpayne@69: parts.append(part) jpayne@69: jpayne@69: return b''.join(parts) jpayne@69: jpayne@69: def readall(self): jpayne@69: return self.read() jpayne@69: jpayne@69: def readinto(self, buf): jpayne@69: if self.fp == NULL: jpayne@69: raise IOError('operation on closed HFile') jpayne@69: jpayne@69: size = len(buf) jpayne@69: jpayne@69: if size == 0: jpayne@69: return size jpayne@69: jpayne@69: mv = memoryview(buf) jpayne@69: ret = hread(self.fp, mv, size) jpayne@69: jpayne@69: if ret < 0: jpayne@69: IOError(herrno(self.fp), 'failed to read HFile', self.name) jpayne@69: jpayne@69: return ret jpayne@69: jpayne@69: def readline(self, Py_ssize_t size=-1): jpayne@69: if self.fp == NULL: jpayne@69: raise IOError('operation on closed HFile') jpayne@69: jpayne@69: if size == 0: jpayne@69: return b'' jpayne@69: jpayne@69: cdef list parts = [] jpayne@69: cdef bytes part jpayne@69: cdef Py_ssize_t chunk_size, ret, bytes_read = 0 jpayne@69: cdef char *cpart jpayne@69: jpayne@69: while size == -1 or bytes_read < size: jpayne@69: chunk_size = 4096 jpayne@69: if size != -1: jpayne@69: chunk_size = min(chunk_size, size - bytes_read) jpayne@69: jpayne@69: part = PyBytes_FromStringAndSize(NULL, chunk_size) jpayne@69: cpart = part jpayne@69: jpayne@69: # Python bytes objects allocate an extra byte for a null terminator jpayne@69: ret = hgetln(cpart, chunk_size+1, self.fp) jpayne@69: jpayne@69: if ret < 0: jpayne@69: IOError(herrno(self.fp), 'failed to read HFile', self.name) jpayne@69: elif not ret: jpayne@69: break jpayne@69: jpayne@69: bytes_read += ret jpayne@69: jpayne@69: if ret < chunk_size: jpayne@69: part = cpart[:ret] jpayne@69: cpart = part jpayne@69: jpayne@69: parts.append(part) jpayne@69: jpayne@69: if cpart[ret-1] == b'\n': jpayne@69: break jpayne@69: jpayne@69: return b''.join(parts) jpayne@69: jpayne@69: def readlines(self): jpayne@69: return list(self) jpayne@69: jpayne@69: def seek(self, Py_ssize_t offset, int whence=io.SEEK_SET): jpayne@69: if self.fp == NULL: jpayne@69: raise IOError('operation on closed HFile') jpayne@69: jpayne@69: cdef Py_ssize_t off = hseek(self.fp, offset, libc_whence_from_io(whence)) jpayne@69: jpayne@69: if off < 0: jpayne@69: raise IOError(herrno(self.fp), 'seek failed on HFile', self.name) jpayne@69: jpayne@69: return off jpayne@69: jpayne@69: def tell(self): jpayne@69: if self.fp == NULL: jpayne@69: raise IOError('operation on closed HFile') jpayne@69: jpayne@69: ret = htell(self.fp) jpayne@69: jpayne@69: if ret < 0: jpayne@69: raise IOError(herrno(self.fp), 'tell failed on HFile', self.name) jpayne@69: jpayne@69: return ret jpayne@69: jpayne@69: def seekable(self): jpayne@69: return self.fp != NULL jpayne@69: jpayne@69: def truncate(self, size=None): jpayne@69: raise NotImplementedError() jpayne@69: jpayne@69: def writable(self): jpayne@69: return self.fp != NULL and 'w' in self.mode jpayne@69: jpayne@69: def write(self, bytes b): jpayne@69: if self.fp == NULL: jpayne@69: raise IOError('operation on closed HFile') jpayne@69: jpayne@69: got = hwrite(self.fp, b, len(b)) jpayne@69: jpayne@69: if got < 0: jpayne@69: raise IOError(herrno(self.fp), 'write failed on HFile', self.name) jpayne@69: jpayne@69: return got jpayne@69: jpayne@69: def writelines(self, lines): jpayne@69: for line in lines: jpayne@69: self.write(line) jpayne@69: jpayne@69: jpayne@69: ######################################################################## jpayne@69: ######################################################################## jpayne@69: ## Helpers for backward compatibility to hide the difference between jpayne@69: ## boolean properties and methods jpayne@69: ######################################################################## jpayne@69: jpayne@69: class CallableValue(object): jpayne@69: def __init__(self, value): jpayne@69: self.value = value jpayne@69: def __call__(self): jpayne@69: return self.value jpayne@69: def __bool__(self): jpayne@69: return self.value jpayne@69: def __nonzero__(self): jpayne@69: return self.value jpayne@69: def __eq__(self, other): jpayne@69: return self.value == other jpayne@69: def __ne__(self, other): jpayne@69: return self.value != other jpayne@69: jpayne@69: jpayne@69: CTrue = CallableValue(True) jpayne@69: CFalse = CallableValue(False) jpayne@69: jpayne@69: jpayne@69: ######################################################################## jpayne@69: ######################################################################## jpayne@69: ## HTSFile wrapper class (base class for AlignmentFile and VariantFile) jpayne@69: ######################################################################## jpayne@69: jpayne@69: cdef class HTSFile(object): jpayne@69: """ jpayne@69: Base class for HTS file types jpayne@69: """ jpayne@69: jpayne@69: def __cinit__(self, *args, **kwargs): jpayne@69: self.htsfile = NULL jpayne@69: self.threads = 1 jpayne@69: self.duplicate_filehandle = True jpayne@69: jpayne@69: def close(self): jpayne@69: if self.htsfile: jpayne@69: hts_close(self.htsfile) jpayne@69: self.htsfile = NULL jpayne@69: jpayne@69: def __dealloc__(self): jpayne@69: if self.htsfile: jpayne@69: hts_close(self.htsfile) jpayne@69: self.htsfile = NULL jpayne@69: jpayne@69: def flush(self): jpayne@69: """Flush any buffered data to the underlying output stream.""" jpayne@69: if self.htsfile: jpayne@69: if hts_flush(self.htsfile) < 0: jpayne@69: raise OSError(errno, f'Flushing {type(self).__name__} failed', force_str(self.filename)) jpayne@69: jpayne@69: def check_truncation(self, ignore_truncation=False): jpayne@69: """Check if file is truncated.""" jpayne@69: if not self.htsfile: jpayne@69: return jpayne@69: jpayne@69: if self.htsfile.format.compression != bgzf: jpayne@69: return jpayne@69: jpayne@69: cdef BGZF *bgzfp = hts_get_bgzfp(self.htsfile) jpayne@69: if not bgzfp: jpayne@69: return jpayne@69: jpayne@69: cdef int ret = bgzf_check_EOF(bgzfp) jpayne@69: if ret < 0: jpayne@69: raise IOError(errno, 'error checking for EOF marker') jpayne@69: elif ret == 0: jpayne@69: msg = 'no BGZF EOF marker; file may be truncated'.format(self.filename) jpayne@69: if ignore_truncation: jpayne@69: warn(msg) jpayne@69: else: jpayne@69: raise IOError(msg) jpayne@69: jpayne@69: def __enter__(self): jpayne@69: return self jpayne@69: jpayne@69: def __exit__(self, exc_type, exc_value, traceback): jpayne@69: self.close() jpayne@69: return False jpayne@69: jpayne@69: @property jpayne@69: def category(self): jpayne@69: """General file format category. One of UNKNOWN, ALIGNMENTS, jpayne@69: VARIANTS, INDEX, REGIONS""" jpayne@69: if not self.htsfile: jpayne@69: raise ValueError('metadata not available on closed file') jpayne@69: return FORMAT_CATEGORIES[self.htsfile.format.category] jpayne@69: jpayne@69: @property jpayne@69: def format(self): jpayne@69: """File format. jpayne@69: jpayne@69: One of UNKNOWN, BINARY_FORMAT, TEXT_FORMAT, SAM, BAM, jpayne@69: BAI, CRAM, CRAI, VCF, BCF, CSI, GZI, TBI, BED. jpayne@69: """ jpayne@69: if not self.htsfile: jpayne@69: raise ValueError('metadata not available on closed file') jpayne@69: return FORMATS[self.htsfile.format.format] jpayne@69: jpayne@69: @property jpayne@69: def version(self): jpayne@69: """Tuple of file format version numbers (major, minor)""" jpayne@69: if not self.htsfile: jpayne@69: raise ValueError('metadata not available on closed file') jpayne@69: return self.htsfile.format.version.major, self.htsfile.format.version.minor jpayne@69: jpayne@69: @property jpayne@69: def compression(self): jpayne@69: """File compression. jpayne@69: jpayne@69: One of NONE, GZIP, BGZF, CUSTOM.""" jpayne@69: if not self.htsfile: jpayne@69: raise ValueError('metadata not available on closed file') jpayne@69: return COMPRESSION[self.htsfile.format.compression] jpayne@69: jpayne@69: @property jpayne@69: def description(self): jpayne@69: """Vaguely human readable description of the file format""" jpayne@69: if not self.htsfile: jpayne@69: raise ValueError('metadata not available on closed file') jpayne@69: cdef char *desc = hts_format_description(&self.htsfile.format) jpayne@69: try: jpayne@69: return charptr_to_str(desc) jpayne@69: finally: jpayne@69: free(desc) jpayne@69: jpayne@69: @property jpayne@69: def is_open(self): jpayne@69: """return True if HTSFile is open and in a valid state.""" jpayne@69: return CTrue if self.htsfile != NULL else CFalse jpayne@69: jpayne@69: @property jpayne@69: def is_closed(self): jpayne@69: """return True if HTSFile is closed.""" jpayne@69: return self.htsfile == NULL jpayne@69: jpayne@69: @property jpayne@69: def closed(self): jpayne@69: """return True if HTSFile is closed.""" jpayne@69: return self.htsfile == NULL jpayne@69: jpayne@69: @property jpayne@69: def is_write(self): jpayne@69: """return True if HTSFile is open for writing""" jpayne@69: return self.htsfile != NULL and self.htsfile.is_write != 0 jpayne@69: jpayne@69: @property jpayne@69: def is_read(self): jpayne@69: """return True if HTSFile is open for reading""" jpayne@69: return self.htsfile != NULL and self.htsfile.is_write == 0 jpayne@69: jpayne@69: @property jpayne@69: def is_sam(self): jpayne@69: """return True if HTSFile is reading or writing a SAM alignment file""" jpayne@69: return self.htsfile != NULL and self.htsfile.format.format == sam jpayne@69: jpayne@69: @property jpayne@69: def is_bam(self): jpayne@69: """return True if HTSFile is reading or writing a BAM alignment file""" jpayne@69: return self.htsfile != NULL and self.htsfile.format.format == bam jpayne@69: jpayne@69: @property jpayne@69: def is_cram(self): jpayne@69: """return True if HTSFile is reading or writing a BAM alignment file""" jpayne@69: return self.htsfile != NULL and self.htsfile.format.format == cram jpayne@69: jpayne@69: @property jpayne@69: def is_vcf(self): jpayne@69: """return True if HTSFile is reading or writing a VCF variant file""" jpayne@69: return self.htsfile != NULL and self.htsfile.format.format == vcf jpayne@69: jpayne@69: @property jpayne@69: def is_bcf(self): jpayne@69: """return True if HTSFile is reading or writing a BCF variant file""" jpayne@69: return self.htsfile != NULL and self.htsfile.format.format == bcf jpayne@69: jpayne@69: def reset(self): jpayne@69: """reset file position to beginning of file just after the header. jpayne@69: jpayne@69: Returns jpayne@69: ------- jpayne@69: jpayne@69: The file position after moving the file pointer. : pointer jpayne@69: jpayne@69: """ jpayne@69: return self.seek(self.start_offset) jpayne@69: jpayne@69: def seek(self, uint64_t offset, int whence=io.SEEK_SET): jpayne@69: """move file pointer to position *offset*, see :meth:`pysam.HTSFile.tell`.""" jpayne@69: if not self.is_open: jpayne@69: raise ValueError('I/O operation on closed file') jpayne@69: if self.is_stream: jpayne@69: raise IOError('seek not available in streams') jpayne@69: jpayne@69: whence = libc_whence_from_io(whence) jpayne@69: jpayne@69: cdef int64_t ret jpayne@69: if self.htsfile.format.compression == bgzf: jpayne@69: with nogil: jpayne@69: ret = bgzf_seek(hts_get_bgzfp(self.htsfile), offset, whence) jpayne@69: elif self.htsfile.format.compression == no_compression: jpayne@69: ret = 0 if (hseek(self.htsfile.fp.hfile, offset, whence) >= 0) else -1 jpayne@69: else: jpayne@69: raise NotImplementedError("seek not implemented in files compressed by method {}".format( jpayne@69: self.htsfile.format.compression)) jpayne@69: return ret jpayne@69: jpayne@69: def tell(self): jpayne@69: """return current file position, see :meth:`pysam.HTSFile.seek`.""" jpayne@69: if not self.is_open: jpayne@69: raise ValueError('I/O operation on closed file') jpayne@69: if self.is_stream: jpayne@69: raise IOError('tell not available in streams') jpayne@69: jpayne@69: cdef int64_t ret jpayne@69: if self.htsfile.format.compression == bgzf: jpayne@69: with nogil: jpayne@69: ret = bgzf_tell(hts_get_bgzfp(self.htsfile)) jpayne@69: elif self.htsfile.format.compression == no_compression: jpayne@69: ret = htell(self.htsfile.fp.hfile) jpayne@69: elif self.htsfile.format.format == cram: jpayne@69: with nogil: jpayne@69: ret = htell(cram_fd_get_fp(self.htsfile.fp.cram)) jpayne@69: else: jpayne@69: raise NotImplementedError("seek not implemented in files compressed by method {}".format( jpayne@69: self.htsfile.format.compression)) jpayne@69: jpayne@69: return ret jpayne@69: jpayne@69: cdef htsFile *_open_htsfile(self) except? NULL: jpayne@69: cdef char *cfilename jpayne@69: cdef char *cmode = self.mode jpayne@69: cdef int fd, dup_fd, threads jpayne@69: jpayne@69: threads = self.threads - 1 jpayne@69: if isinstance(self.filename, bytes): jpayne@69: cfilename = self.filename jpayne@69: with nogil: jpayne@69: htsfile = hts_open(cfilename, cmode) jpayne@69: if htsfile != NULL: jpayne@69: hts_set_threads(htsfile, threads) jpayne@69: return htsfile jpayne@69: else: jpayne@69: if isinstance(self.filename, int): jpayne@69: fd = self.filename jpayne@69: else: jpayne@69: fd = self.filename.fileno() jpayne@69: jpayne@69: if self.duplicate_filehandle: jpayne@69: dup_fd = dup(fd) jpayne@69: else: jpayne@69: dup_fd = fd jpayne@69: jpayne@69: # Replicate mode normalization done in hts_open_format jpayne@69: smode = self.mode.replace(b'b', b'').replace(b'c', b'') jpayne@69: if b'b' in self.mode: jpayne@69: smode += b'b' jpayne@69: elif b'c' in self.mode: jpayne@69: smode += b'c' jpayne@69: cmode = smode jpayne@69: jpayne@69: hfile = hdopen(dup_fd, cmode) jpayne@69: if hfile == NULL: jpayne@69: raise IOError('Cannot create hfile') jpayne@69: jpayne@69: try: jpayne@69: # filename.name can be an int jpayne@69: filename = str(self.filename.name) jpayne@69: except AttributeError: jpayne@69: filename = ''.format(fd) jpayne@69: jpayne@69: filename = encode_filename(filename) jpayne@69: cfilename = filename jpayne@69: with nogil: jpayne@69: htsfile = hts_hopen(hfile, cfilename, cmode) jpayne@69: if htsfile != NULL: jpayne@69: hts_set_threads(htsfile, threads) jpayne@69: return htsfile jpayne@69: jpayne@69: def add_hts_options(self, format_options=None): jpayne@69: """Given a list of key=value format option strings, add them to an open htsFile jpayne@69: """ jpayne@69: cdef int rval jpayne@69: cdef hts_opt *opts = NULL jpayne@69: jpayne@69: if format_options: jpayne@69: for format_option in format_options: jpayne@69: rval = hts_opt_add(&opts, format_option) jpayne@69: if rval != 0: jpayne@69: if opts != NULL: jpayne@69: hts_opt_free(opts) jpayne@69: raise RuntimeError('Invalid format option ({}) specified'.format(format_option)) jpayne@69: if opts != NULL: jpayne@69: rval = hts_opt_apply(self.htsfile, opts) jpayne@69: if rval != 0: jpayne@69: hts_opt_free(opts) jpayne@69: raise RuntimeError('An error occurred while applying the requested format options') jpayne@69: hts_opt_free(opts) jpayne@69: jpayne@69: def parse_region(self, contig=None, start=None, stop=None, jpayne@69: region=None, tid=None, jpayne@69: reference=None, end=None): jpayne@69: """parse alternative ways to specify a genomic region. A region can jpayne@69: either be specified by :term:`contig`, `start` and jpayne@69: `stop`. `start` and `stop` denote 0-based, half-open jpayne@69: intervals. :term:`reference` and `end` are also accepted for jpayne@69: backward compatibility as synonyms for :term:`contig` and jpayne@69: `stop`, respectively. jpayne@69: jpayne@69: Alternatively, a samtools :term:`region` string can be jpayne@69: supplied. jpayne@69: jpayne@69: If any of the coordinates are missing they will be replaced by jpayne@69: the minimum (`start`) or maximum (`stop`) coordinate. jpayne@69: jpayne@69: Note that region strings are 1-based inclusive, while `start` jpayne@69: and `stop` denote an interval in 0-based, half-open jpayne@69: coordinates (like BED files and Python slices). jpayne@69: jpayne@69: If `contig` or `region` or are ``*``, unmapped reads at the end jpayne@69: of a BAM file will be returned. Setting either to ``.`` will jpayne@69: iterate from the beginning of the file. jpayne@69: jpayne@69: Returns jpayne@69: ------- jpayne@69: jpayne@69: tuple : jpayne@69: a tuple of `flag`, :term:`tid`, `start` and jpayne@69: `stop`. The flag indicates whether no coordinates were jpayne@69: supplied and the genomic region is the complete genomic space. jpayne@69: jpayne@69: Raises jpayne@69: ------ jpayne@69: jpayne@69: ValueError jpayne@69: for invalid or out of bounds regions. jpayne@69: jpayne@69: """ jpayne@69: cdef int rtid jpayne@69: cdef int32_t rstart jpayne@69: cdef int32_t rstop jpayne@69: jpayne@69: if reference is not None: jpayne@69: if contig is not None: jpayne@69: raise ValueError('contig and reference should not both be specified') jpayne@69: contig = reference jpayne@69: jpayne@69: if end is not None: jpayne@69: if stop is not None: jpayne@69: raise ValueError('stop and end should not both be specified') jpayne@69: stop = end jpayne@69: jpayne@69: if contig is None and tid is None and region is None: jpayne@69: return 0, 0, 0, MAX_POS jpayne@69: jpayne@69: rtid = -1 jpayne@69: rstart = 0 jpayne@69: rstop = MAX_POS jpayne@69: if start is not None: jpayne@69: try: jpayne@69: rstart = start jpayne@69: except OverflowError: jpayne@69: raise ValueError('start out of range (%i)' % start) jpayne@69: jpayne@69: if stop is not None: jpayne@69: try: jpayne@69: rstop = stop jpayne@69: except OverflowError: jpayne@69: raise ValueError('stop out of range (%i)' % stop) jpayne@69: jpayne@69: if region: jpayne@69: region = force_str(region) jpayne@69: if ":" in region: jpayne@69: contig, coord = region.split(":") jpayne@69: parts = coord.split("-") jpayne@69: rstart = int(parts[0]) - 1 jpayne@69: if len(parts) >= 1: jpayne@69: rstop = int(parts[1]) jpayne@69: else: jpayne@69: contig = region jpayne@69: jpayne@69: if tid is not None: jpayne@69: if not self.is_valid_tid(tid): jpayne@69: raise IndexError('invalid tid') jpayne@69: rtid = tid jpayne@69: else: jpayne@69: if contig == "*": jpayne@69: rtid = HTS_IDX_NOCOOR jpayne@69: elif contig == ".": jpayne@69: rtid = HTS_IDX_START jpayne@69: else: jpayne@69: rtid = self.get_tid(contig) jpayne@69: if rtid < 0: jpayne@69: raise ValueError('invalid contig `%s`' % contig) jpayne@69: jpayne@69: if rstart > rstop: jpayne@69: raise ValueError('invalid coordinates: start (%i) > stop (%i)' % (rstart, rstop)) jpayne@69: if not 0 <= rstart < MAX_POS: jpayne@69: raise ValueError('start out of range (%i)' % rstart) jpayne@69: if not 0 <= rstop <= MAX_POS: jpayne@69: raise ValueError('stop out of range (%i)' % rstop) jpayne@69: jpayne@69: return 1, rtid, rstart, rstop jpayne@69: jpayne@69: def is_valid_tid(self, tid): jpayne@69: """ jpayne@69: return True if the numerical :term:`tid` is valid; False otherwise. jpayne@69: jpayne@69: returns -1 if contig is not known. jpayne@69: """ jpayne@69: raise NotImplementedError() jpayne@69: jpayne@69: def is_valid_reference_name(self, contig): jpayne@69: """ jpayne@69: return True if the contig name :term:`contig` is valid; False otherwise. jpayne@69: """ jpayne@69: return self.get_tid(contig) != -1 jpayne@69: jpayne@69: def get_tid(self, contig): jpayne@69: """ jpayne@69: return the numerical :term:`tid` corresponding to jpayne@69: :term:`contig` jpayne@69: jpayne@69: returns -1 if contig is not known. jpayne@69: """ jpayne@69: raise NotImplementedError() jpayne@69: jpayne@69: def get_reference_name(self, tid): jpayne@69: """ jpayne@69: return :term:`contig` name corresponding to numerical :term:`tid` jpayne@69: """ jpayne@69: raise NotImplementedError()