annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/Pileup.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 '''Tools for working with files in the samtools pileup -c format.'''
jpayne@69 2 import collections
jpayne@69 3 import pysam
jpayne@69 4
jpayne@69 5 PileupSubstitution = collections.namedtuple("PileupSubstitution",
jpayne@69 6 (
jpayne@69 7 "chromosome",
jpayne@69 8 "pos",
jpayne@69 9 "reference_base",
jpayne@69 10 "genotype",
jpayne@69 11 "consensus_quality",
jpayne@69 12 "snp_quality",
jpayne@69 13 "mapping_quality",
jpayne@69 14 "coverage",
jpayne@69 15 "read_bases",
jpayne@69 16 "base_qualities"))
jpayne@69 17
jpayne@69 18 PileupIndel = collections.namedtuple("PileupIndel",
jpayne@69 19 (
jpayne@69 20 "chromosome",
jpayne@69 21 "pos",
jpayne@69 22 "reference_base",
jpayne@69 23 "genotype",
jpayne@69 24 "consensus_quality",
jpayne@69 25 "snp_quality",
jpayne@69 26 "mapping_quality",
jpayne@69 27 "coverage",
jpayne@69 28 "first_allele",
jpayne@69 29 "second_allele",
jpayne@69 30 "reads_first",
jpayne@69 31 "reads_second",
jpayne@69 32 "reads_diff"))
jpayne@69 33
jpayne@69 34
jpayne@69 35 def iterate(infile):
jpayne@69 36 '''iterate over ``samtools pileup -c`` formatted file.
jpayne@69 37
jpayne@69 38 *infile* can be any iterator over a lines.
jpayne@69 39
jpayne@69 40 The function yields named tuples of the type :class:`pysam.Pileup.PileupSubstitution`
jpayne@69 41 or :class:`pysam.Pileup.PileupIndel`.
jpayne@69 42
jpayne@69 43 .. note::
jpayne@69 44
jpayne@69 45 The parser converts to 0-based coordinates
jpayne@69 46 '''
jpayne@69 47
jpayne@69 48 conv_subst = (str, lambda x: int(x) - 1, str,
jpayne@69 49 str, int, int, int, int, str, str)
jpayne@69 50 conv_indel = (str, lambda x: int(x) - 1, str, str, int,
jpayne@69 51 int, int, int, str, str, int, int, int)
jpayne@69 52
jpayne@69 53 for line in infile:
jpayne@69 54 d = line[:-1].split()
jpayne@69 55 if d[2] == "*":
jpayne@69 56 try:
jpayne@69 57 yield PileupIndel(*[x(y) for x, y in zip(conv_indel, d)])
jpayne@69 58 except TypeError:
jpayne@69 59 raise pysam.SamtoolsError("parsing error in line: `%s`" % line)
jpayne@69 60 else:
jpayne@69 61 try:
jpayne@69 62 yield PileupSubstitution(*[x(y) for x, y in zip(conv_subst, d)])
jpayne@69 63 except TypeError:
jpayne@69 64 raise pysam.SamtoolsError("parsing error in line: `%s`" % line)
jpayne@69 65
jpayne@69 66
jpayne@69 67 ENCODE_GENOTYPE = {
jpayne@69 68 'A': 'A', 'C': 'C', 'G': 'G', 'T': 'T',
jpayne@69 69 'AA': 'A', 'CC': 'C', 'GG': 'G', 'TT': 'T', 'UU': 'U',
jpayne@69 70 'AG': 'r', 'GA': 'R',
jpayne@69 71 'CT': 'y', 'TC': 'Y',
jpayne@69 72 'AC': 'm', 'CA': 'M',
jpayne@69 73 'GT': 'k', 'TG': 'K',
jpayne@69 74 'CG': 's', 'GC': 'S',
jpayne@69 75 'AT': 'w', 'TA': 'W',
jpayne@69 76 }
jpayne@69 77
jpayne@69 78 DECODE_GENOTYPE = {
jpayne@69 79 'A': 'AA',
jpayne@69 80 'C': 'CC',
jpayne@69 81 'G': 'GG',
jpayne@69 82 'T': 'TT',
jpayne@69 83 'r': 'AG', 'R': 'AG',
jpayne@69 84 'y': 'CT', 'Y': 'CT',
jpayne@69 85 'm': 'AC', 'M': 'AC',
jpayne@69 86 'k': 'GT', 'K': 'GT',
jpayne@69 87 's': 'CG', 'S': 'CG',
jpayne@69 88 'w': 'AT', 'W': 'AT',
jpayne@69 89 }
jpayne@69 90
jpayne@69 91 # ------------------------------------------------------------
jpayne@69 92
jpayne@69 93
jpayne@69 94 def encodeGenotype(code):
jpayne@69 95 '''encode genotypes like GG, GA into a one-letter code.
jpayne@69 96 The returned code is lower case if code[0] < code[1], otherwise
jpayne@69 97 it is uppercase.
jpayne@69 98 '''
jpayne@69 99 return ENCODE_GENOTYPE[code.upper()]
jpayne@69 100
jpayne@69 101
jpayne@69 102 def decodeGenotype(code):
jpayne@69 103 '''decode single letter genotypes like m, M into two letters.
jpayne@69 104 This is the reverse operation to :meth:`encodeGenotype`.
jpayne@69 105 '''
jpayne@69 106 return DECODE_GENOTYPE[code]
jpayne@69 107
jpayne@69 108
jpayne@69 109 def translateIndelGenotypeFromVCF(vcf_genotypes, ref):
jpayne@69 110 '''translate indel from vcf to pileup format.'''
jpayne@69 111
jpayne@69 112 # indels
jpayne@69 113 def getPrefix(s1, s2):
jpayne@69 114 '''get common prefix of strings s1 and s2.'''
jpayne@69 115 n = min(len(s1), len(s2))
jpayne@69 116 for x in range(n):
jpayne@69 117 if s1[x] != s2[x]:
jpayne@69 118 return s1[:x]
jpayne@69 119 return s1[:n]
jpayne@69 120
jpayne@69 121 def getSuffix(s1, s2):
jpayne@69 122 '''get common sufix of strings s1 and s2.'''
jpayne@69 123 n = min(len(s1), len(s2))
jpayne@69 124 if s1[-1] != s2[-1]:
jpayne@69 125 return ""
jpayne@69 126 for x in range(-2, -n - 1, -1):
jpayne@69 127 if s1[x] != s2[x]:
jpayne@69 128 return s1[x + 1:]
jpayne@69 129 return s1[-n:]
jpayne@69 130
jpayne@69 131 def getGenotype(variant, ref):
jpayne@69 132
jpayne@69 133 if variant == ref:
jpayne@69 134 return "*", 0
jpayne@69 135
jpayne@69 136 if len(ref) > len(variant):
jpayne@69 137 # is a deletion
jpayne@69 138 if ref.startswith(variant):
jpayne@69 139 return "-%s" % ref[len(variant):], len(variant) - 1
jpayne@69 140 elif ref.endswith(variant):
jpayne@69 141 return "-%s" % ref[:-len(variant)], -1
jpayne@69 142 else:
jpayne@69 143 prefix = getPrefix(ref, variant)
jpayne@69 144 suffix = getSuffix(ref, variant)
jpayne@69 145 shared = len(prefix) + len(suffix) - len(variant)
jpayne@69 146 # print "-", prefix, suffix, ref, variant, shared, len(prefix), len(suffix), len(ref)
jpayne@69 147 if shared < 0:
jpayne@69 148 raise ValueError()
jpayne@69 149 return "-%s" % ref[len(prefix):-(len(suffix) - shared)], len(prefix) - 1
jpayne@69 150
jpayne@69 151 elif len(ref) < len(variant):
jpayne@69 152 # is an insertion
jpayne@69 153 if variant.startswith(ref):
jpayne@69 154 return "+%s" % variant[len(ref):], len(ref) - 1
jpayne@69 155 elif variant.endswith(ref):
jpayne@69 156 return "+%s" % variant[:len(ref)], 0
jpayne@69 157 else:
jpayne@69 158 prefix = getPrefix(ref, variant)
jpayne@69 159 suffix = getSuffix(ref, variant)
jpayne@69 160 shared = len(prefix) + len(suffix) - len(ref)
jpayne@69 161 if shared < 0:
jpayne@69 162 raise ValueError()
jpayne@69 163
jpayne@69 164 return "+%s" % variant[len(prefix):-(len(suffix) - shared)], len(prefix)
jpayne@69 165 else:
jpayne@69 166 assert 0, "snp?"
jpayne@69 167
jpayne@69 168 # in pileup, the position refers to the base
jpayne@69 169 # after the coordinate, hence subtract 1
jpayne@69 170 # pos -= 1
jpayne@69 171
jpayne@69 172 genotypes, offsets = [], []
jpayne@69 173 is_error = True
jpayne@69 174
jpayne@69 175 for variant in vcf_genotypes:
jpayne@69 176 try:
jpayne@69 177 g, offset = getGenotype(variant, ref)
jpayne@69 178 except ValueError:
jpayne@69 179 break
jpayne@69 180
jpayne@69 181 genotypes.append(g)
jpayne@69 182 if g != "*":
jpayne@69 183 offsets.append(offset)
jpayne@69 184
jpayne@69 185 else:
jpayne@69 186 is_error = False
jpayne@69 187
jpayne@69 188 if is_error:
jpayne@69 189 raise ValueError()
jpayne@69 190
jpayne@69 191 assert len(set(offsets)) == 1, "multiple offsets for indel"
jpayne@69 192 offset = offsets[0]
jpayne@69 193
jpayne@69 194 genotypes = "/".join(genotypes)
jpayne@69 195 return genotypes, offset
jpayne@69 196
jpayne@69 197
jpayne@69 198 def vcf2pileup(vcf, sample):
jpayne@69 199 '''convert vcf record to pileup record.'''
jpayne@69 200
jpayne@69 201 chromosome = vcf.contig
jpayne@69 202 pos = vcf.pos
jpayne@69 203 reference = vcf.ref
jpayne@69 204 allelles = [reference] + vcf.alt
jpayne@69 205
jpayne@69 206 data = vcf[sample]
jpayne@69 207
jpayne@69 208 # get genotype
jpayne@69 209 genotypes = data["GT"]
jpayne@69 210 if len(genotypes) > 1:
jpayne@69 211 raise ValueError("only single genotype per position, %s" % (str(vcf)))
jpayne@69 212
jpayne@69 213 genotypes = genotypes[0]
jpayne@69 214
jpayne@69 215 # not a variant
jpayne@69 216 if genotypes[0] == ".":
jpayne@69 217 return None
jpayne@69 218
jpayne@69 219 genotypes = [allelles[int(x)] for x in genotypes if x != "/"]
jpayne@69 220
jpayne@69 221 # snp_quality is "genotype quality"
jpayne@69 222 snp_quality = consensus_quality = data.get("GQ", [0])[0]
jpayne@69 223 mapping_quality = vcf.info.get("MQ", [0])[0]
jpayne@69 224 coverage = data.get("DP", 0)
jpayne@69 225
jpayne@69 226 if len(reference) > 1 or max([len(x) for x in vcf.alt]) > 1:
jpayne@69 227 # indel
jpayne@69 228 genotype, offset = translateIndelGenotypeFromVCF(genotypes, reference)
jpayne@69 229
jpayne@69 230 return PileupIndel(chromosome,
jpayne@69 231 pos + offset,
jpayne@69 232 "*",
jpayne@69 233 genotype,
jpayne@69 234 consensus_quality,
jpayne@69 235 snp_quality,
jpayne@69 236 mapping_quality,
jpayne@69 237 coverage,
jpayne@69 238 genotype,
jpayne@69 239 "<" * len(genotype),
jpayne@69 240 0,
jpayne@69 241 0,
jpayne@69 242 0)
jpayne@69 243
jpayne@69 244 else:
jpayne@69 245 genotype = encodeGenotype("".join(genotypes))
jpayne@69 246 read_bases = ""
jpayne@69 247 base_qualities = ""
jpayne@69 248
jpayne@69 249 return PileupSubstitution(chromosome, pos, reference,
jpayne@69 250 genotype, consensus_quality,
jpayne@69 251 snp_quality, mapping_quality,
jpayne@69 252 coverage, read_bases,
jpayne@69 253 base_qualities)
jpayne@69 254
jpayne@69 255
jpayne@69 256 def iterate_from_vcf(infile, sample):
jpayne@69 257 '''iterate over a vcf-formatted file.
jpayne@69 258
jpayne@69 259 *infile* can be any iterator over a lines.
jpayne@69 260
jpayne@69 261 The function yields named tuples of the type
jpayne@69 262 :class:`pysam.Pileup.PileupSubstitution` or
jpayne@69 263 :class:`pysam.Pileup.PileupIndel`.
jpayne@69 264
jpayne@69 265 Positions without a snp will be skipped.
jpayne@69 266
jpayne@69 267 This method is wasteful and written to support same legacy code
jpayne@69 268 that expects samtools pileup output.
jpayne@69 269
jpayne@69 270 Better use the vcf parser directly.
jpayne@69 271
jpayne@69 272 '''
jpayne@69 273 vcf = pysam.VCF()
jpayne@69 274 vcf.connect(infile)
jpayne@69 275
jpayne@69 276 if sample not in vcf.getsamples():
jpayne@69 277 raise KeyError("sample %s not vcf file")
jpayne@69 278
jpayne@69 279 for row in vcf.fetch():
jpayne@69 280 result = vcf2pileup(row, sample)
jpayne@69 281 if result:
jpayne@69 282 yield result