comparison 0.4.2/workflows/centriflaken.nf @ 105:52045ea4679d

"planemo upload"
author kkonganti
date Thu, 27 Jun 2024 14:17:26 -0400
parents
children
comparison
equal deleted inserted replaced
104:17890124001d 105:52045ea4679d
1 // Define any required imports for this specific workflow
2 import java.nio.file.Paths
3 import nextflow.file.FileHelper
4
5 // Include any necessary methods
6 include { \
7 summaryOfParams; stopNow; fastqEntryPointHelp; sendMail; \
8 addPadding; wrapUpHelp } from "${params.routines}"
9 include { kraken2Help } from "${params.toolshelp}${params.fs}kraken2"
10 include { centrifugeHelp } from "${params.toolshelp}${params.fs}centrifuge"
11 include { flyeHelp } from "${params.toolshelp}${params.fs}flye"
12 include { serotypefinderHelp } from "${params.toolshelp}${params.fs}serotypefinder"
13 include { seqsero2Help } from "${params.toolshelp}${params.fs}seqsero2"
14 include { mlstHelp } from "${params.toolshelp}${params.fs}mlst"
15 include { abricateHelp } from "${params.toolshelp}${params.fs}abricate"
16
17 // Exit if help requested before any subworkflows
18 if (params.help) {
19 log.info help()
20 exit 0
21 }
22
23 // Include any necessary modules and subworkflows
24 include { PROCESS_FASTQ } from "${params.subworkflows}${params.fs}process_fastq"
25 include { FASTQC } from "${params.modules}${params.fs}fastqc${params.fs}main"
26 include { CENTRIFUGE_CLASSIFY } from "${params.modules}${params.fs}centrifuge${params.fs}classify${params.fs}main"
27 include { CENTRIFUGE_PROCESS } from "${params.modules}${params.fs}centrifuge${params.fs}process${params.fs}main"
28 include { SEQKIT_GREP } from "${params.modules}${params.fs}seqkit${params.fs}grep${params.fs}main"
29 include { FLYE_ASSEMBLE } from "${params.modules}${params.fs}flye${params.fs}assemble${params.fs}main"
30 include { KRAKEN2_CLASSIFY } from "${params.modules}${params.fs}kraken2${params.fs}classify${params.fs}main"
31 include { KRAKEN2_EXTRACT_CONTIGS } from "${params.modules}${params.fs}kraken2${params.fs}extract_contigs${params.fs}main"
32 include { SEROTYPEFINDER } from "${params.modules}${params.fs}serotypefinder${params.fs}main"
33 include { SEQSERO2 } from "${params.modules}${params.fs}seqsero2${params.fs}main"
34 include { MLST } from "${params.modules}${params.fs}mlst${params.fs}main"
35 include { ABRICATE_RUN } from "${params.modules}${params.fs}abricate${params.fs}run${params.fs}main"
36 include { ABRICATE_SUMMARY } from "${params.modules}${params.fs}abricate${params.fs}summary${params.fs}main"
37 include { TABLE_SUMMARY } from "${params.modules}${params.fs}cat${params.fs}tables${params.fs}main"
38 include { MULTIQC } from "${params.modules}${params.fs}multiqc${params.fs}main"
39 include { DUMP_SOFTWARE_VERSIONS } from "${params.modules}${params.fs}custom${params.fs}dump_software_versions${params.fs}main"
40
41
42
43 /*
44 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
45 INPUTS AND ANY CHECKS FOR THE CENTRIFLAKEN WORKFLOW
46 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
47 */
48
49 def kraken2_db_dir = file ( "${params.kraken2_db}" )
50 def centrifuge_x = file ( "${params.centrifuge_x}" )
51 def reads_platform = 0
52 def abricate_dbs = [ 'ncbiamrplus', 'resfinder', 'megares', 'argannot' ]
53
54 reads_platform += (params.flye_nano_raw ? 1 : 0)
55 reads_platform += (params.flye_nano_corr ? 1 : 0)
56 reads_platform += (params.flye_nano_hq ? 1 : 0)
57 reads_platform += (params.flye_pacbio_raw ? 1 : 0)
58 reads_platform += (params.flye_pacbio_corr ? 1 : 0)
59 reads_platform += (params.flye_pacbio_hifi ? 1 : 0)
60
61 if (!kraken2_db_dir.exists() || !centrifuge_x.getParent().exists()) {
62 stopNow("Please check if the following absolute paths are valid:\n" +
63 "${params.kraken2_db}\n${params.centrifuge_x}\n" +
64 "Cannot proceed further!")
65 }
66
67 if (reads_platform > 1 || reads_platform == 0) {
68 msg_0 = (reads_platform > 1 ? "only" : "at least")
69 stopNow("Please mention ${msg_0} one read platform for use with the flye assembler\n" +
70 "using any one of the following options:\n" +
71 "--flye_nano_raw\n--flye_nano_corr\n--flye_nano_hq\n" +
72 "--flye_pacbio_raw\n--flye_pacbio_corr\n--flye_pacbio_hifi")
73 }
74
75 if (params.centrifuge_extract_bug != params.kraken2_extract_bug) {
76 stopNow("Please make sure that the bug to be extracted is same\n" +
77 "for both --centrifuge_extract_bug and --kraken2_extract_bug options.")
78 }
79
80 /*
81 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
82 RUN THE CENTRIFLAKEN WORKFLOW
83 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
84 */
85
86 workflow CENTRIFLAKEN {
87 main:
88 ch_asm_filtered_contigs = Channel.empty()
89 ch_mqc_custom_tbl = Channel.empty()
90
91 log.info summaryOfParams()
92
93 PROCESS_FASTQ()
94 .processed_reads
95 .map {
96 meta, fastq ->
97 meta.centrifuge_x = params.centrifuge_x
98 meta.kraken2_db = params.kraken2_db
99 [meta, fastq]
100 }
101 .set { ch_processed_reads }
102
103 PROCESS_FASTQ
104 .out
105 .versions
106 .set { software_versions }
107
108 FASTQC ( ch_processed_reads )
109
110 CENTRIFUGE_CLASSIFY ( ch_processed_reads )
111
112 CENTRIFUGE_PROCESS (
113 CENTRIFUGE_CLASSIFY.out.report
114 .join( CENTRIFUGE_CLASSIFY.out.output )
115 )
116
117 ch_processed_reads.join ( CENTRIFUGE_PROCESS.out.extracted )
118 .set { ch_centrifuge_extracted }
119
120 SEQKIT_GREP ( ch_centrifuge_extracted )
121
122 FLYE_ASSEMBLE ( SEQKIT_GREP.out.fastx )
123
124 FLYE_ASSEMBLE
125 .out
126 .assembly
127 .set { ch_flye_assembly }
128
129 ch_flye_assembly
130 .map {
131 meta, fastq ->
132 meta.is_assembly = true
133 [meta, fastq]
134 }
135 .set { ch_flye_assembly }
136
137 ch_flye_assembly.ifEmpty { [ false, false ] }
138
139 KRAKEN2_CLASSIFY ( ch_flye_assembly )
140
141 KRAKEN2_EXTRACT_CONTIGS (
142 ch_flye_assembly
143 .join( KRAKEN2_CLASSIFY.out.kraken_output ),
144 params.kraken2_extract_bug
145 )
146
147 KRAKEN2_EXTRACT_CONTIGS
148 .out
149 .asm_filtered_contigs
150 .map {
151 meta, fastq ->
152 meta.organism = params.kraken2_extract_bug.split(/\s+/)[0].capitalize()
153 meta.serotypefinder_db = params.serotypefinder_db
154 [meta, fastq]
155 }
156 .set { ch_asm_filtered_contigs }
157
158 SEROTYPEFINDER ( ch_asm_filtered_contigs )
159
160 SEQSERO2 ( ch_asm_filtered_contigs )
161
162 MLST ( ch_asm_filtered_contigs )
163
164 ABRICATE_RUN (
165 ch_asm_filtered_contigs,
166 abricate_dbs
167 )
168
169 ABRICATE_RUN
170 .out
171 .abricated
172 .map { meta, abres -> [ abricate_dbs, abres ] }
173 .groupTuple(by: [0])
174 .map { it -> tuple ( it[0], it[1].flatten() ) }
175 .set { ch_abricated }
176
177 ABRICATE_SUMMARY ( ch_abricated )
178
179 // ABRICATE_SUMMARY.out.ecoli_vf.set { ch_abricate_summary_ecoli_vf }
180 // ch_abricate_summary_ecoli_vf.ifEmpty { [ false, false ] }
181
182 CENTRIFUGE_CLASSIFY.out.kreport
183 .map { meta, kreport -> [ kreport ] }
184 .flatten()
185 .concat (
186 KRAKEN2_CLASSIFY.out.kraken_report
187 .map { meta, kreport -> [ kreport ] }
188 .flatten(),
189 FASTQC.out.zip
190 .map { meta, zip -> [ zip ] }
191 .flatten()
192 )
193 .set { ch_mqc_classify }
194
195 if (params.serotypefinder_run) {
196 SEROTYPEFINDER
197 .out
198 .serotyped
199 .map { meta, tsv -> [ 'serotypefinder', tsv ] }
200 .groupTuple(by: [0])
201 .map { it -> tuple ( it[0], it[1].flatten() ) }
202 .set { ch_mqc_custom_tbl }
203 } else if (params.seqsero2_run) {
204 SEQSERO2
205 .out
206 .serotyped
207 .map { meta, tsv -> [ 'seqsero2', tsv ] }
208 .groupTuple(by: [0])
209 .map { it -> tuple ( it[0], it[1].flatten() ) }
210 .set { ch_mqc_custom_tbl }
211 }
212
213 ch_mqc_custom_tbl
214 .concat (
215 ABRICATE_SUMMARY.out.ncbiamrplus.map{ it -> tuple ( it[0], it[1] )},
216 ABRICATE_SUMMARY.out.resfinder.map{ it -> tuple ( it[0], it[1] )},
217 ABRICATE_SUMMARY.out.megares.map{ it -> tuple ( it[0], it[1] )},
218 ABRICATE_SUMMARY.out.argannot.map{ it -> tuple ( it[0], it[1] )},
219 )
220 .groupTuple(by: [0])
221 .map { it -> [ it[0], it[1].flatten() ]}
222 .set { ch_mqc_custom_tbl }
223
224 TABLE_SUMMARY ( ch_mqc_custom_tbl )
225
226 DUMP_SOFTWARE_VERSIONS (
227 software_versions
228 .mix (
229 FASTQC.out.versions,
230 CENTRIFUGE_CLASSIFY.out.versions,
231 CENTRIFUGE_PROCESS.out.versions,
232 SEQKIT_GREP.out.versions,
233 FLYE_ASSEMBLE.out.versions.ifEmpty(null),
234 KRAKEN2_CLASSIFY.out.versions.ifEmpty(null),
235 KRAKEN2_EXTRACT_CONTIGS.out.versions.ifEmpty(null),
236 SEROTYPEFINDER.out.versions.ifEmpty(null),
237 SEQSERO2.out.versions.ifEmpty(null),
238 MLST.out.versions.ifEmpty(null),
239 ABRICATE_RUN.out.versions.ifEmpty(null),
240 ABRICATE_SUMMARY.out.versions.ifEmpty(null),
241 TABLE_SUMMARY.out.versions.ifEmpty(null)
242 )
243 .unique()
244 .collectFile(name: 'collected_versions.yml')
245 )
246
247 DUMP_SOFTWARE_VERSIONS
248 .out
249 .mqc_yml
250 .concat (
251 ch_mqc_classify,
252 TABLE_SUMMARY.out.mqc_yml
253 )
254 .collect()
255 .set { ch_multiqc }
256
257 MULTIQC ( ch_multiqc )
258 }
259
260 /*
261 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
262 ON COMPLETE, SHOW GORY DETAILS OF ALL PARAMS WHICH WILL BE HELPFUL TO DEBUG
263 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
264 */
265
266 workflow.onComplete {
267 if (workflow.success) {
268 // CREATE APPROPRIATE DIRECTORIES AND MOVE AS REQUESTED BY STAKEHOLDER(S)
269 //
270 // Nextflow's .moveTo will error out if directories contain files and it
271 // would be complex to include logic to skip directories
272 //
273 def final_intermediate_dir = "${params.output}${params.fs}${params.pipeline}-steps"
274 def final_results_dir = "${params.output}${params.fs}${params.pipeline}-results"
275 def kraken2_ext_contigs = file( "${final_intermediate_dir}${params.fs}kraken2_extract_contigs", type: 'dir' )
276 def final_intermediate = file( final_intermediate_dir, type: 'dir' )
277 def final_results = file( final_results_dir, type: 'dir' )
278 def pipeline_output = file( params.output, type: 'dir' )
279
280 if ( !final_intermediate.exists() ) {
281 final_intermediate.mkdirs()
282
283 FileHelper.visitFiles(Paths.get("${params.output}"), '*') {
284 if ( !(it.name ==~ /^(${params.cfsanpipename}|multiqc|\.nextflow|${workflow.workDir.name}|${params.pipeline}).*/) ) {
285 FileHelper.movePath(
286 it, Paths.get( "${final_intermediate_dir}${params.fs}${it.name}" )
287 )
288 }
289 }
290 }
291
292 if ( kraken2_ext_contigs.exists() && !final_results.exists() ) {
293 final_results.mkdirs()
294
295 FileHelper.movePath(
296 Paths.get( "${final_intermediate_dir}${params.fs}kraken2_extract_contigs" ),
297 Paths.get( "${final_results_dir}${params.fs}kraken2_extract_contigs" )
298 )
299 }
300
301 sendMail()
302 }
303 }
304
305 workflow.onError {
306 sendMail()
307 }
308
309 /*
310 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
311 HELPER METHODS FOR CENTRIFLAKEN WORKFLOW
312 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
313 */
314
315 def help() {
316
317 Map helptext = [:]
318
319 helptext.putAll (
320 fastqEntryPointHelp() +
321 kraken2Help(params).text +
322 centrifugeHelp(params).text +
323 flyeHelp(params).text +
324 serotypefinderHelp(params).text +
325 seqsero2Help(params).text +
326 mlstHelp(params).text +
327 abricateHelp(params).text +
328 wrapUpHelp()
329 )
330
331 return addPadding(helptext)
332 }