Mercurial > repos > kkonganti > cfsan_centriflaken
diff 0.2.1/workflows/centriflaken_hy.nf @ 0:77494b0fa3c7
"planemo upload"
author | kkonganti |
---|---|
date | Mon, 27 Jun 2022 15:55:37 -0400 |
parents | |
children | 30191f39a957 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/0.2.1/workflows/centriflaken_hy.nf Mon Jun 27 15:55:37 2022 -0400 @@ -0,0 +1,338 @@ +// Define any required imports for this specific workflow +import java.nio.file.Paths +import nextflow.file.FileHelper + +// Include any necessary methods +include { \ + summaryOfParams; stopNow; fastqEntryPointHelp; sendMail; \ + addPadding; wrapUpHelp } from "${params.routines}" +include { kraken2Help } from "${params.toolshelp}${params.fs}kraken2" +include { centrifugeHelp } from "${params.toolshelp}${params.fs}centrifuge" +include { spadesHelp } from "${params.toolshelp}${params.fs}spades" +include { serotypefinderHelp } from "${params.toolshelp}${params.fs}serotypefinder" +include { seqsero2Help } from "${params.toolshelp}${params.fs}seqsero2" +include { mlstHelp } from "${params.toolshelp}${params.fs}mlst" +include { abricateHelp } from "${params.toolshelp}${params.fs}abricate" + +// Exit if help requested before any subworkflows +if (params.help) { + log.info help() + exit 0 +} + +// Include any necessary modules and subworkflows +include { PROCESS_FASTQ } from "${params.subworkflows}${params.fs}process_fastq" +include { FASTQC } from "${params.modules}${params.fs}fastqc${params.fs}main" +include { CENTRIFUGE_CLASSIFY } from "${params.modules}${params.fs}centrifuge${params.fs}classify${params.fs}main" +include { CENTRIFUGE_PROCESS } from "${params.modules}${params.fs}centrifuge${params.fs}process${params.fs}main" +include { SEQKIT_GREP } from "${params.modules}${params.fs}seqkit${params.fs}grep${params.fs}main" +include { SPADES_ASSEMBLE } from "${params.modules}${params.fs}spades${params.fs}assemble${params.fs}main" +include { KRAKEN2_CLASSIFY } from "${params.modules}${params.fs}kraken2${params.fs}classify${params.fs}main" +include { KRAKEN2_EXTRACT_CONTIGS } from "${params.modules}${params.fs}kraken2${params.fs}extract_contigs${params.fs}main" +include { SEROTYPEFINDER } from "${params.modules}${params.fs}serotypefinder${params.fs}main" +include { SEQSERO2 } from "${params.modules}${params.fs}seqsero2${params.fs}main" +include { MLST } from "${params.modules}${params.fs}mlst${params.fs}main" +include { ABRICATE_RUN } from "${params.modules}${params.fs}abricate${params.fs}run${params.fs}main" +include { ABRICATE_SUMMARY } from "${params.modules}${params.fs}abricate${params.fs}summary${params.fs}main" +include { TABLE_SUMMARY } from "${params.modules}${params.fs}cat${params.fs}tables${params.fs}main" +include { MULTIQC } from "${params.modules}${params.fs}multiqc${params.fs}main" +include { DUMP_SOFTWARE_VERSIONS } from "${params.modules}${params.fs}custom${params.fs}dump_software_versions${params.fs}main" + + + +/* +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + INPUTS AND ANY CHECKS FOR THE CENTRIFLAKEN-HY WORKFLOW +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +*/ + +def kraken2_db_dir = file ( "${params.kraken2_db}" ) +def centrifuge_x = file ( "${params.centrifuge_x}" ) +def spades_custom_hmm = (params.spades_hmm ? file ( "${params.spades_hmm}" ) : false) +def reads_platform = 0 +def abricate_dbs = [ 'ncbiamrplus', 'resfinder', 'megares', 'argannot' ] + +reads_platform += (params.input ? 1 : 0) + +if (!kraken2_db_dir.exists() || !centrifuge_x.getParent().exists()) { + stopNow("Please check if the following absolute paths are valid:\n" + + "${params.kraken2_db}\n${params.centrifuge_x}\n" + + "Cannot proceed further!") +} + +if (spades_custom_hmm && !spades_custom_hmm.exists()) { + stopNow("Please check if the following SPAdes' custom HMM directory\n" + + "path is valid:\n${params.spades_hmm}\nCannot proceed further!") +} + +if (reads_platform < 1 || reads_platform == 0) { + stopNow("Please mention at least one absolute path to input folder which contains\n" + + "FASTQ files sequenced using the --input option.\n" + + "Ex: --input (Illumina or Generic short reads in FASTQ format)") +} + +if (params.centrifuge_extract_bug != params.kraken2_extract_bug) { + stopNow("Please make sure that the bug to be extracted is same\n" + + "for both --centrifuge_extract_bug and --kraken2_extract_bug options.") +} + +/* +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + RUN THE CENTRIFLAKEN-HY WORKFLOW +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +*/ + +workflow CENTRIFLAKEN_HY { + main: + ch_asm_filtered_contigs = Channel.empty() + ch_mqc_custom_tbl = Channel.empty() + ch_dummy = Channel.fromPath("${params.dummyfile}") + ch_dummy2 = Channel.fromPath("${params.dummyfile2}") + + log.info summaryOfParams() + + PROCESS_FASTQ() + .processed_reads + .map { + meta, fastq -> + meta.centrifuge_x = params.centrifuge_x + meta.kraken2_db = params.kraken2_db + [meta, fastq] + } + .set { ch_processed_reads } + + PROCESS_FASTQ + .out + .versions + .set { software_versions } + + FASTQC ( ch_processed_reads ) + + CENTRIFUGE_CLASSIFY( ch_processed_reads ) + + CENTRIFUGE_PROCESS( + CENTRIFUGE_CLASSIFY.out.report + .join( CENTRIFUGE_CLASSIFY.out.output ) + ) + + ch_processed_reads.join( CENTRIFUGE_PROCESS.out.extracted ) + .set { ch_centrifuge_extracted } + + SEQKIT_GREP ( ch_centrifuge_extracted ) + + // As of 06/02/2022, with the upcoming newer versions of NextFlow, we will be able to do + // allowNull: true for both input and output, but until then, we have to use dummy files. + // and work arounds. + // https://github.com/nextflow-io/nextflow/pull/2893 + SPADES_ASSEMBLE ( + SEQKIT_GREP.out.fastx + .combine(ch_dummy) + .combine(ch_dummy2) + ) + + SPADES_ASSEMBLE + .out + .assembly + .set { ch_spades_assembly } + + ch_spades_assembly + .map { + meta, fastq -> + meta.is_assembly = true + [meta, fastq] + } + .set { ch_spades_assembly } + + ch_spades_assembly.ifEmpty { [ false, false ] } + + KRAKEN2_CLASSIFY( ch_spades_assembly ) + + KRAKEN2_EXTRACT_CONTIGS ( + ch_spades_assembly + .join( KRAKEN2_CLASSIFY.out.kraken_output ), + params.kraken2_extract_bug + ) + + KRAKEN2_EXTRACT_CONTIGS + .out + .asm_filtered_contigs + .map { + meta, fastq -> + meta.organism = params.kraken2_extract_bug.split(/\s+/)[0].capitalize() + meta.serotypefinder_db = params.serotypefinder_db + [meta, fastq] + } + .set { ch_asm_filtered_contigs } + + SEROTYPEFINDER ( ch_asm_filtered_contigs ) + + SEQSERO2 ( ch_asm_filtered_contigs ) + + MLST ( ch_asm_filtered_contigs ) + + ABRICATE_RUN ( + ch_asm_filtered_contigs, + abricate_dbs + ) + + ABRICATE_RUN + .out + .abricated + .map { meta, abres -> [ abricate_dbs, abres ] } + .groupTuple(by: [0]) + .map { it -> tuple ( it[0], it[1].flatten() ) } + .set { ch_abricated } + + ABRICATE_SUMMARY ( ch_abricated ) + + CENTRIFUGE_CLASSIFY.out.kreport + .map { meta, kreport -> [ kreport ] } + .flatten() + .concat ( + KRAKEN2_CLASSIFY.out.kraken_report + .map { meta, kreport -> [ kreport ] } + .flatten(), + FASTQC.out.zip + .map { meta, zip -> [ zip ] } + .flatten() + ) + .set { ch_mqc_classify } + + if (params.serotypefinder_run) { + SEROTYPEFINDER + .out + .serotyped + .map { meta, tsv -> [ 'serotypefinder', tsv ] } + .groupTuple(by: [0]) + .map { it -> tuple ( it[0], it[1].flatten() ) } + .set { ch_mqc_custom_tbl } + } else if (params.seqsero2_run) { + SEQSERO2 + .out + .serotyped + .map { meta, tsv -> [ 'seqsero2', tsv ] } + .groupTuple(by: [0]) + .map { it -> tuple ( it[0], it[1].flatten() ) } + .set { ch_mqc_custom_tbl } + } + + ch_mqc_custom_tbl + .concat ( + ABRICATE_SUMMARY.out.ncbiamrplus.map{ it -> tuple ( it[0], it[1] )}, + ABRICATE_SUMMARY.out.resfinder.map{ it -> tuple ( it[0], it[1] )}, + ABRICATE_SUMMARY.out.megares.map{ it -> tuple ( it[0], it[1] )}, + ABRICATE_SUMMARY.out.argannot.map{ it -> tuple ( it[0], it[1] )}, + ) + .groupTuple(by: [0]) + .map { it -> [ it[0], it[1].flatten() ]} + .set { ch_mqc_custom_tbl } + + TABLE_SUMMARY( ch_mqc_custom_tbl ) + + DUMP_SOFTWARE_VERSIONS ( + software_versions + .mix ( + FASTQC.out.versions, + CENTRIFUGE_CLASSIFY.out.versions, + CENTRIFUGE_PROCESS.out.versions, + SEQKIT_GREP.out.versions, + SPADES_ASSEMBLE.out.versions.ifEmpty(null), + KRAKEN2_CLASSIFY.out.versions.ifEmpty(null), + KRAKEN2_EXTRACT_CONTIGS.out.versions.ifEmpty(null), + SEROTYPEFINDER.out.versions.ifEmpty(null), + SEQSERO2.out.versions.ifEmpty(null), + MLST.out.versions.ifEmpty(null), + ABRICATE_RUN.out.versions.ifEmpty(null), + ABRICATE_SUMMARY.out.versions.ifEmpty(null), + TABLE_SUMMARY.out.versions.ifEmpty(null) + ) + .unique() + .collectFile(name: 'collected_versions.yml') + ) + + DUMP_SOFTWARE_VERSIONS + .out + .mqc_yml + .concat ( + ch_mqc_classify, + TABLE_SUMMARY.out.mqc_yml + ) + .collect() + .set { ch_multiqc } + + MULTIQC( ch_multiqc ) +} + +/* +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + ON COMPLETE, SHOW GORY DETAILS OF ALL PARAMS WHICH WILL BE HELPFUL TO DEBUG +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +*/ + +workflow.onComplete { + if (workflow.success) { + // CREATE APPROPRIATE DIRECTORIES AND MOVE AS REQUESTED BY STAKEHOLDER(S) + // + // Nextflow's .moveTo will error out if directories contain files and it + // would be complex to include logic to skip directories + // + def final_intermediate_dir = "${params.output}${params.fs}${params.pipeline}-steps" + def final_results_dir = "${params.output}${params.fs}${params.pipeline}-results" + def kraken2_ext_contigs = file( "${final_intermediate_dir}${params.fs}kraken2_extract_contigs", type: 'dir' ) + def final_intermediate = file( final_intermediate_dir, type: 'dir' ) + def final_results = file( final_results_dir, type: 'dir' ) + def pipeline_output = file( params.output, type: 'dir' ) + + if ( !final_intermediate.exists() ) { + final_intermediate.mkdirs() + + FileHelper.visitFiles(Paths.get("${params.output}"), '*') { + if ( !(it.name ==~ /^(${params.cfsanpipename}|multiqc|\.nextflow|${workflow.workDir.name}|${params.pipeline}).*/) ) { + FileHelper.movePath( + it, Paths.get( "${final_intermediate_dir}${params.fs}${it.name}" ) + ) + } + } + } + + if ( kraken2_ext_contigs.exists() && !final_results.exists() ) { + final_results.mkdirs() + + FileHelper.movePath( + Paths.get( "${final_intermediate_dir}${params.fs}kraken2_extract_contigs" ), + Paths.get( "${final_results_dir}${params.fs}kraken2_extract_contigs" ) + ) + } + + sendMail() + } +} + +workflow.onError { + sendMail() +} + +/* +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + HELPER METHODS FOR CENTRIFLAKEN-HY WORKFLOW +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +*/ + +def help() { + + Map helptext = [:] + + helptext.putAll( + fastqEntryPointHelp() + + kraken2Help(params).text + + centrifugeHelp(params).text + + spadesHelp(params).text + + serotypefinderHelp(params).text + + seqsero2Help(params).text + + mlstHelp(params).text + + abricateHelp(params).text + + wrapUpHelp() + ) + + return addPadding(helptext) +} \ No newline at end of file