annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/Pileup.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 '''Tools for working with files in the samtools pileup -c format.'''
jpayne@68 2 import collections
jpayne@68 3 import pysam
jpayne@68 4
jpayne@68 5 PileupSubstitution = collections.namedtuple("PileupSubstitution",
jpayne@68 6 (
jpayne@68 7 "chromosome",
jpayne@68 8 "pos",
jpayne@68 9 "reference_base",
jpayne@68 10 "genotype",
jpayne@68 11 "consensus_quality",
jpayne@68 12 "snp_quality",
jpayne@68 13 "mapping_quality",
jpayne@68 14 "coverage",
jpayne@68 15 "read_bases",
jpayne@68 16 "base_qualities"))
jpayne@68 17
jpayne@68 18 PileupIndel = collections.namedtuple("PileupIndel",
jpayne@68 19 (
jpayne@68 20 "chromosome",
jpayne@68 21 "pos",
jpayne@68 22 "reference_base",
jpayne@68 23 "genotype",
jpayne@68 24 "consensus_quality",
jpayne@68 25 "snp_quality",
jpayne@68 26 "mapping_quality",
jpayne@68 27 "coverage",
jpayne@68 28 "first_allele",
jpayne@68 29 "second_allele",
jpayne@68 30 "reads_first",
jpayne@68 31 "reads_second",
jpayne@68 32 "reads_diff"))
jpayne@68 33
jpayne@68 34
jpayne@68 35 def iterate(infile):
jpayne@68 36 '''iterate over ``samtools pileup -c`` formatted file.
jpayne@68 37
jpayne@68 38 *infile* can be any iterator over a lines.
jpayne@68 39
jpayne@68 40 The function yields named tuples of the type :class:`pysam.Pileup.PileupSubstitution`
jpayne@68 41 or :class:`pysam.Pileup.PileupIndel`.
jpayne@68 42
jpayne@68 43 .. note::
jpayne@68 44
jpayne@68 45 The parser converts to 0-based coordinates
jpayne@68 46 '''
jpayne@68 47
jpayne@68 48 conv_subst = (str, lambda x: int(x) - 1, str,
jpayne@68 49 str, int, int, int, int, str, str)
jpayne@68 50 conv_indel = (str, lambda x: int(x) - 1, str, str, int,
jpayne@68 51 int, int, int, str, str, int, int, int)
jpayne@68 52
jpayne@68 53 for line in infile:
jpayne@68 54 d = line[:-1].split()
jpayne@68 55 if d[2] == "*":
jpayne@68 56 try:
jpayne@68 57 yield PileupIndel(*[x(y) for x, y in zip(conv_indel, d)])
jpayne@68 58 except TypeError:
jpayne@68 59 raise pysam.SamtoolsError("parsing error in line: `%s`" % line)
jpayne@68 60 else:
jpayne@68 61 try:
jpayne@68 62 yield PileupSubstitution(*[x(y) for x, y in zip(conv_subst, d)])
jpayne@68 63 except TypeError:
jpayne@68 64 raise pysam.SamtoolsError("parsing error in line: `%s`" % line)
jpayne@68 65
jpayne@68 66
jpayne@68 67 ENCODE_GENOTYPE = {
jpayne@68 68 'A': 'A', 'C': 'C', 'G': 'G', 'T': 'T',
jpayne@68 69 'AA': 'A', 'CC': 'C', 'GG': 'G', 'TT': 'T', 'UU': 'U',
jpayne@68 70 'AG': 'r', 'GA': 'R',
jpayne@68 71 'CT': 'y', 'TC': 'Y',
jpayne@68 72 'AC': 'm', 'CA': 'M',
jpayne@68 73 'GT': 'k', 'TG': 'K',
jpayne@68 74 'CG': 's', 'GC': 'S',
jpayne@68 75 'AT': 'w', 'TA': 'W',
jpayne@68 76 }
jpayne@68 77
jpayne@68 78 DECODE_GENOTYPE = {
jpayne@68 79 'A': 'AA',
jpayne@68 80 'C': 'CC',
jpayne@68 81 'G': 'GG',
jpayne@68 82 'T': 'TT',
jpayne@68 83 'r': 'AG', 'R': 'AG',
jpayne@68 84 'y': 'CT', 'Y': 'CT',
jpayne@68 85 'm': 'AC', 'M': 'AC',
jpayne@68 86 'k': 'GT', 'K': 'GT',
jpayne@68 87 's': 'CG', 'S': 'CG',
jpayne@68 88 'w': 'AT', 'W': 'AT',
jpayne@68 89 }
jpayne@68 90
jpayne@68 91 # ------------------------------------------------------------
jpayne@68 92
jpayne@68 93
jpayne@68 94 def encodeGenotype(code):
jpayne@68 95 '''encode genotypes like GG, GA into a one-letter code.
jpayne@68 96 The returned code is lower case if code[0] < code[1], otherwise
jpayne@68 97 it is uppercase.
jpayne@68 98 '''
jpayne@68 99 return ENCODE_GENOTYPE[code.upper()]
jpayne@68 100
jpayne@68 101
jpayne@68 102 def decodeGenotype(code):
jpayne@68 103 '''decode single letter genotypes like m, M into two letters.
jpayne@68 104 This is the reverse operation to :meth:`encodeGenotype`.
jpayne@68 105 '''
jpayne@68 106 return DECODE_GENOTYPE[code]
jpayne@68 107
jpayne@68 108
jpayne@68 109 def translateIndelGenotypeFromVCF(vcf_genotypes, ref):
jpayne@68 110 '''translate indel from vcf to pileup format.'''
jpayne@68 111
jpayne@68 112 # indels
jpayne@68 113 def getPrefix(s1, s2):
jpayne@68 114 '''get common prefix of strings s1 and s2.'''
jpayne@68 115 n = min(len(s1), len(s2))
jpayne@68 116 for x in range(n):
jpayne@68 117 if s1[x] != s2[x]:
jpayne@68 118 return s1[:x]
jpayne@68 119 return s1[:n]
jpayne@68 120
jpayne@68 121 def getSuffix(s1, s2):
jpayne@68 122 '''get common sufix of strings s1 and s2.'''
jpayne@68 123 n = min(len(s1), len(s2))
jpayne@68 124 if s1[-1] != s2[-1]:
jpayne@68 125 return ""
jpayne@68 126 for x in range(-2, -n - 1, -1):
jpayne@68 127 if s1[x] != s2[x]:
jpayne@68 128 return s1[x + 1:]
jpayne@68 129 return s1[-n:]
jpayne@68 130
jpayne@68 131 def getGenotype(variant, ref):
jpayne@68 132
jpayne@68 133 if variant == ref:
jpayne@68 134 return "*", 0
jpayne@68 135
jpayne@68 136 if len(ref) > len(variant):
jpayne@68 137 # is a deletion
jpayne@68 138 if ref.startswith(variant):
jpayne@68 139 return "-%s" % ref[len(variant):], len(variant) - 1
jpayne@68 140 elif ref.endswith(variant):
jpayne@68 141 return "-%s" % ref[:-len(variant)], -1
jpayne@68 142 else:
jpayne@68 143 prefix = getPrefix(ref, variant)
jpayne@68 144 suffix = getSuffix(ref, variant)
jpayne@68 145 shared = len(prefix) + len(suffix) - len(variant)
jpayne@68 146 # print "-", prefix, suffix, ref, variant, shared, len(prefix), len(suffix), len(ref)
jpayne@68 147 if shared < 0:
jpayne@68 148 raise ValueError()
jpayne@68 149 return "-%s" % ref[len(prefix):-(len(suffix) - shared)], len(prefix) - 1
jpayne@68 150
jpayne@68 151 elif len(ref) < len(variant):
jpayne@68 152 # is an insertion
jpayne@68 153 if variant.startswith(ref):
jpayne@68 154 return "+%s" % variant[len(ref):], len(ref) - 1
jpayne@68 155 elif variant.endswith(ref):
jpayne@68 156 return "+%s" % variant[:len(ref)], 0
jpayne@68 157 else:
jpayne@68 158 prefix = getPrefix(ref, variant)
jpayne@68 159 suffix = getSuffix(ref, variant)
jpayne@68 160 shared = len(prefix) + len(suffix) - len(ref)
jpayne@68 161 if shared < 0:
jpayne@68 162 raise ValueError()
jpayne@68 163
jpayne@68 164 return "+%s" % variant[len(prefix):-(len(suffix) - shared)], len(prefix)
jpayne@68 165 else:
jpayne@68 166 assert 0, "snp?"
jpayne@68 167
jpayne@68 168 # in pileup, the position refers to the base
jpayne@68 169 # after the coordinate, hence subtract 1
jpayne@68 170 # pos -= 1
jpayne@68 171
jpayne@68 172 genotypes, offsets = [], []
jpayne@68 173 is_error = True
jpayne@68 174
jpayne@68 175 for variant in vcf_genotypes:
jpayne@68 176 try:
jpayne@68 177 g, offset = getGenotype(variant, ref)
jpayne@68 178 except ValueError:
jpayne@68 179 break
jpayne@68 180
jpayne@68 181 genotypes.append(g)
jpayne@68 182 if g != "*":
jpayne@68 183 offsets.append(offset)
jpayne@68 184
jpayne@68 185 else:
jpayne@68 186 is_error = False
jpayne@68 187
jpayne@68 188 if is_error:
jpayne@68 189 raise ValueError()
jpayne@68 190
jpayne@68 191 assert len(set(offsets)) == 1, "multiple offsets for indel"
jpayne@68 192 offset = offsets[0]
jpayne@68 193
jpayne@68 194 genotypes = "/".join(genotypes)
jpayne@68 195 return genotypes, offset
jpayne@68 196
jpayne@68 197
jpayne@68 198 def vcf2pileup(vcf, sample):
jpayne@68 199 '''convert vcf record to pileup record.'''
jpayne@68 200
jpayne@68 201 chromosome = vcf.contig
jpayne@68 202 pos = vcf.pos
jpayne@68 203 reference = vcf.ref
jpayne@68 204 allelles = [reference] + vcf.alt
jpayne@68 205
jpayne@68 206 data = vcf[sample]
jpayne@68 207
jpayne@68 208 # get genotype
jpayne@68 209 genotypes = data["GT"]
jpayne@68 210 if len(genotypes) > 1:
jpayne@68 211 raise ValueError("only single genotype per position, %s" % (str(vcf)))
jpayne@68 212
jpayne@68 213 genotypes = genotypes[0]
jpayne@68 214
jpayne@68 215 # not a variant
jpayne@68 216 if genotypes[0] == ".":
jpayne@68 217 return None
jpayne@68 218
jpayne@68 219 genotypes = [allelles[int(x)] for x in genotypes if x != "/"]
jpayne@68 220
jpayne@68 221 # snp_quality is "genotype quality"
jpayne@68 222 snp_quality = consensus_quality = data.get("GQ", [0])[0]
jpayne@68 223 mapping_quality = vcf.info.get("MQ", [0])[0]
jpayne@68 224 coverage = data.get("DP", 0)
jpayne@68 225
jpayne@68 226 if len(reference) > 1 or max([len(x) for x in vcf.alt]) > 1:
jpayne@68 227 # indel
jpayne@68 228 genotype, offset = translateIndelGenotypeFromVCF(genotypes, reference)
jpayne@68 229
jpayne@68 230 return PileupIndel(chromosome,
jpayne@68 231 pos + offset,
jpayne@68 232 "*",
jpayne@68 233 genotype,
jpayne@68 234 consensus_quality,
jpayne@68 235 snp_quality,
jpayne@68 236 mapping_quality,
jpayne@68 237 coverage,
jpayne@68 238 genotype,
jpayne@68 239 "<" * len(genotype),
jpayne@68 240 0,
jpayne@68 241 0,
jpayne@68 242 0)
jpayne@68 243
jpayne@68 244 else:
jpayne@68 245 genotype = encodeGenotype("".join(genotypes))
jpayne@68 246 read_bases = ""
jpayne@68 247 base_qualities = ""
jpayne@68 248
jpayne@68 249 return PileupSubstitution(chromosome, pos, reference,
jpayne@68 250 genotype, consensus_quality,
jpayne@68 251 snp_quality, mapping_quality,
jpayne@68 252 coverage, read_bases,
jpayne@68 253 base_qualities)
jpayne@68 254
jpayne@68 255
jpayne@68 256 def iterate_from_vcf(infile, sample):
jpayne@68 257 '''iterate over a vcf-formatted file.
jpayne@68 258
jpayne@68 259 *infile* can be any iterator over a lines.
jpayne@68 260
jpayne@68 261 The function yields named tuples of the type
jpayne@68 262 :class:`pysam.Pileup.PileupSubstitution` or
jpayne@68 263 :class:`pysam.Pileup.PileupIndel`.
jpayne@68 264
jpayne@68 265 Positions without a snp will be skipped.
jpayne@68 266
jpayne@68 267 This method is wasteful and written to support same legacy code
jpayne@68 268 that expects samtools pileup output.
jpayne@68 269
jpayne@68 270 Better use the vcf parser directly.
jpayne@68 271
jpayne@68 272 '''
jpayne@68 273 vcf = pysam.VCF()
jpayne@68 274 vcf.connect(infile)
jpayne@68 275
jpayne@68 276 if sample not in vcf.getsamples():
jpayne@68 277 raise KeyError("sample %s not vcf file")
jpayne@68 278
jpayne@68 279 for row in vcf.fetch():
jpayne@68 280 result = vcf2pileup(row, sample)
jpayne@68 281 if result:
jpayne@68 282 yield result