jpayne@69: '''Tools for working with files in the samtools pileup -c format.''' jpayne@69: import collections jpayne@69: import pysam jpayne@69: jpayne@69: PileupSubstitution = collections.namedtuple("PileupSubstitution", jpayne@69: ( jpayne@69: "chromosome", jpayne@69: "pos", jpayne@69: "reference_base", jpayne@69: "genotype", jpayne@69: "consensus_quality", jpayne@69: "snp_quality", jpayne@69: "mapping_quality", jpayne@69: "coverage", jpayne@69: "read_bases", jpayne@69: "base_qualities")) jpayne@69: jpayne@69: PileupIndel = collections.namedtuple("PileupIndel", jpayne@69: ( jpayne@69: "chromosome", jpayne@69: "pos", jpayne@69: "reference_base", jpayne@69: "genotype", jpayne@69: "consensus_quality", jpayne@69: "snp_quality", jpayne@69: "mapping_quality", jpayne@69: "coverage", jpayne@69: "first_allele", jpayne@69: "second_allele", jpayne@69: "reads_first", jpayne@69: "reads_second", jpayne@69: "reads_diff")) jpayne@69: jpayne@69: jpayne@69: def iterate(infile): jpayne@69: '''iterate over ``samtools pileup -c`` formatted file. jpayne@69: jpayne@69: *infile* can be any iterator over a lines. jpayne@69: jpayne@69: The function yields named tuples of the type :class:`pysam.Pileup.PileupSubstitution` jpayne@69: or :class:`pysam.Pileup.PileupIndel`. jpayne@69: jpayne@69: .. note:: jpayne@69: jpayne@69: The parser converts to 0-based coordinates jpayne@69: ''' jpayne@69: jpayne@69: conv_subst = (str, lambda x: int(x) - 1, str, jpayne@69: str, int, int, int, int, str, str) jpayne@69: conv_indel = (str, lambda x: int(x) - 1, str, str, int, jpayne@69: int, int, int, str, str, int, int, int) jpayne@69: jpayne@69: for line in infile: jpayne@69: d = line[:-1].split() jpayne@69: if d[2] == "*": jpayne@69: try: jpayne@69: yield PileupIndel(*[x(y) for x, y in zip(conv_indel, d)]) jpayne@69: except TypeError: jpayne@69: raise pysam.SamtoolsError("parsing error in line: `%s`" % line) jpayne@69: else: jpayne@69: try: jpayne@69: yield PileupSubstitution(*[x(y) for x, y in zip(conv_subst, d)]) jpayne@69: except TypeError: jpayne@69: raise pysam.SamtoolsError("parsing error in line: `%s`" % line) jpayne@69: jpayne@69: jpayne@69: ENCODE_GENOTYPE = { jpayne@69: 'A': 'A', 'C': 'C', 'G': 'G', 'T': 'T', jpayne@69: 'AA': 'A', 'CC': 'C', 'GG': 'G', 'TT': 'T', 'UU': 'U', jpayne@69: 'AG': 'r', 'GA': 'R', jpayne@69: 'CT': 'y', 'TC': 'Y', jpayne@69: 'AC': 'm', 'CA': 'M', jpayne@69: 'GT': 'k', 'TG': 'K', jpayne@69: 'CG': 's', 'GC': 'S', jpayne@69: 'AT': 'w', 'TA': 'W', jpayne@69: } jpayne@69: jpayne@69: DECODE_GENOTYPE = { jpayne@69: 'A': 'AA', jpayne@69: 'C': 'CC', jpayne@69: 'G': 'GG', jpayne@69: 'T': 'TT', jpayne@69: 'r': 'AG', 'R': 'AG', jpayne@69: 'y': 'CT', 'Y': 'CT', jpayne@69: 'm': 'AC', 'M': 'AC', jpayne@69: 'k': 'GT', 'K': 'GT', jpayne@69: 's': 'CG', 'S': 'CG', jpayne@69: 'w': 'AT', 'W': 'AT', jpayne@69: } jpayne@69: jpayne@69: # ------------------------------------------------------------ jpayne@69: jpayne@69: jpayne@69: def encodeGenotype(code): jpayne@69: '''encode genotypes like GG, GA into a one-letter code. jpayne@69: The returned code is lower case if code[0] < code[1], otherwise jpayne@69: it is uppercase. jpayne@69: ''' jpayne@69: return ENCODE_GENOTYPE[code.upper()] jpayne@69: jpayne@69: jpayne@69: def decodeGenotype(code): jpayne@69: '''decode single letter genotypes like m, M into two letters. jpayne@69: This is the reverse operation to :meth:`encodeGenotype`. jpayne@69: ''' jpayne@69: return DECODE_GENOTYPE[code] jpayne@69: jpayne@69: jpayne@69: def translateIndelGenotypeFromVCF(vcf_genotypes, ref): jpayne@69: '''translate indel from vcf to pileup format.''' jpayne@69: jpayne@69: # indels jpayne@69: def getPrefix(s1, s2): jpayne@69: '''get common prefix of strings s1 and s2.''' jpayne@69: n = min(len(s1), len(s2)) jpayne@69: for x in range(n): jpayne@69: if s1[x] != s2[x]: jpayne@69: return s1[:x] jpayne@69: return s1[:n] jpayne@69: jpayne@69: def getSuffix(s1, s2): jpayne@69: '''get common sufix of strings s1 and s2.''' jpayne@69: n = min(len(s1), len(s2)) jpayne@69: if s1[-1] != s2[-1]: jpayne@69: return "" jpayne@69: for x in range(-2, -n - 1, -1): jpayne@69: if s1[x] != s2[x]: jpayne@69: return s1[x + 1:] jpayne@69: return s1[-n:] jpayne@69: jpayne@69: def getGenotype(variant, ref): jpayne@69: jpayne@69: if variant == ref: jpayne@69: return "*", 0 jpayne@69: jpayne@69: if len(ref) > len(variant): jpayne@69: # is a deletion jpayne@69: if ref.startswith(variant): jpayne@69: return "-%s" % ref[len(variant):], len(variant) - 1 jpayne@69: elif ref.endswith(variant): jpayne@69: return "-%s" % ref[:-len(variant)], -1 jpayne@69: else: jpayne@69: prefix = getPrefix(ref, variant) jpayne@69: suffix = getSuffix(ref, variant) jpayne@69: shared = len(prefix) + len(suffix) - len(variant) jpayne@69: # print "-", prefix, suffix, ref, variant, shared, len(prefix), len(suffix), len(ref) jpayne@69: if shared < 0: jpayne@69: raise ValueError() jpayne@69: return "-%s" % ref[len(prefix):-(len(suffix) - shared)], len(prefix) - 1 jpayne@69: jpayne@69: elif len(ref) < len(variant): jpayne@69: # is an insertion jpayne@69: if variant.startswith(ref): jpayne@69: return "+%s" % variant[len(ref):], len(ref) - 1 jpayne@69: elif variant.endswith(ref): jpayne@69: return "+%s" % variant[:len(ref)], 0 jpayne@69: else: jpayne@69: prefix = getPrefix(ref, variant) jpayne@69: suffix = getSuffix(ref, variant) jpayne@69: shared = len(prefix) + len(suffix) - len(ref) jpayne@69: if shared < 0: jpayne@69: raise ValueError() jpayne@69: jpayne@69: return "+%s" % variant[len(prefix):-(len(suffix) - shared)], len(prefix) jpayne@69: else: jpayne@69: assert 0, "snp?" jpayne@69: jpayne@69: # in pileup, the position refers to the base jpayne@69: # after the coordinate, hence subtract 1 jpayne@69: # pos -= 1 jpayne@69: jpayne@69: genotypes, offsets = [], [] jpayne@69: is_error = True jpayne@69: jpayne@69: for variant in vcf_genotypes: jpayne@69: try: jpayne@69: g, offset = getGenotype(variant, ref) jpayne@69: except ValueError: jpayne@69: break jpayne@69: jpayne@69: genotypes.append(g) jpayne@69: if g != "*": jpayne@69: offsets.append(offset) jpayne@69: jpayne@69: else: jpayne@69: is_error = False jpayne@69: jpayne@69: if is_error: jpayne@69: raise ValueError() jpayne@69: jpayne@69: assert len(set(offsets)) == 1, "multiple offsets for indel" jpayne@69: offset = offsets[0] jpayne@69: jpayne@69: genotypes = "/".join(genotypes) jpayne@69: return genotypes, offset jpayne@69: jpayne@69: jpayne@69: def vcf2pileup(vcf, sample): jpayne@69: '''convert vcf record to pileup record.''' jpayne@69: jpayne@69: chromosome = vcf.contig jpayne@69: pos = vcf.pos jpayne@69: reference = vcf.ref jpayne@69: allelles = [reference] + vcf.alt jpayne@69: jpayne@69: data = vcf[sample] jpayne@69: jpayne@69: # get genotype jpayne@69: genotypes = data["GT"] jpayne@69: if len(genotypes) > 1: jpayne@69: raise ValueError("only single genotype per position, %s" % (str(vcf))) jpayne@69: jpayne@69: genotypes = genotypes[0] jpayne@69: jpayne@69: # not a variant jpayne@69: if genotypes[0] == ".": jpayne@69: return None jpayne@69: jpayne@69: genotypes = [allelles[int(x)] for x in genotypes if x != "/"] jpayne@69: jpayne@69: # snp_quality is "genotype quality" jpayne@69: snp_quality = consensus_quality = data.get("GQ", [0])[0] jpayne@69: mapping_quality = vcf.info.get("MQ", [0])[0] jpayne@69: coverage = data.get("DP", 0) jpayne@69: jpayne@69: if len(reference) > 1 or max([len(x) for x in vcf.alt]) > 1: jpayne@69: # indel jpayne@69: genotype, offset = translateIndelGenotypeFromVCF(genotypes, reference) jpayne@69: jpayne@69: return PileupIndel(chromosome, jpayne@69: pos + offset, jpayne@69: "*", jpayne@69: genotype, jpayne@69: consensus_quality, jpayne@69: snp_quality, jpayne@69: mapping_quality, jpayne@69: coverage, jpayne@69: genotype, jpayne@69: "<" * len(genotype), jpayne@69: 0, jpayne@69: 0, jpayne@69: 0) jpayne@69: jpayne@69: else: jpayne@69: genotype = encodeGenotype("".join(genotypes)) jpayne@69: read_bases = "" jpayne@69: base_qualities = "" jpayne@69: jpayne@69: return PileupSubstitution(chromosome, pos, reference, jpayne@69: genotype, consensus_quality, jpayne@69: snp_quality, mapping_quality, jpayne@69: coverage, read_bases, jpayne@69: base_qualities) jpayne@69: jpayne@69: jpayne@69: def iterate_from_vcf(infile, sample): jpayne@69: '''iterate over a vcf-formatted file. jpayne@69: jpayne@69: *infile* can be any iterator over a lines. jpayne@69: jpayne@69: The function yields named tuples of the type jpayne@69: :class:`pysam.Pileup.PileupSubstitution` or jpayne@69: :class:`pysam.Pileup.PileupIndel`. jpayne@69: jpayne@69: Positions without a snp will be skipped. jpayne@69: jpayne@69: This method is wasteful and written to support same legacy code jpayne@69: that expects samtools pileup output. jpayne@69: jpayne@69: Better use the vcf parser directly. jpayne@69: jpayne@69: ''' jpayne@69: vcf = pysam.VCF() jpayne@69: vcf.connect(infile) jpayne@69: jpayne@69: if sample not in vcf.getsamples(): jpayne@69: raise KeyError("sample %s not vcf file") jpayne@69: jpayne@69: for row in vcf.fetch(): jpayne@69: result = vcf2pileup(row, sample) jpayne@69: if result: jpayne@69: yield result