Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pysam/Pileup.py @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 '''Tools for working with files in the samtools pileup -c format.''' | |
2 import collections | |
3 import pysam | |
4 | |
5 PileupSubstitution = collections.namedtuple("PileupSubstitution", | |
6 ( | |
7 "chromosome", | |
8 "pos", | |
9 "reference_base", | |
10 "genotype", | |
11 "consensus_quality", | |
12 "snp_quality", | |
13 "mapping_quality", | |
14 "coverage", | |
15 "read_bases", | |
16 "base_qualities")) | |
17 | |
18 PileupIndel = collections.namedtuple("PileupIndel", | |
19 ( | |
20 "chromosome", | |
21 "pos", | |
22 "reference_base", | |
23 "genotype", | |
24 "consensus_quality", | |
25 "snp_quality", | |
26 "mapping_quality", | |
27 "coverage", | |
28 "first_allele", | |
29 "second_allele", | |
30 "reads_first", | |
31 "reads_second", | |
32 "reads_diff")) | |
33 | |
34 | |
35 def iterate(infile): | |
36 '''iterate over ``samtools pileup -c`` formatted file. | |
37 | |
38 *infile* can be any iterator over a lines. | |
39 | |
40 The function yields named tuples of the type :class:`pysam.Pileup.PileupSubstitution` | |
41 or :class:`pysam.Pileup.PileupIndel`. | |
42 | |
43 .. note:: | |
44 | |
45 The parser converts to 0-based coordinates | |
46 ''' | |
47 | |
48 conv_subst = (str, lambda x: int(x) - 1, str, | |
49 str, int, int, int, int, str, str) | |
50 conv_indel = (str, lambda x: int(x) - 1, str, str, int, | |
51 int, int, int, str, str, int, int, int) | |
52 | |
53 for line in infile: | |
54 d = line[:-1].split() | |
55 if d[2] == "*": | |
56 try: | |
57 yield PileupIndel(*[x(y) for x, y in zip(conv_indel, d)]) | |
58 except TypeError: | |
59 raise pysam.SamtoolsError("parsing error in line: `%s`" % line) | |
60 else: | |
61 try: | |
62 yield PileupSubstitution(*[x(y) for x, y in zip(conv_subst, d)]) | |
63 except TypeError: | |
64 raise pysam.SamtoolsError("parsing error in line: `%s`" % line) | |
65 | |
66 | |
67 ENCODE_GENOTYPE = { | |
68 'A': 'A', 'C': 'C', 'G': 'G', 'T': 'T', | |
69 'AA': 'A', 'CC': 'C', 'GG': 'G', 'TT': 'T', 'UU': 'U', | |
70 'AG': 'r', 'GA': 'R', | |
71 'CT': 'y', 'TC': 'Y', | |
72 'AC': 'm', 'CA': 'M', | |
73 'GT': 'k', 'TG': 'K', | |
74 'CG': 's', 'GC': 'S', | |
75 'AT': 'w', 'TA': 'W', | |
76 } | |
77 | |
78 DECODE_GENOTYPE = { | |
79 'A': 'AA', | |
80 'C': 'CC', | |
81 'G': 'GG', | |
82 'T': 'TT', | |
83 'r': 'AG', 'R': 'AG', | |
84 'y': 'CT', 'Y': 'CT', | |
85 'm': 'AC', 'M': 'AC', | |
86 'k': 'GT', 'K': 'GT', | |
87 's': 'CG', 'S': 'CG', | |
88 'w': 'AT', 'W': 'AT', | |
89 } | |
90 | |
91 # ------------------------------------------------------------ | |
92 | |
93 | |
94 def encodeGenotype(code): | |
95 '''encode genotypes like GG, GA into a one-letter code. | |
96 The returned code is lower case if code[0] < code[1], otherwise | |
97 it is uppercase. | |
98 ''' | |
99 return ENCODE_GENOTYPE[code.upper()] | |
100 | |
101 | |
102 def decodeGenotype(code): | |
103 '''decode single letter genotypes like m, M into two letters. | |
104 This is the reverse operation to :meth:`encodeGenotype`. | |
105 ''' | |
106 return DECODE_GENOTYPE[code] | |
107 | |
108 | |
109 def translateIndelGenotypeFromVCF(vcf_genotypes, ref): | |
110 '''translate indel from vcf to pileup format.''' | |
111 | |
112 # indels | |
113 def getPrefix(s1, s2): | |
114 '''get common prefix of strings s1 and s2.''' | |
115 n = min(len(s1), len(s2)) | |
116 for x in range(n): | |
117 if s1[x] != s2[x]: | |
118 return s1[:x] | |
119 return s1[:n] | |
120 | |
121 def getSuffix(s1, s2): | |
122 '''get common sufix of strings s1 and s2.''' | |
123 n = min(len(s1), len(s2)) | |
124 if s1[-1] != s2[-1]: | |
125 return "" | |
126 for x in range(-2, -n - 1, -1): | |
127 if s1[x] != s2[x]: | |
128 return s1[x + 1:] | |
129 return s1[-n:] | |
130 | |
131 def getGenotype(variant, ref): | |
132 | |
133 if variant == ref: | |
134 return "*", 0 | |
135 | |
136 if len(ref) > len(variant): | |
137 # is a deletion | |
138 if ref.startswith(variant): | |
139 return "-%s" % ref[len(variant):], len(variant) - 1 | |
140 elif ref.endswith(variant): | |
141 return "-%s" % ref[:-len(variant)], -1 | |
142 else: | |
143 prefix = getPrefix(ref, variant) | |
144 suffix = getSuffix(ref, variant) | |
145 shared = len(prefix) + len(suffix) - len(variant) | |
146 # print "-", prefix, suffix, ref, variant, shared, len(prefix), len(suffix), len(ref) | |
147 if shared < 0: | |
148 raise ValueError() | |
149 return "-%s" % ref[len(prefix):-(len(suffix) - shared)], len(prefix) - 1 | |
150 | |
151 elif len(ref) < len(variant): | |
152 # is an insertion | |
153 if variant.startswith(ref): | |
154 return "+%s" % variant[len(ref):], len(ref) - 1 | |
155 elif variant.endswith(ref): | |
156 return "+%s" % variant[:len(ref)], 0 | |
157 else: | |
158 prefix = getPrefix(ref, variant) | |
159 suffix = getSuffix(ref, variant) | |
160 shared = len(prefix) + len(suffix) - len(ref) | |
161 if shared < 0: | |
162 raise ValueError() | |
163 | |
164 return "+%s" % variant[len(prefix):-(len(suffix) - shared)], len(prefix) | |
165 else: | |
166 assert 0, "snp?" | |
167 | |
168 # in pileup, the position refers to the base | |
169 # after the coordinate, hence subtract 1 | |
170 # pos -= 1 | |
171 | |
172 genotypes, offsets = [], [] | |
173 is_error = True | |
174 | |
175 for variant in vcf_genotypes: | |
176 try: | |
177 g, offset = getGenotype(variant, ref) | |
178 except ValueError: | |
179 break | |
180 | |
181 genotypes.append(g) | |
182 if g != "*": | |
183 offsets.append(offset) | |
184 | |
185 else: | |
186 is_error = False | |
187 | |
188 if is_error: | |
189 raise ValueError() | |
190 | |
191 assert len(set(offsets)) == 1, "multiple offsets for indel" | |
192 offset = offsets[0] | |
193 | |
194 genotypes = "/".join(genotypes) | |
195 return genotypes, offset | |
196 | |
197 | |
198 def vcf2pileup(vcf, sample): | |
199 '''convert vcf record to pileup record.''' | |
200 | |
201 chromosome = vcf.contig | |
202 pos = vcf.pos | |
203 reference = vcf.ref | |
204 allelles = [reference] + vcf.alt | |
205 | |
206 data = vcf[sample] | |
207 | |
208 # get genotype | |
209 genotypes = data["GT"] | |
210 if len(genotypes) > 1: | |
211 raise ValueError("only single genotype per position, %s" % (str(vcf))) | |
212 | |
213 genotypes = genotypes[0] | |
214 | |
215 # not a variant | |
216 if genotypes[0] == ".": | |
217 return None | |
218 | |
219 genotypes = [allelles[int(x)] for x in genotypes if x != "/"] | |
220 | |
221 # snp_quality is "genotype quality" | |
222 snp_quality = consensus_quality = data.get("GQ", [0])[0] | |
223 mapping_quality = vcf.info.get("MQ", [0])[0] | |
224 coverage = data.get("DP", 0) | |
225 | |
226 if len(reference) > 1 or max([len(x) for x in vcf.alt]) > 1: | |
227 # indel | |
228 genotype, offset = translateIndelGenotypeFromVCF(genotypes, reference) | |
229 | |
230 return PileupIndel(chromosome, | |
231 pos + offset, | |
232 "*", | |
233 genotype, | |
234 consensus_quality, | |
235 snp_quality, | |
236 mapping_quality, | |
237 coverage, | |
238 genotype, | |
239 "<" * len(genotype), | |
240 0, | |
241 0, | |
242 0) | |
243 | |
244 else: | |
245 genotype = encodeGenotype("".join(genotypes)) | |
246 read_bases = "" | |
247 base_qualities = "" | |
248 | |
249 return PileupSubstitution(chromosome, pos, reference, | |
250 genotype, consensus_quality, | |
251 snp_quality, mapping_quality, | |
252 coverage, read_bases, | |
253 base_qualities) | |
254 | |
255 | |
256 def iterate_from_vcf(infile, sample): | |
257 '''iterate over a vcf-formatted file. | |
258 | |
259 *infile* can be any iterator over a lines. | |
260 | |
261 The function yields named tuples of the type | |
262 :class:`pysam.Pileup.PileupSubstitution` or | |
263 :class:`pysam.Pileup.PileupIndel`. | |
264 | |
265 Positions without a snp will be skipped. | |
266 | |
267 This method is wasteful and written to support same legacy code | |
268 that expects samtools pileup output. | |
269 | |
270 Better use the vcf parser directly. | |
271 | |
272 ''' | |
273 vcf = pysam.VCF() | |
274 vcf.connect(infile) | |
275 | |
276 if sample not in vcf.getsamples(): | |
277 raise KeyError("sample %s not vcf file") | |
278 | |
279 for row in vcf.fetch(): | |
280 result = vcf2pileup(row, sample) | |
281 if result: | |
282 yield result |