diff CSP2/subworkflows/fetchData/main.nf @ 0:01431fa12065

"planemo upload"
author rliterman
date Mon, 02 Dec 2024 10:40:55 -0500
parents
children 0dee732cfad2
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/CSP2/subworkflows/fetchData/main.nf	Mon Dec 02 10:40:55 2024 -0500
@@ -0,0 +1,542 @@
+// Subworkflow to fetch sample and reference data from --fasta/--reads/--ref_fasta/--ref_reads
+
+// Set path variables
+output_directory = file(params.output_directory)
+assembly_directory = file(params.assembly_directory)
+log_directory = file(params.log_directory)
+
+ref_id_file = file(params.ref_id_file)
+
+// Set ref_mode
+ref_mode = params.ref_mode
+
+// Set file headers
+assembly_header = "Isolate_ID\tRead_Type\tRead_Location\tAssembly_Path\n"
+
+// Set paths to accessory files/scripts
+assembly_log = file("${log_directory}/Assembly_Data.tsv")
+user_snpdiffs_list = file("${log_directory}/Imported_SNPDiffs.txt")
+findReads = file("${projectDir}/bin/fetchReads.py")
+userSNPDiffs = file("${projectDir}/bin/userSNPDiffs.py")
+
+// Set SKESA cores to 5 or fewer
+skesa_cpus = (params.cores as Integer) >= 5 ? 5 : (params.cores as Integer)
+
+workflow {
+    main:
+    input_data = fetchData()
+    query_data = input_data.query_data
+    reference_data = input_data.reference_data
+    snpdiffs_data = input_data.snpdiff_data
+    
+    publish:
+    query_data >> 'query_data.tsv'
+    reference_data >> 'reference_data.tsv'
+    snpdiff_data >> 'snpdiff_data.tsv'
+}
+
+// Top-level workflow //
+workflow fetchData{
+
+    emit:
+    query_data
+    reference_data
+    snpdiff_data
+
+    main:
+    // Get any excluded IDs
+    ("${params.exclude}" != "" ? processExclude() : Channel.empty()).set{exclude_ids} 
+    
+    // Process snpdiffs alignments
+    // If assembly file cannot be found, it will be 'null'
+    ("${params.snpdiffs}" != "" ? processSNPDiffs() : Channel.empty()).set{user_snpdiffs}
+
+    excluded_snpdiffs = user_snpdiffs.map{it -> tuple(it[1],it[0])}
+    .concat(user_snpdiffs.map{it -> tuple(it[10],it[0])})
+    .join(exclude_ids,by:0)
+    .unique{it -> it[1]}
+    .map{it -> tuple(it[1],"Exclude")}
+
+    // Generate return channel: 3-item tuple (Query_ID, Reference_ID, SNPDiff_Path)
+    snpdiff_data = user_snpdiffs
+    .map{it -> tuple(it[0],it[1],it[10])}
+    .join(excluded_snpdiffs,by:0,remainder:true)
+    .filter{it -> it[0].toString() != "null"}
+    .filter{it -> it[3].toString() != "Exclude"}
+    .unique{it -> it[0]}
+    .map{it -> tuple(it[1],it[2],it[0])}
+    .collect().flatten().collate(3)
+
+    // Get assembly data from snpdiffs
+    snpdiff_assemblies = user_snpdiffs.map{it-> tuple(it[1],it[2])}
+    .concat(user_snpdiffs.map{it-> tuple(it[10],it[11])})
+    .join(exclude_ids,by:0,remainder:true)
+    .filter{it -> it[0].toString() != "null"}
+    .filter{it -> it[2].toString() != "Exclude"} 
+    .map{it -> tuple(it[0],it[1],'SNPDiff')}
+    .collect().flatten().collate(3)
+
+    assembled_snpdiffs = snpdiff_assemblies
+    .filter{it -> it[1].toString() != "null"}
+    .unique{it->it[0]}.collect().flatten().collate(3)
+
+    // Process any data provided as assemblies
+    // Returns 2-item tuples with the following format: (Isolate_ID, Assembly_Path)
+    ("${params.fasta}" != "" ? fetchQueryFasta() : Channel.empty()).set{query_fasta}
+    ("${params.ref_fasta}" != "" ? fetchRefFasta() : Channel.empty()).set{ref_fasta}
+
+    pre_assembled = assembled_snpdiffs
+    .map{it -> tuple(it[0],it[1])}
+    .concat(query_fasta)
+    .concat(ref_fasta)
+    .unique{it -> it[0]}
+    .join(exclude_ids,by:0,remainder:true)
+    .filter{it -> it[0].toString() != "null"}
+    .filter{it -> it[2].toString() != "Exclude"}
+    .map{it->tuple(it[0],it[1])}
+    .collect().flatten().collate(2)
+
+    // Process any data provided as reads
+    // Returns 3-item tuples with the following format: (Isolate_ID, Read_Type, Read_Path)
+    ("${params.reads}" != "" ? fetchQueryReads() : Channel.empty()).set{query_reads}
+    ("${params.ref_reads}" != "" ? fetchRefReads() : Channel.empty()).set{ref_reads}
+
+    all_reads = query_reads
+    .concat(ref_reads)
+    .unique{it->it[0]}
+    .join(exclude_ids,by:0,remainder:true)
+    .filter{it -> it[0].toString() != "null"}
+    .filter{it -> it[3].toString() != "Exclude"}
+    .map{it->tuple(it[0],it[1],it[2])}            
+    .collect().flatten().collate(3)
+
+    // Figure out if any assembly is necessary
+    fasta_read_combo = all_reads.join(pre_assembled,by:0,remainder: true) |
+    branch{it ->
+        assembly: it[1].toString() == "null"
+            return(tuple(it[0],it[2]))
+        read: it[3].toString() == "null"
+           return(tuple(it[0],it[1],it[2]))
+        combo: true
+           return(tuple(it[0],it[3]))}
+
+    // Assemble reads if necessary
+    assembled_reads = fasta_read_combo.read
+    .collect().flatten().collate(3) | assembleReads
+
+    // If runmode is 'assemble', tasks are complete
+    if(params.runmode == "assemble"){
+        query_data = Channel.empty()
+        reference_data = Channel.empty()
+    } else{
+        
+        // If FASTAs are provided via data and snpdiffs, use snpdiffs (as it's already been used)
+        user_fastas = query_fasta
+        .concat(ref_fasta)
+        .concat(assembled_reads)
+        .unique{it -> it[0]}
+        .join(exclude_ids,by:0,remainder:true)
+        .filter{it -> it[0].toString() != "null"}
+        .filter{it -> it[2].toString() != "Exclude"}
+        .map{it->tuple(it[0],it[1],'User')}
+        .collect().flatten().collate(3)
+        .join(assembled_snpdiffs,by:0,remainder:true)
+        .filter{it -> it[3].toString() == "null"}
+        .map{it->tuple(it[0],it[1])}
+
+        // Get all assemblies
+        all_assembled = assembled_snpdiffs
+        .map{it -> tuple(it[0],it[1])}
+        .concat(user_fastas)
+        .unique{it->it[0]}.collect().flatten().collate(2)
+
+        // Get data for isolates where a SNPDiff was provided, but no FASTA could be located
+        no_assembly = snpdiff_assemblies
+        .map{it -> tuple(it[0],it[1])}
+        .filter{it -> it[1].toString() == "null"}
+        .unique{it -> it[0]}
+        .join(all_assembled,by:0,remainder:true)
+        .filter{it -> it[2].toString() == "null"}
+        .map{it->tuple(it[0],it[1])}
+        .collect().flatten().collate(2)
+
+        // Compile all samples
+        all_samples = all_assembled
+        .concat(no_assembly)
+        .unique{it-> it[0]}.collect().flatten().collate(2)
+
+        // If no reference data is provided return a blank channel
+        if(!ref_mode){
+            reference_data = Channel.empty()
+            
+            query_data = all_samples                
+            .unique{it -> it[0]}
+            .collect().flatten().collate(2)
+
+        } else{
+
+            // Process additional reference IDs
+            ("${params.ref_id}" != "" ? processRefIDs() : Channel.empty()).set{user_ref_ids}
+            
+            all_ref_ids = ref_fasta.map{it->tuple(it[0])}
+            .concat(ref_reads.map{it->tuple(it[0])})
+            .concat(user_ref_ids)
+            .unique{it-> it[0]}.collect().flatten().collate(1)
+            .map{it -> tuple(it[0],"Reference")}
+            .join(exclude_ids,by:0,remainder:true)
+            .filter{it -> it[0].toString() != "null"}
+            .filter{it -> it[2].toString() != "Exclude"}
+            .map{it -> tuple(it[0],it[1])}
+
+            reference_data = all_samples
+            .join(all_ref_ids,by:0,remainder:true)
+            .filter{it -> it[2].toString() == "Reference"}
+            .map{it->tuple(it[0],it[1])}
+            .unique{it -> it[0]}
+            .collect().flatten().collate(2)
+
+            // Save reference data to file
+            reference_data
+            .collect{it -> it[0]}
+            | saveRefIDs
+
+            if(params.runmode == "screen" || params.runmode == "align"){
+                query_data = all_samples
+                .join(all_ref_ids,by:0,remainder:true)
+                .filter{it -> it[2].toString() != "Reference"}
+                .map{it->tuple(it[0],it[1])}
+                .unique{it -> it[0]}
+                .collect().flatten().collate(2)
+            } else if(params.runmode == "snp"){
+                query_data = all_samples
+                .unique{it -> it[0]}
+                .collect().flatten().collate(2)
+            }
+        }
+    }
+}
+
+// Fetching preassembled data //
+workflow fetchQueryFasta{
+    
+    emit:
+    query_fasta
+
+    main:
+
+    // If --fasta is set, grab assembly paths and characterize assemblies
+    ("${params.fasta}" != "" ? getAssemblies(params.fasta) : Channel.empty()).set{query_fasta}
+}
+workflow fetchRefFasta{
+    
+    emit:
+    ref_fasta
+
+    main:
+
+    // If --fasta is set, grab assembly paths and characterize assemblies
+    ("${params.ref_fasta}" != "" ? getAssemblies(params.ref_fasta) : Channel.empty()).set{ref_fasta}
+}
+workflow getAssemblies{
+
+    take:
+    fasta_loc
+
+    emit:
+    fasta_data
+    
+    main:
+    def trim_this = "${params.trim_name}"
+
+    if(fasta_loc == ""){
+        error "No assembly data provided via --fasta/--ref_fasta"
+    } else{
+
+        fasta_dir = file(fasta_loc)
+
+        // If --fasta is a directory...
+        if(fasta_dir.isDirectory()){
+            ch_fasta = Channel.fromPath(["${fasta_dir}/*.fa","${fasta_dir}/*.fasta","${fasta_dir}/*.fna"])
+        } 
+        // If --fasta is a file...
+        else if(fasta_dir.isFile()){
+            
+            // Check if it is a single fasta file...
+            if(fasta_dir.getExtension() == "fa" || fasta_dir.getExtension() == "fna" || fasta_dir.getExtension() == "fasta"){
+                ch_fasta = Channel.from(fasta_dir).map{it-> file(it)}
+            } 
+            // Otherwise, assume a file with paths to FASTAs
+            else{
+                ch_fasta = Channel.from(fasta_dir.readLines()).filter{ file -> file =~ /\.(fa|fasta|fna)$/}.map{it-> file(it)}
+            }
+        } else{
+            error "$fasta_dir is not a valid directory or file..."
+        }
+        fasta_data = ch_fasta
+        .filter { file(it).exists() }
+        .map { filePath ->
+            def fileName = file(filePath).getBaseName()
+            def sampleName = fileName.replaceAll(trim_this, "")
+            tuple(sampleName, filePath)}
+    }
+}
+workflow processSNPDiffs{
+
+    emit:
+    snpdiffs_data
+    
+    main:
+
+    if("${params.snpdiffs}" == ""){
+        error "No assembly data provided via --snpdiffs"
+    } else{
+
+        snpdiffs_dir = file("${params.snpdiffs}")
+
+        // If --fasta is a directory...
+        if(snpdiffs_dir.isDirectory()){
+            ch_snpdiffs = Channel.fromPath("${snpdiffs_dir}/*.snpdiffs")
+        } 
+        // If --fasta is a file...
+        else if(snpdiffs_dir.isFile()){
+            
+            // Check if it is a single fasta file...
+            if(snpdiffs_dir.getExtension() == "snpdiffs"){
+                ch_snpdiffs = Channel.from(snpdiffs_dir)
+            } 
+            // Otherwise, assume a file with paths to SNPDiffs
+            else{
+                ch_snpdiffs = Channel.from(snpdiffs_dir.readLines()).filter{it->it.endsWith('.snpdiffs') }
+            }
+        } else{
+            error "$snpdiffs_dir is not a valid directory or file..."
+        }
+
+        snpdiffs_data = ch_snpdiffs
+            .filter { file(it).exists() }
+            .collect() | getSNPDiffsData | splitCsv | collect | flatten | collate(19)
+
+        // (1) SNPDiffs_File, (2) Query_ID, (3) Query_Assembly, (4) Query_Contig_Count, (5) Query_Assembly_Bases, 
+        // (6) Query_N50, (7) Query_N90, (8) Query_L50, (9) Query_L90, (10) Query_SHA256,
+        // (11) Reference_ID, (12) Reference_Assembly, (13) Reference_Contig_Count, (14) Reference_Assembly_Bases,
+        // (15) Reference_N50, (16) Reference_N90, (17) Reference_L50, (18) Reference_L90, (19) Reference_SHA256
+    }
+}
+process getSNPDiffsData{
+    executor = 'local'
+    cpus = 1
+    maxForks = 1
+
+    input:
+    val(snpdiffs_paths)
+
+    output:
+    stdout
+
+    script:
+
+    user_snpdiffs_list.write(snpdiffs_paths.join('\n') + "\n")
+    """
+    $params.load_python_module
+    python ${userSNPDiffs} --snpdiffs_file "${user_snpdiffs_list}" --trim_name "${params.trim_name}"
+    """
+}
+
+
+// Fetching read data //
+workflow fetchQueryReads{
+    
+    emit:
+    query_reads
+
+    main:
+
+    // If --fasta is set, grab assembly paths and characterize assemblies
+    ("${params.reads}" != "" ? processReads(params.reads,params.readext,params.forward,params.reverse) : Channel.empty()).set{query_reads}
+}
+workflow fetchRefReads{
+    
+    emit:
+    ref_reads
+
+    main:
+
+    // If --fasta is set, grab assembly paths and characterize assemblies
+    ("${params.ref_reads}" != "" ? processReads(params.ref_reads,params.ref_readext,params.ref_forward,params.ref_reverse) : Channel.empty()).set{ref_reads}
+}
+workflow processReads{
+
+    take:
+    read_loc
+    read_ext
+    forward
+    reverse
+
+    emit:
+    read_info
+    
+    main:
+
+    if(read_loc == ""){
+        error "No data provided to --reads/--ref_reads"
+    } else{
+
+        read_dir = file(read_loc)
+
+        // If --reads is a single directory, get all reads from that directory
+        if(read_dir.isDirectory()){
+            read_info = fetchReads(read_dir,read_ext,forward,reverse) | splitCsv
+        } 
+
+        // If --reads is a file including paths to many directories, process reads from all directories
+        else if(read_dir.isFile()){
+            read_info = fetchReads(Channel.from(read_dir.readLines()),read_ext,forward,reverse) | splitCsv
+        }
+        // Error if --reads doesn't point to a valid file or directory
+        else{
+            error "$read_dir is neither a valid file or directory..."
+        }
+    }      
+}
+process fetchReads{
+
+    executor = 'local'
+    cpus = 1
+    maxForks = 1
+
+    input:
+    val dir // Directory containing read files
+    val read_ext // Extention for read files (e.g., fastq.gz or fq)
+    val forward_suffix // Identifier for forward reads (e.g., _1.fastq or _R1_001.fq.gz)
+    val reverse_suffix // Identifier for reverse reads (e.g., _2.fastq or _R2_001.fq.gz)
+
+    output:
+    stdout
+
+    script:
+
+    if(!file(dir).isDirectory()){
+        error "$dir is not a valid directory..."
+    } else{
+    """
+    $params.load_python_module
+    python ${findReads} --read_dir ${dir} --read_filetype ${read_ext} --forward_suffix ${forward_suffix} --reverse_suffix ${reverse_suffix} --trim_name ${params.trim_name}
+    """
+    }
+}
+
+// Fetch reference IDs //
+workflow processRefIDs{
+
+    emit:
+    ref_ids
+    
+    main:
+    def trim_this = "${params.trim_name}"
+
+    ref_ids = params.ref_id
+    .tokenize(',')
+    .unique()
+    .collect { it ->
+        "${it}".replaceAll(trim_this, "")}
+    .flatten()
+}
+
+// Fetch reference IDs //
+workflow processExclude{
+
+    emit:
+    exclude_ids
+    
+    main:
+    def trim_this = "${params.trim_name}"
+
+    exclude_ids = Channel.from(params.exclude
+    .tokenize(',')
+    .collect { it -> "${it}".replaceAll(trim_this, "")})
+    .map{it -> tuple(it.toString(),"Exclude")}
+    .unique{it -> it[0]}
+}
+
+process saveRefIDs{
+    executor = 'local'
+    cpus = 1
+    maxForks = 1
+    
+    input:
+    val(ref_ids)
+
+    script:
+    ref_id_file.append(ref_ids.join('\n') + '\n')        
+    """
+    """
+}
+
+// Assembly //
+workflow assembleReads{
+
+    take:
+    to_assemble
+    
+    emit:
+    assembled_data
+
+    main:
+    
+    // Run SKESA on each entry
+    assembly_output = skesaAssemble(to_assemble).splitCsv()
+
+    // Print log of assemblies
+    assembly_output.map {it -> it.join("\t")}.collect() | saveAssemblyLog
+
+    // Return assembly data
+    assembled_data = assembly_output.map{it->tuple(it[0],it[3])}
+}
+process skesaAssemble{
+    memory '12 GB' // Add readcount/memory check?
+
+    input:
+    tuple val(sample_name),val(read_type),val(read_location)
+
+    output:
+    stdout
+ 
+    script:
+    assembly_file = file("${assembly_directory}/${sample_name}.fasta")
+    
+    // Ensure folder exists and file doesn't
+    if(!assembly_directory.isDirectory()){
+        error "$assembly_directory is not a valid directory..."
+    } else if(assembly_file.isFile()){
+        error "$assembly_file already exists..."
+    } else if(read_type == "Paired"){
+        forward_reverse = read_location.split(";")
+        """
+        $params.load_skesa_module
+        skesa --cores ${skesa_cpus} --use_paired_ends --fastq ${forward_reverse[0]} ${forward_reverse[1]} --contigs_out ${assembly_file}
+        echo "${sample_name},${read_type},${read_location},${assembly_file}"
+        """
+    } else if(read_type == "Single"){
+        """
+        $params.load_skesa_module
+        skesa --cores ${skesa_cpus} --fastq ${read_location} --contigs_out ${assembly_file}
+        echo "${sample_name},${read_type},${read_location},${assembly_file}"
+        """
+    } else{
+        error "read_type should be Paired or Single, not $read_type..."
+    }
+}
+process saveAssemblyLog{
+    executor = 'local'
+    cpus = 1
+    maxForks = 1
+    
+    input:
+    val(assembly_data)
+
+    script:
+    assembly_log.write(assembly_header)    
+    assembly_log.append(assembly_data.join('\n') + '\n')    
+    """
+    """
+}