comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/Pileup.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 69:33d812a61356
1 '''Tools for working with files in the samtools pileup -c format.'''
2 import collections
3 import pysam
4
5 PileupSubstitution = collections.namedtuple("PileupSubstitution",
6 (
7 "chromosome",
8 "pos",
9 "reference_base",
10 "genotype",
11 "consensus_quality",
12 "snp_quality",
13 "mapping_quality",
14 "coverage",
15 "read_bases",
16 "base_qualities"))
17
18 PileupIndel = collections.namedtuple("PileupIndel",
19 (
20 "chromosome",
21 "pos",
22 "reference_base",
23 "genotype",
24 "consensus_quality",
25 "snp_quality",
26 "mapping_quality",
27 "coverage",
28 "first_allele",
29 "second_allele",
30 "reads_first",
31 "reads_second",
32 "reads_diff"))
33
34
35 def iterate(infile):
36 '''iterate over ``samtools pileup -c`` formatted file.
37
38 *infile* can be any iterator over a lines.
39
40 The function yields named tuples of the type :class:`pysam.Pileup.PileupSubstitution`
41 or :class:`pysam.Pileup.PileupIndel`.
42
43 .. note::
44
45 The parser converts to 0-based coordinates
46 '''
47
48 conv_subst = (str, lambda x: int(x) - 1, str,
49 str, int, int, int, int, str, str)
50 conv_indel = (str, lambda x: int(x) - 1, str, str, int,
51 int, int, int, str, str, int, int, int)
52
53 for line in infile:
54 d = line[:-1].split()
55 if d[2] == "*":
56 try:
57 yield PileupIndel(*[x(y) for x, y in zip(conv_indel, d)])
58 except TypeError:
59 raise pysam.SamtoolsError("parsing error in line: `%s`" % line)
60 else:
61 try:
62 yield PileupSubstitution(*[x(y) for x, y in zip(conv_subst, d)])
63 except TypeError:
64 raise pysam.SamtoolsError("parsing error in line: `%s`" % line)
65
66
67 ENCODE_GENOTYPE = {
68 'A': 'A', 'C': 'C', 'G': 'G', 'T': 'T',
69 'AA': 'A', 'CC': 'C', 'GG': 'G', 'TT': 'T', 'UU': 'U',
70 'AG': 'r', 'GA': 'R',
71 'CT': 'y', 'TC': 'Y',
72 'AC': 'm', 'CA': 'M',
73 'GT': 'k', 'TG': 'K',
74 'CG': 's', 'GC': 'S',
75 'AT': 'w', 'TA': 'W',
76 }
77
78 DECODE_GENOTYPE = {
79 'A': 'AA',
80 'C': 'CC',
81 'G': 'GG',
82 'T': 'TT',
83 'r': 'AG', 'R': 'AG',
84 'y': 'CT', 'Y': 'CT',
85 'm': 'AC', 'M': 'AC',
86 'k': 'GT', 'K': 'GT',
87 's': 'CG', 'S': 'CG',
88 'w': 'AT', 'W': 'AT',
89 }
90
91 # ------------------------------------------------------------
92
93
94 def encodeGenotype(code):
95 '''encode genotypes like GG, GA into a one-letter code.
96 The returned code is lower case if code[0] < code[1], otherwise
97 it is uppercase.
98 '''
99 return ENCODE_GENOTYPE[code.upper()]
100
101
102 def decodeGenotype(code):
103 '''decode single letter genotypes like m, M into two letters.
104 This is the reverse operation to :meth:`encodeGenotype`.
105 '''
106 return DECODE_GENOTYPE[code]
107
108
109 def translateIndelGenotypeFromVCF(vcf_genotypes, ref):
110 '''translate indel from vcf to pileup format.'''
111
112 # indels
113 def getPrefix(s1, s2):
114 '''get common prefix of strings s1 and s2.'''
115 n = min(len(s1), len(s2))
116 for x in range(n):
117 if s1[x] != s2[x]:
118 return s1[:x]
119 return s1[:n]
120
121 def getSuffix(s1, s2):
122 '''get common sufix of strings s1 and s2.'''
123 n = min(len(s1), len(s2))
124 if s1[-1] != s2[-1]:
125 return ""
126 for x in range(-2, -n - 1, -1):
127 if s1[x] != s2[x]:
128 return s1[x + 1:]
129 return s1[-n:]
130
131 def getGenotype(variant, ref):
132
133 if variant == ref:
134 return "*", 0
135
136 if len(ref) > len(variant):
137 # is a deletion
138 if ref.startswith(variant):
139 return "-%s" % ref[len(variant):], len(variant) - 1
140 elif ref.endswith(variant):
141 return "-%s" % ref[:-len(variant)], -1
142 else:
143 prefix = getPrefix(ref, variant)
144 suffix = getSuffix(ref, variant)
145 shared = len(prefix) + len(suffix) - len(variant)
146 # print "-", prefix, suffix, ref, variant, shared, len(prefix), len(suffix), len(ref)
147 if shared < 0:
148 raise ValueError()
149 return "-%s" % ref[len(prefix):-(len(suffix) - shared)], len(prefix) - 1
150
151 elif len(ref) < len(variant):
152 # is an insertion
153 if variant.startswith(ref):
154 return "+%s" % variant[len(ref):], len(ref) - 1
155 elif variant.endswith(ref):
156 return "+%s" % variant[:len(ref)], 0
157 else:
158 prefix = getPrefix(ref, variant)
159 suffix = getSuffix(ref, variant)
160 shared = len(prefix) + len(suffix) - len(ref)
161 if shared < 0:
162 raise ValueError()
163
164 return "+%s" % variant[len(prefix):-(len(suffix) - shared)], len(prefix)
165 else:
166 assert 0, "snp?"
167
168 # in pileup, the position refers to the base
169 # after the coordinate, hence subtract 1
170 # pos -= 1
171
172 genotypes, offsets = [], []
173 is_error = True
174
175 for variant in vcf_genotypes:
176 try:
177 g, offset = getGenotype(variant, ref)
178 except ValueError:
179 break
180
181 genotypes.append(g)
182 if g != "*":
183 offsets.append(offset)
184
185 else:
186 is_error = False
187
188 if is_error:
189 raise ValueError()
190
191 assert len(set(offsets)) == 1, "multiple offsets for indel"
192 offset = offsets[0]
193
194 genotypes = "/".join(genotypes)
195 return genotypes, offset
196
197
198 def vcf2pileup(vcf, sample):
199 '''convert vcf record to pileup record.'''
200
201 chromosome = vcf.contig
202 pos = vcf.pos
203 reference = vcf.ref
204 allelles = [reference] + vcf.alt
205
206 data = vcf[sample]
207
208 # get genotype
209 genotypes = data["GT"]
210 if len(genotypes) > 1:
211 raise ValueError("only single genotype per position, %s" % (str(vcf)))
212
213 genotypes = genotypes[0]
214
215 # not a variant
216 if genotypes[0] == ".":
217 return None
218
219 genotypes = [allelles[int(x)] for x in genotypes if x != "/"]
220
221 # snp_quality is "genotype quality"
222 snp_quality = consensus_quality = data.get("GQ", [0])[0]
223 mapping_quality = vcf.info.get("MQ", [0])[0]
224 coverage = data.get("DP", 0)
225
226 if len(reference) > 1 or max([len(x) for x in vcf.alt]) > 1:
227 # indel
228 genotype, offset = translateIndelGenotypeFromVCF(genotypes, reference)
229
230 return PileupIndel(chromosome,
231 pos + offset,
232 "*",
233 genotype,
234 consensus_quality,
235 snp_quality,
236 mapping_quality,
237 coverage,
238 genotype,
239 "<" * len(genotype),
240 0,
241 0,
242 0)
243
244 else:
245 genotype = encodeGenotype("".join(genotypes))
246 read_bases = ""
247 base_qualities = ""
248
249 return PileupSubstitution(chromosome, pos, reference,
250 genotype, consensus_quality,
251 snp_quality, mapping_quality,
252 coverage, read_bases,
253 base_qualities)
254
255
256 def iterate_from_vcf(infile, sample):
257 '''iterate over a vcf-formatted file.
258
259 *infile* can be any iterator over a lines.
260
261 The function yields named tuples of the type
262 :class:`pysam.Pileup.PileupSubstitution` or
263 :class:`pysam.Pileup.PileupIndel`.
264
265 Positions without a snp will be skipped.
266
267 This method is wasteful and written to support same legacy code
268 that expects samtools pileup output.
269
270 Better use the vcf parser directly.
271
272 '''
273 vcf = pysam.VCF()
274 vcf.connect(infile)
275
276 if sample not in vcf.getsamples():
277 raise KeyError("sample %s not vcf file")
278
279 for row in vcf.fetch():
280 result = vcf2pileup(row, sample)
281 if result:
282 yield result