comparison 0.2.1/workflows/centriflaken_hy.nf @ 0:77494b0fa3c7

"planemo upload"
author kkonganti
date Mon, 27 Jun 2022 15:55:37 -0400
parents
children 30191f39a957
comparison
equal deleted inserted replaced
-1:000000000000 0:77494b0fa3c7
1 // Define any required imports for this specific workflow
2 import java.nio.file.Paths
3 import nextflow.file.FileHelper
4
5 // Include any necessary methods
6 include { \
7 summaryOfParams; stopNow; fastqEntryPointHelp; sendMail; \
8 addPadding; wrapUpHelp } from "${params.routines}"
9 include { kraken2Help } from "${params.toolshelp}${params.fs}kraken2"
10 include { centrifugeHelp } from "${params.toolshelp}${params.fs}centrifuge"
11 include { spadesHelp } from "${params.toolshelp}${params.fs}spades"
12 include { serotypefinderHelp } from "${params.toolshelp}${params.fs}serotypefinder"
13 include { seqsero2Help } from "${params.toolshelp}${params.fs}seqsero2"
14 include { mlstHelp } from "${params.toolshelp}${params.fs}mlst"
15 include { abricateHelp } from "${params.toolshelp}${params.fs}abricate"
16
17 // Exit if help requested before any subworkflows
18 if (params.help) {
19 log.info help()
20 exit 0
21 }
22
23 // Include any necessary modules and subworkflows
24 include { PROCESS_FASTQ } from "${params.subworkflows}${params.fs}process_fastq"
25 include { FASTQC } from "${params.modules}${params.fs}fastqc${params.fs}main"
26 include { CENTRIFUGE_CLASSIFY } from "${params.modules}${params.fs}centrifuge${params.fs}classify${params.fs}main"
27 include { CENTRIFUGE_PROCESS } from "${params.modules}${params.fs}centrifuge${params.fs}process${params.fs}main"
28 include { SEQKIT_GREP } from "${params.modules}${params.fs}seqkit${params.fs}grep${params.fs}main"
29 include { SPADES_ASSEMBLE } from "${params.modules}${params.fs}spades${params.fs}assemble${params.fs}main"
30 include { KRAKEN2_CLASSIFY } from "${params.modules}${params.fs}kraken2${params.fs}classify${params.fs}main"
31 include { KRAKEN2_EXTRACT_CONTIGS } from "${params.modules}${params.fs}kraken2${params.fs}extract_contigs${params.fs}main"
32 include { SEROTYPEFINDER } from "${params.modules}${params.fs}serotypefinder${params.fs}main"
33 include { SEQSERO2 } from "${params.modules}${params.fs}seqsero2${params.fs}main"
34 include { MLST } from "${params.modules}${params.fs}mlst${params.fs}main"
35 include { ABRICATE_RUN } from "${params.modules}${params.fs}abricate${params.fs}run${params.fs}main"
36 include { ABRICATE_SUMMARY } from "${params.modules}${params.fs}abricate${params.fs}summary${params.fs}main"
37 include { TABLE_SUMMARY } from "${params.modules}${params.fs}cat${params.fs}tables${params.fs}main"
38 include { MULTIQC } from "${params.modules}${params.fs}multiqc${params.fs}main"
39 include { DUMP_SOFTWARE_VERSIONS } from "${params.modules}${params.fs}custom${params.fs}dump_software_versions${params.fs}main"
40
41
42
43 /*
44 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
45 INPUTS AND ANY CHECKS FOR THE CENTRIFLAKEN-HY WORKFLOW
46 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
47 */
48
49 def kraken2_db_dir = file ( "${params.kraken2_db}" )
50 def centrifuge_x = file ( "${params.centrifuge_x}" )
51 def spades_custom_hmm = (params.spades_hmm ? file ( "${params.spades_hmm}" ) : false)
52 def reads_platform = 0
53 def abricate_dbs = [ 'ncbiamrplus', 'resfinder', 'megares', 'argannot' ]
54
55 reads_platform += (params.input ? 1 : 0)
56
57 if (!kraken2_db_dir.exists() || !centrifuge_x.getParent().exists()) {
58 stopNow("Please check if the following absolute paths are valid:\n" +
59 "${params.kraken2_db}\n${params.centrifuge_x}\n" +
60 "Cannot proceed further!")
61 }
62
63 if (spades_custom_hmm && !spades_custom_hmm.exists()) {
64 stopNow("Please check if the following SPAdes' custom HMM directory\n" +
65 "path is valid:\n${params.spades_hmm}\nCannot proceed further!")
66 }
67
68 if (reads_platform < 1 || reads_platform == 0) {
69 stopNow("Please mention at least one absolute path to input folder which contains\n" +
70 "FASTQ files sequenced using the --input option.\n" +
71 "Ex: --input (Illumina or Generic short reads in FASTQ format)")
72 }
73
74 if (params.centrifuge_extract_bug != params.kraken2_extract_bug) {
75 stopNow("Please make sure that the bug to be extracted is same\n" +
76 "for both --centrifuge_extract_bug and --kraken2_extract_bug options.")
77 }
78
79 /*
80 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
81 RUN THE CENTRIFLAKEN-HY WORKFLOW
82 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
83 */
84
85 workflow CENTRIFLAKEN_HY {
86 main:
87 ch_asm_filtered_contigs = Channel.empty()
88 ch_mqc_custom_tbl = Channel.empty()
89 ch_dummy = Channel.fromPath("${params.dummyfile}")
90 ch_dummy2 = Channel.fromPath("${params.dummyfile2}")
91
92 log.info summaryOfParams()
93
94 PROCESS_FASTQ()
95 .processed_reads
96 .map {
97 meta, fastq ->
98 meta.centrifuge_x = params.centrifuge_x
99 meta.kraken2_db = params.kraken2_db
100 [meta, fastq]
101 }
102 .set { ch_processed_reads }
103
104 PROCESS_FASTQ
105 .out
106 .versions
107 .set { software_versions }
108
109 FASTQC ( ch_processed_reads )
110
111 CENTRIFUGE_CLASSIFY( ch_processed_reads )
112
113 CENTRIFUGE_PROCESS(
114 CENTRIFUGE_CLASSIFY.out.report
115 .join( CENTRIFUGE_CLASSIFY.out.output )
116 )
117
118 ch_processed_reads.join( CENTRIFUGE_PROCESS.out.extracted )
119 .set { ch_centrifuge_extracted }
120
121 SEQKIT_GREP ( ch_centrifuge_extracted )
122
123 // As of 06/02/2022, with the upcoming newer versions of NextFlow, we will be able to do
124 // allowNull: true for both input and output, but until then, we have to use dummy files.
125 // and work arounds.
126 // https://github.com/nextflow-io/nextflow/pull/2893
127 SPADES_ASSEMBLE (
128 SEQKIT_GREP.out.fastx
129 .combine(ch_dummy)
130 .combine(ch_dummy2)
131 )
132
133 SPADES_ASSEMBLE
134 .out
135 .assembly
136 .set { ch_spades_assembly }
137
138 ch_spades_assembly
139 .map {
140 meta, fastq ->
141 meta.is_assembly = true
142 [meta, fastq]
143 }
144 .set { ch_spades_assembly }
145
146 ch_spades_assembly.ifEmpty { [ false, false ] }
147
148 KRAKEN2_CLASSIFY( ch_spades_assembly )
149
150 KRAKEN2_EXTRACT_CONTIGS (
151 ch_spades_assembly
152 .join( KRAKEN2_CLASSIFY.out.kraken_output ),
153 params.kraken2_extract_bug
154 )
155
156 KRAKEN2_EXTRACT_CONTIGS
157 .out
158 .asm_filtered_contigs
159 .map {
160 meta, fastq ->
161 meta.organism = params.kraken2_extract_bug.split(/\s+/)[0].capitalize()
162 meta.serotypefinder_db = params.serotypefinder_db
163 [meta, fastq]
164 }
165 .set { ch_asm_filtered_contigs }
166
167 SEROTYPEFINDER ( ch_asm_filtered_contigs )
168
169 SEQSERO2 ( ch_asm_filtered_contigs )
170
171 MLST ( ch_asm_filtered_contigs )
172
173 ABRICATE_RUN (
174 ch_asm_filtered_contigs,
175 abricate_dbs
176 )
177
178 ABRICATE_RUN
179 .out
180 .abricated
181 .map { meta, abres -> [ abricate_dbs, abres ] }
182 .groupTuple(by: [0])
183 .map { it -> tuple ( it[0], it[1].flatten() ) }
184 .set { ch_abricated }
185
186 ABRICATE_SUMMARY ( ch_abricated )
187
188 CENTRIFUGE_CLASSIFY.out.kreport
189 .map { meta, kreport -> [ kreport ] }
190 .flatten()
191 .concat (
192 KRAKEN2_CLASSIFY.out.kraken_report
193 .map { meta, kreport -> [ kreport ] }
194 .flatten(),
195 FASTQC.out.zip
196 .map { meta, zip -> [ zip ] }
197 .flatten()
198 )
199 .set { ch_mqc_classify }
200
201 if (params.serotypefinder_run) {
202 SEROTYPEFINDER
203 .out
204 .serotyped
205 .map { meta, tsv -> [ 'serotypefinder', tsv ] }
206 .groupTuple(by: [0])
207 .map { it -> tuple ( it[0], it[1].flatten() ) }
208 .set { ch_mqc_custom_tbl }
209 } else if (params.seqsero2_run) {
210 SEQSERO2
211 .out
212 .serotyped
213 .map { meta, tsv -> [ 'seqsero2', tsv ] }
214 .groupTuple(by: [0])
215 .map { it -> tuple ( it[0], it[1].flatten() ) }
216 .set { ch_mqc_custom_tbl }
217 }
218
219 ch_mqc_custom_tbl
220 .concat (
221 ABRICATE_SUMMARY.out.ncbiamrplus.map{ it -> tuple ( it[0], it[1] )},
222 ABRICATE_SUMMARY.out.resfinder.map{ it -> tuple ( it[0], it[1] )},
223 ABRICATE_SUMMARY.out.megares.map{ it -> tuple ( it[0], it[1] )},
224 ABRICATE_SUMMARY.out.argannot.map{ it -> tuple ( it[0], it[1] )},
225 )
226 .groupTuple(by: [0])
227 .map { it -> [ it[0], it[1].flatten() ]}
228 .set { ch_mqc_custom_tbl }
229
230 TABLE_SUMMARY( ch_mqc_custom_tbl )
231
232 DUMP_SOFTWARE_VERSIONS (
233 software_versions
234 .mix (
235 FASTQC.out.versions,
236 CENTRIFUGE_CLASSIFY.out.versions,
237 CENTRIFUGE_PROCESS.out.versions,
238 SEQKIT_GREP.out.versions,
239 SPADES_ASSEMBLE.out.versions.ifEmpty(null),
240 KRAKEN2_CLASSIFY.out.versions.ifEmpty(null),
241 KRAKEN2_EXTRACT_CONTIGS.out.versions.ifEmpty(null),
242 SEROTYPEFINDER.out.versions.ifEmpty(null),
243 SEQSERO2.out.versions.ifEmpty(null),
244 MLST.out.versions.ifEmpty(null),
245 ABRICATE_RUN.out.versions.ifEmpty(null),
246 ABRICATE_SUMMARY.out.versions.ifEmpty(null),
247 TABLE_SUMMARY.out.versions.ifEmpty(null)
248 )
249 .unique()
250 .collectFile(name: 'collected_versions.yml')
251 )
252
253 DUMP_SOFTWARE_VERSIONS
254 .out
255 .mqc_yml
256 .concat (
257 ch_mqc_classify,
258 TABLE_SUMMARY.out.mqc_yml
259 )
260 .collect()
261 .set { ch_multiqc }
262
263 MULTIQC( ch_multiqc )
264 }
265
266 /*
267 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
268 ON COMPLETE, SHOW GORY DETAILS OF ALL PARAMS WHICH WILL BE HELPFUL TO DEBUG
269 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
270 */
271
272 workflow.onComplete {
273 if (workflow.success) {
274 // CREATE APPROPRIATE DIRECTORIES AND MOVE AS REQUESTED BY STAKEHOLDER(S)
275 //
276 // Nextflow's .moveTo will error out if directories contain files and it
277 // would be complex to include logic to skip directories
278 //
279 def final_intermediate_dir = "${params.output}${params.fs}${params.pipeline}-steps"
280 def final_results_dir = "${params.output}${params.fs}${params.pipeline}-results"
281 def kraken2_ext_contigs = file( "${final_intermediate_dir}${params.fs}kraken2_extract_contigs", type: 'dir' )
282 def final_intermediate = file( final_intermediate_dir, type: 'dir' )
283 def final_results = file( final_results_dir, type: 'dir' )
284 def pipeline_output = file( params.output, type: 'dir' )
285
286 if ( !final_intermediate.exists() ) {
287 final_intermediate.mkdirs()
288
289 FileHelper.visitFiles(Paths.get("${params.output}"), '*') {
290 if ( !(it.name ==~ /^(${params.cfsanpipename}|multiqc|\.nextflow|${workflow.workDir.name}|${params.pipeline}).*/) ) {
291 FileHelper.movePath(
292 it, Paths.get( "${final_intermediate_dir}${params.fs}${it.name}" )
293 )
294 }
295 }
296 }
297
298 if ( kraken2_ext_contigs.exists() && !final_results.exists() ) {
299 final_results.mkdirs()
300
301 FileHelper.movePath(
302 Paths.get( "${final_intermediate_dir}${params.fs}kraken2_extract_contigs" ),
303 Paths.get( "${final_results_dir}${params.fs}kraken2_extract_contigs" )
304 )
305 }
306
307 sendMail()
308 }
309 }
310
311 workflow.onError {
312 sendMail()
313 }
314
315 /*
316 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
317 HELPER METHODS FOR CENTRIFLAKEN-HY WORKFLOW
318 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
319 */
320
321 def help() {
322
323 Map helptext = [:]
324
325 helptext.putAll(
326 fastqEntryPointHelp() +
327 kraken2Help(params).text +
328 centrifugeHelp(params).text +
329 spadesHelp(params).text +
330 serotypefinderHelp(params).text +
331 seqsero2Help(params).text +
332 mlstHelp(params).text +
333 abricateHelp(params).text +
334 wrapUpHelp()
335 )
336
337 return addPadding(helptext)
338 }