comparison CSP2/bin/runSNPPipeline.py @ 0:01431fa12065

"planemo upload"
author rliterman
date Mon, 02 Dec 2024 10:40:55 -0500
parents
children 792274118b2e
comparison
equal deleted inserted replaced
-1:000000000000 0:01431fa12065
1 #!/usr/bin/env python3
2
3 import sys
4 import os
5 import pandas as pd
6 import datetime
7 from pybedtools import BedTool,helpers
8 import concurrent.futures
9 import time
10 from Bio.Seq import Seq
11 from Bio.SeqRecord import SeqRecord
12 from Bio.Align import MultipleSeqAlignment
13 from Bio import AlignIO
14 from itertools import combinations
15 import numpy as np
16 import uuid
17 import traceback
18 import shutil
19 import argparse
20
21 def fetchHeaders(snpdiffs_file):
22
23 with open(snpdiffs_file, 'r') as file:
24 top_line = file.readline().strip().split('\t')[1:]
25
26 header_cols = [item.split(':')[0] for item in top_line]
27 header_vals = [item.split(':')[1] for item in top_line]
28
29 header_data = pd.DataFrame(columns = header_cols)
30 header_data.loc[0] = header_vals
31 header_data.loc[:, 'File_Path'] = snpdiffs_file
32
33 return header_data
34
35 def processBED(bed_rows,snpdiffs_orientation):
36
37 bed_columns = ['Ref_Contig','Ref_Start','Ref_End','Ref_Length','Ref_Aligned',
38 'Query_Contig','Query_Start','Query_End','Query_Length','Query_Aligned',
39 'Perc_Iden']
40
41 reverse_columns = ['Query_Contig','Query_Start','Query_End','Query_Length','Query_Aligned',
42 'Ref_Contig','Ref_Start','Ref_End','Ref_Length','Ref_Aligned',
43 'Perc_Iden']
44
45 int_columns = ['Ref_Start', 'Ref_End', 'Ref_Length', 'Ref_Aligned',
46 'Query_Start', 'Query_End', 'Query_Length', 'Query_Aligned']
47
48 float_columns = ['Perc_Iden']
49
50 if len(bed_rows) > 0:
51
52 bed_df = pd.DataFrame(bed_rows, columns=bed_columns)
53
54 # Swap columns if reversed
55 if snpdiffs_orientation == -1:
56 bed_df = bed_df[reverse_columns].copy()
57 bed_df.columns = bed_columns
58
59 # Gather covered loci
60 covered_bed_df = bed_df[(bed_df['Ref_Start'] != ".") & (bed_df['Query_Start'] != ".")].copy()
61 if covered_bed_df.shape[0] > 0:
62 for col in int_columns:
63 covered_bed_df.loc[:, col] = covered_bed_df.loc[:, col].astype(float).astype(int)
64 for col in float_columns:
65 covered_bed_df.loc[:, col] = covered_bed_df.loc[:, col].astype(float)
66
67 return covered_bed_df
68 else:
69 return pd.DataFrame(columns=bed_columns)
70
71 def processSNPs(snp_rows,snpdiffs_orientation):
72
73 snp_columns = ['Ref_Contig','Start_Ref','Ref_Pos',
74 'Query_Contig','Start_Query','Query_Pos',
75 'Ref_Loc','Query_Loc',
76 'Ref_Start','Ref_End',
77 'Query_Start','Query_End',
78 'Ref_Base','Query_Base',
79 'Dist_to_Ref_End','Dist_to_Query_End',
80 'Ref_Aligned','Query_Aligned',
81 'Query_Direction','Perc_Iden','Cat']
82
83 return_columns = ['Ref_Contig','Start_Ref','Ref_Pos',
84 'Query_Contig','Start_Query','Query_Pos',
85 'Ref_Loc','Query_Loc',
86 'Ref_Start','Ref_End',
87 'Query_Start','Query_End',
88 'Ref_Base','Query_Base',
89 'Dist_to_Ref_End','Dist_to_Query_End',
90 'Ref_Aligned','Query_Aligned',
91 'Perc_Iden','Cat']
92
93 reverse_columns = ['Query_Contig','Start_Query','Query_Pos',
94 'Ref_Contig','Start_Ref','Ref_Pos',
95 'Query_Loc','Ref_Loc',
96 'Query_Start','Query_End',
97 'Ref_Start','Ref_End',
98 'Query_Base','Ref_Base',
99 'Dist_to_Query_End','Dist_to_Ref_End',
100 'Query_Aligned','Ref_Aligned',
101 'Query_Direction','Perc_Iden','Cat']
102
103 reverse_complement = {'A':'T','T':'A','G':'C','C':'G',
104 'a':'T','t':'A','c':'G','g':'C'}
105
106 # Columns to convert to integer
107 int_columns = ['Start_Ref', 'Ref_Pos', 'Start_Query', 'Query_Pos',
108 'Dist_to_Ref_End', 'Dist_to_Query_End', 'Ref_Aligned', 'Query_Aligned']
109
110 # Columns to convert to float
111 float_columns = ['Perc_Iden']
112
113 if len(snp_rows) > 0:
114 snp_df = pd.DataFrame(snp_rows, columns= snp_columns).copy()
115
116 if snpdiffs_orientation == -1:
117 snp_df = snp_df[reverse_columns].copy()
118 snp_df.columns = snp_columns
119
120 # Replace Query_Base and Reference_Base with reverse complement if Query_Direction is -1 and base is in ['A','T','G','C','a','c','t','g']
121 snp_df.loc[snp_df['Query_Direction'] == '-1','Query_Base'] = snp_df.loc[snp_df['Query_Direction'] == '-1','Query_Base'].apply(lambda x: reverse_complement[x] if x in reverse_complement else x)
122 snp_df.loc[snp_df['Query_Direction'] == '-1','Ref_Base'] = snp_df.loc[snp_df['Query_Direction'] == '-1','Ref_Base'].apply(lambda x: reverse_complement[x] if x in reverse_complement else x)
123
124 for col in int_columns:
125 snp_df.loc[:, col] = snp_df.loc[:, col].astype(float).astype(int)
126 for col in float_columns:
127 snp_df.loc[:, col] = snp_df.loc[:, col].astype(float)
128
129 else:
130 snp_df = pd.DataFrame(columns = return_columns)
131
132 return snp_df[return_columns]
133
134 def swapHeader(header_data):
135
136 raw_header_cols = [x for x in header_data.columns]
137 reverse_header_cols = [item.replace('Reference', 'temp').replace('Query', 'Reference').replace('temp', 'Query') for item in raw_header_cols]
138 reversed_header_data = header_data[reverse_header_cols].copy()
139 reversed_header_data.columns = raw_header_cols
140
141 return reversed_header_data
142
143 def parseSNPDiffs(snpdiffs_file,snpdiffs_orientation):
144
145 bed_rows = []
146 snp_rows = []
147
148 with open(snpdiffs_file, 'r') as file:
149 lines = file.readlines()
150
151 for line in lines:
152 if line[0:2] == "#\t":
153 pass
154 elif line[0:3] == "##\t":
155 bed_rows.append(line.strip().split("\t")[1:])
156 else:
157 snp_rows.append(line.strip().split("\t"))
158
159 bed_df = processBED(bed_rows,snpdiffs_orientation)
160 snp_df = processSNPs(snp_rows,snpdiffs_orientation)
161 return (bed_df,snp_df)
162
163 def calculate_total_length(bedtool):
164 return sum(len(interval) for interval in bedtool)
165
166 def filterSNPs(raw_snp_df,bed_df,log_file, min_len, min_iden, ref_edge, query_edge, density_windows, max_snps):
167
168 if temp_dir != "":
169 helpers.set_tempdir(temp_dir)
170
171 # Grab raw data
172 total_snp_count = raw_snp_df.shape[0]
173
174 # Get unique SNPs relative to the reference genome
175 unique_ref_snps = raw_snp_df['Ref_Loc'].unique()
176 unique_snp_count = len(unique_ref_snps)
177
178 snp_tally_df = pd.DataFrame()
179
180 with open(log_file,"a+") as log:
181 log.write(f"\n\t- Raw SNP + indel count: {total_snp_count}\n")
182 log.write(f"\n\t- Unique SNP positions in reference genome: {unique_snp_count}\n")
183
184 # Set all sites to SNP
185 raw_snp_df['Filter_Cat'] = "SNP"
186
187 # Filter out SNPs based on --min_len and --min_iden
188 reject_length = raw_snp_df.loc[(raw_snp_df['Ref_Aligned'] < min_len) & (raw_snp_df['Perc_Iden'] >= min_iden)].copy()
189 if reject_length.shape[0] > 0:
190 with open(log_file,"a+") as log:
191 log.write(f"\t\t- Purged (Alignment Length): {reject_length.shape[0]}\n")
192 reject_length['Filter_Cat'] = "Purged_Length"
193 snp_tally_df = pd.concat([snp_tally_df,reject_length]).reset_index(drop=True)
194
195 reject_iden = raw_snp_df.loc[(raw_snp_df['Ref_Aligned'] >= min_len) & (raw_snp_df['Perc_Iden'] < min_iden)].copy()
196 if reject_iden.shape[0] > 0:
197 with open(log_file,"a+") as log:
198 log.write(f"\t\t- Purged (Alignment Identity): {reject_iden.shape[0]}\n")
199 reject_iden['Filter_Cat'] = "Purged_Identity"
200 snp_tally_df = pd.concat([snp_tally_df,reject_iden]).reset_index(drop=True)
201
202 reject_lenIden = raw_snp_df.loc[(raw_snp_df['Ref_Aligned'] < min_len) & (raw_snp_df['Perc_Iden'] < min_iden)].copy()
203 if reject_lenIden.shape[0] > 0:
204 with open(log_file,"a+") as log:
205 log.write(f"\t\t- Purged (Alignment Length + Identity): {reject_lenIden.shape[0]}\n")
206 reject_lenIden['Filter_Cat'] = "Purged_LengthIdentity"
207 snp_tally_df = pd.concat([snp_tally_df,reject_lenIden]).reset_index(drop=True)
208
209 pass_filter = raw_snp_df.loc[(raw_snp_df['Ref_Aligned'] >= min_len) & (raw_snp_df['Perc_Iden'] >= min_iden)].copy().reset_index(drop=True)
210
211 # Invalid processing
212 reject_invalid = pass_filter[pass_filter['Cat'] == "Invalid"].copy()
213 if reject_invalid.shape[0] > 0:
214 with open(log_file,"a+") as log:
215 log.write(f"\t\t- Purged (Invalid Base): {reject_invalid.shape[0]}\n")
216 reject_invalid['Filter_Cat'] = "Purged_Invalid"
217 snp_tally_df = pd.concat([snp_tally_df,reject_invalid]).reset_index(drop=True)
218 pass_filter = pass_filter.loc[pass_filter['Cat'] != "Invalid"].copy()
219
220 # Indel processing
221 reject_indel = pass_filter[pass_filter['Cat'] == "Indel"].copy()
222 if reject_indel.shape[0] > 0:
223 with open(log_file,"a+") as log:
224 log.write(f"\t\t- Purged (Indel): {reject_indel.shape[0]}\n")
225 reject_indel['Filter_Cat'] = "Purged_Indel"
226 snp_tally_df = pd.concat([snp_tally_df,reject_indel]).reset_index(drop=True)
227 pass_filter = pass_filter.loc[pass_filter['Cat'] != "Indel"].copy()
228
229 # Check for heterozygous SNPs
230 check_heterozygous = pass_filter.groupby('Ref_Loc').filter(lambda x: x['Query_Base'].nunique() > 1)
231 if check_heterozygous.shape[0] > 0:
232 reject_heterozygous = pass_filter.loc[pass_filter['Ref_Loc'].isin(check_heterozygous['Ref_Loc'])].copy()
233 reject_heterozygous['Filter_Cat'] = "Purged_Heterozygous"
234 with open(log_file,"a+") as log:
235 log.write(f"\t\t- Purged (Heterozygotes): {reject_heterozygous.shape[0]}\n")
236 snp_tally_df = pd.concat([snp_tally_df,reject_heterozygous]).reset_index(drop=True)
237 pass_filter = pass_filter.loc[~pass_filter['Ref_Loc'].isin(check_heterozygous['Ref_Loc'])].copy()
238
239 # Check for duplicate SNPs and take the longest, best hit
240 check_duplicates = pass_filter.groupby('Ref_Loc').filter(lambda x: x.shape[0] > 1)
241 if check_duplicates.shape[0] > 0:
242 reject_duplicate = pass_filter.loc[pass_filter['Ref_Loc'].isin(check_duplicates['Ref_Loc'])].copy()
243 pass_filter = pass_filter.loc[~pass_filter['Ref_Loc'].isin(check_duplicates['Ref_Loc'])].copy()
244
245 best_snp = reject_duplicate.groupby('Ref_Loc').apply(lambda x: x.sort_values(by=['Ref_Aligned', 'Perc_Iden'], ascending=[False, False]).head(1))
246 pass_filter = pd.concat([pass_filter,best_snp]).reset_index(drop=True)
247
248 dup_snps = reject_duplicate[~reject_duplicate.apply(lambda x: x in best_snp, axis=1)]
249 dup_snps['Filter_Cat'] = "Purged_Duplicate"
250
251 snp_tally_df = pd.concat([snp_tally_df,dup_snps]).reset_index(drop=True)
252
253 with open(log_file,"a+") as log:
254 log.write(f"\t\t- Purged (Duplicates): {dup_snps.shape[0]}\n")
255
256 # Assert that Ref_Loc and Query_Loc are unique in pass_filter
257 helpers.cleanup(verbose=False,remove_all = False)
258 assert pass_filter['Ref_Loc'].nunique() == pass_filter.shape[0]
259 assert pass_filter['Query_Loc'].nunique() == pass_filter.shape[0]
260
261 # Density filtering
262 density_locs = []
263 ref_locs = pass_filter['Ref_Loc'].tolist()
264
265 if len(density_windows) == 0:
266 with open(log_file,"a+") as log:
267 log.write("\n\t- Density filtering disabled...\n")
268 elif len(ref_locs) > 0:
269 density_df = pd.DataFrame([item.split('/') for item in ref_locs], columns=['Ref_Contig','Ref_End'])
270 density_df['Ref_Start'] = density_df['Ref_End'].astype(float).astype(int) - 1
271 density_bed = BedTool.from_dataframe(density_df[['Ref_Contig','Ref_Start','Ref_End']])
272
273 # For each density window, remove all SNPs that fall in a window with > max_snps
274 for i in range(0,len(density_windows)):
275 window_df = density_bed.window(density_bed,c=True, w=density_windows[i]).to_dataframe()
276 problematic_windows = window_df[window_df['name'] > max_snps[i]].copy()
277 if not problematic_windows.empty:
278 temp_locs = []
279 for _, row in problematic_windows.iterrows():
280 purge_window_df = window_df[window_df['chrom'] == row['chrom']].copy()
281 purge_window_df['Dist'] = abs(purge_window_df['end'] - row['end'])
282 window_snps = purge_window_df.sort_values(by=['Dist'],ascending=True).head(row['name'])
283 temp_locs = temp_locs + ["/".join([str(x[0]),str(x[1])]) for x in list(zip(window_snps.chrom, window_snps.end))]
284 density_locs.extend(list(set(temp_locs)))
285
286 density_locs = list(set(density_locs))
287 reject_density = pass_filter[pass_filter['Ref_Loc'].isin(density_locs)].copy()
288
289 if reject_density.shape[0] > 0:
290 with open(log_file,"a+") as log:
291 log.write(f"\t\t- Purged (Density): {reject_density.shape[0]}\n")
292 reject_density['Filter_Cat'] = "Purged_Density"
293 snp_tally_df = pd.concat([snp_tally_df,reject_density]).reset_index(drop=True)
294 pass_filter = pass_filter[~pass_filter['Ref_Loc'].isin(density_locs)].copy()
295
296 reject_query_edge = pass_filter[(pass_filter['Dist_to_Query_End'] < query_edge) & (pass_filter['Dist_to_Ref_End'] >= ref_edge)].copy()
297 reject_ref_edge = pass_filter[(pass_filter['Dist_to_Ref_End'] < ref_edge) & (pass_filter['Dist_to_Query_End'] >= query_edge)].copy()
298 reject_both_edge = pass_filter[(pass_filter['Dist_to_Query_End'] < query_edge) & (pass_filter['Dist_to_Ref_End'] < ref_edge)].copy()
299
300 if reject_query_edge.shape[0] > 0:
301 with open(log_file,"a+") as log:
302 log.write(f"\t\t- Purged (Query Edge): {reject_query_edge.shape[0]}\n")
303 reject_query_edge['Filter_Cat'] = "Filtered_Query_Edge"
304 snp_tally_df = pd.concat([snp_tally_df,reject_query_edge]).reset_index(drop=True)
305
306 if reject_ref_edge.shape[0] > 0:
307 with open(log_file,"a+") as log:
308 log.write(f"\t\t- Purged (Ref Edge): {reject_ref_edge.shape[0]}\n")
309 reject_ref_edge['Filter_Cat'] = "Filtered_Ref_Edge"
310 snp_tally_df = pd.concat([snp_tally_df,reject_ref_edge]).reset_index(drop=True)
311
312 if reject_both_edge.shape[0] > 0:
313 with open(log_file,"a+") as log:
314 log.write(f"\t\t- Purged (Both Edge): {reject_both_edge.shape[0]}\n")
315 reject_both_edge['Filter_Cat'] = "Filtered_Both_Edge"
316 snp_tally_df = pd.concat([snp_tally_df,reject_both_edge]).reset_index(drop=True)
317
318 pass_filter = pass_filter[(pass_filter['Dist_to_Query_End'] >= query_edge) & (pass_filter['Dist_to_Ref_End'] >= ref_edge)].copy()
319
320 helpers.cleanup(verbose=False,remove_all = False)
321
322 assert snp_tally_df.shape[0] + pass_filter.shape[0] == total_snp_count
323 return_df = pd.concat([pass_filter,snp_tally_df]).reset_index(drop=True).sort_values(by=['Ref_Loc'])
324
325 return return_df.drop(columns=['Cat']).rename({'Filter_Cat':'Cat'}, axis=1)
326
327 def screenSNPDiffs(snpdiffs_file,trim_name, min_cov, min_len, min_iden, ref_edge, query_edge, density_windows, max_snps,reference_id,log_directory):
328
329 screen_start_time = time.time()
330
331 if temp_dir != "":
332 helpers.set_tempdir(temp_dir)
333
334 # Set CSP2 variables to NA
335 csp2_screen_snps = purged_length = purged_identity = purged_invalid = purged_indel = purged_lengthIdentity = purged_duplicate = purged_het = purged_density = filtered_ref_edge = filtered_query_edge = filtered_both_edge = "NA"
336 filtered_snp_df = pd.DataFrame()
337 good_reference_bed_df = pd.DataFrame()
338
339 # Ensure snpdiffs file exists
340 if not os.path.exists(snpdiffs_file) or not snpdiffs_file.endswith('.snpdiffs'):
341 run_failed = True
342 sys.exit(f"Invalid snpdiffs file provided: {snpdiffs_file}")
343
344 # Ensure header can be read in
345 try:
346 header_data = fetchHeaders(snpdiffs_file)
347 header_query = header_data['Query_ID'][0].replace(trim_name,'')
348 header_ref = header_data['Reference_ID'][0].replace(trim_name,'')
349 except:
350 run_failed = True
351 sys.exit(f"Error reading headers from snpdiffs file: {snpdiffs_file}")
352
353 # Check snpdiffs orientation
354 if (header_ref == reference_id):
355 snpdiffs_orientation = 1
356 query_id = header_query
357 elif (header_query == reference_id):
358 snpdiffs_orientation = -1
359 query_id = header_ref
360 header_data = swapHeader(header_data)
361 else:
362 run_failed = True
363 sys.exit(f"Error: Reference ID not found in header of {snpdiffs_file}...")
364
365 # Establish log file
366 log_file = f"{log_directory}/{query_id}__vs__{reference_id}.log"
367 with open(log_file,"w+") as log:
368 log.write("Reference Screening for SNP Pipeline Analysis\n")
369 log.write(f"Query Isolate: {query_id}\n")
370 log.write(f"Reference Isolate: {reference_id}\n")
371 log.write(str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))+"\n")
372 log.write("-------------------------------------------------------\n\n")
373 if snpdiffs_orientation == 1:
374 log.write("\t- SNPDiffs file is in the forward orientation\n")
375 log.write("-------------------------------------------------------\n\n")
376 else:
377 log.write("\t- SNPDiffs file is in the reverse orientation\n")
378 log.write("-------------------------------------------------------\n\n")
379
380
381 # Set variables from header data
382 raw_snps = int(header_data['SNPs'][0])
383 raw_indels = int(header_data['Indels'][0])
384 raw_invalid = int(header_data['Invalid'][0])
385
386 kmer_similarity = float(header_data['Kmer_Similarity'][0])
387 shared_kmers = int(header_data['Shared_Kmers'][0])
388 query_unique_kmers = int(header_data['Query_Unique_Kmers'][0])
389 reference_unique_kmers = int(header_data['Reference_Unique_Kmers'][0])
390 mummer_gsnps = int(header_data['gSNPs'][0])
391 mummer_gindels = int(header_data['gIndels'][0])
392
393 query_bases = int(header_data['Query_Assembly_Bases'][0])
394 reference_bases = int(header_data['Reference_Assembly_Bases'][0])
395
396 query_contigs = int(header_data['Query_Contig_Count'][0])
397 reference_contigs = int(header_data['Reference_Contig_Count'][0])
398
399 raw_query_percent_aligned = float(header_data['Query_Percent_Aligned'][0])
400 raw_ref_percent_aligned = float(header_data['Reference_Percent_Aligned'][0])
401
402 # If the reference is not covered by at least min_cov, STOP
403 if raw_ref_percent_aligned < min_cov:
404 query_percent_aligned = raw_query_percent_aligned
405 reference_percent_aligned = raw_ref_percent_aligned
406 screen_category = "Low_Coverage"
407 with open(log_file,"a+") as log:
408 log.write(f"\t- Reference genome coverage: {raw_ref_percent_aligned}% \n")
409 log.write(f"\t- Query covers less than --min_cov ({min_cov}%)...Screen halted...\n")
410 log.write("-------------------------------------------------------\n\n")
411
412 elif raw_snps + raw_indels + raw_invalid > 10000:
413 query_percent_aligned = raw_query_percent_aligned
414 reference_percent_aligned = raw_ref_percent_aligned
415 screen_category = "SNP_Cutoff"
416 with open(log_file,"a+") as log:
417 log.write(f"\t- {raw_snps} detected...\n")
418 log.write("\t- > 10,000 SNPs, indels, or invalid sites detected by MUMmer...Screen halted...\n")
419 log.write("-------------------------------------------------------\n\n")
420
421 else:
422
423 ##### 02: Read in BED/SNP data #####
424 with open(log_file,"a+") as log:
425 log.write("Step 1: Reading in snpdiffs BED/SNP data...")
426 try:
427 bed_df,snp_df = parseSNPDiffs(snpdiffs_file,snpdiffs_orientation)
428
429 with open(log_file,"a+") as log:
430 log.write("Done!\n")
431 log.write("-------------------------------------------------------\n\n")
432
433 except Exception as e:
434 run_failed = True
435 with open(log_file,"a+") as log:
436 log.write(f"\nError reading BED/SNP data from file: {snpdiffs_file}\n{str(e)}")
437 sys.exit(f"Error reading BED/SNP data from file: {snpdiffs_file}\n{str(e)}")
438
439 ##### 03: Filter genome overlaps #####
440 with open(log_file,"a+") as log:
441 log.write("Step 2: Filtering for short overlaps and low percent identity...")
442
443 good_bed_df = bed_df[(bed_df['Ref_Aligned'] >= min_len) & (bed_df['Perc_Iden'] >= min_iden)].copy()
444
445 if good_bed_df.shape[0] == 0:
446 screen_category = "Low_Quality_Coverage"
447 with open(log_file,"a+") as log:
448 log.write(f"\n\t- After filtering based on --min_len ({min_len}) and --min_iden ({min_iden}) , no valid alignments remain...Screen halted...\n")
449 log.write("-------------------------------------------------------\n\n")
450
451
452 else:
453 # Create a BED file for alignments that pass basic QC
454 good_query_bed_df = good_bed_df[['Query_Contig','Query_Start','Query_End']].copy()
455 good_reference_bed_df = good_bed_df[['Ref_Contig','Ref_Start','Ref_End']].copy()
456 good_reference_bed_df.loc[:, 'Query_ID'] = query_id
457
458 good_query_aligned = calculate_total_length(BedTool.from_dataframe(good_query_bed_df).sort().merge())
459 good_reference_aligned = calculate_total_length(BedTool.from_dataframe(good_reference_bed_df[['Ref_Contig','Ref_Start','Ref_End']]).sort().merge())
460
461 query_percent_aligned = (good_query_aligned / query_bases) * 100
462 reference_percent_aligned = (good_reference_aligned / reference_bases) * 100
463
464 if reference_percent_aligned < min_cov:
465 screen_category = "Low_Quality_Coverage"
466 with open(log_file,"a+") as log:
467 log.write(f"\n\t- Raw reference genome coverage was {raw_ref_percent_aligned}% \n")
468 log.write(f"\t- After filtering based on --min_len ({min_len}) and --min_iden ({min_iden}), reference genome coverage was {reference_percent_aligned:.2f}% \n")
469 log.write(f"\t- Query covers less than --min_cov ({min_cov}%) of reference after filtering...Screen halted...\n")
470 log.write("-------------------------------------------------------\n\n")
471
472 else:
473 screen_category = "Pass"
474 with open(log_file,"a+") as log:
475 log.write("Done!\n")
476 log.write(f"\t- Raw reference genome coverage was {raw_ref_percent_aligned}% \n")
477 log.write(f"\t- After filtering based on --min_len ({min_len}) and --min_iden ({min_iden}), reference genome coverage was {reference_percent_aligned:.2f}% \n")
478 log.write("-------------------------------------------------------\n\n")
479
480
481 # Filter SNPs
482 with open(log_file,"a+") as log:
483 log.write("Step 3: Filtering SNPs to get final SNP distances...")
484
485 if raw_snps == 0:
486 csp2_screen_snps = purged_length = purged_identity = purged_lengthIdentity = purged_indel = purged_invalid = purged_duplicate = purged_het = purged_density = filtered_ref_edge = filtered_query_edge = filtered_both_edge = 0
487 with open(log_file,"a+") as log:
488 log.write("Done!\n")
489 log.write("\t- No SNPs detected in MUMmer output, no filtering required\n")
490 log.write("-------------------------------------------------------\n\n")
491
492 else:
493 filtered_snp_df = filterSNPs(snp_df,bed_df,log_file, min_len, min_iden, ref_edge, query_edge, density_windows, max_snps)
494
495 csp2_screen_snps = filtered_snp_df[filtered_snp_df.Cat == "SNP"].shape[0]
496 purged_length = filtered_snp_df[filtered_snp_df.Cat == "Purged_Length"].shape[0]
497 purged_identity = filtered_snp_df[filtered_snp_df.Cat == "Purged_Identity"].shape[0]
498 purged_lengthIdentity = filtered_snp_df[filtered_snp_df.Cat == "Purged_LengthIdentity"].shape[0]
499 purged_invalid = filtered_snp_df[filtered_snp_df.Cat == "Purged_Invalid"].shape[0]
500 purged_indel = filtered_snp_df[filtered_snp_df.Cat == "Purged_Indel"].shape[0]
501 purged_het = filtered_snp_df[filtered_snp_df.Cat == "Purged_Heterozygous"].shape[0]
502 purged_duplicate = filtered_snp_df[filtered_snp_df.Cat == "Purged_Duplicate"].shape[0]
503 purged_density = filtered_snp_df[filtered_snp_df.Cat == "Purged_Density"].shape[0]
504 filtered_query_edge = filtered_snp_df[filtered_snp_df.Cat == "Filtered_Query_Edge"].shape[0]
505 filtered_ref_edge = filtered_snp_df[filtered_snp_df.Cat == "Filtered_Ref_Edge"].shape[0]
506 filtered_both_edge = filtered_snp_df[filtered_snp_df.Cat == "Filtered_Both_Edge"].shape[0]
507
508 # Write filtered SNP data to file
509 snp_file = log_file.replace(".log","_SNPs.tsv")
510 filtered_snp_df.to_csv(snp_file, sep="\t", index=False)
511
512 filtered_snp_df.loc[:, 'Query_ID'] = query_id
513
514 with open(log_file,"a+") as log:
515 log.write("Done!\n")
516 log.write(f"\t- {csp2_screen_snps} SNPs detected between {query_id} and {reference_id} after filtering\n")
517 log.write(f"\t- Full SNP data saved to {snp_file}\n")
518 log.write("-------------------------------------------------------\n\n")
519
520 screen_end_time = time.time()
521 with open(log_file,"a+") as log:
522 log.write(f"Screening Time: {screen_end_time - screen_start_time:.2f} seconds\n")
523
524 # Clean up pybedtools temp
525 helpers.cleanup(verbose=False, remove_all=False)
526
527 return ([str(item) for item in [query_id,reference_id,screen_category,csp2_screen_snps,
528 f"{query_percent_aligned:.2f}",f"{reference_percent_aligned:.2f}",
529 query_contigs,query_bases,reference_contigs,reference_bases,
530 raw_snps,purged_length,purged_identity,purged_lengthIdentity,purged_invalid,purged_indel,purged_duplicate,purged_het,purged_density,
531 filtered_query_edge,filtered_ref_edge,filtered_both_edge,
532 kmer_similarity,shared_kmers,query_unique_kmers,reference_unique_kmers,
533 mummer_gsnps,mummer_gindels]],good_reference_bed_df,filtered_snp_df)
534
535 def assessCoverage(query_id,site_list):
536
537 if temp_dir != "":
538 helpers.set_tempdir(temp_dir)
539
540 if len(site_list) == 0:
541 return pd.DataFrame(columns=['Ref_Loc','Query_ID','Cat'])
542 else:
543 coverage_df = pass_filter_coverage_df[pass_filter_coverage_df['Query_ID'] == query_id].copy()
544
545 if coverage_df.shape[0] == 0:
546 uncovered_loc_df = pd.DataFrame({
547 'Ref_Loc': site_list,
548 'Query_ID': [query_id] * len(site_list),
549 'Cat': ["Uncovered"] * len(site_list)
550 })
551 return uncovered_loc_df
552 else:
553 coverage_bed = BedTool.from_dataframe(coverage_df[['Ref_Contig','Ref_Start','Ref_End']]).sort()
554 snp_bed_df = pd.DataFrame([item.split('/') for item in site_list], columns=['Ref_Contig','Ref_End'])
555 snp_bed_df['Ref_Start'] = snp_bed_df['Ref_End'].astype(float).astype(int) - 1
556 snp_bed_df['Ref_Loc'] = site_list
557 snp_bed = BedTool.from_dataframe(snp_bed_df[['Ref_Contig','Ref_Start','Ref_End','Ref_Loc']]).sort()
558
559 # Ref_Locs from snp_bed that intersect with coverage_bed go into covered_locs, the rest go into uncovered_locs
560 covered_locs = snp_bed.intersect(coverage_bed, wa=True)
561 uncovered_locs = snp_bed.intersect(coverage_bed, v=True, wa=True)
562
563 covered_loc_df = pd.DataFrame({
564 'Ref_Loc': [snp.fields[3] for snp in covered_locs],
565 'Query_ID': [query_id] * covered_locs.count(),
566 'Cat': ["Ref_Base"] * covered_locs.count()
567 }) if covered_locs.count() > 0 else pd.DataFrame(columns=['Ref_Loc','Query_ID','Cat'])
568
569 uncovered_loc_df = pd.DataFrame({
570 'Ref_Loc': [snp.fields[3] for snp in uncovered_locs],
571 'Query_ID': [query_id] * uncovered_locs.count(),
572 'Cat': ["Uncovered"] * uncovered_locs.count()
573 }) if uncovered_locs.count() > 0 else pd.DataFrame(columns=['Ref_Loc','Query_ID','Cat'])
574
575 # Clean up pybedtools temp
576 helpers.cleanup(verbose=False, remove_all=False)
577
578 return pd.concat([covered_loc_df.drop_duplicates(['Ref_Loc']),uncovered_loc_df])
579
580 def getPairwise(chunk, sequences, ids):
581 results = []
582
583 for i, j in chunk:
584 seq1, seq2 = sequences[i], sequences[j]
585 actg_mask1 = np.isin(seq1, list('ACTGactg'))
586 actg_mask2 = np.isin(seq2, list('ACTGactg'))
587 cocalled_mask = actg_mask1 & actg_mask2
588
589 snps_cocalled = np.sum(cocalled_mask)
590 snp_distance = np.sum((seq1 != seq2) & cocalled_mask)
591
592 results.append([ids[i], ids[j], snp_distance, snps_cocalled])
593
594 return results
595
596 def parallelAlignment(alignment, chunk_size=5000):
597 sequences = [np.array(list(record.seq)) for record in alignment]
598 ids = [record.id for record in alignment]
599 pairwise_combinations = list(combinations(range(len(sequences)), 2))
600
601 # Create chunks of pairwise combinations
602 chunks = [pairwise_combinations[i:i + chunk_size] for i in range(0, len(pairwise_combinations), chunk_size)]
603
604 results = []
605 with concurrent.futures.ProcessPoolExecutor() as executor:
606 future_to_chunk = {executor.submit(getPairwise, chunk, sequences, ids): chunk for chunk in chunks}
607 for future in concurrent.futures.as_completed(future_to_chunk):
608 chunk_results = future.result()
609 results.extend(chunk_results)
610 return results
611
612 def getFinalPurge(df):
613 # Returns the 'farthest along' category for a given Ref_Loc
614 if "Purged_Density" in df['Cat'].values:
615 return "Purged_Density"
616 elif "Purged_Heterozygous" in df['Cat'].values:
617 return "Purged_Heterozygous"
618 elif "Purged_Indel" in df['Cat'].values:
619 return "Purged_Indel"
620 elif "Purged_Invalid" in df['Cat'].values:
621 return "Purged_Invalid"
622 elif "Purged_Length" in df['Cat'].values:
623 return "Purged_Length"
624 else:
625 return "Purged_Identity"
626
627
628 # Read in arguments
629 global run_failed
630 run_failed = False
631
632 start_time = time.time()
633
634 parser = argparse.ArgumentParser(description='CSP2 SNP Pipeline Analysis')
635 parser.add_argument('--reference_id', type=str, help='Reference Isolate')
636 parser.add_argument('--output_directory', type=str, help='Output Directory')
637 parser.add_argument('--log_directory', type=str, help='Log Directory')
638 parser.add_argument('--snpdiffs_file', type=str, help='Path to SNPdiffs file')
639 parser.add_argument('--min_cov', type=float, help='Minimum coverage')
640 parser.add_argument('--min_len', type=int, help='Minimum length')
641 parser.add_argument('--min_iden', type=float, help='Minimum identity')
642 parser.add_argument('--ref_edge', type=int, help='Reference edge')
643 parser.add_argument('--query_edge', type=int, help='Query edge')
644 parser.add_argument('--density_windows', type=str, help='Density windows')
645 parser.add_argument('--max_snps', type=str, help='Maximum SNPs')
646 parser.add_argument('--trim_name', type=str, help='Trim name')
647 parser.add_argument('--max_missing', type=float, help='Maximum missing')
648 parser.add_argument('--tmp_dir', type=str, help='Temporary directory')
649 parser.add_argument('--rescue', type=str, help='Rescue edge SNPs (rescue/norescue)')
650 args = parser.parse_args()
651
652 reference_id = args.reference_id
653 output_directory = os.path.abspath(args.output_directory)
654 log_directory = os.path.abspath(args.log_directory)
655 log_file = f"{output_directory}/CSP2_SNP_Pipeline.log"
656 snpdiffs_file = args.snpdiffs_file
657 min_cov = args.min_cov
658 min_len = args.min_len
659 min_iden = args.min_iden
660 ref_edge = args.ref_edge
661 query_edge = args.query_edge
662 density_windows = [int(x) for x in args.density_windows.split(",")]
663 max_snps = [int(x) for x in args.max_snps.split(",")]
664 trim_name = args.trim_name
665 max_missing = args.max_missing
666
667 # Establish log file
668 with open(log_file,"w+") as log:
669 log.write("CSP2 SNP Pipeline Analysis\n")
670 log.write(f"Reference Isolate: {reference_id}\n")
671 log.write(str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))+"\n")
672 log.write("-------------------------------------------------------\n\n")
673 log.write("Reading in SNPDiffs files...")
674
675
676 # Read in all lines and ensure each file exists
677 snpdiffs_list = [line.strip() for line in open(snpdiffs_file, 'r')]
678 snpdiffs_list = [line for line in snpdiffs_list if line]
679 for snpdiffs_file in snpdiffs_list:
680 if not os.path.exists(snpdiffs_file):
681 run_failed = True
682 sys.exit("Error: File does not exist: " + snpdiffs_file)
683
684 snpdiffs_list = list(set(snpdiffs_list))
685
686 if len(snpdiffs_list) == 0:
687 run_failed = True
688 sys.exit("No SNPdiffs files provided...")
689
690 with open(log_file, "a+") as log:
691 log.write("Done!\n")
692 log.write(f"\t- Read in {len(snpdiffs_list)} SNPdiffs files\n")
693 log.write("-------------------------------------------------------\n\n")
694
695 global temp_dir
696 if args.tmp_dir != "":
697 random_temp_id = str(uuid.uuid4())
698 temp_dir = f"{os.path.normpath(os.path.abspath(args.tmp_dir))}/{random_temp_id}"
699 try:
700 os.mkdir(temp_dir)
701 helpers.set_tempdir(temp_dir)
702 except OSError as e:
703 run_failed = True
704 print(f"Error: Failed to create directory '{temp_dir}': {e}")
705 else:
706 temp_dir = ""
707
708 rescue_edge = str(args.rescue)
709 if rescue_edge not in ["rescue","norescue"]:
710 with open(log_file,"a+") as log:
711 log.write(f"\t- Unexpected rescue_edge variable ('{rescue_edge}'). Not performing SNP edge rescuing (any SNPs found within {query_edge}bp of a query contig edge will be purged)...\n")
712 log.write("-------------------------------------------------------\n\n")
713 rescue_edge = "norescue"
714 elif rescue_edge == "rescue":
715 with open(log_file,"a+") as log:
716 log.write(f"\t- Rescuing edge SNPs within {query_edge}bp of query contig edges if found more centrally in another query...\n")
717 log.write("-------------------------------------------------------\n\n")
718 else:
719 with open(log_file,"a+") as log:
720 log.write(f"\t- Not performing SNP edge rescuing (any SNPs found within {query_edge}bp of a query contig edge will be purged)...\n")
721 log.write("-------------------------------------------------------\n\n")
722
723 try:
724 # Establish output files
725 reference_screening_file = f"{output_directory}/Reference_Screening.tsv"
726 locus_category_file = f"{output_directory}/Locus_Categories.tsv"
727 query_coverage_file = f"{output_directory}/Query_Coverage.tsv"
728 raw_loclist = f"{output_directory}/snplist.txt"
729 raw_alignment = f"{output_directory}/snpma.fasta"
730 preserved_loclist = f"{output_directory}/snplist_preserved.txt"
731 preserved_alignment_file = f"{output_directory}/snpma_preserved.fasta"
732 raw_pairwise = f"{output_directory}/snp_distance_pairwise.tsv"
733 raw_matrix = f"{output_directory}/snp_distance_matrix.tsv"
734 preserved_pairwise = f"{output_directory}/snp_distance_pairwise_preserved.tsv"
735 preserved_matrix = f"{output_directory}/snp_distance_matrix_preserved.tsv"
736
737 with open(log_file,"a+") as log:
738 log.write("Screening all queries against reference...")
739 with concurrent.futures.ProcessPoolExecutor() as executor:
740 results = [executor.submit(screenSNPDiffs,snp_diff_file,trim_name, min_cov, min_len, min_iden, ref_edge, query_edge, density_windows, max_snps,reference_id,log_directory) for snp_diff_file in snpdiffs_list]
741
742 # Combine results into a dataframe
743 output_columns = ['Query_ID','Reference_ID','Screen_Category','CSP2_Screen_SNPs',
744 'Query_Percent_Aligned','Reference_Percent_Aligned',
745 'Query_Contigs','Query_Bases','Reference_Contigs','Reference_Bases',
746 'Raw_SNPs','Purged_Length','Purged_Identity','Purged_LengthIdentity','Purged_Invalid','Purged_Indel','Purged_Duplicate','Purged_Het','Purged_Density',
747 'Filtered_Query_Edge','Filtered_Ref_Edge','Filtered_Both_Edge',
748 'Kmer_Similarity','Shared_Kmers','Query_Unique_Kmers','Reference_Unique_Kmers',
749 'MUMmer_gSNPs','MUMmer_gIndels']
750
751 # Save reference screening
752 results_df = pd.DataFrame([item.result()[0] for item in results], columns = output_columns)
753 results_df.to_csv(reference_screening_file, sep="\t", index=False)
754
755 # Get reference bed dfs
756 covered_df = pd.concat([item.result()[1] for item in results])
757
758 # Get snp dfs
759 filtered_snp_df = pd.concat([item.result()[2] for item in results])
760
761 # Separate isolates that pass QC
762 pass_qc_isolates = list(set(results_df[results_df['Screen_Category'] == "Pass"]['Query_ID']))
763 fail_qc_isolates = list(set(results_df[results_df['Screen_Category'] != "Pass"]['Query_ID']))
764
765 if len(pass_qc_isolates) == 0:
766 with open(log_file,"a+") as log:
767 log.write("Done!\n")
768 log.write(f"\t- Reference screening data saved to {reference_screening_file}\n")
769 log.write(f"\t- Of {len(snpdiffs_list)} comparisons, no isolates passed QC. Pipeline cannot continue.\n")
770 log.write(f"\t- {len(fail_qc_isolates)} comparisons failed QC\n")
771 for isolate in fail_qc_isolates:
772 isolate_category = results_df[results_df['Query_ID'] == isolate]['Screen_Category'].values[0]
773 log.write(f"\t\t- {isolate}: {isolate_category}\n")
774 log.write("-------------------------------------------------------\n\n")
775 sys.exit(0)
776 else:
777 with open(log_file,"a+") as log:
778 log.write("Done!\n")
779 log.write(f"\t- Reference screening data saved to {reference_screening_file}\n")
780 log.write(f"\t- Of {len(snpdiffs_list)} comparisons, {len(pass_qc_isolates)} covered at least {min_cov}% of the reference genome after removing poor alignments\n")
781 if len(fail_qc_isolates) > 0:
782 log.write(f"\t- {len(fail_qc_isolates)} comparisons failed QC\n")
783 for isolate in fail_qc_isolates:
784 isolate_category = results_df[results_df['Query_ID'] == isolate]['Screen_Category'].values[0]
785 log.write(f"\t\t- {isolate}: {isolate_category}\n")
786 log.write("-------------------------------------------------------\n\n")
787
788 with open(log_file,"a+") as log:
789 log.write(f"Compiling SNPs across {len(pass_qc_isolates)} samples...\n")
790
791 if filtered_snp_df.shape[0] == 0:
792 snp_count = 0
793 with open(log_file,"a+") as log:
794 log.write("\t- No SNPs detected across samples...Skipping to output...\n")
795 else:
796
797 # Remove samples that failed QC
798 pass_filter_snps = filtered_snp_df[filtered_snp_df['Query_ID'].isin(pass_qc_isolates)].copy()
799 pass_filter_snp_list = list(set(pass_filter_snps['Ref_Loc']))
800 pass_filter_snp_count = len(pass_filter_snp_list)
801
802 global pass_filter_coverage_df
803 pass_filter_coverage_df = covered_df[covered_df['Query_ID'].isin(pass_qc_isolates)].copy()
804
805 # Get SNP counts
806 snp_df = pass_filter_snps[pass_filter_snps['Cat'] == "SNP"].copy()
807
808 if snp_df.shape[0] == 0:
809 snp_count = 0
810 with open(log_file,"a+") as log:
811 log.write(f"\t- {pass_filter_snp_count} total SNPs detected across all samples...\n")
812 log.write("\t- No SNPs passed QC filtering in any sample...Skipping to output...\n")
813
814 else:
815 snp_list = list(set(snp_df['Ref_Loc']))
816 snp_count = len(snp_list)
817
818 with open(log_file,"a+") as log:
819 log.write(f"\t- {pass_filter_snp_count} total SNPs detected across all samples...\n")
820 log.write(f"\t- {snp_count} unique SNPs passed QC filtering in at least one sample...\n")
821
822 # Note SNPs lost irrevocably to reference edge trimming
823 ref_edge_df = pass_filter_snps[pass_filter_snps['Cat'].isin(["Filtered_Ref_Edge",'Filtered_Both_Edge'])].copy()
824 if ref_edge_df.shape[0] > 0:
825 with open(log_file,"a+") as log:
826 log.write(f"\t- {ref_edge_df['Ref_Loc'].nunique()} unique SNPs were within {ref_edge}bp of a reference contig end and were not considered in any query...\n")
827
828 # Create Ref_Base df
829 ref_base_df = snp_df[['Ref_Loc', 'Ref_Base']].copy().drop_duplicates().rename(columns = {'Ref_Base':'Query_Base'})
830 ref_base_df.loc[:,'Query_ID'] = reference_id
831 ref_base_df.loc[:,'Cat'] = "Reference_Isolate"
832 ref_base_df = ref_base_df.loc[:,['Ref_Loc','Query_ID','Query_Base','Cat']]
833
834 # Rescue SNPs that are near the edge if they are valid SNPs in other samples
835 rescued_edge_df = pass_filter_snps[(pass_filter_snps['Cat'] == "Filtered_Query_Edge") & (pass_filter_snps['Ref_Loc'].isin(snp_list))].copy()
836
837 if rescue_edge == "norescue":
838 with open(log_file,"a+") as log:
839 log.write("\t- Skipping edge resucing...\n")
840
841 elif rescued_edge_df.shape[0] > 0:
842
843 # Remove rescued sites from pass_filter_snps
844 rescue_merge = pass_filter_snps.merge(rescued_edge_df, indicator=True, how='outer')
845 pass_filter_snps = rescue_merge[rescue_merge['_merge'] == 'left_only'].drop(columns=['_merge']).copy()
846
847 # Add rescued SNPs to snp_df
848 rescued_edge_df.loc[:,'Cat'] = "Rescued_SNP"
849 snp_df = pd.concat([snp_df,rescued_edge_df]).reset_index(drop=True)
850 with open(log_file,"a+") as log:
851 log.write(f"\t- Rescued {rescued_edge_df.shape[0]} query SNPs that fell within {query_edge}bp of the query contig edge...\n")
852
853 else:
854 with open(log_file,"a+") as log:
855 log.write(f"\t- No query SNPs that fell within {query_edge}bp of the query contig edge were rescued...\n")
856
857 # Gather base data for all valid SNPs
858 snp_base_df = snp_df[['Ref_Loc','Query_ID','Query_Base','Cat']].copy()
859
860 # Process purged sites
861 purged_snp_df = pd.DataFrame(columns=['Ref_Loc','Query_ID','Query_Base','Cat'])
862 purged_df = pass_filter_snps[pass_filter_snps['Cat'] != "SNP"].copy()
863
864 if purged_df.shape[0] > 0:
865
866 # Remove rows from purged_df if the Ref_Loc/Query_ID pair is already in snp_base_df
867 purged_df = purged_df[~purged_df[['Ref_Loc','Query_ID']].apply(tuple,1).isin(snp_base_df[['Ref_Loc','Query_ID']].apply(tuple,1))].copy()
868
869 if purged_df.shape[0] > 0:
870
871 # Get purged SNPs where no query has a valid SNP
872 non_snp_df = purged_df[~purged_df['Ref_Loc'].isin(snp_list)].copy()
873 if non_snp_df.shape[0] > 0:
874 non_snp_merge = purged_df.merge(non_snp_df, indicator=True, how='outer')
875 purged_df = non_snp_merge[non_snp_merge['_merge'] == 'left_only'].drop(columns=['_merge']).copy()
876
877 with open(log_file,"a+") as log:
878 log.write(f"\t- {len(list(set(non_snp_df['Ref_Loc'])))} unique SNPs were purged in all queries they were found in, and were not considered in the final dataset...\n")
879
880 if purged_df.shape[0] > 0:
881
882 purged_snp_df = purged_df[['Ref_Loc','Query_ID']].copy().drop_duplicates()
883 purged_snp_df.loc[:, 'Query_Base'] = "N"
884 final_purge_df = purged_df.groupby(['Ref_Loc','Query_ID']).apply(getFinalPurge).reset_index().rename(columns={0:'Cat'})
885 purged_snp_df = purged_snp_df.merge(final_purge_df, on=['Ref_Loc','Query_ID'], how='inner')
886
887 # Genomic positions that do not occur- in the SNP data are either uncovered or match the reference base
888 missing_df = pd.DataFrame(columns=['Ref_Loc','Query_ID','Query_Base','Cat'])
889
890 covered_snps = pd.concat([snp_base_df,purged_snp_df]).copy()
891 ref_loc_sets = covered_snps.groupby('Query_ID')['Ref_Loc'].agg(set).to_dict()
892 isolates_with_missing = [isolate for isolate in pass_qc_isolates if len(set(snp_list) - ref_loc_sets.get(isolate, set())) > 0]
893
894 uncovered_df = pd.DataFrame()
895
896 if isolates_with_missing:
897 isolate_data = [(isolate, list(set(snp_list) - ref_loc_sets.get(isolate, set()))) for isolate in isolates_with_missing]
898
899 with concurrent.futures.ProcessPoolExecutor() as executor:
900 results = [executor.submit(assessCoverage, query, sites) for query, sites in isolate_data]
901 coverage_dfs = [result.result() for result in concurrent.futures.as_completed(results)]
902
903 coverage_df = pd.concat(coverage_dfs)
904 covered_df = coverage_df[coverage_df['Cat'] == 'Ref_Base']
905 uncovered_df = coverage_df[coverage_df['Cat'] == 'Uncovered']
906
907 if not uncovered_df.empty:
908 uncovered_df.loc[:, 'Query_Base'] = "?"
909 missing_df = pd.concat([missing_df, uncovered_df[['Ref_Loc', 'Query_ID', 'Query_Base', 'Cat']]])
910
911 if not covered_df.empty:
912 ref_base_snp_df = covered_df.merge(ref_base_df[['Ref_Loc', 'Query_Base']], on='Ref_Loc', how='left')
913 missing_df = pd.concat([missing_df, ref_base_snp_df[['Ref_Loc', 'Query_ID', 'Query_Base', 'Cat']]])
914
915 with open(log_file,"a+") as log:
916 log.write("\t- Processed coverage information...\n")
917
918 final_snp_df = pd.concat([snp_base_df,purged_snp_df,missing_df,ref_base_df]).sort_values(by=['Ref_Loc','Query_ID']).reset_index(drop=True)
919 snp_counts = final_snp_df.groupby('Query_ID')['Ref_Loc'].count().reset_index().rename(columns={'Ref_Loc':'SNP_Count'})
920
921 # Assert that all snp_counts == snp_count
922 assert snp_counts['SNP_Count'].nunique() == 1
923 assert snp_counts['SNP_Count'].values[0] == snp_count
924
925 # Get locus coverage stats
926 snp_coverage_df = final_snp_df[final_snp_df['Cat'].isin(['SNP','Rescued_SNP'])].groupby('Ref_Loc')['Query_ID'].count().reset_index().rename(columns={'Query_ID':'SNP_Count'})
927 ref_base_coverage_df = final_snp_df[final_snp_df['Cat'].isin(["Ref_Base","Reference_Isolate"])].groupby('Ref_Loc')['Query_ID'].count().reset_index().rename(columns={'Query_ID':'Ref_Base_Count'})
928
929 if uncovered_df.shape[0] > 0:
930 uncovered_count_df = final_snp_df[final_snp_df['Cat'] == "Uncovered"].groupby('Ref_Loc')['Query_ID'].count().reset_index().rename(columns={'Query_ID':'Uncovered_Count'}).copy()
931 else:
932 uncovered_count_df = pd.DataFrame(columns=['Ref_Loc','Uncovered_Count'])
933
934 possible_purged_cols = ['Purged_Length','Purged_Identity','Purged_Invalid','Purged_Indel','Purged_Heterozygous','Purged_Density']
935 if purged_snp_df.shape[0] > 0:
936 purged_count_df = final_snp_df[final_snp_df['Cat'].isin(possible_purged_cols)].groupby('Ref_Loc')['Query_ID'].count().reset_index().rename(columns={'Query_ID':'Purged_Count'}).copy()
937 else:
938 purged_count_df = pd.DataFrame(columns=['Ref_Loc','Purged_Count'])
939
940 locus_coverage_df = snp_coverage_df.merge(ref_base_coverage_df, how='outer', on='Ref_Loc').merge(uncovered_count_df, how='outer', on='Ref_Loc').merge(purged_count_df, how='outer', on='Ref_Loc').fillna(0)
941 locus_coverage_df.loc[:, ['SNP_Count','Ref_Base_Count','Uncovered_Count','Purged_Count']] = locus_coverage_df.loc[:, ['SNP_Count','Ref_Base_Count','Uncovered_Count','Purged_Count']].astype(int)
942 locus_coverage_df['Missing_Ratio'] = ((locus_coverage_df['Uncovered_Count'] + locus_coverage_df['Purged_Count']) / (1+len(pass_qc_isolates))) * 100
943 locus_coverage_df.to_csv(locus_category_file, sep="\t", index=False)
944
945 # Get isolate coverage stats
946 min_isolate_cols = ['Query_ID','SNP','Ref_Base','Percent_Missing','Purged','Uncovered','Rescued_SNP','Purged_Ref_Edge']
947 isolate_coverage_df = final_snp_df.groupby('Query_ID')['Cat'].value_counts().unstack().fillna(0).astype(float).astype(int).reset_index().drop(columns=['Reference_Isolate'])
948 isolate_coverage_df.loc[isolate_coverage_df['Query_ID'] == reference_id, 'Ref_Base'] = snp_count
949
950 if "Rescued_SNP" not in isolate_coverage_df.columns.tolist():
951 isolate_coverage_df.loc[:,'Rescued_SNP'] = 0
952 isolate_coverage_df['SNP'] = isolate_coverage_df['SNP'] + isolate_coverage_df['Rescued_SNP']
953
954 for col in ['Uncovered'] + possible_purged_cols:
955 if col not in isolate_coverage_df.columns.tolist():
956 isolate_coverage_df.loc[:,col] = 0
957
958 isolate_coverage_df['Purged'] = isolate_coverage_df[possible_purged_cols].sum(axis=1)
959
960 isolate_coverage_df['Percent_Missing'] = (isolate_coverage_df['Uncovered'] + isolate_coverage_df['Purged'])/(isolate_coverage_df['Uncovered'] + isolate_coverage_df['Purged'] + isolate_coverage_df['Ref_Base'] + isolate_coverage_df['SNP']) * 100
961
962 isolate_coverage_df.loc[:,'Purged_Ref_Edge'] = 0
963 if ref_edge_df.shape[0] > 0:
964 isolate_coverage_df.loc[isolate_coverage_df['Query_ID'] == reference_id, 'Purged_Ref_Edge'] = ref_edge_df['Ref_Loc'].nunique()
965
966 isolate_coverage_df = isolate_coverage_df[min_isolate_cols + possible_purged_cols].sort_values(by = 'Percent_Missing',ascending = False).reset_index(drop=True)
967 isolate_coverage_df.to_csv(query_coverage_file, sep="\t", index=False)
968
969 with open(log_file,"a+") as log:
970 log.write(f"\t- SNP coverage information: {locus_category_file}\n")
971 log.write(f"\t- Query coverage information: {query_coverage_file}\n")
972 log.write("-------------------------------------------------------\n\n")
973
974 with open(log_file,"a+") as log:
975 log.write("Processing alignment data...")
976
977 alignment_df = final_snp_df[['Query_ID','Ref_Loc','Query_Base']].copy().rename(columns={'Query_Base':'Base'}).pivot(index='Query_ID', columns='Ref_Loc', values='Base')
978 csp2_ordered = alignment_df.columns
979
980 with open(raw_loclist,"w+") as loclist:
981 loclist.write("\n".join(csp2_ordered)+"\n")
982
983 seq_records = [SeqRecord(Seq(''.join(row)), id=query,description='') for query,row in alignment_df.iterrows()]
984
985 alignment = MultipleSeqAlignment(seq_records)
986 AlignIO.write(alignment,raw_alignment,"fasta")
987
988 with open(log_file,"a+") as log:
989 log.write("Done!\n")
990 log.write(f"\t- Saved alignment of {snp_count} SNPs to {raw_alignment}\n")
991 log.write(f"\t- Saved ordered loc list to {raw_loclist}\n")
992
993 if max_missing == float(100):
994 locs_pass_missing = csp2_ordered
995 preserved_alignment = alignment
996
997 AlignIO.write(preserved_alignment,preserved_alignment_file,"fasta")
998 with open(preserved_loclist,"w+") as loclist:
999 loclist.write("\n".join(csp2_ordered)+"\n")
1000
1001 with open(log_file,"a+") as log:
1002 log.write("Skipping SNP preservation step...\n")
1003 log.write(f"\t- Saved duplicate alignment to {preserved_alignment_file}\n")
1004 log.write(f"\t- Saved duplicate ordered loc list to {preserved_loclist}\n")
1005 else:
1006 with open(log_file,"a+") as log:
1007 log.write(f"\t- Preserving SNPs with at most {max_missing}% missing data...\n")
1008
1009 # Parse missing data
1010 locs_pass_missing = list(set(locus_coverage_df[locus_coverage_df['Missing_Ratio'] <= max_missing]['Ref_Loc']))
1011
1012 if len(locs_pass_missing) == 0:
1013 with open(log_file,"a+") as log:
1014 log.write(f"\t- Of {snp_count} SNPs, no SNPs pass the {max_missing}% missing data threshold...\n")
1015 log.write("-------------------------------------------------------\n\n")
1016 else:
1017 preserved_alignment_df = alignment_df[locs_pass_missing].copy()
1018
1019 preserved_ordered = preserved_alignment_df.columns
1020 with open(preserved_loclist,"w+") as loclist:
1021 loclist.write("\n".join(preserved_ordered)+"\n")
1022
1023 seq_records = [SeqRecord(Seq(''.join(row)), id=query,description='') for query,row in preserved_alignment_df.iterrows()]
1024 preserved_alignment = MultipleSeqAlignment(seq_records)
1025 AlignIO.write(preserved_alignment,preserved_alignment_file,"fasta")
1026 with open(log_file,"a+") as log:
1027 log.write(f"\t- Of {snp_count} SNPs, {len(locs_pass_missing)} SNPs pass the {max_missing}% missing data threshold...\n")
1028 log.write(f"\t- Saved preserved alignment to {preserved_alignment_file}\n")
1029 log.write(f"\t- Saved preserved ordered loc list to {preserved_loclist}\n")
1030 log.write("-------------------------------------------------------\n\n")
1031
1032 with open(log_file,"a+") as log:
1033 log.write("Processing pairwise comparisons files...")
1034
1035 # Get pairwise comparisons between all pass_qc_isolates and reference_id
1036 pairwise_combinations = [sorted(x) for x in list(combinations([reference_id] + pass_qc_isolates, 2))]
1037
1038 if snp_count == 0:
1039 pairwise_df = pd.DataFrame([(pairwise[0], pairwise[1], 0,np.nan) for pairwise in pairwise_combinations],columns = ['Query_1','Query_2','SNP_Distance','SNPs_Cocalled'])
1040 preserved_pairwise_df = pairwise_df.copy()
1041
1042 pairwise_df.to_csv(raw_pairwise, sep="\t", index=False)
1043 preserved_pairwise_df.to_csv(preserved_pairwise, sep="\t", index=False)
1044
1045 # Create matrix
1046 idx = sorted(set(pairwise_df['Query_1']).union(pairwise_df['Query_2']))
1047 mirrored_distance_df = pairwise_df.pivot(index='Query_1', columns='Query_2', values='SNP_Distance').reindex(index=idx, columns=idx).fillna(0, downcast='infer').pipe(lambda x: x+x.values.T).applymap(lambda x: format(x, '.0f'))
1048 mirrored_distance_df.index.name = ''
1049 mirrored_distance_df.to_csv(raw_matrix,sep="\t")
1050 mirrored_distance_df.to_csv(preserved_matrix,sep="\t")
1051
1052 else:
1053 raw_distance_results = parallelAlignment(alignment)
1054 raw_pairwise_df = pd.DataFrame(raw_distance_results, columns=['Query_1', 'Query_2', 'SNP_Distance', 'SNPs_Cocalled'])
1055 raw_pairwise_df.to_csv(raw_pairwise, sep="\t", index=False)
1056
1057 if len(locs_pass_missing) == snp_count:
1058 preserved_pairwise_df = raw_pairwise_df.copy()
1059 preserved_pairwise_df.to_csv(preserved_pairwise, sep="\t", index=False)
1060 elif len(locs_pass_missing) == 0:
1061 preserved_pairwise_df = pd.DataFrame([(pairwise[0], pairwise[1], 0,np.nan) for pairwise in pairwise_combinations],columns = ['Query_1','Query_2','SNP_Distance','SNPs_Cocalled'])
1062 preserved_pairwise_df.to_csv(preserved_pairwise, sep="\t", index=False)
1063 else:
1064 preserved_distance_results = parallelAlignment(preserved_alignment)
1065 preserved_pairwise_df = pd.DataFrame(preserved_distance_results, columns=['Query_1', 'Query_2', 'SNP_Distance', 'SNPs_Cocalled'])
1066 preserved_pairwise_df.to_csv(preserved_pairwise, sep="\t", index=False)
1067
1068 # Create matrix
1069 idx = sorted(set(raw_pairwise_df['Query_1']).union(raw_pairwise_df['Query_2']))
1070 mirrored_distance_df = raw_pairwise_df.pivot(index='Query_1', columns='Query_2', values='SNP_Distance').reindex(index=idx, columns=idx).fillna(0, downcast='infer').pipe(lambda x: x+x.values.T).applymap(lambda x: format(x, '.0f'))
1071 mirrored_distance_df.index.name = ''
1072 mirrored_distance_df.to_csv(raw_matrix,sep="\t")
1073
1074 idx = sorted(set(preserved_pairwise_df['Query_1']).union(preserved_pairwise_df['Query_2']))
1075 mirrored_distance_df = preserved_pairwise_df.pivot(index='Query_1', columns='Query_2', values='SNP_Distance').reindex(index=idx, columns=idx).fillna(0, downcast='infer').pipe(lambda x: x+x.values.T).applymap(lambda x: format(x, '.0f'))
1076 mirrored_distance_df.index.name = ''
1077 mirrored_distance_df.to_csv(preserved_matrix,sep="\t")
1078
1079 # Clean up pybedtools temp
1080 helpers.cleanup(verbose=False,remove_all = False)
1081
1082 end_time = time.time()
1083 with open(log_file,"a+") as log:
1084 log.write("Done!\n")
1085 if snp_count == 0:
1086 log.write(f"\t- No SNPs detected, zeroed pairwise distance files saved to {raw_pairwise}/{preserved_pairwise}/{raw_matrix}/{preserved_matrix}\n")
1087 else:
1088 log.write(f"\t- Saved raw pairwise distances to {raw_pairwise}\n")
1089 log.write(f"\t- Saved raw pairwise matrix to {raw_matrix}\n")
1090
1091 if max_missing == float(100):
1092 log.write("Skipped SNP preservation step...\n")
1093 log.write(f"\t- Saved duplicated preserved pairwise distances to {preserved_pairwise}\n")
1094 log.write(f"\t- Saved duplicated preserved pairwise matrix to {preserved_matrix}\n")
1095 elif len(locs_pass_missing) == 0:
1096 log.write(f"\t- No SNPs passed the {max_missing}% missing data threshold, zeroed pairwise distance files saved to {preserved_pairwise}/{preserved_matrix}\n")
1097 else:
1098 log.write(f"\t- Saved preserved pairwise distances to {preserved_pairwise}\n")
1099 log.write(f"\t- Saved preserved pairwise matrix to {preserved_matrix}\n")
1100 log.write(f"Total Time: {end_time - start_time:.2f} seconds\n")
1101 log.write("-------------------------------------------------------\n\n")
1102
1103 except:
1104 run_failed = True
1105 print("Exception occurred:\n", traceback.format_exc())
1106 finally:
1107 helpers.cleanup(verbose=False, remove_all=False)
1108 if temp_dir != "":
1109 shutil.rmtree(temp_dir)
1110 if run_failed:
1111 sys.exit(1)