comparison CSP2/subworkflows/fetchData/main.nf @ 0:01431fa12065

"planemo upload"
author rliterman
date Mon, 02 Dec 2024 10:40:55 -0500
parents
children 0dee732cfad2
comparison
equal deleted inserted replaced
-1:000000000000 0:01431fa12065
1 // Subworkflow to fetch sample and reference data from --fasta/--reads/--ref_fasta/--ref_reads
2
3 // Set path variables
4 output_directory = file(params.output_directory)
5 assembly_directory = file(params.assembly_directory)
6 log_directory = file(params.log_directory)
7
8 ref_id_file = file(params.ref_id_file)
9
10 // Set ref_mode
11 ref_mode = params.ref_mode
12
13 // Set file headers
14 assembly_header = "Isolate_ID\tRead_Type\tRead_Location\tAssembly_Path\n"
15
16 // Set paths to accessory files/scripts
17 assembly_log = file("${log_directory}/Assembly_Data.tsv")
18 user_snpdiffs_list = file("${log_directory}/Imported_SNPDiffs.txt")
19 findReads = file("${projectDir}/bin/fetchReads.py")
20 userSNPDiffs = file("${projectDir}/bin/userSNPDiffs.py")
21
22 // Set SKESA cores to 5 or fewer
23 skesa_cpus = (params.cores as Integer) >= 5 ? 5 : (params.cores as Integer)
24
25 workflow {
26 main:
27 input_data = fetchData()
28 query_data = input_data.query_data
29 reference_data = input_data.reference_data
30 snpdiffs_data = input_data.snpdiff_data
31
32 publish:
33 query_data >> 'query_data.tsv'
34 reference_data >> 'reference_data.tsv'
35 snpdiff_data >> 'snpdiff_data.tsv'
36 }
37
38 // Top-level workflow //
39 workflow fetchData{
40
41 emit:
42 query_data
43 reference_data
44 snpdiff_data
45
46 main:
47 // Get any excluded IDs
48 ("${params.exclude}" != "" ? processExclude() : Channel.empty()).set{exclude_ids}
49
50 // Process snpdiffs alignments
51 // If assembly file cannot be found, it will be 'null'
52 ("${params.snpdiffs}" != "" ? processSNPDiffs() : Channel.empty()).set{user_snpdiffs}
53
54 excluded_snpdiffs = user_snpdiffs.map{it -> tuple(it[1],it[0])}
55 .concat(user_snpdiffs.map{it -> tuple(it[10],it[0])})
56 .join(exclude_ids,by:0)
57 .unique{it -> it[1]}
58 .map{it -> tuple(it[1],"Exclude")}
59
60 // Generate return channel: 3-item tuple (Query_ID, Reference_ID, SNPDiff_Path)
61 snpdiff_data = user_snpdiffs
62 .map{it -> tuple(it[0],it[1],it[10])}
63 .join(excluded_snpdiffs,by:0,remainder:true)
64 .filter{it -> it[0].toString() != "null"}
65 .filter{it -> it[3].toString() != "Exclude"}
66 .unique{it -> it[0]}
67 .map{it -> tuple(it[1],it[2],it[0])}
68 .collect().flatten().collate(3)
69
70 // Get assembly data from snpdiffs
71 snpdiff_assemblies = user_snpdiffs.map{it-> tuple(it[1],it[2])}
72 .concat(user_snpdiffs.map{it-> tuple(it[10],it[11])})
73 .join(exclude_ids,by:0,remainder:true)
74 .filter{it -> it[0].toString() != "null"}
75 .filter{it -> it[2].toString() != "Exclude"}
76 .map{it -> tuple(it[0],it[1],'SNPDiff')}
77 .collect().flatten().collate(3)
78
79 assembled_snpdiffs = snpdiff_assemblies
80 .filter{it -> it[1].toString() != "null"}
81 .unique{it->it[0]}.collect().flatten().collate(3)
82
83 // Process any data provided as assemblies
84 // Returns 2-item tuples with the following format: (Isolate_ID, Assembly_Path)
85 ("${params.fasta}" != "" ? fetchQueryFasta() : Channel.empty()).set{query_fasta}
86 ("${params.ref_fasta}" != "" ? fetchRefFasta() : Channel.empty()).set{ref_fasta}
87
88 pre_assembled = assembled_snpdiffs
89 .map{it -> tuple(it[0],it[1])}
90 .concat(query_fasta)
91 .concat(ref_fasta)
92 .unique{it -> it[0]}
93 .join(exclude_ids,by:0,remainder:true)
94 .filter{it -> it[0].toString() != "null"}
95 .filter{it -> it[2].toString() != "Exclude"}
96 .map{it->tuple(it[0],it[1])}
97 .collect().flatten().collate(2)
98
99 // Process any data provided as reads
100 // Returns 3-item tuples with the following format: (Isolate_ID, Read_Type, Read_Path)
101 ("${params.reads}" != "" ? fetchQueryReads() : Channel.empty()).set{query_reads}
102 ("${params.ref_reads}" != "" ? fetchRefReads() : Channel.empty()).set{ref_reads}
103
104 all_reads = query_reads
105 .concat(ref_reads)
106 .unique{it->it[0]}
107 .join(exclude_ids,by:0,remainder:true)
108 .filter{it -> it[0].toString() != "null"}
109 .filter{it -> it[3].toString() != "Exclude"}
110 .map{it->tuple(it[0],it[1],it[2])}
111 .collect().flatten().collate(3)
112
113 // Figure out if any assembly is necessary
114 fasta_read_combo = all_reads.join(pre_assembled,by:0,remainder: true) |
115 branch{it ->
116 assembly: it[1].toString() == "null"
117 return(tuple(it[0],it[2]))
118 read: it[3].toString() == "null"
119 return(tuple(it[0],it[1],it[2]))
120 combo: true
121 return(tuple(it[0],it[3]))}
122
123 // Assemble reads if necessary
124 assembled_reads = fasta_read_combo.read
125 .collect().flatten().collate(3) | assembleReads
126
127 // If runmode is 'assemble', tasks are complete
128 if(params.runmode == "assemble"){
129 query_data = Channel.empty()
130 reference_data = Channel.empty()
131 } else{
132
133 // If FASTAs are provided via data and snpdiffs, use snpdiffs (as it's already been used)
134 user_fastas = query_fasta
135 .concat(ref_fasta)
136 .concat(assembled_reads)
137 .unique{it -> it[0]}
138 .join(exclude_ids,by:0,remainder:true)
139 .filter{it -> it[0].toString() != "null"}
140 .filter{it -> it[2].toString() != "Exclude"}
141 .map{it->tuple(it[0],it[1],'User')}
142 .collect().flatten().collate(3)
143 .join(assembled_snpdiffs,by:0,remainder:true)
144 .filter{it -> it[3].toString() == "null"}
145 .map{it->tuple(it[0],it[1])}
146
147 // Get all assemblies
148 all_assembled = assembled_snpdiffs
149 .map{it -> tuple(it[0],it[1])}
150 .concat(user_fastas)
151 .unique{it->it[0]}.collect().flatten().collate(2)
152
153 // Get data for isolates where a SNPDiff was provided, but no FASTA could be located
154 no_assembly = snpdiff_assemblies
155 .map{it -> tuple(it[0],it[1])}
156 .filter{it -> it[1].toString() == "null"}
157 .unique{it -> it[0]}
158 .join(all_assembled,by:0,remainder:true)
159 .filter{it -> it[2].toString() == "null"}
160 .map{it->tuple(it[0],it[1])}
161 .collect().flatten().collate(2)
162
163 // Compile all samples
164 all_samples = all_assembled
165 .concat(no_assembly)
166 .unique{it-> it[0]}.collect().flatten().collate(2)
167
168 // If no reference data is provided return a blank channel
169 if(!ref_mode){
170 reference_data = Channel.empty()
171
172 query_data = all_samples
173 .unique{it -> it[0]}
174 .collect().flatten().collate(2)
175
176 } else{
177
178 // Process additional reference IDs
179 ("${params.ref_id}" != "" ? processRefIDs() : Channel.empty()).set{user_ref_ids}
180
181 all_ref_ids = ref_fasta.map{it->tuple(it[0])}
182 .concat(ref_reads.map{it->tuple(it[0])})
183 .concat(user_ref_ids)
184 .unique{it-> it[0]}.collect().flatten().collate(1)
185 .map{it -> tuple(it[0],"Reference")}
186 .join(exclude_ids,by:0,remainder:true)
187 .filter{it -> it[0].toString() != "null"}
188 .filter{it -> it[2].toString() != "Exclude"}
189 .map{it -> tuple(it[0],it[1])}
190
191 reference_data = all_samples
192 .join(all_ref_ids,by:0,remainder:true)
193 .filter{it -> it[2].toString() == "Reference"}
194 .map{it->tuple(it[0],it[1])}
195 .unique{it -> it[0]}
196 .collect().flatten().collate(2)
197
198 // Save reference data to file
199 reference_data
200 .collect{it -> it[0]}
201 | saveRefIDs
202
203 if(params.runmode == "screen" || params.runmode == "align"){
204 query_data = all_samples
205 .join(all_ref_ids,by:0,remainder:true)
206 .filter{it -> it[2].toString() != "Reference"}
207 .map{it->tuple(it[0],it[1])}
208 .unique{it -> it[0]}
209 .collect().flatten().collate(2)
210 } else if(params.runmode == "snp"){
211 query_data = all_samples
212 .unique{it -> it[0]}
213 .collect().flatten().collate(2)
214 }
215 }
216 }
217 }
218
219 // Fetching preassembled data //
220 workflow fetchQueryFasta{
221
222 emit:
223 query_fasta
224
225 main:
226
227 // If --fasta is set, grab assembly paths and characterize assemblies
228 ("${params.fasta}" != "" ? getAssemblies(params.fasta) : Channel.empty()).set{query_fasta}
229 }
230 workflow fetchRefFasta{
231
232 emit:
233 ref_fasta
234
235 main:
236
237 // If --fasta is set, grab assembly paths and characterize assemblies
238 ("${params.ref_fasta}" != "" ? getAssemblies(params.ref_fasta) : Channel.empty()).set{ref_fasta}
239 }
240 workflow getAssemblies{
241
242 take:
243 fasta_loc
244
245 emit:
246 fasta_data
247
248 main:
249 def trim_this = "${params.trim_name}"
250
251 if(fasta_loc == ""){
252 error "No assembly data provided via --fasta/--ref_fasta"
253 } else{
254
255 fasta_dir = file(fasta_loc)
256
257 // If --fasta is a directory...
258 if(fasta_dir.isDirectory()){
259 ch_fasta = Channel.fromPath(["${fasta_dir}/*.fa","${fasta_dir}/*.fasta","${fasta_dir}/*.fna"])
260 }
261 // If --fasta is a file...
262 else if(fasta_dir.isFile()){
263
264 // Check if it is a single fasta file...
265 if(fasta_dir.getExtension() == "fa" || fasta_dir.getExtension() == "fna" || fasta_dir.getExtension() == "fasta"){
266 ch_fasta = Channel.from(fasta_dir).map{it-> file(it)}
267 }
268 // Otherwise, assume a file with paths to FASTAs
269 else{
270 ch_fasta = Channel.from(fasta_dir.readLines()).filter{ file -> file =~ /\.(fa|fasta|fna)$/}.map{it-> file(it)}
271 }
272 } else{
273 error "$fasta_dir is not a valid directory or file..."
274 }
275 fasta_data = ch_fasta
276 .filter { file(it).exists() }
277 .map { filePath ->
278 def fileName = file(filePath).getBaseName()
279 def sampleName = fileName.replaceAll(trim_this, "")
280 tuple(sampleName, filePath)}
281 }
282 }
283 workflow processSNPDiffs{
284
285 emit:
286 snpdiffs_data
287
288 main:
289
290 if("${params.snpdiffs}" == ""){
291 error "No assembly data provided via --snpdiffs"
292 } else{
293
294 snpdiffs_dir = file("${params.snpdiffs}")
295
296 // If --fasta is a directory...
297 if(snpdiffs_dir.isDirectory()){
298 ch_snpdiffs = Channel.fromPath("${snpdiffs_dir}/*.snpdiffs")
299 }
300 // If --fasta is a file...
301 else if(snpdiffs_dir.isFile()){
302
303 // Check if it is a single fasta file...
304 if(snpdiffs_dir.getExtension() == "snpdiffs"){
305 ch_snpdiffs = Channel.from(snpdiffs_dir)
306 }
307 // Otherwise, assume a file with paths to SNPDiffs
308 else{
309 ch_snpdiffs = Channel.from(snpdiffs_dir.readLines()).filter{it->it.endsWith('.snpdiffs') }
310 }
311 } else{
312 error "$snpdiffs_dir is not a valid directory or file..."
313 }
314
315 snpdiffs_data = ch_snpdiffs
316 .filter { file(it).exists() }
317 .collect() | getSNPDiffsData | splitCsv | collect | flatten | collate(19)
318
319 // (1) SNPDiffs_File, (2) Query_ID, (3) Query_Assembly, (4) Query_Contig_Count, (5) Query_Assembly_Bases,
320 // (6) Query_N50, (7) Query_N90, (8) Query_L50, (9) Query_L90, (10) Query_SHA256,
321 // (11) Reference_ID, (12) Reference_Assembly, (13) Reference_Contig_Count, (14) Reference_Assembly_Bases,
322 // (15) Reference_N50, (16) Reference_N90, (17) Reference_L50, (18) Reference_L90, (19) Reference_SHA256
323 }
324 }
325 process getSNPDiffsData{
326 executor = 'local'
327 cpus = 1
328 maxForks = 1
329
330 input:
331 val(snpdiffs_paths)
332
333 output:
334 stdout
335
336 script:
337
338 user_snpdiffs_list.write(snpdiffs_paths.join('\n') + "\n")
339 """
340 $params.load_python_module
341 python ${userSNPDiffs} --snpdiffs_file "${user_snpdiffs_list}" --trim_name "${params.trim_name}"
342 """
343 }
344
345
346 // Fetching read data //
347 workflow fetchQueryReads{
348
349 emit:
350 query_reads
351
352 main:
353
354 // If --fasta is set, grab assembly paths and characterize assemblies
355 ("${params.reads}" != "" ? processReads(params.reads,params.readext,params.forward,params.reverse) : Channel.empty()).set{query_reads}
356 }
357 workflow fetchRefReads{
358
359 emit:
360 ref_reads
361
362 main:
363
364 // If --fasta is set, grab assembly paths and characterize assemblies
365 ("${params.ref_reads}" != "" ? processReads(params.ref_reads,params.ref_readext,params.ref_forward,params.ref_reverse) : Channel.empty()).set{ref_reads}
366 }
367 workflow processReads{
368
369 take:
370 read_loc
371 read_ext
372 forward
373 reverse
374
375 emit:
376 read_info
377
378 main:
379
380 if(read_loc == ""){
381 error "No data provided to --reads/--ref_reads"
382 } else{
383
384 read_dir = file(read_loc)
385
386 // If --reads is a single directory, get all reads from that directory
387 if(read_dir.isDirectory()){
388 read_info = fetchReads(read_dir,read_ext,forward,reverse) | splitCsv
389 }
390
391 // If --reads is a file including paths to many directories, process reads from all directories
392 else if(read_dir.isFile()){
393 read_info = fetchReads(Channel.from(read_dir.readLines()),read_ext,forward,reverse) | splitCsv
394 }
395 // Error if --reads doesn't point to a valid file or directory
396 else{
397 error "$read_dir is neither a valid file or directory..."
398 }
399 }
400 }
401 process fetchReads{
402
403 executor = 'local'
404 cpus = 1
405 maxForks = 1
406
407 input:
408 val dir // Directory containing read files
409 val read_ext // Extention for read files (e.g., fastq.gz or fq)
410 val forward_suffix // Identifier for forward reads (e.g., _1.fastq or _R1_001.fq.gz)
411 val reverse_suffix // Identifier for reverse reads (e.g., _2.fastq or _R2_001.fq.gz)
412
413 output:
414 stdout
415
416 script:
417
418 if(!file(dir).isDirectory()){
419 error "$dir is not a valid directory..."
420 } else{
421 """
422 $params.load_python_module
423 python ${findReads} --read_dir ${dir} --read_filetype ${read_ext} --forward_suffix ${forward_suffix} --reverse_suffix ${reverse_suffix} --trim_name ${params.trim_name}
424 """
425 }
426 }
427
428 // Fetch reference IDs //
429 workflow processRefIDs{
430
431 emit:
432 ref_ids
433
434 main:
435 def trim_this = "${params.trim_name}"
436
437 ref_ids = params.ref_id
438 .tokenize(',')
439 .unique()
440 .collect { it ->
441 "${it}".replaceAll(trim_this, "")}
442 .flatten()
443 }
444
445 // Fetch reference IDs //
446 workflow processExclude{
447
448 emit:
449 exclude_ids
450
451 main:
452 def trim_this = "${params.trim_name}"
453
454 exclude_ids = Channel.from(params.exclude
455 .tokenize(',')
456 .collect { it -> "${it}".replaceAll(trim_this, "")})
457 .map{it -> tuple(it.toString(),"Exclude")}
458 .unique{it -> it[0]}
459 }
460
461 process saveRefIDs{
462 executor = 'local'
463 cpus = 1
464 maxForks = 1
465
466 input:
467 val(ref_ids)
468
469 script:
470 ref_id_file.append(ref_ids.join('\n') + '\n')
471 """
472 """
473 }
474
475 // Assembly //
476 workflow assembleReads{
477
478 take:
479 to_assemble
480
481 emit:
482 assembled_data
483
484 main:
485
486 // Run SKESA on each entry
487 assembly_output = skesaAssemble(to_assemble).splitCsv()
488
489 // Print log of assemblies
490 assembly_output.map {it -> it.join("\t")}.collect() | saveAssemblyLog
491
492 // Return assembly data
493 assembled_data = assembly_output.map{it->tuple(it[0],it[3])}
494 }
495 process skesaAssemble{
496 memory '12 GB' // Add readcount/memory check?
497
498 input:
499 tuple val(sample_name),val(read_type),val(read_location)
500
501 output:
502 stdout
503
504 script:
505 assembly_file = file("${assembly_directory}/${sample_name}.fasta")
506
507 // Ensure folder exists and file doesn't
508 if(!assembly_directory.isDirectory()){
509 error "$assembly_directory is not a valid directory..."
510 } else if(assembly_file.isFile()){
511 error "$assembly_file already exists..."
512 } else if(read_type == "Paired"){
513 forward_reverse = read_location.split(";")
514 """
515 $params.load_skesa_module
516 skesa --cores ${skesa_cpus} --use_paired_ends --fastq ${forward_reverse[0]} ${forward_reverse[1]} --contigs_out ${assembly_file}
517 echo "${sample_name},${read_type},${read_location},${assembly_file}"
518 """
519 } else if(read_type == "Single"){
520 """
521 $params.load_skesa_module
522 skesa --cores ${skesa_cpus} --fastq ${read_location} --contigs_out ${assembly_file}
523 echo "${sample_name},${read_type},${read_location},${assembly_file}"
524 """
525 } else{
526 error "read_type should be Paired or Single, not $read_type..."
527 }
528 }
529 process saveAssemblyLog{
530 executor = 'local'
531 cpus = 1
532 maxForks = 1
533
534 input:
535 val(assembly_data)
536
537 script:
538 assembly_log.write(assembly_header)
539 assembly_log.append(assembly_data.join('\n') + '\n')
540 """
541 """
542 }