Mercurial > repos > rliterman > csp2
comparison CSP2/subworkflows/fetchData/main.nf @ 0:01431fa12065
"planemo upload"
author | rliterman |
---|---|
date | Mon, 02 Dec 2024 10:40:55 -0500 |
parents | |
children | 0dee732cfad2 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:01431fa12065 |
---|---|
1 // Subworkflow to fetch sample and reference data from --fasta/--reads/--ref_fasta/--ref_reads | |
2 | |
3 // Set path variables | |
4 output_directory = file(params.output_directory) | |
5 assembly_directory = file(params.assembly_directory) | |
6 log_directory = file(params.log_directory) | |
7 | |
8 ref_id_file = file(params.ref_id_file) | |
9 | |
10 // Set ref_mode | |
11 ref_mode = params.ref_mode | |
12 | |
13 // Set file headers | |
14 assembly_header = "Isolate_ID\tRead_Type\tRead_Location\tAssembly_Path\n" | |
15 | |
16 // Set paths to accessory files/scripts | |
17 assembly_log = file("${log_directory}/Assembly_Data.tsv") | |
18 user_snpdiffs_list = file("${log_directory}/Imported_SNPDiffs.txt") | |
19 findReads = file("${projectDir}/bin/fetchReads.py") | |
20 userSNPDiffs = file("${projectDir}/bin/userSNPDiffs.py") | |
21 | |
22 // Set SKESA cores to 5 or fewer | |
23 skesa_cpus = (params.cores as Integer) >= 5 ? 5 : (params.cores as Integer) | |
24 | |
25 workflow { | |
26 main: | |
27 input_data = fetchData() | |
28 query_data = input_data.query_data | |
29 reference_data = input_data.reference_data | |
30 snpdiffs_data = input_data.snpdiff_data | |
31 | |
32 publish: | |
33 query_data >> 'query_data.tsv' | |
34 reference_data >> 'reference_data.tsv' | |
35 snpdiff_data >> 'snpdiff_data.tsv' | |
36 } | |
37 | |
38 // Top-level workflow // | |
39 workflow fetchData{ | |
40 | |
41 emit: | |
42 query_data | |
43 reference_data | |
44 snpdiff_data | |
45 | |
46 main: | |
47 // Get any excluded IDs | |
48 ("${params.exclude}" != "" ? processExclude() : Channel.empty()).set{exclude_ids} | |
49 | |
50 // Process snpdiffs alignments | |
51 // If assembly file cannot be found, it will be 'null' | |
52 ("${params.snpdiffs}" != "" ? processSNPDiffs() : Channel.empty()).set{user_snpdiffs} | |
53 | |
54 excluded_snpdiffs = user_snpdiffs.map{it -> tuple(it[1],it[0])} | |
55 .concat(user_snpdiffs.map{it -> tuple(it[10],it[0])}) | |
56 .join(exclude_ids,by:0) | |
57 .unique{it -> it[1]} | |
58 .map{it -> tuple(it[1],"Exclude")} | |
59 | |
60 // Generate return channel: 3-item tuple (Query_ID, Reference_ID, SNPDiff_Path) | |
61 snpdiff_data = user_snpdiffs | |
62 .map{it -> tuple(it[0],it[1],it[10])} | |
63 .join(excluded_snpdiffs,by:0,remainder:true) | |
64 .filter{it -> it[0].toString() != "null"} | |
65 .filter{it -> it[3].toString() != "Exclude"} | |
66 .unique{it -> it[0]} | |
67 .map{it -> tuple(it[1],it[2],it[0])} | |
68 .collect().flatten().collate(3) | |
69 | |
70 // Get assembly data from snpdiffs | |
71 snpdiff_assemblies = user_snpdiffs.map{it-> tuple(it[1],it[2])} | |
72 .concat(user_snpdiffs.map{it-> tuple(it[10],it[11])}) | |
73 .join(exclude_ids,by:0,remainder:true) | |
74 .filter{it -> it[0].toString() != "null"} | |
75 .filter{it -> it[2].toString() != "Exclude"} | |
76 .map{it -> tuple(it[0],it[1],'SNPDiff')} | |
77 .collect().flatten().collate(3) | |
78 | |
79 assembled_snpdiffs = snpdiff_assemblies | |
80 .filter{it -> it[1].toString() != "null"} | |
81 .unique{it->it[0]}.collect().flatten().collate(3) | |
82 | |
83 // Process any data provided as assemblies | |
84 // Returns 2-item tuples with the following format: (Isolate_ID, Assembly_Path) | |
85 ("${params.fasta}" != "" ? fetchQueryFasta() : Channel.empty()).set{query_fasta} | |
86 ("${params.ref_fasta}" != "" ? fetchRefFasta() : Channel.empty()).set{ref_fasta} | |
87 | |
88 pre_assembled = assembled_snpdiffs | |
89 .map{it -> tuple(it[0],it[1])} | |
90 .concat(query_fasta) | |
91 .concat(ref_fasta) | |
92 .unique{it -> it[0]} | |
93 .join(exclude_ids,by:0,remainder:true) | |
94 .filter{it -> it[0].toString() != "null"} | |
95 .filter{it -> it[2].toString() != "Exclude"} | |
96 .map{it->tuple(it[0],it[1])} | |
97 .collect().flatten().collate(2) | |
98 | |
99 // Process any data provided as reads | |
100 // Returns 3-item tuples with the following format: (Isolate_ID, Read_Type, Read_Path) | |
101 ("${params.reads}" != "" ? fetchQueryReads() : Channel.empty()).set{query_reads} | |
102 ("${params.ref_reads}" != "" ? fetchRefReads() : Channel.empty()).set{ref_reads} | |
103 | |
104 all_reads = query_reads | |
105 .concat(ref_reads) | |
106 .unique{it->it[0]} | |
107 .join(exclude_ids,by:0,remainder:true) | |
108 .filter{it -> it[0].toString() != "null"} | |
109 .filter{it -> it[3].toString() != "Exclude"} | |
110 .map{it->tuple(it[0],it[1],it[2])} | |
111 .collect().flatten().collate(3) | |
112 | |
113 // Figure out if any assembly is necessary | |
114 fasta_read_combo = all_reads.join(pre_assembled,by:0,remainder: true) | | |
115 branch{it -> | |
116 assembly: it[1].toString() == "null" | |
117 return(tuple(it[0],it[2])) | |
118 read: it[3].toString() == "null" | |
119 return(tuple(it[0],it[1],it[2])) | |
120 combo: true | |
121 return(tuple(it[0],it[3]))} | |
122 | |
123 // Assemble reads if necessary | |
124 assembled_reads = fasta_read_combo.read | |
125 .collect().flatten().collate(3) | assembleReads | |
126 | |
127 // If runmode is 'assemble', tasks are complete | |
128 if(params.runmode == "assemble"){ | |
129 query_data = Channel.empty() | |
130 reference_data = Channel.empty() | |
131 } else{ | |
132 | |
133 // If FASTAs are provided via data and snpdiffs, use snpdiffs (as it's already been used) | |
134 user_fastas = query_fasta | |
135 .concat(ref_fasta) | |
136 .concat(assembled_reads) | |
137 .unique{it -> it[0]} | |
138 .join(exclude_ids,by:0,remainder:true) | |
139 .filter{it -> it[0].toString() != "null"} | |
140 .filter{it -> it[2].toString() != "Exclude"} | |
141 .map{it->tuple(it[0],it[1],'User')} | |
142 .collect().flatten().collate(3) | |
143 .join(assembled_snpdiffs,by:0,remainder:true) | |
144 .filter{it -> it[3].toString() == "null"} | |
145 .map{it->tuple(it[0],it[1])} | |
146 | |
147 // Get all assemblies | |
148 all_assembled = assembled_snpdiffs | |
149 .map{it -> tuple(it[0],it[1])} | |
150 .concat(user_fastas) | |
151 .unique{it->it[0]}.collect().flatten().collate(2) | |
152 | |
153 // Get data for isolates where a SNPDiff was provided, but no FASTA could be located | |
154 no_assembly = snpdiff_assemblies | |
155 .map{it -> tuple(it[0],it[1])} | |
156 .filter{it -> it[1].toString() == "null"} | |
157 .unique{it -> it[0]} | |
158 .join(all_assembled,by:0,remainder:true) | |
159 .filter{it -> it[2].toString() == "null"} | |
160 .map{it->tuple(it[0],it[1])} | |
161 .collect().flatten().collate(2) | |
162 | |
163 // Compile all samples | |
164 all_samples = all_assembled | |
165 .concat(no_assembly) | |
166 .unique{it-> it[0]}.collect().flatten().collate(2) | |
167 | |
168 // If no reference data is provided return a blank channel | |
169 if(!ref_mode){ | |
170 reference_data = Channel.empty() | |
171 | |
172 query_data = all_samples | |
173 .unique{it -> it[0]} | |
174 .collect().flatten().collate(2) | |
175 | |
176 } else{ | |
177 | |
178 // Process additional reference IDs | |
179 ("${params.ref_id}" != "" ? processRefIDs() : Channel.empty()).set{user_ref_ids} | |
180 | |
181 all_ref_ids = ref_fasta.map{it->tuple(it[0])} | |
182 .concat(ref_reads.map{it->tuple(it[0])}) | |
183 .concat(user_ref_ids) | |
184 .unique{it-> it[0]}.collect().flatten().collate(1) | |
185 .map{it -> tuple(it[0],"Reference")} | |
186 .join(exclude_ids,by:0,remainder:true) | |
187 .filter{it -> it[0].toString() != "null"} | |
188 .filter{it -> it[2].toString() != "Exclude"} | |
189 .map{it -> tuple(it[0],it[1])} | |
190 | |
191 reference_data = all_samples | |
192 .join(all_ref_ids,by:0,remainder:true) | |
193 .filter{it -> it[2].toString() == "Reference"} | |
194 .map{it->tuple(it[0],it[1])} | |
195 .unique{it -> it[0]} | |
196 .collect().flatten().collate(2) | |
197 | |
198 // Save reference data to file | |
199 reference_data | |
200 .collect{it -> it[0]} | |
201 | saveRefIDs | |
202 | |
203 if(params.runmode == "screen" || params.runmode == "align"){ | |
204 query_data = all_samples | |
205 .join(all_ref_ids,by:0,remainder:true) | |
206 .filter{it -> it[2].toString() != "Reference"} | |
207 .map{it->tuple(it[0],it[1])} | |
208 .unique{it -> it[0]} | |
209 .collect().flatten().collate(2) | |
210 } else if(params.runmode == "snp"){ | |
211 query_data = all_samples | |
212 .unique{it -> it[0]} | |
213 .collect().flatten().collate(2) | |
214 } | |
215 } | |
216 } | |
217 } | |
218 | |
219 // Fetching preassembled data // | |
220 workflow fetchQueryFasta{ | |
221 | |
222 emit: | |
223 query_fasta | |
224 | |
225 main: | |
226 | |
227 // If --fasta is set, grab assembly paths and characterize assemblies | |
228 ("${params.fasta}" != "" ? getAssemblies(params.fasta) : Channel.empty()).set{query_fasta} | |
229 } | |
230 workflow fetchRefFasta{ | |
231 | |
232 emit: | |
233 ref_fasta | |
234 | |
235 main: | |
236 | |
237 // If --fasta is set, grab assembly paths and characterize assemblies | |
238 ("${params.ref_fasta}" != "" ? getAssemblies(params.ref_fasta) : Channel.empty()).set{ref_fasta} | |
239 } | |
240 workflow getAssemblies{ | |
241 | |
242 take: | |
243 fasta_loc | |
244 | |
245 emit: | |
246 fasta_data | |
247 | |
248 main: | |
249 def trim_this = "${params.trim_name}" | |
250 | |
251 if(fasta_loc == ""){ | |
252 error "No assembly data provided via --fasta/--ref_fasta" | |
253 } else{ | |
254 | |
255 fasta_dir = file(fasta_loc) | |
256 | |
257 // If --fasta is a directory... | |
258 if(fasta_dir.isDirectory()){ | |
259 ch_fasta = Channel.fromPath(["${fasta_dir}/*.fa","${fasta_dir}/*.fasta","${fasta_dir}/*.fna"]) | |
260 } | |
261 // If --fasta is a file... | |
262 else if(fasta_dir.isFile()){ | |
263 | |
264 // Check if it is a single fasta file... | |
265 if(fasta_dir.getExtension() == "fa" || fasta_dir.getExtension() == "fna" || fasta_dir.getExtension() == "fasta"){ | |
266 ch_fasta = Channel.from(fasta_dir).map{it-> file(it)} | |
267 } | |
268 // Otherwise, assume a file with paths to FASTAs | |
269 else{ | |
270 ch_fasta = Channel.from(fasta_dir.readLines()).filter{ file -> file =~ /\.(fa|fasta|fna)$/}.map{it-> file(it)} | |
271 } | |
272 } else{ | |
273 error "$fasta_dir is not a valid directory or file..." | |
274 } | |
275 fasta_data = ch_fasta | |
276 .filter { file(it).exists() } | |
277 .map { filePath -> | |
278 def fileName = file(filePath).getBaseName() | |
279 def sampleName = fileName.replaceAll(trim_this, "") | |
280 tuple(sampleName, filePath)} | |
281 } | |
282 } | |
283 workflow processSNPDiffs{ | |
284 | |
285 emit: | |
286 snpdiffs_data | |
287 | |
288 main: | |
289 | |
290 if("${params.snpdiffs}" == ""){ | |
291 error "No assembly data provided via --snpdiffs" | |
292 } else{ | |
293 | |
294 snpdiffs_dir = file("${params.snpdiffs}") | |
295 | |
296 // If --fasta is a directory... | |
297 if(snpdiffs_dir.isDirectory()){ | |
298 ch_snpdiffs = Channel.fromPath("${snpdiffs_dir}/*.snpdiffs") | |
299 } | |
300 // If --fasta is a file... | |
301 else if(snpdiffs_dir.isFile()){ | |
302 | |
303 // Check if it is a single fasta file... | |
304 if(snpdiffs_dir.getExtension() == "snpdiffs"){ | |
305 ch_snpdiffs = Channel.from(snpdiffs_dir) | |
306 } | |
307 // Otherwise, assume a file with paths to SNPDiffs | |
308 else{ | |
309 ch_snpdiffs = Channel.from(snpdiffs_dir.readLines()).filter{it->it.endsWith('.snpdiffs') } | |
310 } | |
311 } else{ | |
312 error "$snpdiffs_dir is not a valid directory or file..." | |
313 } | |
314 | |
315 snpdiffs_data = ch_snpdiffs | |
316 .filter { file(it).exists() } | |
317 .collect() | getSNPDiffsData | splitCsv | collect | flatten | collate(19) | |
318 | |
319 // (1) SNPDiffs_File, (2) Query_ID, (3) Query_Assembly, (4) Query_Contig_Count, (5) Query_Assembly_Bases, | |
320 // (6) Query_N50, (7) Query_N90, (8) Query_L50, (9) Query_L90, (10) Query_SHA256, | |
321 // (11) Reference_ID, (12) Reference_Assembly, (13) Reference_Contig_Count, (14) Reference_Assembly_Bases, | |
322 // (15) Reference_N50, (16) Reference_N90, (17) Reference_L50, (18) Reference_L90, (19) Reference_SHA256 | |
323 } | |
324 } | |
325 process getSNPDiffsData{ | |
326 executor = 'local' | |
327 cpus = 1 | |
328 maxForks = 1 | |
329 | |
330 input: | |
331 val(snpdiffs_paths) | |
332 | |
333 output: | |
334 stdout | |
335 | |
336 script: | |
337 | |
338 user_snpdiffs_list.write(snpdiffs_paths.join('\n') + "\n") | |
339 """ | |
340 $params.load_python_module | |
341 python ${userSNPDiffs} --snpdiffs_file "${user_snpdiffs_list}" --trim_name "${params.trim_name}" | |
342 """ | |
343 } | |
344 | |
345 | |
346 // Fetching read data // | |
347 workflow fetchQueryReads{ | |
348 | |
349 emit: | |
350 query_reads | |
351 | |
352 main: | |
353 | |
354 // If --fasta is set, grab assembly paths and characterize assemblies | |
355 ("${params.reads}" != "" ? processReads(params.reads,params.readext,params.forward,params.reverse) : Channel.empty()).set{query_reads} | |
356 } | |
357 workflow fetchRefReads{ | |
358 | |
359 emit: | |
360 ref_reads | |
361 | |
362 main: | |
363 | |
364 // If --fasta is set, grab assembly paths and characterize assemblies | |
365 ("${params.ref_reads}" != "" ? processReads(params.ref_reads,params.ref_readext,params.ref_forward,params.ref_reverse) : Channel.empty()).set{ref_reads} | |
366 } | |
367 workflow processReads{ | |
368 | |
369 take: | |
370 read_loc | |
371 read_ext | |
372 forward | |
373 reverse | |
374 | |
375 emit: | |
376 read_info | |
377 | |
378 main: | |
379 | |
380 if(read_loc == ""){ | |
381 error "No data provided to --reads/--ref_reads" | |
382 } else{ | |
383 | |
384 read_dir = file(read_loc) | |
385 | |
386 // If --reads is a single directory, get all reads from that directory | |
387 if(read_dir.isDirectory()){ | |
388 read_info = fetchReads(read_dir,read_ext,forward,reverse) | splitCsv | |
389 } | |
390 | |
391 // If --reads is a file including paths to many directories, process reads from all directories | |
392 else if(read_dir.isFile()){ | |
393 read_info = fetchReads(Channel.from(read_dir.readLines()),read_ext,forward,reverse) | splitCsv | |
394 } | |
395 // Error if --reads doesn't point to a valid file or directory | |
396 else{ | |
397 error "$read_dir is neither a valid file or directory..." | |
398 } | |
399 } | |
400 } | |
401 process fetchReads{ | |
402 | |
403 executor = 'local' | |
404 cpus = 1 | |
405 maxForks = 1 | |
406 | |
407 input: | |
408 val dir // Directory containing read files | |
409 val read_ext // Extention for read files (e.g., fastq.gz or fq) | |
410 val forward_suffix // Identifier for forward reads (e.g., _1.fastq or _R1_001.fq.gz) | |
411 val reverse_suffix // Identifier for reverse reads (e.g., _2.fastq or _R2_001.fq.gz) | |
412 | |
413 output: | |
414 stdout | |
415 | |
416 script: | |
417 | |
418 if(!file(dir).isDirectory()){ | |
419 error "$dir is not a valid directory..." | |
420 } else{ | |
421 """ | |
422 $params.load_python_module | |
423 python ${findReads} --read_dir ${dir} --read_filetype ${read_ext} --forward_suffix ${forward_suffix} --reverse_suffix ${reverse_suffix} --trim_name ${params.trim_name} | |
424 """ | |
425 } | |
426 } | |
427 | |
428 // Fetch reference IDs // | |
429 workflow processRefIDs{ | |
430 | |
431 emit: | |
432 ref_ids | |
433 | |
434 main: | |
435 def trim_this = "${params.trim_name}" | |
436 | |
437 ref_ids = params.ref_id | |
438 .tokenize(',') | |
439 .unique() | |
440 .collect { it -> | |
441 "${it}".replaceAll(trim_this, "")} | |
442 .flatten() | |
443 } | |
444 | |
445 // Fetch reference IDs // | |
446 workflow processExclude{ | |
447 | |
448 emit: | |
449 exclude_ids | |
450 | |
451 main: | |
452 def trim_this = "${params.trim_name}" | |
453 | |
454 exclude_ids = Channel.from(params.exclude | |
455 .tokenize(',') | |
456 .collect { it -> "${it}".replaceAll(trim_this, "")}) | |
457 .map{it -> tuple(it.toString(),"Exclude")} | |
458 .unique{it -> it[0]} | |
459 } | |
460 | |
461 process saveRefIDs{ | |
462 executor = 'local' | |
463 cpus = 1 | |
464 maxForks = 1 | |
465 | |
466 input: | |
467 val(ref_ids) | |
468 | |
469 script: | |
470 ref_id_file.append(ref_ids.join('\n') + '\n') | |
471 """ | |
472 """ | |
473 } | |
474 | |
475 // Assembly // | |
476 workflow assembleReads{ | |
477 | |
478 take: | |
479 to_assemble | |
480 | |
481 emit: | |
482 assembled_data | |
483 | |
484 main: | |
485 | |
486 // Run SKESA on each entry | |
487 assembly_output = skesaAssemble(to_assemble).splitCsv() | |
488 | |
489 // Print log of assemblies | |
490 assembly_output.map {it -> it.join("\t")}.collect() | saveAssemblyLog | |
491 | |
492 // Return assembly data | |
493 assembled_data = assembly_output.map{it->tuple(it[0],it[3])} | |
494 } | |
495 process skesaAssemble{ | |
496 memory '12 GB' // Add readcount/memory check? | |
497 | |
498 input: | |
499 tuple val(sample_name),val(read_type),val(read_location) | |
500 | |
501 output: | |
502 stdout | |
503 | |
504 script: | |
505 assembly_file = file("${assembly_directory}/${sample_name}.fasta") | |
506 | |
507 // Ensure folder exists and file doesn't | |
508 if(!assembly_directory.isDirectory()){ | |
509 error "$assembly_directory is not a valid directory..." | |
510 } else if(assembly_file.isFile()){ | |
511 error "$assembly_file already exists..." | |
512 } else if(read_type == "Paired"){ | |
513 forward_reverse = read_location.split(";") | |
514 """ | |
515 $params.load_skesa_module | |
516 skesa --cores ${skesa_cpus} --use_paired_ends --fastq ${forward_reverse[0]} ${forward_reverse[1]} --contigs_out ${assembly_file} | |
517 echo "${sample_name},${read_type},${read_location},${assembly_file}" | |
518 """ | |
519 } else if(read_type == "Single"){ | |
520 """ | |
521 $params.load_skesa_module | |
522 skesa --cores ${skesa_cpus} --fastq ${read_location} --contigs_out ${assembly_file} | |
523 echo "${sample_name},${read_type},${read_location},${assembly_file}" | |
524 """ | |
525 } else{ | |
526 error "read_type should be Paired or Single, not $read_type..." | |
527 } | |
528 } | |
529 process saveAssemblyLog{ | |
530 executor = 'local' | |
531 cpus = 1 | |
532 maxForks = 1 | |
533 | |
534 input: | |
535 val(assembly_data) | |
536 | |
537 script: | |
538 assembly_log.write(assembly_header) | |
539 assembly_log.append(assembly_data.join('\n') + '\n') | |
540 """ | |
541 """ | |
542 } |