Mercurial > repos > rliterman > csp2
comparison CSP2/bin/runSNPPipeline.py @ 0:01431fa12065
"planemo upload"
author | rliterman |
---|---|
date | Mon, 02 Dec 2024 10:40:55 -0500 |
parents | |
children | 792274118b2e |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:01431fa12065 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 import sys | |
4 import os | |
5 import pandas as pd | |
6 import datetime | |
7 from pybedtools import BedTool,helpers | |
8 import concurrent.futures | |
9 import time | |
10 from Bio.Seq import Seq | |
11 from Bio.SeqRecord import SeqRecord | |
12 from Bio.Align import MultipleSeqAlignment | |
13 from Bio import AlignIO | |
14 from itertools import combinations | |
15 import numpy as np | |
16 import uuid | |
17 import traceback | |
18 import shutil | |
19 import argparse | |
20 | |
21 def fetchHeaders(snpdiffs_file): | |
22 | |
23 with open(snpdiffs_file, 'r') as file: | |
24 top_line = file.readline().strip().split('\t')[1:] | |
25 | |
26 header_cols = [item.split(':')[0] for item in top_line] | |
27 header_vals = [item.split(':')[1] for item in top_line] | |
28 | |
29 header_data = pd.DataFrame(columns = header_cols) | |
30 header_data.loc[0] = header_vals | |
31 header_data.loc[:, 'File_Path'] = snpdiffs_file | |
32 | |
33 return header_data | |
34 | |
35 def processBED(bed_rows,snpdiffs_orientation): | |
36 | |
37 bed_columns = ['Ref_Contig','Ref_Start','Ref_End','Ref_Length','Ref_Aligned', | |
38 'Query_Contig','Query_Start','Query_End','Query_Length','Query_Aligned', | |
39 'Perc_Iden'] | |
40 | |
41 reverse_columns = ['Query_Contig','Query_Start','Query_End','Query_Length','Query_Aligned', | |
42 'Ref_Contig','Ref_Start','Ref_End','Ref_Length','Ref_Aligned', | |
43 'Perc_Iden'] | |
44 | |
45 int_columns = ['Ref_Start', 'Ref_End', 'Ref_Length', 'Ref_Aligned', | |
46 'Query_Start', 'Query_End', 'Query_Length', 'Query_Aligned'] | |
47 | |
48 float_columns = ['Perc_Iden'] | |
49 | |
50 if len(bed_rows) > 0: | |
51 | |
52 bed_df = pd.DataFrame(bed_rows, columns=bed_columns) | |
53 | |
54 # Swap columns if reversed | |
55 if snpdiffs_orientation == -1: | |
56 bed_df = bed_df[reverse_columns].copy() | |
57 bed_df.columns = bed_columns | |
58 | |
59 # Gather covered loci | |
60 covered_bed_df = bed_df[(bed_df['Ref_Start'] != ".") & (bed_df['Query_Start'] != ".")].copy() | |
61 if covered_bed_df.shape[0] > 0: | |
62 for col in int_columns: | |
63 covered_bed_df.loc[:, col] = covered_bed_df.loc[:, col].astype(float).astype(int) | |
64 for col in float_columns: | |
65 covered_bed_df.loc[:, col] = covered_bed_df.loc[:, col].astype(float) | |
66 | |
67 return covered_bed_df | |
68 else: | |
69 return pd.DataFrame(columns=bed_columns) | |
70 | |
71 def processSNPs(snp_rows,snpdiffs_orientation): | |
72 | |
73 snp_columns = ['Ref_Contig','Start_Ref','Ref_Pos', | |
74 'Query_Contig','Start_Query','Query_Pos', | |
75 'Ref_Loc','Query_Loc', | |
76 'Ref_Start','Ref_End', | |
77 'Query_Start','Query_End', | |
78 'Ref_Base','Query_Base', | |
79 'Dist_to_Ref_End','Dist_to_Query_End', | |
80 'Ref_Aligned','Query_Aligned', | |
81 'Query_Direction','Perc_Iden','Cat'] | |
82 | |
83 return_columns = ['Ref_Contig','Start_Ref','Ref_Pos', | |
84 'Query_Contig','Start_Query','Query_Pos', | |
85 'Ref_Loc','Query_Loc', | |
86 'Ref_Start','Ref_End', | |
87 'Query_Start','Query_End', | |
88 'Ref_Base','Query_Base', | |
89 'Dist_to_Ref_End','Dist_to_Query_End', | |
90 'Ref_Aligned','Query_Aligned', | |
91 'Perc_Iden','Cat'] | |
92 | |
93 reverse_columns = ['Query_Contig','Start_Query','Query_Pos', | |
94 'Ref_Contig','Start_Ref','Ref_Pos', | |
95 'Query_Loc','Ref_Loc', | |
96 'Query_Start','Query_End', | |
97 'Ref_Start','Ref_End', | |
98 'Query_Base','Ref_Base', | |
99 'Dist_to_Query_End','Dist_to_Ref_End', | |
100 'Query_Aligned','Ref_Aligned', | |
101 'Query_Direction','Perc_Iden','Cat'] | |
102 | |
103 reverse_complement = {'A':'T','T':'A','G':'C','C':'G', | |
104 'a':'T','t':'A','c':'G','g':'C'} | |
105 | |
106 # Columns to convert to integer | |
107 int_columns = ['Start_Ref', 'Ref_Pos', 'Start_Query', 'Query_Pos', | |
108 'Dist_to_Ref_End', 'Dist_to_Query_End', 'Ref_Aligned', 'Query_Aligned'] | |
109 | |
110 # Columns to convert to float | |
111 float_columns = ['Perc_Iden'] | |
112 | |
113 if len(snp_rows) > 0: | |
114 snp_df = pd.DataFrame(snp_rows, columns= snp_columns).copy() | |
115 | |
116 if snpdiffs_orientation == -1: | |
117 snp_df = snp_df[reverse_columns].copy() | |
118 snp_df.columns = snp_columns | |
119 | |
120 # Replace Query_Base and Reference_Base with reverse complement if Query_Direction is -1 and base is in ['A','T','G','C','a','c','t','g'] | |
121 snp_df.loc[snp_df['Query_Direction'] == '-1','Query_Base'] = snp_df.loc[snp_df['Query_Direction'] == '-1','Query_Base'].apply(lambda x: reverse_complement[x] if x in reverse_complement else x) | |
122 snp_df.loc[snp_df['Query_Direction'] == '-1','Ref_Base'] = snp_df.loc[snp_df['Query_Direction'] == '-1','Ref_Base'].apply(lambda x: reverse_complement[x] if x in reverse_complement else x) | |
123 | |
124 for col in int_columns: | |
125 snp_df.loc[:, col] = snp_df.loc[:, col].astype(float).astype(int) | |
126 for col in float_columns: | |
127 snp_df.loc[:, col] = snp_df.loc[:, col].astype(float) | |
128 | |
129 else: | |
130 snp_df = pd.DataFrame(columns = return_columns) | |
131 | |
132 return snp_df[return_columns] | |
133 | |
134 def swapHeader(header_data): | |
135 | |
136 raw_header_cols = [x for x in header_data.columns] | |
137 reverse_header_cols = [item.replace('Reference', 'temp').replace('Query', 'Reference').replace('temp', 'Query') for item in raw_header_cols] | |
138 reversed_header_data = header_data[reverse_header_cols].copy() | |
139 reversed_header_data.columns = raw_header_cols | |
140 | |
141 return reversed_header_data | |
142 | |
143 def parseSNPDiffs(snpdiffs_file,snpdiffs_orientation): | |
144 | |
145 bed_rows = [] | |
146 snp_rows = [] | |
147 | |
148 with open(snpdiffs_file, 'r') as file: | |
149 lines = file.readlines() | |
150 | |
151 for line in lines: | |
152 if line[0:2] == "#\t": | |
153 pass | |
154 elif line[0:3] == "##\t": | |
155 bed_rows.append(line.strip().split("\t")[1:]) | |
156 else: | |
157 snp_rows.append(line.strip().split("\t")) | |
158 | |
159 bed_df = processBED(bed_rows,snpdiffs_orientation) | |
160 snp_df = processSNPs(snp_rows,snpdiffs_orientation) | |
161 return (bed_df,snp_df) | |
162 | |
163 def calculate_total_length(bedtool): | |
164 return sum(len(interval) for interval in bedtool) | |
165 | |
166 def filterSNPs(raw_snp_df,bed_df,log_file, min_len, min_iden, ref_edge, query_edge, density_windows, max_snps): | |
167 | |
168 if temp_dir != "": | |
169 helpers.set_tempdir(temp_dir) | |
170 | |
171 # Grab raw data | |
172 total_snp_count = raw_snp_df.shape[0] | |
173 | |
174 # Get unique SNPs relative to the reference genome | |
175 unique_ref_snps = raw_snp_df['Ref_Loc'].unique() | |
176 unique_snp_count = len(unique_ref_snps) | |
177 | |
178 snp_tally_df = pd.DataFrame() | |
179 | |
180 with open(log_file,"a+") as log: | |
181 log.write(f"\n\t- Raw SNP + indel count: {total_snp_count}\n") | |
182 log.write(f"\n\t- Unique SNP positions in reference genome: {unique_snp_count}\n") | |
183 | |
184 # Set all sites to SNP | |
185 raw_snp_df['Filter_Cat'] = "SNP" | |
186 | |
187 # Filter out SNPs based on --min_len and --min_iden | |
188 reject_length = raw_snp_df.loc[(raw_snp_df['Ref_Aligned'] < min_len) & (raw_snp_df['Perc_Iden'] >= min_iden)].copy() | |
189 if reject_length.shape[0] > 0: | |
190 with open(log_file,"a+") as log: | |
191 log.write(f"\t\t- Purged (Alignment Length): {reject_length.shape[0]}\n") | |
192 reject_length['Filter_Cat'] = "Purged_Length" | |
193 snp_tally_df = pd.concat([snp_tally_df,reject_length]).reset_index(drop=True) | |
194 | |
195 reject_iden = raw_snp_df.loc[(raw_snp_df['Ref_Aligned'] >= min_len) & (raw_snp_df['Perc_Iden'] < min_iden)].copy() | |
196 if reject_iden.shape[0] > 0: | |
197 with open(log_file,"a+") as log: | |
198 log.write(f"\t\t- Purged (Alignment Identity): {reject_iden.shape[0]}\n") | |
199 reject_iden['Filter_Cat'] = "Purged_Identity" | |
200 snp_tally_df = pd.concat([snp_tally_df,reject_iden]).reset_index(drop=True) | |
201 | |
202 reject_lenIden = raw_snp_df.loc[(raw_snp_df['Ref_Aligned'] < min_len) & (raw_snp_df['Perc_Iden'] < min_iden)].copy() | |
203 if reject_lenIden.shape[0] > 0: | |
204 with open(log_file,"a+") as log: | |
205 log.write(f"\t\t- Purged (Alignment Length + Identity): {reject_lenIden.shape[0]}\n") | |
206 reject_lenIden['Filter_Cat'] = "Purged_LengthIdentity" | |
207 snp_tally_df = pd.concat([snp_tally_df,reject_lenIden]).reset_index(drop=True) | |
208 | |
209 pass_filter = raw_snp_df.loc[(raw_snp_df['Ref_Aligned'] >= min_len) & (raw_snp_df['Perc_Iden'] >= min_iden)].copy().reset_index(drop=True) | |
210 | |
211 # Invalid processing | |
212 reject_invalid = pass_filter[pass_filter['Cat'] == "Invalid"].copy() | |
213 if reject_invalid.shape[0] > 0: | |
214 with open(log_file,"a+") as log: | |
215 log.write(f"\t\t- Purged (Invalid Base): {reject_invalid.shape[0]}\n") | |
216 reject_invalid['Filter_Cat'] = "Purged_Invalid" | |
217 snp_tally_df = pd.concat([snp_tally_df,reject_invalid]).reset_index(drop=True) | |
218 pass_filter = pass_filter.loc[pass_filter['Cat'] != "Invalid"].copy() | |
219 | |
220 # Indel processing | |
221 reject_indel = pass_filter[pass_filter['Cat'] == "Indel"].copy() | |
222 if reject_indel.shape[0] > 0: | |
223 with open(log_file,"a+") as log: | |
224 log.write(f"\t\t- Purged (Indel): {reject_indel.shape[0]}\n") | |
225 reject_indel['Filter_Cat'] = "Purged_Indel" | |
226 snp_tally_df = pd.concat([snp_tally_df,reject_indel]).reset_index(drop=True) | |
227 pass_filter = pass_filter.loc[pass_filter['Cat'] != "Indel"].copy() | |
228 | |
229 # Check for heterozygous SNPs | |
230 check_heterozygous = pass_filter.groupby('Ref_Loc').filter(lambda x: x['Query_Base'].nunique() > 1) | |
231 if check_heterozygous.shape[0] > 0: | |
232 reject_heterozygous = pass_filter.loc[pass_filter['Ref_Loc'].isin(check_heterozygous['Ref_Loc'])].copy() | |
233 reject_heterozygous['Filter_Cat'] = "Purged_Heterozygous" | |
234 with open(log_file,"a+") as log: | |
235 log.write(f"\t\t- Purged (Heterozygotes): {reject_heterozygous.shape[0]}\n") | |
236 snp_tally_df = pd.concat([snp_tally_df,reject_heterozygous]).reset_index(drop=True) | |
237 pass_filter = pass_filter.loc[~pass_filter['Ref_Loc'].isin(check_heterozygous['Ref_Loc'])].copy() | |
238 | |
239 # Check for duplicate SNPs and take the longest, best hit | |
240 check_duplicates = pass_filter.groupby('Ref_Loc').filter(lambda x: x.shape[0] > 1) | |
241 if check_duplicates.shape[0] > 0: | |
242 reject_duplicate = pass_filter.loc[pass_filter['Ref_Loc'].isin(check_duplicates['Ref_Loc'])].copy() | |
243 pass_filter = pass_filter.loc[~pass_filter['Ref_Loc'].isin(check_duplicates['Ref_Loc'])].copy() | |
244 | |
245 best_snp = reject_duplicate.groupby('Ref_Loc').apply(lambda x: x.sort_values(by=['Ref_Aligned', 'Perc_Iden'], ascending=[False, False]).head(1)) | |
246 pass_filter = pd.concat([pass_filter,best_snp]).reset_index(drop=True) | |
247 | |
248 dup_snps = reject_duplicate[~reject_duplicate.apply(lambda x: x in best_snp, axis=1)] | |
249 dup_snps['Filter_Cat'] = "Purged_Duplicate" | |
250 | |
251 snp_tally_df = pd.concat([snp_tally_df,dup_snps]).reset_index(drop=True) | |
252 | |
253 with open(log_file,"a+") as log: | |
254 log.write(f"\t\t- Purged (Duplicates): {dup_snps.shape[0]}\n") | |
255 | |
256 # Assert that Ref_Loc and Query_Loc are unique in pass_filter | |
257 helpers.cleanup(verbose=False,remove_all = False) | |
258 assert pass_filter['Ref_Loc'].nunique() == pass_filter.shape[0] | |
259 assert pass_filter['Query_Loc'].nunique() == pass_filter.shape[0] | |
260 | |
261 # Density filtering | |
262 density_locs = [] | |
263 ref_locs = pass_filter['Ref_Loc'].tolist() | |
264 | |
265 if len(density_windows) == 0: | |
266 with open(log_file,"a+") as log: | |
267 log.write("\n\t- Density filtering disabled...\n") | |
268 elif len(ref_locs) > 0: | |
269 density_df = pd.DataFrame([item.split('/') for item in ref_locs], columns=['Ref_Contig','Ref_End']) | |
270 density_df['Ref_Start'] = density_df['Ref_End'].astype(float).astype(int) - 1 | |
271 density_bed = BedTool.from_dataframe(density_df[['Ref_Contig','Ref_Start','Ref_End']]) | |
272 | |
273 # For each density window, remove all SNPs that fall in a window with > max_snps | |
274 for i in range(0,len(density_windows)): | |
275 window_df = density_bed.window(density_bed,c=True, w=density_windows[i]).to_dataframe() | |
276 problematic_windows = window_df[window_df['name'] > max_snps[i]].copy() | |
277 if not problematic_windows.empty: | |
278 temp_locs = [] | |
279 for _, row in problematic_windows.iterrows(): | |
280 purge_window_df = window_df[window_df['chrom'] == row['chrom']].copy() | |
281 purge_window_df['Dist'] = abs(purge_window_df['end'] - row['end']) | |
282 window_snps = purge_window_df.sort_values(by=['Dist'],ascending=True).head(row['name']) | |
283 temp_locs = temp_locs + ["/".join([str(x[0]),str(x[1])]) for x in list(zip(window_snps.chrom, window_snps.end))] | |
284 density_locs.extend(list(set(temp_locs))) | |
285 | |
286 density_locs = list(set(density_locs)) | |
287 reject_density = pass_filter[pass_filter['Ref_Loc'].isin(density_locs)].copy() | |
288 | |
289 if reject_density.shape[0] > 0: | |
290 with open(log_file,"a+") as log: | |
291 log.write(f"\t\t- Purged (Density): {reject_density.shape[0]}\n") | |
292 reject_density['Filter_Cat'] = "Purged_Density" | |
293 snp_tally_df = pd.concat([snp_tally_df,reject_density]).reset_index(drop=True) | |
294 pass_filter = pass_filter[~pass_filter['Ref_Loc'].isin(density_locs)].copy() | |
295 | |
296 reject_query_edge = pass_filter[(pass_filter['Dist_to_Query_End'] < query_edge) & (pass_filter['Dist_to_Ref_End'] >= ref_edge)].copy() | |
297 reject_ref_edge = pass_filter[(pass_filter['Dist_to_Ref_End'] < ref_edge) & (pass_filter['Dist_to_Query_End'] >= query_edge)].copy() | |
298 reject_both_edge = pass_filter[(pass_filter['Dist_to_Query_End'] < query_edge) & (pass_filter['Dist_to_Ref_End'] < ref_edge)].copy() | |
299 | |
300 if reject_query_edge.shape[0] > 0: | |
301 with open(log_file,"a+") as log: | |
302 log.write(f"\t\t- Purged (Query Edge): {reject_query_edge.shape[0]}\n") | |
303 reject_query_edge['Filter_Cat'] = "Filtered_Query_Edge" | |
304 snp_tally_df = pd.concat([snp_tally_df,reject_query_edge]).reset_index(drop=True) | |
305 | |
306 if reject_ref_edge.shape[0] > 0: | |
307 with open(log_file,"a+") as log: | |
308 log.write(f"\t\t- Purged (Ref Edge): {reject_ref_edge.shape[0]}\n") | |
309 reject_ref_edge['Filter_Cat'] = "Filtered_Ref_Edge" | |
310 snp_tally_df = pd.concat([snp_tally_df,reject_ref_edge]).reset_index(drop=True) | |
311 | |
312 if reject_both_edge.shape[0] > 0: | |
313 with open(log_file,"a+") as log: | |
314 log.write(f"\t\t- Purged (Both Edge): {reject_both_edge.shape[0]}\n") | |
315 reject_both_edge['Filter_Cat'] = "Filtered_Both_Edge" | |
316 snp_tally_df = pd.concat([snp_tally_df,reject_both_edge]).reset_index(drop=True) | |
317 | |
318 pass_filter = pass_filter[(pass_filter['Dist_to_Query_End'] >= query_edge) & (pass_filter['Dist_to_Ref_End'] >= ref_edge)].copy() | |
319 | |
320 helpers.cleanup(verbose=False,remove_all = False) | |
321 | |
322 assert snp_tally_df.shape[0] + pass_filter.shape[0] == total_snp_count | |
323 return_df = pd.concat([pass_filter,snp_tally_df]).reset_index(drop=True).sort_values(by=['Ref_Loc']) | |
324 | |
325 return return_df.drop(columns=['Cat']).rename({'Filter_Cat':'Cat'}, axis=1) | |
326 | |
327 def screenSNPDiffs(snpdiffs_file,trim_name, min_cov, min_len, min_iden, ref_edge, query_edge, density_windows, max_snps,reference_id,log_directory): | |
328 | |
329 screen_start_time = time.time() | |
330 | |
331 if temp_dir != "": | |
332 helpers.set_tempdir(temp_dir) | |
333 | |
334 # Set CSP2 variables to NA | |
335 csp2_screen_snps = purged_length = purged_identity = purged_invalid = purged_indel = purged_lengthIdentity = purged_duplicate = purged_het = purged_density = filtered_ref_edge = filtered_query_edge = filtered_both_edge = "NA" | |
336 filtered_snp_df = pd.DataFrame() | |
337 good_reference_bed_df = pd.DataFrame() | |
338 | |
339 # Ensure snpdiffs file exists | |
340 if not os.path.exists(snpdiffs_file) or not snpdiffs_file.endswith('.snpdiffs'): | |
341 run_failed = True | |
342 sys.exit(f"Invalid snpdiffs file provided: {snpdiffs_file}") | |
343 | |
344 # Ensure header can be read in | |
345 try: | |
346 header_data = fetchHeaders(snpdiffs_file) | |
347 header_query = header_data['Query_ID'][0].replace(trim_name,'') | |
348 header_ref = header_data['Reference_ID'][0].replace(trim_name,'') | |
349 except: | |
350 run_failed = True | |
351 sys.exit(f"Error reading headers from snpdiffs file: {snpdiffs_file}") | |
352 | |
353 # Check snpdiffs orientation | |
354 if (header_ref == reference_id): | |
355 snpdiffs_orientation = 1 | |
356 query_id = header_query | |
357 elif (header_query == reference_id): | |
358 snpdiffs_orientation = -1 | |
359 query_id = header_ref | |
360 header_data = swapHeader(header_data) | |
361 else: | |
362 run_failed = True | |
363 sys.exit(f"Error: Reference ID not found in header of {snpdiffs_file}...") | |
364 | |
365 # Establish log file | |
366 log_file = f"{log_directory}/{query_id}__vs__{reference_id}.log" | |
367 with open(log_file,"w+") as log: | |
368 log.write("Reference Screening for SNP Pipeline Analysis\n") | |
369 log.write(f"Query Isolate: {query_id}\n") | |
370 log.write(f"Reference Isolate: {reference_id}\n") | |
371 log.write(str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))+"\n") | |
372 log.write("-------------------------------------------------------\n\n") | |
373 if snpdiffs_orientation == 1: | |
374 log.write("\t- SNPDiffs file is in the forward orientation\n") | |
375 log.write("-------------------------------------------------------\n\n") | |
376 else: | |
377 log.write("\t- SNPDiffs file is in the reverse orientation\n") | |
378 log.write("-------------------------------------------------------\n\n") | |
379 | |
380 | |
381 # Set variables from header data | |
382 raw_snps = int(header_data['SNPs'][0]) | |
383 raw_indels = int(header_data['Indels'][0]) | |
384 raw_invalid = int(header_data['Invalid'][0]) | |
385 | |
386 kmer_similarity = float(header_data['Kmer_Similarity'][0]) | |
387 shared_kmers = int(header_data['Shared_Kmers'][0]) | |
388 query_unique_kmers = int(header_data['Query_Unique_Kmers'][0]) | |
389 reference_unique_kmers = int(header_data['Reference_Unique_Kmers'][0]) | |
390 mummer_gsnps = int(header_data['gSNPs'][0]) | |
391 mummer_gindels = int(header_data['gIndels'][0]) | |
392 | |
393 query_bases = int(header_data['Query_Assembly_Bases'][0]) | |
394 reference_bases = int(header_data['Reference_Assembly_Bases'][0]) | |
395 | |
396 query_contigs = int(header_data['Query_Contig_Count'][0]) | |
397 reference_contigs = int(header_data['Reference_Contig_Count'][0]) | |
398 | |
399 raw_query_percent_aligned = float(header_data['Query_Percent_Aligned'][0]) | |
400 raw_ref_percent_aligned = float(header_data['Reference_Percent_Aligned'][0]) | |
401 | |
402 # If the reference is not covered by at least min_cov, STOP | |
403 if raw_ref_percent_aligned < min_cov: | |
404 query_percent_aligned = raw_query_percent_aligned | |
405 reference_percent_aligned = raw_ref_percent_aligned | |
406 screen_category = "Low_Coverage" | |
407 with open(log_file,"a+") as log: | |
408 log.write(f"\t- Reference genome coverage: {raw_ref_percent_aligned}% \n") | |
409 log.write(f"\t- Query covers less than --min_cov ({min_cov}%)...Screen halted...\n") | |
410 log.write("-------------------------------------------------------\n\n") | |
411 | |
412 elif raw_snps + raw_indels + raw_invalid > 10000: | |
413 query_percent_aligned = raw_query_percent_aligned | |
414 reference_percent_aligned = raw_ref_percent_aligned | |
415 screen_category = "SNP_Cutoff" | |
416 with open(log_file,"a+") as log: | |
417 log.write(f"\t- {raw_snps} detected...\n") | |
418 log.write("\t- > 10,000 SNPs, indels, or invalid sites detected by MUMmer...Screen halted...\n") | |
419 log.write("-------------------------------------------------------\n\n") | |
420 | |
421 else: | |
422 | |
423 ##### 02: Read in BED/SNP data ##### | |
424 with open(log_file,"a+") as log: | |
425 log.write("Step 1: Reading in snpdiffs BED/SNP data...") | |
426 try: | |
427 bed_df,snp_df = parseSNPDiffs(snpdiffs_file,snpdiffs_orientation) | |
428 | |
429 with open(log_file,"a+") as log: | |
430 log.write("Done!\n") | |
431 log.write("-------------------------------------------------------\n\n") | |
432 | |
433 except Exception as e: | |
434 run_failed = True | |
435 with open(log_file,"a+") as log: | |
436 log.write(f"\nError reading BED/SNP data from file: {snpdiffs_file}\n{str(e)}") | |
437 sys.exit(f"Error reading BED/SNP data from file: {snpdiffs_file}\n{str(e)}") | |
438 | |
439 ##### 03: Filter genome overlaps ##### | |
440 with open(log_file,"a+") as log: | |
441 log.write("Step 2: Filtering for short overlaps and low percent identity...") | |
442 | |
443 good_bed_df = bed_df[(bed_df['Ref_Aligned'] >= min_len) & (bed_df['Perc_Iden'] >= min_iden)].copy() | |
444 | |
445 if good_bed_df.shape[0] == 0: | |
446 screen_category = "Low_Quality_Coverage" | |
447 with open(log_file,"a+") as log: | |
448 log.write(f"\n\t- After filtering based on --min_len ({min_len}) and --min_iden ({min_iden}) , no valid alignments remain...Screen halted...\n") | |
449 log.write("-------------------------------------------------------\n\n") | |
450 | |
451 | |
452 else: | |
453 # Create a BED file for alignments that pass basic QC | |
454 good_query_bed_df = good_bed_df[['Query_Contig','Query_Start','Query_End']].copy() | |
455 good_reference_bed_df = good_bed_df[['Ref_Contig','Ref_Start','Ref_End']].copy() | |
456 good_reference_bed_df.loc[:, 'Query_ID'] = query_id | |
457 | |
458 good_query_aligned = calculate_total_length(BedTool.from_dataframe(good_query_bed_df).sort().merge()) | |
459 good_reference_aligned = calculate_total_length(BedTool.from_dataframe(good_reference_bed_df[['Ref_Contig','Ref_Start','Ref_End']]).sort().merge()) | |
460 | |
461 query_percent_aligned = (good_query_aligned / query_bases) * 100 | |
462 reference_percent_aligned = (good_reference_aligned / reference_bases) * 100 | |
463 | |
464 if reference_percent_aligned < min_cov: | |
465 screen_category = "Low_Quality_Coverage" | |
466 with open(log_file,"a+") as log: | |
467 log.write(f"\n\t- Raw reference genome coverage was {raw_ref_percent_aligned}% \n") | |
468 log.write(f"\t- After filtering based on --min_len ({min_len}) and --min_iden ({min_iden}), reference genome coverage was {reference_percent_aligned:.2f}% \n") | |
469 log.write(f"\t- Query covers less than --min_cov ({min_cov}%) of reference after filtering...Screen halted...\n") | |
470 log.write("-------------------------------------------------------\n\n") | |
471 | |
472 else: | |
473 screen_category = "Pass" | |
474 with open(log_file,"a+") as log: | |
475 log.write("Done!\n") | |
476 log.write(f"\t- Raw reference genome coverage was {raw_ref_percent_aligned}% \n") | |
477 log.write(f"\t- After filtering based on --min_len ({min_len}) and --min_iden ({min_iden}), reference genome coverage was {reference_percent_aligned:.2f}% \n") | |
478 log.write("-------------------------------------------------------\n\n") | |
479 | |
480 | |
481 # Filter SNPs | |
482 with open(log_file,"a+") as log: | |
483 log.write("Step 3: Filtering SNPs to get final SNP distances...") | |
484 | |
485 if raw_snps == 0: | |
486 csp2_screen_snps = purged_length = purged_identity = purged_lengthIdentity = purged_indel = purged_invalid = purged_duplicate = purged_het = purged_density = filtered_ref_edge = filtered_query_edge = filtered_both_edge = 0 | |
487 with open(log_file,"a+") as log: | |
488 log.write("Done!\n") | |
489 log.write("\t- No SNPs detected in MUMmer output, no filtering required\n") | |
490 log.write("-------------------------------------------------------\n\n") | |
491 | |
492 else: | |
493 filtered_snp_df = filterSNPs(snp_df,bed_df,log_file, min_len, min_iden, ref_edge, query_edge, density_windows, max_snps) | |
494 | |
495 csp2_screen_snps = filtered_snp_df[filtered_snp_df.Cat == "SNP"].shape[0] | |
496 purged_length = filtered_snp_df[filtered_snp_df.Cat == "Purged_Length"].shape[0] | |
497 purged_identity = filtered_snp_df[filtered_snp_df.Cat == "Purged_Identity"].shape[0] | |
498 purged_lengthIdentity = filtered_snp_df[filtered_snp_df.Cat == "Purged_LengthIdentity"].shape[0] | |
499 purged_invalid = filtered_snp_df[filtered_snp_df.Cat == "Purged_Invalid"].shape[0] | |
500 purged_indel = filtered_snp_df[filtered_snp_df.Cat == "Purged_Indel"].shape[0] | |
501 purged_het = filtered_snp_df[filtered_snp_df.Cat == "Purged_Heterozygous"].shape[0] | |
502 purged_duplicate = filtered_snp_df[filtered_snp_df.Cat == "Purged_Duplicate"].shape[0] | |
503 purged_density = filtered_snp_df[filtered_snp_df.Cat == "Purged_Density"].shape[0] | |
504 filtered_query_edge = filtered_snp_df[filtered_snp_df.Cat == "Filtered_Query_Edge"].shape[0] | |
505 filtered_ref_edge = filtered_snp_df[filtered_snp_df.Cat == "Filtered_Ref_Edge"].shape[0] | |
506 filtered_both_edge = filtered_snp_df[filtered_snp_df.Cat == "Filtered_Both_Edge"].shape[0] | |
507 | |
508 # Write filtered SNP data to file | |
509 snp_file = log_file.replace(".log","_SNPs.tsv") | |
510 filtered_snp_df.to_csv(snp_file, sep="\t", index=False) | |
511 | |
512 filtered_snp_df.loc[:, 'Query_ID'] = query_id | |
513 | |
514 with open(log_file,"a+") as log: | |
515 log.write("Done!\n") | |
516 log.write(f"\t- {csp2_screen_snps} SNPs detected between {query_id} and {reference_id} after filtering\n") | |
517 log.write(f"\t- Full SNP data saved to {snp_file}\n") | |
518 log.write("-------------------------------------------------------\n\n") | |
519 | |
520 screen_end_time = time.time() | |
521 with open(log_file,"a+") as log: | |
522 log.write(f"Screening Time: {screen_end_time - screen_start_time:.2f} seconds\n") | |
523 | |
524 # Clean up pybedtools temp | |
525 helpers.cleanup(verbose=False, remove_all=False) | |
526 | |
527 return ([str(item) for item in [query_id,reference_id,screen_category,csp2_screen_snps, | |
528 f"{query_percent_aligned:.2f}",f"{reference_percent_aligned:.2f}", | |
529 query_contigs,query_bases,reference_contigs,reference_bases, | |
530 raw_snps,purged_length,purged_identity,purged_lengthIdentity,purged_invalid,purged_indel,purged_duplicate,purged_het,purged_density, | |
531 filtered_query_edge,filtered_ref_edge,filtered_both_edge, | |
532 kmer_similarity,shared_kmers,query_unique_kmers,reference_unique_kmers, | |
533 mummer_gsnps,mummer_gindels]],good_reference_bed_df,filtered_snp_df) | |
534 | |
535 def assessCoverage(query_id,site_list): | |
536 | |
537 if temp_dir != "": | |
538 helpers.set_tempdir(temp_dir) | |
539 | |
540 if len(site_list) == 0: | |
541 return pd.DataFrame(columns=['Ref_Loc','Query_ID','Cat']) | |
542 else: | |
543 coverage_df = pass_filter_coverage_df[pass_filter_coverage_df['Query_ID'] == query_id].copy() | |
544 | |
545 if coverage_df.shape[0] == 0: | |
546 uncovered_loc_df = pd.DataFrame({ | |
547 'Ref_Loc': site_list, | |
548 'Query_ID': [query_id] * len(site_list), | |
549 'Cat': ["Uncovered"] * len(site_list) | |
550 }) | |
551 return uncovered_loc_df | |
552 else: | |
553 coverage_bed = BedTool.from_dataframe(coverage_df[['Ref_Contig','Ref_Start','Ref_End']]).sort() | |
554 snp_bed_df = pd.DataFrame([item.split('/') for item in site_list], columns=['Ref_Contig','Ref_End']) | |
555 snp_bed_df['Ref_Start'] = snp_bed_df['Ref_End'].astype(float).astype(int) - 1 | |
556 snp_bed_df['Ref_Loc'] = site_list | |
557 snp_bed = BedTool.from_dataframe(snp_bed_df[['Ref_Contig','Ref_Start','Ref_End','Ref_Loc']]).sort() | |
558 | |
559 # Ref_Locs from snp_bed that intersect with coverage_bed go into covered_locs, the rest go into uncovered_locs | |
560 covered_locs = snp_bed.intersect(coverage_bed, wa=True) | |
561 uncovered_locs = snp_bed.intersect(coverage_bed, v=True, wa=True) | |
562 | |
563 covered_loc_df = pd.DataFrame({ | |
564 'Ref_Loc': [snp.fields[3] for snp in covered_locs], | |
565 'Query_ID': [query_id] * covered_locs.count(), | |
566 'Cat': ["Ref_Base"] * covered_locs.count() | |
567 }) if covered_locs.count() > 0 else pd.DataFrame(columns=['Ref_Loc','Query_ID','Cat']) | |
568 | |
569 uncovered_loc_df = pd.DataFrame({ | |
570 'Ref_Loc': [snp.fields[3] for snp in uncovered_locs], | |
571 'Query_ID': [query_id] * uncovered_locs.count(), | |
572 'Cat': ["Uncovered"] * uncovered_locs.count() | |
573 }) if uncovered_locs.count() > 0 else pd.DataFrame(columns=['Ref_Loc','Query_ID','Cat']) | |
574 | |
575 # Clean up pybedtools temp | |
576 helpers.cleanup(verbose=False, remove_all=False) | |
577 | |
578 return pd.concat([covered_loc_df.drop_duplicates(['Ref_Loc']),uncovered_loc_df]) | |
579 | |
580 def getPairwise(chunk, sequences, ids): | |
581 results = [] | |
582 | |
583 for i, j in chunk: | |
584 seq1, seq2 = sequences[i], sequences[j] | |
585 actg_mask1 = np.isin(seq1, list('ACTGactg')) | |
586 actg_mask2 = np.isin(seq2, list('ACTGactg')) | |
587 cocalled_mask = actg_mask1 & actg_mask2 | |
588 | |
589 snps_cocalled = np.sum(cocalled_mask) | |
590 snp_distance = np.sum((seq1 != seq2) & cocalled_mask) | |
591 | |
592 results.append([ids[i], ids[j], snp_distance, snps_cocalled]) | |
593 | |
594 return results | |
595 | |
596 def parallelAlignment(alignment, chunk_size=5000): | |
597 sequences = [np.array(list(record.seq)) for record in alignment] | |
598 ids = [record.id for record in alignment] | |
599 pairwise_combinations = list(combinations(range(len(sequences)), 2)) | |
600 | |
601 # Create chunks of pairwise combinations | |
602 chunks = [pairwise_combinations[i:i + chunk_size] for i in range(0, len(pairwise_combinations), chunk_size)] | |
603 | |
604 results = [] | |
605 with concurrent.futures.ProcessPoolExecutor() as executor: | |
606 future_to_chunk = {executor.submit(getPairwise, chunk, sequences, ids): chunk for chunk in chunks} | |
607 for future in concurrent.futures.as_completed(future_to_chunk): | |
608 chunk_results = future.result() | |
609 results.extend(chunk_results) | |
610 return results | |
611 | |
612 def getFinalPurge(df): | |
613 # Returns the 'farthest along' category for a given Ref_Loc | |
614 if "Purged_Density" in df['Cat'].values: | |
615 return "Purged_Density" | |
616 elif "Purged_Heterozygous" in df['Cat'].values: | |
617 return "Purged_Heterozygous" | |
618 elif "Purged_Indel" in df['Cat'].values: | |
619 return "Purged_Indel" | |
620 elif "Purged_Invalid" in df['Cat'].values: | |
621 return "Purged_Invalid" | |
622 elif "Purged_Length" in df['Cat'].values: | |
623 return "Purged_Length" | |
624 else: | |
625 return "Purged_Identity" | |
626 | |
627 | |
628 # Read in arguments | |
629 global run_failed | |
630 run_failed = False | |
631 | |
632 start_time = time.time() | |
633 | |
634 parser = argparse.ArgumentParser(description='CSP2 SNP Pipeline Analysis') | |
635 parser.add_argument('--reference_id', type=str, help='Reference Isolate') | |
636 parser.add_argument('--output_directory', type=str, help='Output Directory') | |
637 parser.add_argument('--log_directory', type=str, help='Log Directory') | |
638 parser.add_argument('--snpdiffs_file', type=str, help='Path to SNPdiffs file') | |
639 parser.add_argument('--min_cov', type=float, help='Minimum coverage') | |
640 parser.add_argument('--min_len', type=int, help='Minimum length') | |
641 parser.add_argument('--min_iden', type=float, help='Minimum identity') | |
642 parser.add_argument('--ref_edge', type=int, help='Reference edge') | |
643 parser.add_argument('--query_edge', type=int, help='Query edge') | |
644 parser.add_argument('--density_windows', type=str, help='Density windows') | |
645 parser.add_argument('--max_snps', type=str, help='Maximum SNPs') | |
646 parser.add_argument('--trim_name', type=str, help='Trim name') | |
647 parser.add_argument('--max_missing', type=float, help='Maximum missing') | |
648 parser.add_argument('--tmp_dir', type=str, help='Temporary directory') | |
649 parser.add_argument('--rescue', type=str, help='Rescue edge SNPs (rescue/norescue)') | |
650 args = parser.parse_args() | |
651 | |
652 reference_id = args.reference_id | |
653 output_directory = os.path.abspath(args.output_directory) | |
654 log_directory = os.path.abspath(args.log_directory) | |
655 log_file = f"{output_directory}/CSP2_SNP_Pipeline.log" | |
656 snpdiffs_file = args.snpdiffs_file | |
657 min_cov = args.min_cov | |
658 min_len = args.min_len | |
659 min_iden = args.min_iden | |
660 ref_edge = args.ref_edge | |
661 query_edge = args.query_edge | |
662 density_windows = [int(x) for x in args.density_windows.split(",")] | |
663 max_snps = [int(x) for x in args.max_snps.split(",")] | |
664 trim_name = args.trim_name | |
665 max_missing = args.max_missing | |
666 | |
667 # Establish log file | |
668 with open(log_file,"w+") as log: | |
669 log.write("CSP2 SNP Pipeline Analysis\n") | |
670 log.write(f"Reference Isolate: {reference_id}\n") | |
671 log.write(str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))+"\n") | |
672 log.write("-------------------------------------------------------\n\n") | |
673 log.write("Reading in SNPDiffs files...") | |
674 | |
675 | |
676 # Read in all lines and ensure each file exists | |
677 snpdiffs_list = [line.strip() for line in open(snpdiffs_file, 'r')] | |
678 snpdiffs_list = [line for line in snpdiffs_list if line] | |
679 for snpdiffs_file in snpdiffs_list: | |
680 if not os.path.exists(snpdiffs_file): | |
681 run_failed = True | |
682 sys.exit("Error: File does not exist: " + snpdiffs_file) | |
683 | |
684 snpdiffs_list = list(set(snpdiffs_list)) | |
685 | |
686 if len(snpdiffs_list) == 0: | |
687 run_failed = True | |
688 sys.exit("No SNPdiffs files provided...") | |
689 | |
690 with open(log_file, "a+") as log: | |
691 log.write("Done!\n") | |
692 log.write(f"\t- Read in {len(snpdiffs_list)} SNPdiffs files\n") | |
693 log.write("-------------------------------------------------------\n\n") | |
694 | |
695 global temp_dir | |
696 if args.tmp_dir != "": | |
697 random_temp_id = str(uuid.uuid4()) | |
698 temp_dir = f"{os.path.normpath(os.path.abspath(args.tmp_dir))}/{random_temp_id}" | |
699 try: | |
700 os.mkdir(temp_dir) | |
701 helpers.set_tempdir(temp_dir) | |
702 except OSError as e: | |
703 run_failed = True | |
704 print(f"Error: Failed to create directory '{temp_dir}': {e}") | |
705 else: | |
706 temp_dir = "" | |
707 | |
708 rescue_edge = str(args.rescue) | |
709 if rescue_edge not in ["rescue","norescue"]: | |
710 with open(log_file,"a+") as log: | |
711 log.write(f"\t- Unexpected rescue_edge variable ('{rescue_edge}'). Not performing SNP edge rescuing (any SNPs found within {query_edge}bp of a query contig edge will be purged)...\n") | |
712 log.write("-------------------------------------------------------\n\n") | |
713 rescue_edge = "norescue" | |
714 elif rescue_edge == "rescue": | |
715 with open(log_file,"a+") as log: | |
716 log.write(f"\t- Rescuing edge SNPs within {query_edge}bp of query contig edges if found more centrally in another query...\n") | |
717 log.write("-------------------------------------------------------\n\n") | |
718 else: | |
719 with open(log_file,"a+") as log: | |
720 log.write(f"\t- Not performing SNP edge rescuing (any SNPs found within {query_edge}bp of a query contig edge will be purged)...\n") | |
721 log.write("-------------------------------------------------------\n\n") | |
722 | |
723 try: | |
724 # Establish output files | |
725 reference_screening_file = f"{output_directory}/Reference_Screening.tsv" | |
726 locus_category_file = f"{output_directory}/Locus_Categories.tsv" | |
727 query_coverage_file = f"{output_directory}/Query_Coverage.tsv" | |
728 raw_loclist = f"{output_directory}/snplist.txt" | |
729 raw_alignment = f"{output_directory}/snpma.fasta" | |
730 preserved_loclist = f"{output_directory}/snplist_preserved.txt" | |
731 preserved_alignment_file = f"{output_directory}/snpma_preserved.fasta" | |
732 raw_pairwise = f"{output_directory}/snp_distance_pairwise.tsv" | |
733 raw_matrix = f"{output_directory}/snp_distance_matrix.tsv" | |
734 preserved_pairwise = f"{output_directory}/snp_distance_pairwise_preserved.tsv" | |
735 preserved_matrix = f"{output_directory}/snp_distance_matrix_preserved.tsv" | |
736 | |
737 with open(log_file,"a+") as log: | |
738 log.write("Screening all queries against reference...") | |
739 with concurrent.futures.ProcessPoolExecutor() as executor: | |
740 results = [executor.submit(screenSNPDiffs,snp_diff_file,trim_name, min_cov, min_len, min_iden, ref_edge, query_edge, density_windows, max_snps,reference_id,log_directory) for snp_diff_file in snpdiffs_list] | |
741 | |
742 # Combine results into a dataframe | |
743 output_columns = ['Query_ID','Reference_ID','Screen_Category','CSP2_Screen_SNPs', | |
744 'Query_Percent_Aligned','Reference_Percent_Aligned', | |
745 'Query_Contigs','Query_Bases','Reference_Contigs','Reference_Bases', | |
746 'Raw_SNPs','Purged_Length','Purged_Identity','Purged_LengthIdentity','Purged_Invalid','Purged_Indel','Purged_Duplicate','Purged_Het','Purged_Density', | |
747 'Filtered_Query_Edge','Filtered_Ref_Edge','Filtered_Both_Edge', | |
748 'Kmer_Similarity','Shared_Kmers','Query_Unique_Kmers','Reference_Unique_Kmers', | |
749 'MUMmer_gSNPs','MUMmer_gIndels'] | |
750 | |
751 # Save reference screening | |
752 results_df = pd.DataFrame([item.result()[0] for item in results], columns = output_columns) | |
753 results_df.to_csv(reference_screening_file, sep="\t", index=False) | |
754 | |
755 # Get reference bed dfs | |
756 covered_df = pd.concat([item.result()[1] for item in results]) | |
757 | |
758 # Get snp dfs | |
759 filtered_snp_df = pd.concat([item.result()[2] for item in results]) | |
760 | |
761 # Separate isolates that pass QC | |
762 pass_qc_isolates = list(set(results_df[results_df['Screen_Category'] == "Pass"]['Query_ID'])) | |
763 fail_qc_isolates = list(set(results_df[results_df['Screen_Category'] != "Pass"]['Query_ID'])) | |
764 | |
765 if len(pass_qc_isolates) == 0: | |
766 with open(log_file,"a+") as log: | |
767 log.write("Done!\n") | |
768 log.write(f"\t- Reference screening data saved to {reference_screening_file}\n") | |
769 log.write(f"\t- Of {len(snpdiffs_list)} comparisons, no isolates passed QC. Pipeline cannot continue.\n") | |
770 log.write(f"\t- {len(fail_qc_isolates)} comparisons failed QC\n") | |
771 for isolate in fail_qc_isolates: | |
772 isolate_category = results_df[results_df['Query_ID'] == isolate]['Screen_Category'].values[0] | |
773 log.write(f"\t\t- {isolate}: {isolate_category}\n") | |
774 log.write("-------------------------------------------------------\n\n") | |
775 sys.exit(0) | |
776 else: | |
777 with open(log_file,"a+") as log: | |
778 log.write("Done!\n") | |
779 log.write(f"\t- Reference screening data saved to {reference_screening_file}\n") | |
780 log.write(f"\t- Of {len(snpdiffs_list)} comparisons, {len(pass_qc_isolates)} covered at least {min_cov}% of the reference genome after removing poor alignments\n") | |
781 if len(fail_qc_isolates) > 0: | |
782 log.write(f"\t- {len(fail_qc_isolates)} comparisons failed QC\n") | |
783 for isolate in fail_qc_isolates: | |
784 isolate_category = results_df[results_df['Query_ID'] == isolate]['Screen_Category'].values[0] | |
785 log.write(f"\t\t- {isolate}: {isolate_category}\n") | |
786 log.write("-------------------------------------------------------\n\n") | |
787 | |
788 with open(log_file,"a+") as log: | |
789 log.write(f"Compiling SNPs across {len(pass_qc_isolates)} samples...\n") | |
790 | |
791 if filtered_snp_df.shape[0] == 0: | |
792 snp_count = 0 | |
793 with open(log_file,"a+") as log: | |
794 log.write("\t- No SNPs detected across samples...Skipping to output...\n") | |
795 else: | |
796 | |
797 # Remove samples that failed QC | |
798 pass_filter_snps = filtered_snp_df[filtered_snp_df['Query_ID'].isin(pass_qc_isolates)].copy() | |
799 pass_filter_snp_list = list(set(pass_filter_snps['Ref_Loc'])) | |
800 pass_filter_snp_count = len(pass_filter_snp_list) | |
801 | |
802 global pass_filter_coverage_df | |
803 pass_filter_coverage_df = covered_df[covered_df['Query_ID'].isin(pass_qc_isolates)].copy() | |
804 | |
805 # Get SNP counts | |
806 snp_df = pass_filter_snps[pass_filter_snps['Cat'] == "SNP"].copy() | |
807 | |
808 if snp_df.shape[0] == 0: | |
809 snp_count = 0 | |
810 with open(log_file,"a+") as log: | |
811 log.write(f"\t- {pass_filter_snp_count} total SNPs detected across all samples...\n") | |
812 log.write("\t- No SNPs passed QC filtering in any sample...Skipping to output...\n") | |
813 | |
814 else: | |
815 snp_list = list(set(snp_df['Ref_Loc'])) | |
816 snp_count = len(snp_list) | |
817 | |
818 with open(log_file,"a+") as log: | |
819 log.write(f"\t- {pass_filter_snp_count} total SNPs detected across all samples...\n") | |
820 log.write(f"\t- {snp_count} unique SNPs passed QC filtering in at least one sample...\n") | |
821 | |
822 # Note SNPs lost irrevocably to reference edge trimming | |
823 ref_edge_df = pass_filter_snps[pass_filter_snps['Cat'].isin(["Filtered_Ref_Edge",'Filtered_Both_Edge'])].copy() | |
824 if ref_edge_df.shape[0] > 0: | |
825 with open(log_file,"a+") as log: | |
826 log.write(f"\t- {ref_edge_df['Ref_Loc'].nunique()} unique SNPs were within {ref_edge}bp of a reference contig end and were not considered in any query...\n") | |
827 | |
828 # Create Ref_Base df | |
829 ref_base_df = snp_df[['Ref_Loc', 'Ref_Base']].copy().drop_duplicates().rename(columns = {'Ref_Base':'Query_Base'}) | |
830 ref_base_df.loc[:,'Query_ID'] = reference_id | |
831 ref_base_df.loc[:,'Cat'] = "Reference_Isolate" | |
832 ref_base_df = ref_base_df.loc[:,['Ref_Loc','Query_ID','Query_Base','Cat']] | |
833 | |
834 # Rescue SNPs that are near the edge if they are valid SNPs in other samples | |
835 rescued_edge_df = pass_filter_snps[(pass_filter_snps['Cat'] == "Filtered_Query_Edge") & (pass_filter_snps['Ref_Loc'].isin(snp_list))].copy() | |
836 | |
837 if rescue_edge == "norescue": | |
838 with open(log_file,"a+") as log: | |
839 log.write("\t- Skipping edge resucing...\n") | |
840 | |
841 elif rescued_edge_df.shape[0] > 0: | |
842 | |
843 # Remove rescued sites from pass_filter_snps | |
844 rescue_merge = pass_filter_snps.merge(rescued_edge_df, indicator=True, how='outer') | |
845 pass_filter_snps = rescue_merge[rescue_merge['_merge'] == 'left_only'].drop(columns=['_merge']).copy() | |
846 | |
847 # Add rescued SNPs to snp_df | |
848 rescued_edge_df.loc[:,'Cat'] = "Rescued_SNP" | |
849 snp_df = pd.concat([snp_df,rescued_edge_df]).reset_index(drop=True) | |
850 with open(log_file,"a+") as log: | |
851 log.write(f"\t- Rescued {rescued_edge_df.shape[0]} query SNPs that fell within {query_edge}bp of the query contig edge...\n") | |
852 | |
853 else: | |
854 with open(log_file,"a+") as log: | |
855 log.write(f"\t- No query SNPs that fell within {query_edge}bp of the query contig edge were rescued...\n") | |
856 | |
857 # Gather base data for all valid SNPs | |
858 snp_base_df = snp_df[['Ref_Loc','Query_ID','Query_Base','Cat']].copy() | |
859 | |
860 # Process purged sites | |
861 purged_snp_df = pd.DataFrame(columns=['Ref_Loc','Query_ID','Query_Base','Cat']) | |
862 purged_df = pass_filter_snps[pass_filter_snps['Cat'] != "SNP"].copy() | |
863 | |
864 if purged_df.shape[0] > 0: | |
865 | |
866 # Remove rows from purged_df if the Ref_Loc/Query_ID pair is already in snp_base_df | |
867 purged_df = purged_df[~purged_df[['Ref_Loc','Query_ID']].apply(tuple,1).isin(snp_base_df[['Ref_Loc','Query_ID']].apply(tuple,1))].copy() | |
868 | |
869 if purged_df.shape[0] > 0: | |
870 | |
871 # Get purged SNPs where no query has a valid SNP | |
872 non_snp_df = purged_df[~purged_df['Ref_Loc'].isin(snp_list)].copy() | |
873 if non_snp_df.shape[0] > 0: | |
874 non_snp_merge = purged_df.merge(non_snp_df, indicator=True, how='outer') | |
875 purged_df = non_snp_merge[non_snp_merge['_merge'] == 'left_only'].drop(columns=['_merge']).copy() | |
876 | |
877 with open(log_file,"a+") as log: | |
878 log.write(f"\t- {len(list(set(non_snp_df['Ref_Loc'])))} unique SNPs were purged in all queries they were found in, and were not considered in the final dataset...\n") | |
879 | |
880 if purged_df.shape[0] > 0: | |
881 | |
882 purged_snp_df = purged_df[['Ref_Loc','Query_ID']].copy().drop_duplicates() | |
883 purged_snp_df.loc[:, 'Query_Base'] = "N" | |
884 final_purge_df = purged_df.groupby(['Ref_Loc','Query_ID']).apply(getFinalPurge).reset_index().rename(columns={0:'Cat'}) | |
885 purged_snp_df = purged_snp_df.merge(final_purge_df, on=['Ref_Loc','Query_ID'], how='inner') | |
886 | |
887 # Genomic positions that do not occur- in the SNP data are either uncovered or match the reference base | |
888 missing_df = pd.DataFrame(columns=['Ref_Loc','Query_ID','Query_Base','Cat']) | |
889 | |
890 covered_snps = pd.concat([snp_base_df,purged_snp_df]).copy() | |
891 ref_loc_sets = covered_snps.groupby('Query_ID')['Ref_Loc'].agg(set).to_dict() | |
892 isolates_with_missing = [isolate for isolate in pass_qc_isolates if len(set(snp_list) - ref_loc_sets.get(isolate, set())) > 0] | |
893 | |
894 uncovered_df = pd.DataFrame() | |
895 | |
896 if isolates_with_missing: | |
897 isolate_data = [(isolate, list(set(snp_list) - ref_loc_sets.get(isolate, set()))) for isolate in isolates_with_missing] | |
898 | |
899 with concurrent.futures.ProcessPoolExecutor() as executor: | |
900 results = [executor.submit(assessCoverage, query, sites) for query, sites in isolate_data] | |
901 coverage_dfs = [result.result() for result in concurrent.futures.as_completed(results)] | |
902 | |
903 coverage_df = pd.concat(coverage_dfs) | |
904 covered_df = coverage_df[coverage_df['Cat'] == 'Ref_Base'] | |
905 uncovered_df = coverage_df[coverage_df['Cat'] == 'Uncovered'] | |
906 | |
907 if not uncovered_df.empty: | |
908 uncovered_df.loc[:, 'Query_Base'] = "?" | |
909 missing_df = pd.concat([missing_df, uncovered_df[['Ref_Loc', 'Query_ID', 'Query_Base', 'Cat']]]) | |
910 | |
911 if not covered_df.empty: | |
912 ref_base_snp_df = covered_df.merge(ref_base_df[['Ref_Loc', 'Query_Base']], on='Ref_Loc', how='left') | |
913 missing_df = pd.concat([missing_df, ref_base_snp_df[['Ref_Loc', 'Query_ID', 'Query_Base', 'Cat']]]) | |
914 | |
915 with open(log_file,"a+") as log: | |
916 log.write("\t- Processed coverage information...\n") | |
917 | |
918 final_snp_df = pd.concat([snp_base_df,purged_snp_df,missing_df,ref_base_df]).sort_values(by=['Ref_Loc','Query_ID']).reset_index(drop=True) | |
919 snp_counts = final_snp_df.groupby('Query_ID')['Ref_Loc'].count().reset_index().rename(columns={'Ref_Loc':'SNP_Count'}) | |
920 | |
921 # Assert that all snp_counts == snp_count | |
922 assert snp_counts['SNP_Count'].nunique() == 1 | |
923 assert snp_counts['SNP_Count'].values[0] == snp_count | |
924 | |
925 # Get locus coverage stats | |
926 snp_coverage_df = final_snp_df[final_snp_df['Cat'].isin(['SNP','Rescued_SNP'])].groupby('Ref_Loc')['Query_ID'].count().reset_index().rename(columns={'Query_ID':'SNP_Count'}) | |
927 ref_base_coverage_df = final_snp_df[final_snp_df['Cat'].isin(["Ref_Base","Reference_Isolate"])].groupby('Ref_Loc')['Query_ID'].count().reset_index().rename(columns={'Query_ID':'Ref_Base_Count'}) | |
928 | |
929 if uncovered_df.shape[0] > 0: | |
930 uncovered_count_df = final_snp_df[final_snp_df['Cat'] == "Uncovered"].groupby('Ref_Loc')['Query_ID'].count().reset_index().rename(columns={'Query_ID':'Uncovered_Count'}).copy() | |
931 else: | |
932 uncovered_count_df = pd.DataFrame(columns=['Ref_Loc','Uncovered_Count']) | |
933 | |
934 possible_purged_cols = ['Purged_Length','Purged_Identity','Purged_Invalid','Purged_Indel','Purged_Heterozygous','Purged_Density'] | |
935 if purged_snp_df.shape[0] > 0: | |
936 purged_count_df = final_snp_df[final_snp_df['Cat'].isin(possible_purged_cols)].groupby('Ref_Loc')['Query_ID'].count().reset_index().rename(columns={'Query_ID':'Purged_Count'}).copy() | |
937 else: | |
938 purged_count_df = pd.DataFrame(columns=['Ref_Loc','Purged_Count']) | |
939 | |
940 locus_coverage_df = snp_coverage_df.merge(ref_base_coverage_df, how='outer', on='Ref_Loc').merge(uncovered_count_df, how='outer', on='Ref_Loc').merge(purged_count_df, how='outer', on='Ref_Loc').fillna(0) | |
941 locus_coverage_df.loc[:, ['SNP_Count','Ref_Base_Count','Uncovered_Count','Purged_Count']] = locus_coverage_df.loc[:, ['SNP_Count','Ref_Base_Count','Uncovered_Count','Purged_Count']].astype(int) | |
942 locus_coverage_df['Missing_Ratio'] = ((locus_coverage_df['Uncovered_Count'] + locus_coverage_df['Purged_Count']) / (1+len(pass_qc_isolates))) * 100 | |
943 locus_coverage_df.to_csv(locus_category_file, sep="\t", index=False) | |
944 | |
945 # Get isolate coverage stats | |
946 min_isolate_cols = ['Query_ID','SNP','Ref_Base','Percent_Missing','Purged','Uncovered','Rescued_SNP','Purged_Ref_Edge'] | |
947 isolate_coverage_df = final_snp_df.groupby('Query_ID')['Cat'].value_counts().unstack().fillna(0).astype(float).astype(int).reset_index().drop(columns=['Reference_Isolate']) | |
948 isolate_coverage_df.loc[isolate_coverage_df['Query_ID'] == reference_id, 'Ref_Base'] = snp_count | |
949 | |
950 if "Rescued_SNP" not in isolate_coverage_df.columns.tolist(): | |
951 isolate_coverage_df.loc[:,'Rescued_SNP'] = 0 | |
952 isolate_coverage_df['SNP'] = isolate_coverage_df['SNP'] + isolate_coverage_df['Rescued_SNP'] | |
953 | |
954 for col in ['Uncovered'] + possible_purged_cols: | |
955 if col not in isolate_coverage_df.columns.tolist(): | |
956 isolate_coverage_df.loc[:,col] = 0 | |
957 | |
958 isolate_coverage_df['Purged'] = isolate_coverage_df[possible_purged_cols].sum(axis=1) | |
959 | |
960 isolate_coverage_df['Percent_Missing'] = (isolate_coverage_df['Uncovered'] + isolate_coverage_df['Purged'])/(isolate_coverage_df['Uncovered'] + isolate_coverage_df['Purged'] + isolate_coverage_df['Ref_Base'] + isolate_coverage_df['SNP']) * 100 | |
961 | |
962 isolate_coverage_df.loc[:,'Purged_Ref_Edge'] = 0 | |
963 if ref_edge_df.shape[0] > 0: | |
964 isolate_coverage_df.loc[isolate_coverage_df['Query_ID'] == reference_id, 'Purged_Ref_Edge'] = ref_edge_df['Ref_Loc'].nunique() | |
965 | |
966 isolate_coverage_df = isolate_coverage_df[min_isolate_cols + possible_purged_cols].sort_values(by = 'Percent_Missing',ascending = False).reset_index(drop=True) | |
967 isolate_coverage_df.to_csv(query_coverage_file, sep="\t", index=False) | |
968 | |
969 with open(log_file,"a+") as log: | |
970 log.write(f"\t- SNP coverage information: {locus_category_file}\n") | |
971 log.write(f"\t- Query coverage information: {query_coverage_file}\n") | |
972 log.write("-------------------------------------------------------\n\n") | |
973 | |
974 with open(log_file,"a+") as log: | |
975 log.write("Processing alignment data...") | |
976 | |
977 alignment_df = final_snp_df[['Query_ID','Ref_Loc','Query_Base']].copy().rename(columns={'Query_Base':'Base'}).pivot(index='Query_ID', columns='Ref_Loc', values='Base') | |
978 csp2_ordered = alignment_df.columns | |
979 | |
980 with open(raw_loclist,"w+") as loclist: | |
981 loclist.write("\n".join(csp2_ordered)+"\n") | |
982 | |
983 seq_records = [SeqRecord(Seq(''.join(row)), id=query,description='') for query,row in alignment_df.iterrows()] | |
984 | |
985 alignment = MultipleSeqAlignment(seq_records) | |
986 AlignIO.write(alignment,raw_alignment,"fasta") | |
987 | |
988 with open(log_file,"a+") as log: | |
989 log.write("Done!\n") | |
990 log.write(f"\t- Saved alignment of {snp_count} SNPs to {raw_alignment}\n") | |
991 log.write(f"\t- Saved ordered loc list to {raw_loclist}\n") | |
992 | |
993 if max_missing == float(100): | |
994 locs_pass_missing = csp2_ordered | |
995 preserved_alignment = alignment | |
996 | |
997 AlignIO.write(preserved_alignment,preserved_alignment_file,"fasta") | |
998 with open(preserved_loclist,"w+") as loclist: | |
999 loclist.write("\n".join(csp2_ordered)+"\n") | |
1000 | |
1001 with open(log_file,"a+") as log: | |
1002 log.write("Skipping SNP preservation step...\n") | |
1003 log.write(f"\t- Saved duplicate alignment to {preserved_alignment_file}\n") | |
1004 log.write(f"\t- Saved duplicate ordered loc list to {preserved_loclist}\n") | |
1005 else: | |
1006 with open(log_file,"a+") as log: | |
1007 log.write(f"\t- Preserving SNPs with at most {max_missing}% missing data...\n") | |
1008 | |
1009 # Parse missing data | |
1010 locs_pass_missing = list(set(locus_coverage_df[locus_coverage_df['Missing_Ratio'] <= max_missing]['Ref_Loc'])) | |
1011 | |
1012 if len(locs_pass_missing) == 0: | |
1013 with open(log_file,"a+") as log: | |
1014 log.write(f"\t- Of {snp_count} SNPs, no SNPs pass the {max_missing}% missing data threshold...\n") | |
1015 log.write("-------------------------------------------------------\n\n") | |
1016 else: | |
1017 preserved_alignment_df = alignment_df[locs_pass_missing].copy() | |
1018 | |
1019 preserved_ordered = preserved_alignment_df.columns | |
1020 with open(preserved_loclist,"w+") as loclist: | |
1021 loclist.write("\n".join(preserved_ordered)+"\n") | |
1022 | |
1023 seq_records = [SeqRecord(Seq(''.join(row)), id=query,description='') for query,row in preserved_alignment_df.iterrows()] | |
1024 preserved_alignment = MultipleSeqAlignment(seq_records) | |
1025 AlignIO.write(preserved_alignment,preserved_alignment_file,"fasta") | |
1026 with open(log_file,"a+") as log: | |
1027 log.write(f"\t- Of {snp_count} SNPs, {len(locs_pass_missing)} SNPs pass the {max_missing}% missing data threshold...\n") | |
1028 log.write(f"\t- Saved preserved alignment to {preserved_alignment_file}\n") | |
1029 log.write(f"\t- Saved preserved ordered loc list to {preserved_loclist}\n") | |
1030 log.write("-------------------------------------------------------\n\n") | |
1031 | |
1032 with open(log_file,"a+") as log: | |
1033 log.write("Processing pairwise comparisons files...") | |
1034 | |
1035 # Get pairwise comparisons between all pass_qc_isolates and reference_id | |
1036 pairwise_combinations = [sorted(x) for x in list(combinations([reference_id] + pass_qc_isolates, 2))] | |
1037 | |
1038 if snp_count == 0: | |
1039 pairwise_df = pd.DataFrame([(pairwise[0], pairwise[1], 0,np.nan) for pairwise in pairwise_combinations],columns = ['Query_1','Query_2','SNP_Distance','SNPs_Cocalled']) | |
1040 preserved_pairwise_df = pairwise_df.copy() | |
1041 | |
1042 pairwise_df.to_csv(raw_pairwise, sep="\t", index=False) | |
1043 preserved_pairwise_df.to_csv(preserved_pairwise, sep="\t", index=False) | |
1044 | |
1045 # Create matrix | |
1046 idx = sorted(set(pairwise_df['Query_1']).union(pairwise_df['Query_2'])) | |
1047 mirrored_distance_df = pairwise_df.pivot(index='Query_1', columns='Query_2', values='SNP_Distance').reindex(index=idx, columns=idx).fillna(0, downcast='infer').pipe(lambda x: x+x.values.T).applymap(lambda x: format(x, '.0f')) | |
1048 mirrored_distance_df.index.name = '' | |
1049 mirrored_distance_df.to_csv(raw_matrix,sep="\t") | |
1050 mirrored_distance_df.to_csv(preserved_matrix,sep="\t") | |
1051 | |
1052 else: | |
1053 raw_distance_results = parallelAlignment(alignment) | |
1054 raw_pairwise_df = pd.DataFrame(raw_distance_results, columns=['Query_1', 'Query_2', 'SNP_Distance', 'SNPs_Cocalled']) | |
1055 raw_pairwise_df.to_csv(raw_pairwise, sep="\t", index=False) | |
1056 | |
1057 if len(locs_pass_missing) == snp_count: | |
1058 preserved_pairwise_df = raw_pairwise_df.copy() | |
1059 preserved_pairwise_df.to_csv(preserved_pairwise, sep="\t", index=False) | |
1060 elif len(locs_pass_missing) == 0: | |
1061 preserved_pairwise_df = pd.DataFrame([(pairwise[0], pairwise[1], 0,np.nan) for pairwise in pairwise_combinations],columns = ['Query_1','Query_2','SNP_Distance','SNPs_Cocalled']) | |
1062 preserved_pairwise_df.to_csv(preserved_pairwise, sep="\t", index=False) | |
1063 else: | |
1064 preserved_distance_results = parallelAlignment(preserved_alignment) | |
1065 preserved_pairwise_df = pd.DataFrame(preserved_distance_results, columns=['Query_1', 'Query_2', 'SNP_Distance', 'SNPs_Cocalled']) | |
1066 preserved_pairwise_df.to_csv(preserved_pairwise, sep="\t", index=False) | |
1067 | |
1068 # Create matrix | |
1069 idx = sorted(set(raw_pairwise_df['Query_1']).union(raw_pairwise_df['Query_2'])) | |
1070 mirrored_distance_df = raw_pairwise_df.pivot(index='Query_1', columns='Query_2', values='SNP_Distance').reindex(index=idx, columns=idx).fillna(0, downcast='infer').pipe(lambda x: x+x.values.T).applymap(lambda x: format(x, '.0f')) | |
1071 mirrored_distance_df.index.name = '' | |
1072 mirrored_distance_df.to_csv(raw_matrix,sep="\t") | |
1073 | |
1074 idx = sorted(set(preserved_pairwise_df['Query_1']).union(preserved_pairwise_df['Query_2'])) | |
1075 mirrored_distance_df = preserved_pairwise_df.pivot(index='Query_1', columns='Query_2', values='SNP_Distance').reindex(index=idx, columns=idx).fillna(0, downcast='infer').pipe(lambda x: x+x.values.T).applymap(lambda x: format(x, '.0f')) | |
1076 mirrored_distance_df.index.name = '' | |
1077 mirrored_distance_df.to_csv(preserved_matrix,sep="\t") | |
1078 | |
1079 # Clean up pybedtools temp | |
1080 helpers.cleanup(verbose=False,remove_all = False) | |
1081 | |
1082 end_time = time.time() | |
1083 with open(log_file,"a+") as log: | |
1084 log.write("Done!\n") | |
1085 if snp_count == 0: | |
1086 log.write(f"\t- No SNPs detected, zeroed pairwise distance files saved to {raw_pairwise}/{preserved_pairwise}/{raw_matrix}/{preserved_matrix}\n") | |
1087 else: | |
1088 log.write(f"\t- Saved raw pairwise distances to {raw_pairwise}\n") | |
1089 log.write(f"\t- Saved raw pairwise matrix to {raw_matrix}\n") | |
1090 | |
1091 if max_missing == float(100): | |
1092 log.write("Skipped SNP preservation step...\n") | |
1093 log.write(f"\t- Saved duplicated preserved pairwise distances to {preserved_pairwise}\n") | |
1094 log.write(f"\t- Saved duplicated preserved pairwise matrix to {preserved_matrix}\n") | |
1095 elif len(locs_pass_missing) == 0: | |
1096 log.write(f"\t- No SNPs passed the {max_missing}% missing data threshold, zeroed pairwise distance files saved to {preserved_pairwise}/{preserved_matrix}\n") | |
1097 else: | |
1098 log.write(f"\t- Saved preserved pairwise distances to {preserved_pairwise}\n") | |
1099 log.write(f"\t- Saved preserved pairwise matrix to {preserved_matrix}\n") | |
1100 log.write(f"Total Time: {end_time - start_time:.2f} seconds\n") | |
1101 log.write("-------------------------------------------------------\n\n") | |
1102 | |
1103 except: | |
1104 run_failed = True | |
1105 print("Exception occurred:\n", traceback.format_exc()) | |
1106 finally: | |
1107 helpers.cleanup(verbose=False, remove_all=False) | |
1108 if temp_dir != "": | |
1109 shutil.rmtree(temp_dir) | |
1110 if run_failed: | |
1111 sys.exit(1) |