jpayne@69: # cython: embedsignature=True jpayne@69: # jpayne@69: # Code to read, write and edit VCF files jpayne@69: # jpayne@69: # VCF lines are encoded as a dictionary with these keys (note: all lowercase): jpayne@69: # 'chrom': string jpayne@69: # 'pos': integer jpayne@69: # 'id': string jpayne@69: # 'ref': string jpayne@69: # 'alt': list of strings jpayne@69: # 'qual': integer jpayne@69: # 'filter': None (missing value), or list of keys (strings); empty list parsed as ["PASS"] jpayne@69: # 'info': dictionary of values (see below) jpayne@69: # 'format': list of keys (strings) jpayne@69: # sample keys: dictionary of values (see below) jpayne@69: # jpayne@69: # The sample keys are accessible through vcf.getsamples() jpayne@69: # jpayne@69: # A dictionary of values contains value keys (defined in ##INFO or jpayne@69: # ##FORMAT lines) which map to a list, containing integers, floats, jpayne@69: # strings, or characters. Missing values are replaced by a particular jpayne@69: # value, often -1 or . jpayne@69: # jpayne@69: # Genotypes are not stored as a string, but as a list of 1 or 3 jpayne@69: # elements (for haploid and diploid samples), the first (and last) the jpayne@69: # integer representing an allele, and the second the separation jpayne@69: # character. Note that there is just one genotype per sample, but for jpayne@69: # consistency the single element is stored in a list. jpayne@69: # jpayne@69: # Header lines other than ##INFO, ##FORMAT and ##FILTER are stored as jpayne@69: # (key, value) pairs and are accessible through getheader() jpayne@69: # jpayne@69: # The VCF class can be instantiated with a 'regions' variable jpayne@69: # consisting of tuples (chrom,start,end) encoding 0-based half-open jpayne@69: # segments. Only variants with a position inside the segment will be jpayne@69: # parsed. A regions parser is available under parse_regions. jpayne@69: # jpayne@69: # When instantiated, a reference can be passed to the VCF class. This jpayne@69: # may be any class that supports a fetch(chrom, start, end) method. jpayne@69: # jpayne@69: # NOTE: the position that is returned to Python is 0-based, NOT jpayne@69: # 1-based as in the VCF file. jpayne@69: # NOTE: There is also preliminary VCF functionality in the VariantFile class. jpayne@69: # jpayne@69: # TODO: jpayne@69: # only v4.0 writing is complete; alleles are not converted to v3.3 format jpayne@69: # jpayne@69: jpayne@69: from collections import namedtuple, defaultdict jpayne@69: from operator import itemgetter jpayne@69: import sys, re, copy, bisect jpayne@69: jpayne@69: from libc.stdlib cimport atoi jpayne@69: from libc.stdint cimport int8_t, int16_t, int32_t, int64_t jpayne@69: from libc.stdint cimport uint8_t, uint16_t, uint32_t, uint64_t jpayne@69: jpayne@69: cimport pysam.libctabix as libctabix jpayne@69: cimport pysam.libctabixproxies as libctabixproxies jpayne@69: jpayne@69: from pysam.libcutils cimport force_str jpayne@69: jpayne@69: import pysam jpayne@69: jpayne@69: gtsRegEx = re.compile("[|/\\\\]") jpayne@69: alleleRegEx = re.compile('^[ACGTN]+$') jpayne@69: jpayne@69: # Utility function. Uses 0-based coordinates jpayne@69: def get_sequence(chrom, start, end, fa): jpayne@69: # obtain sequence from .fa file, without truncation jpayne@69: if end<=start: return "" jpayne@69: if not fa: return "N"*(end-start) jpayne@69: if start<0: return "N"*(-start) + get_sequence(chrom, 0, end, fa).upper() jpayne@69: sequence = fa.fetch(chrom, start, end).upper() jpayne@69: if len(sequence) < end-start: sequence += "N"*(end-start-len(sequence)) jpayne@69: return sequence jpayne@69: jpayne@69: # Utility function. Parses a region string jpayne@69: def parse_regions( string ): jpayne@69: result = [] jpayne@69: for r in string.split(','): jpayne@69: elts = r.split(':') jpayne@69: chrom, start, end = elts[0], 0, 3000000000 jpayne@69: if len(elts)==1: pass jpayne@69: elif len(elts)==2: jpayne@69: if len(elts[1])>0: jpayne@69: ielts = elts[1].split('-') jpayne@69: if len(ielts) != 2: ValueError("Don't understand region string '%s'" % r) jpayne@69: try: start, end = int(ielts[0])-1, int(ielts[1]) jpayne@69: except: raise ValueError("Don't understand region string '%s'" % r) jpayne@69: else: jpayne@69: raise ValueError("Don't understand region string '%s'" % r) jpayne@69: result.append( (chrom,start,end) ) jpayne@69: return result jpayne@69: jpayne@69: jpayne@69: FORMAT = namedtuple('FORMAT','id numbertype number type description missingvalue') jpayne@69: jpayne@69: ########################################################################################################### jpayne@69: # jpayne@69: # New class jpayne@69: # jpayne@69: ########################################################################################################### jpayne@69: jpayne@69: cdef class VCFRecord(libctabixproxies.TupleProxy): jpayne@69: '''vcf record. jpayne@69: jpayne@69: initialized from data and vcf meta jpayne@69: ''' jpayne@69: jpayne@69: cdef vcf jpayne@69: cdef char * contig jpayne@69: cdef uint32_t pos jpayne@69: jpayne@69: def __init__(self, vcf): jpayne@69: self.vcf = vcf jpayne@69: self.encoding = vcf.encoding jpayne@69: jpayne@69: # if len(data) != len(self.vcf._samples): jpayne@69: # self.vcf.error(str(data), jpayne@69: # self.BAD_NUMBER_OF_COLUMNS, jpayne@69: # "expected %s for %s samples (%s), got %s" % \ jpayne@69: # (len(self.vcf._samples), jpayne@69: # len(self.vcf._samples), jpayne@69: # self.vcf._samples, jpayne@69: # len(data))) jpayne@69: jpayne@69: def __cinit__(self, vcf): jpayne@69: # start indexed access at genotypes jpayne@69: self.offset = 9 jpayne@69: jpayne@69: self.vcf = vcf jpayne@69: self.encoding = vcf.encoding jpayne@69: jpayne@69: def error(self, line, error, opt=None): jpayne@69: '''raise error.''' jpayne@69: # pass to vcf file for error handling jpayne@69: return self.vcf.error(line, error, opt) jpayne@69: jpayne@69: cdef update(self, char * buffer, size_t nbytes): jpayne@69: '''update internal data. jpayne@69: jpayne@69: nbytes does not include the terminal '\0'. jpayne@69: ''' jpayne@69: libctabixproxies.TupleProxy.update(self, buffer, nbytes) jpayne@69: jpayne@69: self.contig = self.fields[0] jpayne@69: # vcf counts from 1 - correct here jpayne@69: self.pos = atoi(self.fields[1]) - 1 jpayne@69: jpayne@69: def __len__(self): jpayne@69: return max(0, self.nfields - 9) jpayne@69: jpayne@69: property contig: jpayne@69: def __get__(self): return self.contig jpayne@69: jpayne@69: property pos: jpayne@69: def __get__(self): return self.pos jpayne@69: jpayne@69: property id: jpayne@69: def __get__(self): return self.fields[2] jpayne@69: jpayne@69: property ref: jpayne@69: def __get__(self): jpayne@69: return self.fields[3] jpayne@69: jpayne@69: property alt: jpayne@69: def __get__(self): jpayne@69: # convert v3.3 to v4.0 alleles below jpayne@69: alt = self.fields[4] jpayne@69: if alt == ".": alt = [] jpayne@69: else: alt = alt.upper().split(',') jpayne@69: return alt jpayne@69: jpayne@69: property qual: jpayne@69: def __get__(self): jpayne@69: qual = self.fields[5] jpayne@69: if qual == b".": qual = -1 jpayne@69: else: jpayne@69: try: qual = float(qual) jpayne@69: except: self.vcf.error(str(self),self.QUAL_NOT_NUMERICAL) jpayne@69: return qual jpayne@69: jpayne@69: property filter: jpayne@69: def __get__(self): jpayne@69: f = self.fields[6] jpayne@69: # postpone checking that filters exist. Encode missing filter or no filtering as empty list jpayne@69: if f == b"." or f == b"PASS" or f == b"0": return [] jpayne@69: else: return f.split(';') jpayne@69: jpayne@69: property info: jpayne@69: def __get__(self): jpayne@69: col = self.fields[7] jpayne@69: # dictionary of keys, and list of values jpayne@69: info = {} jpayne@69: if col != b".": jpayne@69: for blurp in col.split(';'): jpayne@69: elts = blurp.split('=') jpayne@69: if len(elts) == 1: v = None jpayne@69: elif len(elts) == 2: v = elts[1] jpayne@69: else: self.vcf.error(str(self),self.ERROR_INFO_STRING) jpayne@69: info[elts[0]] = self.vcf.parse_formatdata(elts[0], v, self.vcf._info, str(self.vcf)) jpayne@69: return info jpayne@69: jpayne@69: property format: jpayne@69: def __get__(self): jpayne@69: return self.fields[8].split(':') jpayne@69: jpayne@69: property samples: jpayne@69: def __get__(self): jpayne@69: return self.vcf._samples jpayne@69: jpayne@69: def __getitem__(self, key): jpayne@69: jpayne@69: # parse sample columns jpayne@69: values = self.fields[self.vcf._sample2column[key]].split(':') jpayne@69: alt = self.alt jpayne@69: format = self.format jpayne@69: jpayne@69: if len(values) > len(format): jpayne@69: self.vcf.error(str(self.line),self.BAD_NUMBER_OF_VALUES,"(found %s values in element %s; expected %s)" %\ jpayne@69: (len(values),key,len(format))) jpayne@69: jpayne@69: result = {} jpayne@69: for idx in range(len(format)): jpayne@69: expected = self.vcf.get_expected(format[idx], self.vcf._format, alt) jpayne@69: if idx < len(values): value = values[idx] jpayne@69: else: jpayne@69: if expected == -1: value = "." jpayne@69: else: value = ",".join(["."]*expected) jpayne@69: jpayne@69: result[format[idx]] = self.vcf.parse_formatdata(format[idx], value, self.vcf._format, str(self.data)) jpayne@69: if expected != -1 and len(result[format[idx]]) != expected: jpayne@69: self.vcf.error(str(self.data),self.BAD_NUMBER_OF_PARAMETERS, jpayne@69: "id=%s, expected %s parameters, got %s" % (format[idx],expected,result[format[idx]])) jpayne@69: if len(result[format[idx]] ) < expected: result[format[idx]] += [result[format[idx]][-1]]*(expected-len(result[format[idx]])) jpayne@69: result[format[idx]] = result[format[idx]][:expected] jpayne@69: jpayne@69: return result jpayne@69: jpayne@69: jpayne@69: cdef class asVCFRecord(libctabix.Parser): jpayne@69: '''converts a :term:`tabix row` into a VCF record.''' jpayne@69: cdef vcffile jpayne@69: def __init__(self, vcffile): jpayne@69: self.vcffile = vcffile jpayne@69: jpayne@69: cdef parse(self, char * buffer, int len): jpayne@69: cdef VCFRecord r jpayne@69: r = VCFRecord(self.vcffile) jpayne@69: r.copy(buffer, len) jpayne@69: return r jpayne@69: jpayne@69: class VCF(object): jpayne@69: jpayne@69: # types jpayne@69: NT_UNKNOWN = 0 jpayne@69: NT_NUMBER = 1 jpayne@69: NT_ALLELES = 2 jpayne@69: NT_NR_ALLELES = 3 jpayne@69: NT_GENOTYPES = 4 jpayne@69: NT_PHASED_GENOTYPES = 5 jpayne@69: jpayne@69: _errors = { 0:"UNKNOWN_FORMAT_STRING:Unknown file format identifier", jpayne@69: 1:"BADLY_FORMATTED_FORMAT_STRING:Formatting error in the format string", jpayne@69: 2:"BADLY_FORMATTED_HEADING:Did not find 9 required headings (CHROM, POS, ..., FORMAT) %s", jpayne@69: 3:"BAD_NUMBER_OF_COLUMNS:Wrong number of columns found (%s)", jpayne@69: 4:"POS_NOT_NUMERICAL:Position column is not numerical", jpayne@69: 5:"UNKNOWN_CHAR_IN_REF:Unknown character in reference field", jpayne@69: 6:"V33_BAD_REF:Reference should be single-character in v3.3 VCF", jpayne@69: 7:"V33_BAD_ALLELE:Cannot interpret allele for v3.3 VCF", jpayne@69: 8:"POS_NOT_POSITIVE:Position field must be >0", jpayne@69: 9:"QUAL_NOT_NUMERICAL:Quality field must be numerical, or '.'", jpayne@69: 10:"ERROR_INFO_STRING:Error while parsing info field", jpayne@69: 11:"ERROR_UNKNOWN_KEY:Unknown key (%s) found in formatted field (info; format; or filter)", jpayne@69: 12:"ERROR_FORMAT_NOT_NUMERICAL:Expected integer or float in formatted field; got %s", jpayne@69: 13:"ERROR_FORMAT_NOT_CHAR:Eexpected character in formatted field; got string", jpayne@69: 14:"FILTER_NOT_DEFINED:Identifier (%s) in filter found which was not defined in header", jpayne@69: 15:"FORMAT_NOT_DEFINED:Identifier (%s) in format found which was not defined in header", jpayne@69: 16:"BAD_NUMBER_OF_VALUES:Found too many of values in sample column (%s)", jpayne@69: 17:"BAD_NUMBER_OF_PARAMETERS:Found unexpected number of parameters (%s)", jpayne@69: 18:"BAD_GENOTYPE:Cannot parse genotype (%s)", jpayne@69: 19:"V40_BAD_ALLELE:Bad allele found for v4.0 VCF (%s)", jpayne@69: 20:"MISSING_REF:Reference allele missing", jpayne@69: 21:"V33_UNMATCHED_DELETION:Deleted sequence does not match reference (%s)", jpayne@69: 22:"V40_MISSING_ANGLE_BRACKETS:Format definition is not deliminted by angular brackets", jpayne@69: 23:"FORMAT_MISSING_QUOTES:Description field in format definition is not surrounded by quotes", jpayne@69: 24:"V40_FORMAT_MUST_HAVE_NAMED_FIELDS:Fields in v4.0 VCF format definition must have named fields", jpayne@69: 25:"HEADING_NOT_SEPARATED_BY_TABS:Heading line appears separated by spaces, not tabs", jpayne@69: 26:"WRONG_REF:Wrong reference %s", jpayne@69: 27:"ERROR_TRAILING_DATA:Numerical field ('%s') has semicolon-separated trailing data", jpayne@69: 28:"BAD_CHR_TAG:Error calculating chr tag for %s", jpayne@69: 29:"ZERO_LENGTH_ALLELE:Found zero-length allele", jpayne@69: 30:"MISSING_INDEL_ALLELE_REF_BASE:Indel alleles must begin with single reference base", jpayne@69: 31:"ZERO_FOR_NON_FLAG_FIELD: number set to 0, but type is not 'FLAG'", jpayne@69: 32:"ERROR_FORMAT_NOT_INTEGER:Expected integer in formatted field; got %s", jpayne@69: 33:"ERROR_FLAG_HAS_VALUE:Flag fields should not have a value", jpayne@69: } jpayne@69: jpayne@69: # tag-value pairs; tags are not unique; does not include fileformat, INFO, FILTER or FORMAT fields jpayne@69: _header = [] jpayne@69: jpayne@69: # version number; 33=v3.3; 40=v4.0 jpayne@69: _version = 40 jpayne@69: jpayne@69: # info, filter and format data jpayne@69: _info = {} jpayne@69: _filter = {} jpayne@69: _format = {} jpayne@69: jpayne@69: # header; and required columns jpayne@69: _required = ["CHROM","POS","ID","REF","ALT","QUAL","FILTER","INFO","FORMAT"] jpayne@69: _samples = [] jpayne@69: jpayne@69: # control behaviour jpayne@69: _ignored_errors = set([11,31]) # ERROR_UNKNOWN_KEY, ERROR_ZERO_FOR_NON_FLAG_FIELD jpayne@69: _warn_errors = set([]) jpayne@69: _leftalign = False jpayne@69: jpayne@69: # reference sequence jpayne@69: _reference = None jpayne@69: jpayne@69: # regions to include; None includes everything jpayne@69: _regions = None jpayne@69: jpayne@69: # statefull stuff jpayne@69: _lineno = -1 jpayne@69: _line = None jpayne@69: _lines = None jpayne@69: jpayne@69: def __init__(self, _copy=None, reference=None, regions=None, jpayne@69: lines=None, leftalign=False): jpayne@69: # make error identifiers accessible by name jpayne@69: for id in self._errors.keys(): jpayne@69: self.__dict__[self._errors[id].split(':')[0]] = id jpayne@69: if _copy != None: jpayne@69: self._leftalign = _copy._leftalign jpayne@69: self._header = _copy._header[:] jpayne@69: self._version = _copy._version jpayne@69: self._info = copy.deepcopy(_copy._info) jpayne@69: self._filter = copy.deepcopy(_copy._filter) jpayne@69: self._format = copy.deepcopy(_copy._format) jpayne@69: self._samples = _copy._samples[:] jpayne@69: self._sample2column = copy.deepcopy(_copy._sample2column) jpayne@69: self._ignored_errors = copy.deepcopy(_copy._ignored_errors) jpayne@69: self._warn_errors = copy.deepcopy(_copy._warn_errors) jpayne@69: self._reference = _copy._reference jpayne@69: self._regions = _copy._regions jpayne@69: if reference: self._reference = reference jpayne@69: if regions: self._regions = regions jpayne@69: if leftalign: self._leftalign = leftalign jpayne@69: self._lines = lines jpayne@69: self.encoding = "ascii" jpayne@69: self.tabixfile = None jpayne@69: jpayne@69: def error(self,line,error,opt=None): jpayne@69: if error in self._ignored_errors: return jpayne@69: errorlabel, errorstring = self._errors[error].split(':') jpayne@69: if opt: errorstring = errorstring % opt jpayne@69: errwarn = ["Error","Warning"][error in self._warn_errors] jpayne@69: errorstring += " in line %s: '%s'\n%s %s: %s\n" % (self._lineno,line,errwarn,errorlabel,errorstring) jpayne@69: if error in self._warn_errors: return jpayne@69: raise ValueError(errorstring) jpayne@69: jpayne@69: def parse_format(self,line,format,filter=False): jpayne@69: if self._version == 40: jpayne@69: if not format.startswith('<'): jpayne@69: self.error(line,self.V40_MISSING_ANGLE_BRACKETS) jpayne@69: format = "<"+format jpayne@69: if not format.endswith('>'): jpayne@69: self.error(line,self.V40_MISSING_ANGLE_BRACKETS) jpayne@69: format += ">" jpayne@69: format = format[1:-1] jpayne@69: data = {'id':None,'number':None,'type':None,'descr':None} jpayne@69: idx = 0 jpayne@69: while len(format.strip())>0: jpayne@69: elts = format.strip().split(',') jpayne@69: first, rest = elts[0], ','.join(elts[1:]) jpayne@69: if first.find('=') == -1 or (first.find('"')>=0 and first.find('=') > first.find('"')): jpayne@69: if self._version == 40: self.error(line,self.V40_FORMAT_MUST_HAVE_NAMED_FIELDS) jpayne@69: if idx == 4: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) jpayne@69: first = ["ID=","Number=","Type=","Description="][idx] + first jpayne@69: if first.startswith('ID='): data['id'] = first.split('=')[1] jpayne@69: elif first.startswith('Number='): data['number'] = first.split('=')[1] jpayne@69: elif first.startswith('Type='): data['type'] = first.split('=')[1] jpayne@69: elif first.startswith('Description='): jpayne@69: elts = format.split('"') jpayne@69: if len(elts)<3: jpayne@69: self.error(line,self.FORMAT_MISSING_QUOTES) jpayne@69: elts = first.split('=') + [rest] jpayne@69: data['descr'] = elts[1] jpayne@69: rest = '"'.join(elts[2:]) jpayne@69: if rest.startswith(','): rest = rest[1:] jpayne@69: else: jpayne@69: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) jpayne@69: format = rest jpayne@69: idx += 1 jpayne@69: if filter and idx==1: idx=3 # skip number and type fields for FILTER format strings jpayne@69: if not data['id']: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) jpayne@69: if 'descr' not in data: jpayne@69: # missing description jpayne@69: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) jpayne@69: data['descr'] = "" jpayne@69: if not data['type'] and not data['number']: jpayne@69: # fine, ##filter format jpayne@69: return FORMAT(data['id'],self.NT_NUMBER,0,"Flag",data['descr'],'.') jpayne@69: if not data['type'] in ["Integer","Float","Character","String","Flag"]: jpayne@69: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) jpayne@69: # I would like a missing-value field, but it isn't there jpayne@69: if data['type'] in ['Integer','Float']: data['missing'] = None # Do NOT use arbitrary int/float as missing value jpayne@69: else: data['missing'] = '.' jpayne@69: if not data['number']: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) jpayne@69: try: jpayne@69: n = int(data['number']) jpayne@69: t = self.NT_NUMBER jpayne@69: except ValueError: jpayne@69: n = -1 jpayne@69: if data['number'] == '.': t = self.NT_UNKNOWN jpayne@69: elif data['number'] == '#alleles': t = self.NT_ALLELES jpayne@69: elif data['number'] == '#nonref_alleles': t = self.NT_NR_ALLELES jpayne@69: elif data['number'] == '#genotypes': t = self.NT_GENOTYPES jpayne@69: elif data['number'] == '#phased_genotypes': t = self.NT_PHASED_GENOTYPES jpayne@69: elif data['number'] == '#phased_genotypes': t = self.NT_PHASED_GENOTYPES jpayne@69: # abbreviations added in VCF version v4.1 jpayne@69: elif data['number'] == 'A': t = self.NT_ALLELES jpayne@69: elif data['number'] == 'G': t = self.NT_GENOTYPES jpayne@69: else: jpayne@69: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) jpayne@69: # if number is 0 - type must be Flag jpayne@69: if n == 0 and data['type'] != 'Flag': jpayne@69: self.error( line, self.ZERO_FOR_NON_FLAG_FIELD) jpayne@69: # force type 'Flag' if no number jpayne@69: data['type'] = 'Flag' jpayne@69: jpayne@69: return FORMAT(data['id'],t,n,data['type'],data['descr'],data['missing']) jpayne@69: jpayne@69: def format_format( self, fmt, filter=False ): jpayne@69: values = [('ID',fmt.id)] jpayne@69: if fmt.number != None and not filter: jpayne@69: if fmt.numbertype == self.NT_UNKNOWN: nmb = "." jpayne@69: elif fmt.numbertype == self.NT_NUMBER: nmb = str(fmt.number) jpayne@69: elif fmt.numbertype == self.NT_ALLELES: nmb = "#alleles" jpayne@69: elif fmt.numbertype == self.NT_NR_ALLELES: nmb = "#nonref_alleles" jpayne@69: elif fmt.numbertype == self.NT_GENOTYPES: nmb = "#genotypes" jpayne@69: elif fmt.numbertype == self.NT_PHASED_GENOTYPES: nmb = "#phased_genotypes" jpayne@69: else: jpayne@69: raise ValueError("Unknown number type encountered: %s" % fmt.numbertype) jpayne@69: values.append( ('Number',nmb) ) jpayne@69: values.append( ('Type', fmt.type) ) jpayne@69: values.append( ('Description', '"' + fmt.description + '"') ) jpayne@69: if self._version == 33: jpayne@69: format = ",".join([v for k,v in values]) jpayne@69: else: jpayne@69: format = "<" + (",".join( ["%s=%s" % (k,v) for (k,v) in values] )) + ">" jpayne@69: return format jpayne@69: jpayne@69: def get_expected(self, format, formatdict, alt): jpayne@69: fmt = formatdict[format] jpayne@69: if fmt.numbertype == self.NT_UNKNOWN: return -1 jpayne@69: if fmt.numbertype == self.NT_NUMBER: return fmt.number jpayne@69: if fmt.numbertype == self.NT_ALLELES: return len(alt)+1 jpayne@69: if fmt.numbertype == self.NT_NR_ALLELES: return len(alt) jpayne@69: if fmt.numbertype == self.NT_GENOTYPES: return ((len(alt)+1)*(len(alt)+2)) // 2 jpayne@69: if fmt.numbertype == self.NT_PHASED_GENOTYPES: return (len(alt)+1)*(len(alt)+1) jpayne@69: return 0 jpayne@69: jpayne@69: jpayne@69: def _add_definition(self, formatdict, key, data, line ): jpayne@69: if key in formatdict: return jpayne@69: self.error(line,self.ERROR_UNKNOWN_KEY,key) jpayne@69: if data == None: jpayne@69: formatdict[key] = FORMAT(key,self.NT_NUMBER,0,"Flag","(Undefined tag)",".") jpayne@69: return jpayne@69: if data == []: data = [""] # unsure what type -- say string jpayne@69: if type(data[0]) == type(0.0): jpayne@69: formatdict[key] = FORMAT(key,self.NT_UNKNOWN,-1,"Float","(Undefined tag)",None) jpayne@69: return jpayne@69: if type(data[0]) == type(0): jpayne@69: formatdict[key] = FORMAT(key,self.NT_UNKNOWN,-1,"Integer","(Undefined tag)",None) jpayne@69: return jpayne@69: formatdict[key] = FORMAT(key,self.NT_UNKNOWN,-1,"String","(Undefined tag)",".") jpayne@69: jpayne@69: jpayne@69: # todo: trim trailing missing values jpayne@69: def format_formatdata( self, data, format, key=True, value=True, separator=":" ): jpayne@69: output, sdata = [], [] jpayne@69: if type(data) == type([]): # for FORMAT field, make data with dummy values jpayne@69: d = {} jpayne@69: for k in data: d[k] = [] jpayne@69: data = d jpayne@69: # convert missing values; and silently add definitions if required jpayne@69: for k in data: jpayne@69: self._add_definition( format, k, data[k], "(output)" ) jpayne@69: for idx,v in enumerate(data[k]): jpayne@69: if v == format[k].missingvalue: data[k][idx] = "." jpayne@69: # make sure GT comes first; and ensure fixed ordering; also convert GT data back to string jpayne@69: for k in data: jpayne@69: if k != 'GT': sdata.append( (k,data[k]) ) jpayne@69: sdata.sort() jpayne@69: if 'GT' in data: jpayne@69: sdata = [('GT',map(self.convertGTback,data['GT']))] + sdata jpayne@69: for k,v in sdata: jpayne@69: if v == []: v = None jpayne@69: if key and value: jpayne@69: if v != None: output.append( k+"="+','.join(map(str,v)) ) jpayne@69: else: output.append( k ) jpayne@69: elif key: output.append(k) jpayne@69: elif value: jpayne@69: if v != None: output.append( ','.join(map(str,v)) ) jpayne@69: else: output.append( "." ) # should not happen jpayne@69: # snip off trailing missing data jpayne@69: while len(output) > 1: jpayne@69: last = output[-1].replace(',','').replace('.','') jpayne@69: if len(last)>0: break jpayne@69: output = output[:-1] jpayne@69: return separator.join(output) jpayne@69: jpayne@69: jpayne@69: def enter_default_format(self): jpayne@69: for f in [FORMAT('GT',self.NT_NUMBER,1,'String','Genotype','.'), jpayne@69: FORMAT('DP',self.NT_NUMBER,1,'Integer','Read depth at this position for this sample',-1), jpayne@69: FORMAT('FT',self.NT_NUMBER,1,'String','Sample Genotype Filter','.'), jpayne@69: FORMAT('GL',self.NT_UNKNOWN,-1,'Float','Genotype likelihoods','.'), jpayne@69: FORMAT('GLE',self.NT_UNKNOWN,-1,'Float','Genotype likelihoods','.'), jpayne@69: FORMAT('GQ',self.NT_NUMBER,1,'Integer','Genotype Quality',-1), jpayne@69: FORMAT('PL',self.NT_GENOTYPES,-1,'Integer','Phred-scaled genotype likelihoods', '.'), jpayne@69: FORMAT('GP',self.NT_GENOTYPES,-1,'Float','Genotype posterior probabilities','.'), jpayne@69: FORMAT('GQ',self.NT_GENOTYPES,-1,'Integer','Conditional genotype quality','.'), jpayne@69: FORMAT('HQ',self.NT_UNKNOWN,-1,'Integer','Haplotype Quality',-1), # unknown number, since may be haploid jpayne@69: FORMAT('PS',self.NT_UNKNOWN,-1,'Integer','Phase set','.'), jpayne@69: FORMAT('PQ',self.NT_NUMBER,1,'Integer','Phasing quality',-1), jpayne@69: FORMAT('EC',self.NT_ALLELES,1,'Integer','Expected alternate allel counts',-1), jpayne@69: FORMAT('MQ',self.NT_NUMBER,1,'Integer','RMS mapping quality',-1), jpayne@69: ]: jpayne@69: if f.id not in self._format: jpayne@69: self._format[f.id] = f jpayne@69: jpayne@69: def parse_header(self, line): jpayne@69: jpayne@69: assert line.startswith('##') jpayne@69: elts = line[2:].split('=') jpayne@69: key = elts[0].strip() jpayne@69: value = '='.join(elts[1:]).strip() jpayne@69: if key == "fileformat": jpayne@69: if value == "VCFv3.3": jpayne@69: self._version = 33 jpayne@69: elif value == "VCFv4.0": jpayne@69: self._version = 40 jpayne@69: elif value == "VCFv4.1": jpayne@69: # AH - for testing jpayne@69: self._version = 40 jpayne@69: elif value == "VCFv4.2": jpayne@69: # AH - for testing jpayne@69: self._version = 40 jpayne@69: else: jpayne@69: self.error(line,self.UNKNOWN_FORMAT_STRING) jpayne@69: elif key == "INFO": jpayne@69: f = self.parse_format(line, value) jpayne@69: self._info[ f.id ] = f jpayne@69: elif key == "FILTER": jpayne@69: f = self.parse_format(line, value, filter=True) jpayne@69: self._filter[ f.id ] = f jpayne@69: elif key == "FORMAT": jpayne@69: f = self.parse_format(line, value) jpayne@69: self._format[ f.id ] = f jpayne@69: else: jpayne@69: # keep other keys in the header field jpayne@69: self._header.append( (key,value) ) jpayne@69: jpayne@69: jpayne@69: def write_header( self, stream ): jpayne@69: stream.write("##fileformat=VCFv%s.%s\n" % (self._version // 10, self._version % 10)) jpayne@69: for key,value in self._header: stream.write("##%s=%s\n" % (key,value)) jpayne@69: for var,label in [(self._info,"INFO"),(self._filter,"FILTER"),(self._format,"FORMAT")]: jpayne@69: for f in var.itervalues(): stream.write("##%s=%s\n" % (label,self.format_format(f,filter=(label=="FILTER")))) jpayne@69: jpayne@69: jpayne@69: def parse_heading( self, line ): jpayne@69: assert line.startswith('#') jpayne@69: assert not line.startswith('##') jpayne@69: headings = line[1:].split('\t') jpayne@69: # test for 8, as FORMAT field might be missing jpayne@69: if len(headings)==1 and len(line[1:].split()) >= 8: jpayne@69: self.error(line,self.HEADING_NOT_SEPARATED_BY_TABS) jpayne@69: headings = line[1:].split() jpayne@69: jpayne@69: for i,s in enumerate(self._required): jpayne@69: jpayne@69: if len(headings)<=i or headings[i] != s: jpayne@69: jpayne@69: if len(headings) <= i: jpayne@69: err = "(%sth entry not found)" % (i+1) jpayne@69: else: jpayne@69: err = "(found %s, expected %s)" % (headings[i],s) jpayne@69: jpayne@69: #self.error(line,self.BADLY_FORMATTED_HEADING,err) jpayne@69: # allow FORMAT column to be absent jpayne@69: if len(headings) == 8: jpayne@69: headings.append("FORMAT") jpayne@69: else: jpayne@69: self.error(line,self.BADLY_FORMATTED_HEADING,err) jpayne@69: jpayne@69: self._samples = headings[9:] jpayne@69: self._sample2column = dict( [(y,x+9) for x,y in enumerate( self._samples ) ] ) jpayne@69: jpayne@69: def write_heading( self, stream ): jpayne@69: stream.write("#" + "\t".join(self._required + self._samples) + "\n") jpayne@69: jpayne@69: def convertGT(self, GTstring): jpayne@69: if GTstring == ".": return ["."] jpayne@69: try: jpayne@69: gts = gtsRegEx.split(GTstring) jpayne@69: if len(gts) == 1: return [int(gts[0])] jpayne@69: if len(gts) != 2: raise ValueError() jpayne@69: if gts[0] == "." and gts[1] == ".": return [gts[0],GTstring[len(gts[0]):-len(gts[1])],gts[1]] jpayne@69: return [int(gts[0]),GTstring[len(gts[0]):-len(gts[1])],int(gts[1])] jpayne@69: except ValueError: jpayne@69: self.error(self._line,self.BAD_GENOTYPE,GTstring) jpayne@69: return [".","|","."] jpayne@69: jpayne@69: def convertGTback(self, GTdata): jpayne@69: return ''.join(map(str,GTdata)) jpayne@69: jpayne@69: def parse_formatdata( self, key, value, formatdict, line ): jpayne@69: # To do: check that the right number of values is present jpayne@69: f = formatdict.get(key,None) jpayne@69: if f == None: jpayne@69: self._add_definition(formatdict, key, value, line ) jpayne@69: f = formatdict[key] jpayne@69: if f.type == "Flag": jpayne@69: if value is not None: self.error(line,self.ERROR_FLAG_HAS_VALUE) jpayne@69: return [] jpayne@69: values = value.split(',') jpayne@69: # deal with trailing data in some early VCF files jpayne@69: if f.type in ["Float","Integer"] and len(values)>0 and values[-1].find(';') > -1: jpayne@69: self.error(line,self.ERROR_TRAILING_DATA,values[-1]) jpayne@69: values[-1] = values[-1].split(';')[0] jpayne@69: if f.type == "Integer": jpayne@69: for idx,v in enumerate(values): jpayne@69: try: jpayne@69: if v == ".": values[idx] = f.missingvalue jpayne@69: else: values[idx] = int(v) jpayne@69: except: jpayne@69: self.error(line,self.ERROR_FORMAT_NOT_INTEGER,"%s=%s" % (key, str(values))) jpayne@69: return [0] * len(values) jpayne@69: return values jpayne@69: elif f.type == "String": jpayne@69: self._line = line jpayne@69: if f.id == "GT": values = list(map( self.convertGT, values )) jpayne@69: return values jpayne@69: elif f.type == "Character": jpayne@69: for v in values: jpayne@69: if len(v) != 1: self.error(line,self.ERROR_FORMAT_NOT_CHAR) jpayne@69: return values jpayne@69: elif f.type == "Float": jpayne@69: for idx,v in enumerate(values): jpayne@69: if v == ".": values[idx] = f.missingvalue jpayne@69: try: return list(map(float,values)) jpayne@69: except: jpayne@69: self.error(line,self.ERROR_FORMAT_NOT_NUMERICAL,"%s=%s" % (key, str(values))) jpayne@69: return [0.0] * len(values) jpayne@69: else: jpayne@69: # can't happen jpayne@69: self.error(line,self.ERROR_INFO_STRING) jpayne@69: jpayne@69: def inregion(self, chrom, pos): jpayne@69: if not self._regions: return True jpayne@69: for r in self._regions: jpayne@69: if r[0] == chrom and r[1] <= pos < r[2]: return True jpayne@69: return False jpayne@69: jpayne@69: def parse_data( self, line, lineparse=False ): jpayne@69: cols = line.split('\t') jpayne@69: if len(cols) != len(self._samples)+9: jpayne@69: # gracefully deal with absent FORMAT column jpayne@69: # and those missing samples jpayne@69: if len(cols) == 8: jpayne@69: cols.append("") jpayne@69: else: jpayne@69: self.error(line, jpayne@69: self.BAD_NUMBER_OF_COLUMNS, jpayne@69: "expected %s for %s samples (%s), got %s" % (len(self._samples)+9, len(self._samples), self._samples, len(cols))) jpayne@69: jpayne@69: chrom = cols[0] jpayne@69: jpayne@69: # get 0-based position jpayne@69: try: pos = int(cols[1])-1 jpayne@69: except: self.error(line,self.POS_NOT_NUMERICAL) jpayne@69: if pos < 0: self.error(line,self.POS_NOT_POSITIVE) jpayne@69: jpayne@69: # implement filtering jpayne@69: if not self.inregion(chrom,pos): return None jpayne@69: jpayne@69: # end of first-pass parse for sortedVCF jpayne@69: if lineparse: return chrom, pos, line jpayne@69: jpayne@69: id = cols[2] jpayne@69: jpayne@69: ref = cols[3].upper() jpayne@69: if ref == ".": jpayne@69: self.error(line,self.MISSING_REF) jpayne@69: if self._version == 33: ref = get_sequence(chrom,pos,pos+1,self._reference) jpayne@69: else: ref = "" jpayne@69: else: jpayne@69: for c in ref: jpayne@69: if c not in "ACGTN": self.error(line,self.UNKNOWN_CHAR_IN_REF) jpayne@69: if "N" in ref: ref = get_sequence(chrom,pos,pos+len(ref),self._reference) jpayne@69: jpayne@69: # make sure reference is sane jpayne@69: if self._reference: jpayne@69: left = max(0,pos-100) jpayne@69: faref_leftflank = get_sequence(chrom,left,pos+len(ref),self._reference) jpayne@69: faref = faref_leftflank[pos-left:] jpayne@69: if faref != ref: self.error(line,self.WRONG_REF,"(reference is %s, VCF says %s)" % (faref,ref)) jpayne@69: ref = faref jpayne@69: jpayne@69: # convert v3.3 to v4.0 alleles below jpayne@69: if cols[4] == ".": alt = [] jpayne@69: else: alt = cols[4].upper().split(',') jpayne@69: jpayne@69: if cols[5] == ".": qual = -1 jpayne@69: else: jpayne@69: try: qual = float(cols[5]) jpayne@69: except: self.error(line,self.QUAL_NOT_NUMERICAL) jpayne@69: jpayne@69: # postpone checking that filters exist. Encode missing filter or no filtering as empty list jpayne@69: if cols[6] == "." or cols[6] == "PASS" or cols[6] == "0": filter = [] jpayne@69: else: filter = cols[6].split(';') jpayne@69: jpayne@69: # dictionary of keys, and list of values jpayne@69: info = {} jpayne@69: if cols[7] != ".": jpayne@69: for blurp in cols[7].split(';'): jpayne@69: elts = blurp.split('=') jpayne@69: if len(elts) == 1: v = None jpayne@69: elif len(elts) == 2: v = elts[1] jpayne@69: else: self.error(line,self.ERROR_INFO_STRING) jpayne@69: info[elts[0]] = self.parse_formatdata(elts[0], jpayne@69: v, jpayne@69: self._info, jpayne@69: line) jpayne@69: jpayne@69: # Gracefully deal with absent FORMAT column jpayne@69: if cols[8] == "": format = [] jpayne@69: else: format = cols[8].split(':') jpayne@69: jpayne@69: # check: all filters are defined jpayne@69: for f in filter: jpayne@69: if f not in self._filter: self.error(line,self.FILTER_NOT_DEFINED, f) jpayne@69: jpayne@69: # check: format fields are defined jpayne@69: if self._format: jpayne@69: for f in format: jpayne@69: if f not in self._format: self.error(line,self.FORMAT_NOT_DEFINED, f) jpayne@69: jpayne@69: # convert v3.3 alleles jpayne@69: if self._version == 33: jpayne@69: if len(ref) != 1: self.error(line,self.V33_BAD_REF) jpayne@69: newalts = [] jpayne@69: have_deletions = False jpayne@69: for a in alt: jpayne@69: if len(a) == 1: a = a + ref[1:] # SNP; add trailing reference jpayne@69: elif a.startswith('I'): a = ref[0] + a[1:] + ref[1:] # insertion just beyond pos; add first and trailing reference jpayne@69: elif a.startswith('D'): # allow D and D jpayne@69: have_deletions = True jpayne@69: try: jpayne@69: l = int(a[1:]) # throws ValueError if sequence jpayne@69: if len(ref) < l: # add to reference if necessary jpayne@69: addns = get_sequence(chrom,pos+len(ref),pos+l,self._reference) jpayne@69: ref += addns jpayne@69: for i,na in enumerate(newalts): newalts[i] = na+addns jpayne@69: a = ref[l:] # new deletion, deleting pos...pos+l jpayne@69: except ValueError: jpayne@69: s = a[1:] jpayne@69: if len(ref) < len(s): # add Ns to reference if necessary jpayne@69: addns = get_sequence(chrom,pos+len(ref),pos+len(s),self._reference) jpayne@69: if not s.endswith(addns) and addns != 'N'*len(addns): jpayne@69: self.error(line,self.V33_UNMATCHED_DELETION, jpayne@69: "(deletion is %s, reference is %s)" % (a,get_sequence(chrom,pos,pos+len(s),self._reference))) jpayne@69: ref += addns jpayne@69: for i,na in enumerate(newalts): newalts[i] = na+addns jpayne@69: a = ref[len(s):] # new deletion, deleting from pos jpayne@69: else: jpayne@69: self.error(line,self.V33_BAD_ALLELE) jpayne@69: newalts.append(a) jpayne@69: alt = newalts jpayne@69: # deletion alleles exist, add dummy 1st reference allele, and account for leading base jpayne@69: if have_deletions: jpayne@69: if pos == 0: jpayne@69: # Petr Danacek's: we can't have a leading nucleotide at (1-based) position 1 jpayne@69: addn = get_sequence(chrom,pos+len(ref),pos+len(ref)+1,self._reference) jpayne@69: ref += addn jpayne@69: alt = [allele+addn for allele in alt] jpayne@69: else: jpayne@69: addn = get_sequence(chrom,pos-1,pos,self._reference) jpayne@69: ref = addn + ref jpayne@69: alt = [addn + allele for allele in alt] jpayne@69: pos -= 1 jpayne@69: else: jpayne@69: # format v4.0 -- just check for nucleotides jpayne@69: for allele in alt: jpayne@69: if not alleleRegEx.match(allele): jpayne@69: self.error(line,self.V40_BAD_ALLELE,allele) jpayne@69: jpayne@69: # check for leading nucleotide in indel calls jpayne@69: for allele in alt: jpayne@69: if len(allele) != len(ref): jpayne@69: if len(allele) == 0: self.error(line,self.ZERO_LENGTH_ALLELE) jpayne@69: if ref[0].upper() != allele[0].upper() and "N" not in (ref[0]+allele[0]).upper(): jpayne@69: self.error(line,self.MISSING_INDEL_ALLELE_REF_BASE) jpayne@69: jpayne@69: # trim trailing bases in alleles jpayne@69: # AH: not certain why trimming this needs to be added jpayne@69: # disabled now for unit testing jpayne@69: # if alt: jpayne@69: # for i in range(1,min(len(ref),min(map(len,alt)))): jpayne@69: # if len(set(allele[-1].upper() for allele in alt)) > 1 or ref[-1].upper() != alt[0][-1].upper(): jpayne@69: # break jpayne@69: # ref, alt = ref[:-1], [allele[:-1] for allele in alt] jpayne@69: jpayne@69: # left-align alleles, if a reference is available jpayne@69: if self._leftalign and self._reference: jpayne@69: while left < pos: jpayne@69: movable = True jpayne@69: for allele in alt: jpayne@69: if len(allele) > len(ref): jpayne@69: longest, shortest = allele, ref jpayne@69: else: jpayne@69: longest, shortest = ref, allele jpayne@69: if len(longest) == len(shortest) or longest[:len(shortest)].upper() != shortest.upper(): jpayne@69: movable = False jpayne@69: if longest[-1].upper() != longest[len(shortest)-1].upper(): jpayne@69: movable = False jpayne@69: if not movable: jpayne@69: break jpayne@69: ref = ref[:-1] jpayne@69: alt = [allele[:-1] for allele in alt] jpayne@69: if min([len(allele) for allele in alt]) == 0 or len(ref) == 0: jpayne@69: ref = faref_leftflank[pos-left-1] + ref jpayne@69: alt = [faref_leftflank[pos-left-1] + allele for allele in alt] jpayne@69: pos -= 1 jpayne@69: jpayne@69: # parse sample columns jpayne@69: samples = [] jpayne@69: for sample in cols[9:]: jpayne@69: dict = {} jpayne@69: values = sample.split(':') jpayne@69: if len(values) > len(format): jpayne@69: self.error(line,self.BAD_NUMBER_OF_VALUES,"(found %s values in element %s; expected %s)" % (len(values),sample,len(format))) jpayne@69: for idx in range(len(format)): jpayne@69: expected = self.get_expected(format[idx], self._format, alt) jpayne@69: if idx < len(values): value = values[idx] jpayne@69: else: jpayne@69: if expected == -1: value = "." jpayne@69: else: value = ",".join(["."]*expected) jpayne@69: jpayne@69: dict[format[idx]] = self.parse_formatdata(format[idx], jpayne@69: value, jpayne@69: self._format, jpayne@69: line) jpayne@69: if expected != -1 and len(dict[format[idx]]) != expected: jpayne@69: self.error(line,self.BAD_NUMBER_OF_PARAMETERS, jpayne@69: "id=%s, expected %s parameters, got %s" % (format[idx],expected,dict[format[idx]])) jpayne@69: if len(dict[format[idx]] ) < expected: dict[format[idx]] += [dict[format[idx]][-1]]*(expected-len(dict[format[idx]])) jpayne@69: dict[format[idx]] = dict[format[idx]][:expected] jpayne@69: samples.append( dict ) jpayne@69: jpayne@69: # done jpayne@69: d = {'chrom':chrom, jpayne@69: 'pos':pos, # return 0-based position jpayne@69: 'id':id, jpayne@69: 'ref':ref, jpayne@69: 'alt':alt, jpayne@69: 'qual':qual, jpayne@69: 'filter':filter, jpayne@69: 'info':info, jpayne@69: 'format':format} jpayne@69: for key,value in zip(self._samples,samples): jpayne@69: d[key] = value jpayne@69: jpayne@69: return d jpayne@69: jpayne@69: jpayne@69: def write_data(self, stream, data): jpayne@69: required = ['chrom','pos','id','ref','alt','qual','filter','info','format'] + self._samples jpayne@69: for k in required: jpayne@69: if k not in data: raise ValueError("Required key %s not found in data" % str(k)) jpayne@69: if data['alt'] == []: alt = "." jpayne@69: else: alt = ",".join(data['alt']) jpayne@69: if data['filter'] == None: filter = "." jpayne@69: elif data['filter'] == []: jpayne@69: if self._version == 33: filter = "0" jpayne@69: else: filter = "PASS" jpayne@69: else: filter = ';'.join(data['filter']) jpayne@69: if data['qual'] == -1: qual = "." jpayne@69: else: qual = str(data['qual']) jpayne@69: jpayne@69: output = [data['chrom'], jpayne@69: str(data['pos']+1), # change to 1-based position jpayne@69: data['id'], jpayne@69: data['ref'], jpayne@69: alt, jpayne@69: qual, jpayne@69: filter, jpayne@69: self.format_formatdata( jpayne@69: data['info'], self._info, separator=";"), jpayne@69: self.format_formatdata( jpayne@69: data['format'], self._format, value=False)] jpayne@69: jpayne@69: for s in self._samples: jpayne@69: output.append(self.format_formatdata( jpayne@69: data[s], self._format, key=False)) jpayne@69: jpayne@69: stream.write( "\t".join(output) + "\n" ) jpayne@69: jpayne@69: def _parse_header(self, stream): jpayne@69: self._lineno = 0 jpayne@69: for line in stream: jpayne@69: line = force_str(line, self.encoding) jpayne@69: self._lineno += 1 jpayne@69: if line.startswith('##'): jpayne@69: self.parse_header(line.strip()) jpayne@69: elif line.startswith('#'): jpayne@69: self.parse_heading(line.strip()) jpayne@69: self.enter_default_format() jpayne@69: else: jpayne@69: break jpayne@69: return line jpayne@69: jpayne@69: def _parse(self, line, stream): jpayne@69: # deal with files with header only jpayne@69: if line.startswith("##"): return jpayne@69: if len(line.strip()) > 0: jpayne@69: d = self.parse_data( line.strip() ) jpayne@69: if d: yield d jpayne@69: for line in stream: jpayne@69: self._lineno += 1 jpayne@69: if self._lines and self._lineno > self._lines: raise StopIteration jpayne@69: d = self.parse_data( line.strip() ) jpayne@69: if d: yield d jpayne@69: jpayne@69: ###################################################################################################### jpayne@69: # jpayne@69: # API follows jpayne@69: # jpayne@69: ###################################################################################################### jpayne@69: jpayne@69: def getsamples(self): jpayne@69: """ List of samples in VCF file """ jpayne@69: return self._samples jpayne@69: jpayne@69: def setsamples(self,samples): jpayne@69: """ List of samples in VCF file """ jpayne@69: self._samples = samples jpayne@69: jpayne@69: def getheader(self): jpayne@69: """ List of header key-value pairs (strings) """ jpayne@69: return self._header jpayne@69: jpayne@69: def setheader(self,header): jpayne@69: """ List of header key-value pairs (strings) """ jpayne@69: self._header = header jpayne@69: jpayne@69: def getinfo(self): jpayne@69: """ Dictionary of ##INFO tags, as VCF.FORMAT values """ jpayne@69: return self._info jpayne@69: jpayne@69: def setinfo(self,info): jpayne@69: """ Dictionary of ##INFO tags, as VCF.FORMAT values """ jpayne@69: self._info = info jpayne@69: jpayne@69: def getformat(self): jpayne@69: """ Dictionary of ##FORMAT tags, as VCF.FORMAT values """ jpayne@69: return self._format jpayne@69: jpayne@69: def setformat(self,format): jpayne@69: """ Dictionary of ##FORMAT tags, as VCF.FORMAT values """ jpayne@69: self._format = format jpayne@69: jpayne@69: def getfilter(self): jpayne@69: """ Dictionary of ##FILTER tags, as VCF.FORMAT values """ jpayne@69: return self._filter jpayne@69: jpayne@69: def setfilter(self,filter): jpayne@69: """ Dictionary of ##FILTER tags, as VCF.FORMAT values """ jpayne@69: self._filter = filter jpayne@69: jpayne@69: def setversion(self, version): jpayne@69: if version != 33 and version != 40: raise ValueError("Can only handle v3.3 and v4.0 VCF files") jpayne@69: self._version = version jpayne@69: jpayne@69: def setregions(self, regions): jpayne@69: self._regions = regions jpayne@69: jpayne@69: def setreference(self, ref): jpayne@69: """ Provide a reference sequence; a Python class supporting a fetch(chromosome, start, end) method, e.g. PySam.FastaFile """ jpayne@69: self._reference = ref jpayne@69: jpayne@69: def ignoreerror(self, errorstring): jpayne@69: try: self._ignored_errors.add(self.__dict__[errorstring]) jpayne@69: except KeyError: raise ValueError("Invalid error string: %s" % errorstring) jpayne@69: jpayne@69: def warnerror(self, errorstring): jpayne@69: try: self._warn_errors.add(self.__dict__[errorstring]) jpayne@69: except KeyError: raise ValueError("Invalid error string: %s" % errorstring) jpayne@69: jpayne@69: def parse(self, stream): jpayne@69: """ Parse a stream of VCF-formatted lines. Initializes class instance and return generator """ jpayne@69: last_line = self._parse_header(stream) jpayne@69: # now return a generator that does the actual work. In this way the pre-processing is done jpayne@69: # before the first piece of data is yielded jpayne@69: return self._parse(last_line, stream) jpayne@69: jpayne@69: def write(self, stream, datagenerator): jpayne@69: """ Writes a VCF file to a stream, using a data generator (or list) """ jpayne@69: self.write_header(stream) jpayne@69: self.write_heading(stream) jpayne@69: for data in datagenerator: self.write_data(stream,data) jpayne@69: jpayne@69: def writeheader(self, stream): jpayne@69: """ Writes a VCF header """ jpayne@69: self.write_header(stream) jpayne@69: self.write_heading(stream) jpayne@69: jpayne@69: def compare_calls(self, pos1, ref1, alt1, pos2, ref2, alt2): jpayne@69: """ Utility function: compares two calls for equality """ jpayne@69: # a variant should always be assigned to a unique position, one base before jpayne@69: # the leftmost position of the alignment gap. If this rule is implemented jpayne@69: # correctly, the two positions must be equal for the calls to be identical. jpayne@69: if pos1 != pos2: return False jpayne@69: # from both calls, trim rightmost bases when identical. Do this safely, i.e. jpayne@69: # only when the reference bases are not Ns jpayne@69: while len(ref1)>0 and len(alt1)>0 and ref1[-1] == alt1[-1]: jpayne@69: ref1 = ref1[:-1] jpayne@69: alt1 = alt1[:-1] jpayne@69: while len(ref2)>0 and len(alt2)>0 and ref2[-1] == alt2[-1]: jpayne@69: ref2 = ref2[:-1] jpayne@69: alt2 = alt2[:-1] jpayne@69: # now, the alternative alleles must be identical jpayne@69: return alt1 == alt2 jpayne@69: jpayne@69: ########################################################################################################### jpayne@69: ########################################################################################################### jpayne@69: ## API functions added by Andreas jpayne@69: ########################################################################################################### jpayne@69: jpayne@69: def connect(self, filename, encoding="ascii"): jpayne@69: '''connect to tabix file.''' jpayne@69: self.encoding=encoding jpayne@69: self.tabixfile = pysam.Tabixfile(filename, encoding=encoding) jpayne@69: self._parse_header(self.tabixfile.header) jpayne@69: jpayne@69: def __del__(self): jpayne@69: self.close() jpayne@69: self.tabixfile = None jpayne@69: jpayne@69: def close(self): jpayne@69: if self.tabixfile: jpayne@69: self.tabixfile.close() jpayne@69: self.tabixfile = None jpayne@69: jpayne@69: def fetch(self, jpayne@69: reference=None, jpayne@69: start=None, jpayne@69: end=None, jpayne@69: region=None ): jpayne@69: """ Parse a stream of VCF-formatted lines. jpayne@69: Initializes class instance and return generator """ jpayne@69: return self.tabixfile.fetch( jpayne@69: reference, jpayne@69: start, jpayne@69: end, jpayne@69: region, jpayne@69: parser = asVCFRecord(self)) jpayne@69: jpayne@69: def validate(self, record): jpayne@69: '''validate vcf record. jpayne@69: jpayne@69: returns a validated record. jpayne@69: ''' jpayne@69: jpayne@69: raise NotImplementedError("needs to be checked") jpayne@69: jpayne@69: chrom, pos = record.chrom, record.pos jpayne@69: jpayne@69: # check reference jpayne@69: ref = record.ref jpayne@69: if ref == ".": jpayne@69: self.error(str(record),self.MISSING_REF) jpayne@69: if self._version == 33: ref = get_sequence(chrom,pos,pos+1,self._reference) jpayne@69: else: ref = "" jpayne@69: else: jpayne@69: for c in ref: jpayne@69: if c not in "ACGTN": self.error(str(record),self.UNKNOWN_CHAR_IN_REF) jpayne@69: if "N" in ref: ref = get_sequence(chrom, jpayne@69: pos, jpayne@69: pos+len(ref), jpayne@69: self._reference) jpayne@69: jpayne@69: # make sure reference is sane jpayne@69: if self._reference: jpayne@69: left = max(0,self.pos-100) jpayne@69: faref_leftflank = get_sequence(chrom,left,self.pos+len(ref),self._reference) jpayne@69: faref = faref_leftflank[pos-left:] jpayne@69: if faref != ref: self.error(str(record),self.WRONG_REF,"(reference is %s, VCF says %s)" % (faref,ref)) jpayne@69: ref = faref jpayne@69: jpayne@69: # check: format fields are defined jpayne@69: for f in record.format: jpayne@69: if f not in self._format: self.error(str(record),self.FORMAT_NOT_DEFINED, f) jpayne@69: jpayne@69: # check: all filters are defined jpayne@69: for f in record.filter: jpayne@69: if f not in self._filter: self.error(str(record),self.FILTER_NOT_DEFINED, f) jpayne@69: jpayne@69: # convert v3.3 alleles jpayne@69: if self._version == 33: jpayne@69: if len(ref) != 1: self.error(str(record),self.V33_BAD_REF) jpayne@69: newalts = [] jpayne@69: have_deletions = False jpayne@69: for a in alt: jpayne@69: if len(a) == 1: a = a + ref[1:] # SNP; add trailing reference jpayne@69: elif a.startswith('I'): a = ref[0] + a[1:] + ref[1:] # insertion just beyond pos; add first and trailing reference jpayne@69: elif a.startswith('D'): # allow D and D jpayne@69: have_deletions = True jpayne@69: try: jpayne@69: l = int(a[1:]) # throws ValueError if sequence jpayne@69: if len(ref) < l: # add to reference if necessary jpayne@69: addns = get_sequence(chrom,pos+len(ref),pos+l,self._reference) jpayne@69: ref += addns jpayne@69: for i,na in enumerate(newalts): newalts[i] = na+addns jpayne@69: a = ref[l:] # new deletion, deleting pos...pos+l jpayne@69: except ValueError: jpayne@69: s = a[1:] jpayne@69: if len(ref) < len(s): # add Ns to reference if necessary jpayne@69: addns = get_sequence(chrom,pos+len(ref),pos+len(s),self._reference) jpayne@69: if not s.endswith(addns) and addns != 'N'*len(addns): jpayne@69: self.error(str(record),self.V33_UNMATCHED_DELETION, jpayne@69: "(deletion is %s, reference is %s)" % (a,get_sequence(chrom,pos,pos+len(s),self._reference))) jpayne@69: ref += addns jpayne@69: for i,na in enumerate(newalts): newalts[i] = na+addns jpayne@69: a = ref[len(s):] # new deletion, deleting from pos jpayne@69: else: jpayne@69: self.error(str(record),self.V33_BAD_ALLELE) jpayne@69: newalts.append(a) jpayne@69: alt = newalts jpayne@69: # deletion alleles exist, add dummy 1st reference allele, and account for leading base jpayne@69: if have_deletions: jpayne@69: if pos == 0: jpayne@69: # Petr Danacek's: we can't have a leading nucleotide at (1-based) position 1 jpayne@69: addn = get_sequence(chrom,pos+len(ref),pos+len(ref)+1,self._reference) jpayne@69: ref += addn jpayne@69: alt = [allele+addn for allele in alt] jpayne@69: else: jpayne@69: addn = get_sequence(chrom,pos-1,pos,self._reference) jpayne@69: ref = addn + ref jpayne@69: alt = [addn + allele for allele in alt] jpayne@69: pos -= 1 jpayne@69: else: jpayne@69: # format v4.0 -- just check for nucleotides jpayne@69: for allele in alt: jpayne@69: if not alleleRegEx.match(allele): jpayne@69: self.error(str(record),self.V40_BAD_ALLELE,allele) jpayne@69: jpayne@69: jpayne@69: # check for leading nucleotide in indel calls jpayne@69: for allele in alt: jpayne@69: if len(allele) != len(ref): jpayne@69: if len(allele) == 0: self.error(str(record),self.ZERO_LENGTH_ALLELE) jpayne@69: if ref[0].upper() != allele[0].upper() and "N" not in (ref[0]+allele[0]).upper(): jpayne@69: self.error(str(record),self.MISSING_INDEL_ALLELE_REF_BASE) jpayne@69: jpayne@69: # trim trailing bases in alleles jpayne@69: # AH: not certain why trimming this needs to be added jpayne@69: # disabled now for unit testing jpayne@69: # for i in range(1,min(len(ref),min(map(len,alt)))): jpayne@69: # if len(set(allele[-1].upper() for allele in alt)) > 1 or ref[-1].upper() != alt[0][-1].upper(): jpayne@69: # break jpayne@69: # ref, alt = ref[:-1], [allele[:-1] for allele in alt] jpayne@69: jpayne@69: # left-align alleles, if a reference is available jpayne@69: if self._leftalign and self._reference: jpayne@69: while left < pos: jpayne@69: movable = True jpayne@69: for allele in alt: jpayne@69: if len(allele) > len(ref): jpayne@69: longest, shortest = allele, ref jpayne@69: else: jpayne@69: longest, shortest = ref, allele jpayne@69: if len(longest) == len(shortest) or longest[:len(shortest)].upper() != shortest.upper(): jpayne@69: movable = False jpayne@69: if longest[-1].upper() != longest[len(shortest)-1].upper(): jpayne@69: movable = False jpayne@69: if not movable: jpayne@69: break jpayne@69: ref = ref[:-1] jpayne@69: alt = [allele[:-1] for allele in alt] jpayne@69: if min([len(allele) for allele in alt]) == 0 or len(ref) == 0: jpayne@69: ref = faref_leftflank[pos-left-1] + ref jpayne@69: alt = [faref_leftflank[pos-left-1] + allele for allele in alt] jpayne@69: pos -= 1 jpayne@69: jpayne@69: __all__ = [ jpayne@69: "VCF", "VCFRecord", ]