annotate CSP2/subworkflows/fetchData/main.nf @ 25:5c609acba34e

"planemo upload"
author rliterman
date Tue, 03 Dec 2024 15:56:32 -0500
parents 75070dbe67b1
children 792274118b2e
rev   line source
rliterman@0 1 // Subworkflow to fetch sample and reference data from --fasta/--reads/--ref_fasta/--ref_reads
rliterman@0 2
rliterman@0 3 // Set path variables
rliterman@0 4 output_directory = file(params.output_directory)
rliterman@0 5 assembly_directory = file(params.assembly_directory)
rliterman@0 6 log_directory = file(params.log_directory)
rliterman@0 7
rliterman@0 8 ref_id_file = file(params.ref_id_file)
rliterman@0 9
rliterman@0 10 // Set ref_mode
rliterman@0 11 ref_mode = params.ref_mode
rliterman@0 12
rliterman@0 13 // Set file headers
rliterman@0 14 assembly_header = "Isolate_ID\tRead_Type\tRead_Location\tAssembly_Path\n"
rliterman@0 15
rliterman@0 16 // Set paths to accessory files/scripts
rliterman@0 17 assembly_log = file("${log_directory}/Assembly_Data.tsv")
rliterman@0 18 user_snpdiffs_list = file("${log_directory}/Imported_SNPDiffs.txt")
rliterman@0 19 findReads = file("${projectDir}/bin/fetchReads.py")
rliterman@0 20 userSNPDiffs = file("${projectDir}/bin/userSNPDiffs.py")
rliterman@0 21
rliterman@12 22 // Set SKESA cores to 4 or fewer
rliterman@24 23 skesa_cpus = (params.cores as Integer) >= 4 ? 4 : params.cores as Integer
rliterman@25 24 println "params.cores: ${params.cores}"
rliterman@25 25 println "skesa_cpus: ${skesa_cpus}"
rliterman@0 26
rliterman@0 27 workflow {
rliterman@0 28 main:
rliterman@0 29 input_data = fetchData()
rliterman@0 30 query_data = input_data.query_data
rliterman@0 31 reference_data = input_data.reference_data
rliterman@0 32 snpdiffs_data = input_data.snpdiff_data
rliterman@0 33
rliterman@0 34 publish:
rliterman@0 35 query_data >> 'query_data.tsv'
rliterman@0 36 reference_data >> 'reference_data.tsv'
rliterman@0 37 snpdiff_data >> 'snpdiff_data.tsv'
rliterman@0 38 }
rliterman@0 39
rliterman@0 40 // Top-level workflow //
rliterman@0 41 workflow fetchData{
rliterman@0 42
rliterman@0 43 emit:
rliterman@0 44 query_data
rliterman@0 45 reference_data
rliterman@0 46 snpdiff_data
rliterman@0 47
rliterman@0 48 main:
rliterman@0 49 // Get any excluded IDs
rliterman@0 50 ("${params.exclude}" != "" ? processExclude() : Channel.empty()).set{exclude_ids}
rliterman@0 51
rliterman@0 52 // Process snpdiffs alignments
rliterman@0 53 // If assembly file cannot be found, it will be 'null'
rliterman@0 54 ("${params.snpdiffs}" != "" ? processSNPDiffs() : Channel.empty()).set{user_snpdiffs}
rliterman@0 55
rliterman@0 56 excluded_snpdiffs = user_snpdiffs.map{it -> tuple(it[1],it[0])}
rliterman@0 57 .concat(user_snpdiffs.map{it -> tuple(it[10],it[0])})
rliterman@0 58 .join(exclude_ids,by:0)
rliterman@0 59 .unique{it -> it[1]}
rliterman@0 60 .map{it -> tuple(it[1],"Exclude")}
rliterman@0 61
rliterman@0 62 // Generate return channel: 3-item tuple (Query_ID, Reference_ID, SNPDiff_Path)
rliterman@0 63 snpdiff_data = user_snpdiffs
rliterman@0 64 .map{it -> tuple(it[0],it[1],it[10])}
rliterman@0 65 .join(excluded_snpdiffs,by:0,remainder:true)
rliterman@0 66 .filter{it -> it[0].toString() != "null"}
rliterman@0 67 .filter{it -> it[3].toString() != "Exclude"}
rliterman@0 68 .unique{it -> it[0]}
rliterman@0 69 .map{it -> tuple(it[1],it[2],it[0])}
rliterman@0 70 .collect().flatten().collate(3)
rliterman@0 71
rliterman@0 72 // Get assembly data from snpdiffs
rliterman@0 73 snpdiff_assemblies = user_snpdiffs.map{it-> tuple(it[1],it[2])}
rliterman@0 74 .concat(user_snpdiffs.map{it-> tuple(it[10],it[11])})
rliterman@0 75 .join(exclude_ids,by:0,remainder:true)
rliterman@0 76 .filter{it -> it[0].toString() != "null"}
rliterman@0 77 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 78 .map{it -> tuple(it[0],it[1],'SNPDiff')}
rliterman@0 79 .collect().flatten().collate(3)
rliterman@0 80
rliterman@0 81 assembled_snpdiffs = snpdiff_assemblies
rliterman@0 82 .filter{it -> it[1].toString() != "null"}
rliterman@0 83 .unique{it->it[0]}.collect().flatten().collate(3)
rliterman@0 84
rliterman@0 85 // Process any data provided as assemblies
rliterman@0 86 // Returns 2-item tuples with the following format: (Isolate_ID, Assembly_Path)
rliterman@0 87 ("${params.fasta}" != "" ? fetchQueryFasta() : Channel.empty()).set{query_fasta}
rliterman@0 88 ("${params.ref_fasta}" != "" ? fetchRefFasta() : Channel.empty()).set{ref_fasta}
rliterman@0 89
rliterman@0 90 pre_assembled = assembled_snpdiffs
rliterman@0 91 .map{it -> tuple(it[0],it[1])}
rliterman@0 92 .concat(query_fasta)
rliterman@0 93 .concat(ref_fasta)
rliterman@0 94 .unique{it -> it[0]}
rliterman@0 95 .join(exclude_ids,by:0,remainder:true)
rliterman@0 96 .filter{it -> it[0].toString() != "null"}
rliterman@0 97 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 98 .map{it->tuple(it[0],it[1])}
rliterman@0 99 .collect().flatten().collate(2)
rliterman@0 100
rliterman@0 101 // Process any data provided as reads
rliterman@0 102 // Returns 3-item tuples with the following format: (Isolate_ID, Read_Type, Read_Path)
rliterman@0 103 ("${params.reads}" != "" ? fetchQueryReads() : Channel.empty()).set{query_reads}
rliterman@0 104 ("${params.ref_reads}" != "" ? fetchRefReads() : Channel.empty()).set{ref_reads}
rliterman@0 105
rliterman@0 106 all_reads = query_reads
rliterman@0 107 .concat(ref_reads)
rliterman@0 108 .unique{it->it[0]}
rliterman@0 109 .join(exclude_ids,by:0,remainder:true)
rliterman@0 110 .filter{it -> it[0].toString() != "null"}
rliterman@0 111 .filter{it -> it[3].toString() != "Exclude"}
rliterman@0 112 .map{it->tuple(it[0],it[1],it[2])}
rliterman@0 113 .collect().flatten().collate(3)
rliterman@0 114
rliterman@0 115 // Figure out if any assembly is necessary
rliterman@0 116 fasta_read_combo = all_reads.join(pre_assembled,by:0,remainder: true) |
rliterman@0 117 branch{it ->
rliterman@0 118 assembly: it[1].toString() == "null"
rliterman@0 119 return(tuple(it[0],it[2]))
rliterman@0 120 read: it[3].toString() == "null"
rliterman@0 121 return(tuple(it[0],it[1],it[2]))
rliterman@0 122 combo: true
rliterman@0 123 return(tuple(it[0],it[3]))}
rliterman@0 124
rliterman@0 125 // Assemble reads if necessary
rliterman@0 126 assembled_reads = fasta_read_combo.read
rliterman@0 127 .collect().flatten().collate(3) | assembleReads
rliterman@0 128
rliterman@0 129 // If runmode is 'assemble', tasks are complete
rliterman@0 130 if(params.runmode == "assemble"){
rliterman@0 131 query_data = Channel.empty()
rliterman@0 132 reference_data = Channel.empty()
rliterman@0 133 } else{
rliterman@0 134
rliterman@0 135 // If FASTAs are provided via data and snpdiffs, use snpdiffs (as it's already been used)
rliterman@0 136 user_fastas = query_fasta
rliterman@0 137 .concat(ref_fasta)
rliterman@0 138 .concat(assembled_reads)
rliterman@0 139 .unique{it -> it[0]}
rliterman@0 140 .join(exclude_ids,by:0,remainder:true)
rliterman@0 141 .filter{it -> it[0].toString() != "null"}
rliterman@0 142 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 143 .map{it->tuple(it[0],it[1],'User')}
rliterman@0 144 .collect().flatten().collate(3)
rliterman@0 145 .join(assembled_snpdiffs,by:0,remainder:true)
rliterman@0 146 .filter{it -> it[3].toString() == "null"}
rliterman@0 147 .map{it->tuple(it[0],it[1])}
rliterman@0 148
rliterman@0 149 // Get all assemblies
rliterman@0 150 all_assembled = assembled_snpdiffs
rliterman@0 151 .map{it -> tuple(it[0],it[1])}
rliterman@0 152 .concat(user_fastas)
rliterman@0 153 .unique{it->it[0]}.collect().flatten().collate(2)
rliterman@0 154
rliterman@0 155 // Get data for isolates where a SNPDiff was provided, but no FASTA could be located
rliterman@0 156 no_assembly = snpdiff_assemblies
rliterman@0 157 .map{it -> tuple(it[0],it[1])}
rliterman@0 158 .filter{it -> it[1].toString() == "null"}
rliterman@0 159 .unique{it -> it[0]}
rliterman@0 160 .join(all_assembled,by:0,remainder:true)
rliterman@0 161 .filter{it -> it[2].toString() == "null"}
rliterman@0 162 .map{it->tuple(it[0],it[1])}
rliterman@0 163 .collect().flatten().collate(2)
rliterman@0 164
rliterman@0 165 // Compile all samples
rliterman@0 166 all_samples = all_assembled
rliterman@0 167 .concat(no_assembly)
rliterman@0 168 .unique{it-> it[0]}.collect().flatten().collate(2)
rliterman@0 169
rliterman@0 170 // If no reference data is provided return a blank channel
rliterman@0 171 if(!ref_mode){
rliterman@0 172 reference_data = Channel.empty()
rliterman@0 173
rliterman@0 174 query_data = all_samples
rliterman@0 175 .unique{it -> it[0]}
rliterman@0 176 .collect().flatten().collate(2)
rliterman@0 177
rliterman@0 178 } else{
rliterman@0 179
rliterman@0 180 // Process additional reference IDs
rliterman@0 181 ("${params.ref_id}" != "" ? processRefIDs() : Channel.empty()).set{user_ref_ids}
rliterman@0 182
rliterman@0 183 all_ref_ids = ref_fasta.map{it->tuple(it[0])}
rliterman@0 184 .concat(ref_reads.map{it->tuple(it[0])})
rliterman@0 185 .concat(user_ref_ids)
rliterman@0 186 .unique{it-> it[0]}.collect().flatten().collate(1)
rliterman@0 187 .map{it -> tuple(it[0],"Reference")}
rliterman@0 188 .join(exclude_ids,by:0,remainder:true)
rliterman@0 189 .filter{it -> it[0].toString() != "null"}
rliterman@0 190 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 191 .map{it -> tuple(it[0],it[1])}
rliterman@0 192
rliterman@0 193 reference_data = all_samples
rliterman@0 194 .join(all_ref_ids,by:0,remainder:true)
rliterman@0 195 .filter{it -> it[2].toString() == "Reference"}
rliterman@0 196 .map{it->tuple(it[0],it[1])}
rliterman@0 197 .unique{it -> it[0]}
rliterman@0 198 .collect().flatten().collate(2)
rliterman@0 199
rliterman@0 200 // Save reference data to file
rliterman@0 201 reference_data
rliterman@0 202 .collect{it -> it[0]}
rliterman@0 203 | saveRefIDs
rliterman@0 204
rliterman@0 205 if(params.runmode == "screen" || params.runmode == "align"){
rliterman@0 206 query_data = all_samples
rliterman@0 207 .join(all_ref_ids,by:0,remainder:true)
rliterman@0 208 .filter{it -> it[2].toString() != "Reference"}
rliterman@0 209 .map{it->tuple(it[0],it[1])}
rliterman@0 210 .unique{it -> it[0]}
rliterman@0 211 .collect().flatten().collate(2)
rliterman@0 212 } else if(params.runmode == "snp"){
rliterman@0 213 query_data = all_samples
rliterman@0 214 .unique{it -> it[0]}
rliterman@0 215 .collect().flatten().collate(2)
rliterman@0 216 }
rliterman@0 217 }
rliterman@0 218 }
rliterman@0 219 }
rliterman@0 220
rliterman@0 221 // Fetching preassembled data //
rliterman@0 222 workflow fetchQueryFasta{
rliterman@0 223
rliterman@0 224 emit:
rliterman@0 225 query_fasta
rliterman@0 226
rliterman@0 227 main:
rliterman@0 228
rliterman@0 229 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 230 ("${params.fasta}" != "" ? getAssemblies(params.fasta) : Channel.empty()).set{query_fasta}
rliterman@0 231 }
rliterman@0 232 workflow fetchRefFasta{
rliterman@0 233
rliterman@0 234 emit:
rliterman@0 235 ref_fasta
rliterman@0 236
rliterman@0 237 main:
rliterman@0 238
rliterman@0 239 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 240 ("${params.ref_fasta}" != "" ? getAssemblies(params.ref_fasta) : Channel.empty()).set{ref_fasta}
rliterman@0 241 }
rliterman@0 242 workflow getAssemblies{
rliterman@0 243
rliterman@0 244 take:
rliterman@0 245 fasta_loc
rliterman@0 246
rliterman@0 247 emit:
rliterman@0 248 fasta_data
rliterman@0 249
rliterman@0 250 main:
rliterman@0 251 def trim_this = "${params.trim_name}"
rliterman@0 252
rliterman@0 253 if(fasta_loc == ""){
rliterman@0 254 error "No assembly data provided via --fasta/--ref_fasta"
rliterman@0 255 } else{
rliterman@0 256
rliterman@0 257 fasta_dir = file(fasta_loc)
rliterman@0 258
rliterman@0 259 // If --fasta is a directory...
rliterman@0 260 if(fasta_dir.isDirectory()){
rliterman@0 261 ch_fasta = Channel.fromPath(["${fasta_dir}/*.fa","${fasta_dir}/*.fasta","${fasta_dir}/*.fna"])
rliterman@0 262 }
rliterman@0 263 // If --fasta is a file...
rliterman@0 264 else if(fasta_dir.isFile()){
rliterman@0 265
rliterman@0 266 // Check if it is a single fasta file...
rliterman@0 267 if(fasta_dir.getExtension() == "fa" || fasta_dir.getExtension() == "fna" || fasta_dir.getExtension() == "fasta"){
rliterman@0 268 ch_fasta = Channel.from(fasta_dir).map{it-> file(it)}
rliterman@0 269 }
rliterman@0 270 // Otherwise, assume a file with paths to FASTAs
rliterman@0 271 else{
rliterman@0 272 ch_fasta = Channel.from(fasta_dir.readLines()).filter{ file -> file =~ /\.(fa|fasta|fna)$/}.map{it-> file(it)}
rliterman@0 273 }
rliterman@0 274 } else{
rliterman@0 275 error "$fasta_dir is not a valid directory or file..."
rliterman@0 276 }
rliterman@0 277 fasta_data = ch_fasta
rliterman@0 278 .filter { file(it).exists() }
rliterman@0 279 .map { filePath ->
rliterman@0 280 def fileName = file(filePath).getBaseName()
rliterman@0 281 def sampleName = fileName.replaceAll(trim_this, "")
rliterman@0 282 tuple(sampleName, filePath)}
rliterman@0 283 }
rliterman@0 284 }
rliterman@0 285 workflow processSNPDiffs{
rliterman@0 286
rliterman@0 287 emit:
rliterman@0 288 snpdiffs_data
rliterman@0 289
rliterman@0 290 main:
rliterman@0 291
rliterman@0 292 if("${params.snpdiffs}" == ""){
rliterman@0 293 error "No assembly data provided via --snpdiffs"
rliterman@0 294 } else{
rliterman@0 295
rliterman@0 296 snpdiffs_dir = file("${params.snpdiffs}")
rliterman@0 297
rliterman@0 298 // If --fasta is a directory...
rliterman@0 299 if(snpdiffs_dir.isDirectory()){
rliterman@0 300 ch_snpdiffs = Channel.fromPath("${snpdiffs_dir}/*.snpdiffs")
rliterman@0 301 }
rliterman@0 302 // If --fasta is a file...
rliterman@0 303 else if(snpdiffs_dir.isFile()){
rliterman@0 304
rliterman@0 305 // Check if it is a single fasta file...
rliterman@0 306 if(snpdiffs_dir.getExtension() == "snpdiffs"){
rliterman@0 307 ch_snpdiffs = Channel.from(snpdiffs_dir)
rliterman@0 308 }
rliterman@0 309 // Otherwise, assume a file with paths to SNPDiffs
rliterman@0 310 else{
rliterman@0 311 ch_snpdiffs = Channel.from(snpdiffs_dir.readLines()).filter{it->it.endsWith('.snpdiffs') }
rliterman@0 312 }
rliterman@0 313 } else{
rliterman@0 314 error "$snpdiffs_dir is not a valid directory or file..."
rliterman@0 315 }
rliterman@0 316
rliterman@0 317 snpdiffs_data = ch_snpdiffs
rliterman@0 318 .filter { file(it).exists() }
rliterman@0 319 .collect() | getSNPDiffsData | splitCsv | collect | flatten | collate(19)
rliterman@0 320
rliterman@0 321 // (1) SNPDiffs_File, (2) Query_ID, (3) Query_Assembly, (4) Query_Contig_Count, (5) Query_Assembly_Bases,
rliterman@0 322 // (6) Query_N50, (7) Query_N90, (8) Query_L50, (9) Query_L90, (10) Query_SHA256,
rliterman@0 323 // (11) Reference_ID, (12) Reference_Assembly, (13) Reference_Contig_Count, (14) Reference_Assembly_Bases,
rliterman@0 324 // (15) Reference_N50, (16) Reference_N90, (17) Reference_L50, (18) Reference_L90, (19) Reference_SHA256
rliterman@0 325 }
rliterman@0 326 }
rliterman@0 327 process getSNPDiffsData{
rliterman@0 328 executor = 'local'
rliterman@0 329 cpus = 1
rliterman@0 330 maxForks = 1
rliterman@0 331
rliterman@0 332 input:
rliterman@0 333 val(snpdiffs_paths)
rliterman@0 334
rliterman@0 335 output:
rliterman@0 336 stdout
rliterman@0 337
rliterman@0 338 script:
rliterman@0 339
rliterman@0 340 user_snpdiffs_list.write(snpdiffs_paths.join('\n') + "\n")
rliterman@0 341 """
rliterman@0 342 $params.load_python_module
rliterman@0 343 python ${userSNPDiffs} --snpdiffs_file "${user_snpdiffs_list}" --trim_name "${params.trim_name}"
rliterman@0 344 """
rliterman@0 345 }
rliterman@0 346
rliterman@0 347
rliterman@0 348 // Fetching read data //
rliterman@0 349 workflow fetchQueryReads{
rliterman@0 350
rliterman@0 351 emit:
rliterman@0 352 query_reads
rliterman@0 353
rliterman@0 354 main:
rliterman@0 355
rliterman@0 356 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 357 ("${params.reads}" != "" ? processReads(params.reads,params.readext,params.forward,params.reverse) : Channel.empty()).set{query_reads}
rliterman@0 358 }
rliterman@0 359 workflow fetchRefReads{
rliterman@0 360
rliterman@0 361 emit:
rliterman@0 362 ref_reads
rliterman@0 363
rliterman@0 364 main:
rliterman@0 365
rliterman@0 366 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 367 ("${params.ref_reads}" != "" ? processReads(params.ref_reads,params.ref_readext,params.ref_forward,params.ref_reverse) : Channel.empty()).set{ref_reads}
rliterman@0 368 }
rliterman@0 369 workflow processReads{
rliterman@0 370
rliterman@0 371 take:
rliterman@0 372 read_loc
rliterman@0 373 read_ext
rliterman@0 374 forward
rliterman@0 375 reverse
rliterman@0 376
rliterman@0 377 emit:
rliterman@0 378 read_info
rliterman@0 379
rliterman@0 380 main:
rliterman@0 381
rliterman@0 382 if(read_loc == ""){
rliterman@0 383 error "No data provided to --reads/--ref_reads"
rliterman@0 384 } else{
rliterman@0 385
rliterman@0 386 read_dir = file(read_loc)
rliterman@0 387
rliterman@0 388 // If --reads is a single directory, get all reads from that directory
rliterman@0 389 if(read_dir.isDirectory()){
rliterman@0 390 read_info = fetchReads(read_dir,read_ext,forward,reverse) | splitCsv
rliterman@0 391 }
rliterman@0 392
rliterman@0 393 // If --reads is a file including paths to many directories, process reads from all directories
rliterman@0 394 else if(read_dir.isFile()){
rliterman@0 395 read_info = fetchReads(Channel.from(read_dir.readLines()),read_ext,forward,reverse) | splitCsv
rliterman@0 396 }
rliterman@0 397 // Error if --reads doesn't point to a valid file or directory
rliterman@0 398 else{
rliterman@0 399 error "$read_dir is neither a valid file or directory..."
rliterman@0 400 }
rliterman@0 401 }
rliterman@0 402 }
rliterman@0 403 process fetchReads{
rliterman@0 404
rliterman@0 405 executor = 'local'
rliterman@0 406 cpus = 1
rliterman@0 407 maxForks = 1
rliterman@0 408
rliterman@0 409 input:
rliterman@0 410 val dir // Directory containing read files
rliterman@0 411 val read_ext // Extention for read files (e.g., fastq.gz or fq)
rliterman@0 412 val forward_suffix // Identifier for forward reads (e.g., _1.fastq or _R1_001.fq.gz)
rliterman@0 413 val reverse_suffix // Identifier for reverse reads (e.g., _2.fastq or _R2_001.fq.gz)
rliterman@0 414
rliterman@0 415 output:
rliterman@0 416 stdout
rliterman@0 417
rliterman@0 418 script:
rliterman@0 419
rliterman@0 420 if(!file(dir).isDirectory()){
rliterman@0 421 error "$dir is not a valid directory..."
rliterman@0 422 } else{
rliterman@0 423 """
rliterman@0 424 $params.load_python_module
rliterman@0 425 python ${findReads} --read_dir ${dir} --read_filetype ${read_ext} --forward_suffix ${forward_suffix} --reverse_suffix ${reverse_suffix} --trim_name ${params.trim_name}
rliterman@0 426 """
rliterman@0 427 }
rliterman@0 428 }
rliterman@0 429
rliterman@0 430 // Fetch reference IDs //
rliterman@0 431 workflow processRefIDs{
rliterman@0 432
rliterman@0 433 emit:
rliterman@0 434 ref_ids
rliterman@0 435
rliterman@0 436 main:
rliterman@0 437 def trim_this = "${params.trim_name}"
rliterman@0 438
rliterman@0 439 ref_ids = params.ref_id
rliterman@0 440 .tokenize(',')
rliterman@0 441 .unique()
rliterman@0 442 .collect { it ->
rliterman@0 443 "${it}".replaceAll(trim_this, "")}
rliterman@0 444 .flatten()
rliterman@0 445 }
rliterman@0 446
rliterman@0 447 // Fetch reference IDs //
rliterman@0 448 workflow processExclude{
rliterman@0 449
rliterman@0 450 emit:
rliterman@0 451 exclude_ids
rliterman@0 452
rliterman@0 453 main:
rliterman@0 454 def trim_this = "${params.trim_name}"
rliterman@0 455
rliterman@0 456 exclude_ids = Channel.from(params.exclude
rliterman@0 457 .tokenize(',')
rliterman@0 458 .collect { it -> "${it}".replaceAll(trim_this, "")})
rliterman@0 459 .map{it -> tuple(it.toString(),"Exclude")}
rliterman@0 460 .unique{it -> it[0]}
rliterman@0 461 }
rliterman@0 462
rliterman@0 463 process saveRefIDs{
rliterman@0 464 executor = 'local'
rliterman@0 465 cpus = 1
rliterman@0 466 maxForks = 1
rliterman@0 467
rliterman@0 468 input:
rliterman@0 469 val(ref_ids)
rliterman@0 470
rliterman@0 471 script:
rliterman@0 472 ref_id_file.append(ref_ids.join('\n') + '\n')
rliterman@0 473 """
rliterman@0 474 """
rliterman@0 475 }
rliterman@0 476
rliterman@0 477 // Assembly //
rliterman@0 478 workflow assembleReads{
rliterman@0 479
rliterman@0 480 take:
rliterman@0 481 to_assemble
rliterman@0 482
rliterman@0 483 emit:
rliterman@0 484 assembled_data
rliterman@0 485
rliterman@0 486 main:
rliterman@0 487
rliterman@0 488 // Run SKESA on each entry
rliterman@0 489 assembly_output = skesaAssemble(to_assemble).splitCsv()
rliterman@0 490
rliterman@0 491 // Print log of assemblies
rliterman@0 492 assembly_output.map {it -> it.join("\t")}.collect() | saveAssemblyLog
rliterman@0 493
rliterman@0 494 // Return assembly data
rliterman@0 495 assembled_data = assembly_output.map{it->tuple(it[0],it[3])}
rliterman@0 496 }
rliterman@0 497 process skesaAssemble{
rliterman@22 498 // label 'skesaMem'
rliterman@0 499
rliterman@15 500 cpus = skesa_cpus
rliterman@15 501
rliterman@0 502 input:
rliterman@0 503 tuple val(sample_name),val(read_type),val(read_location)
rliterman@0 504
rliterman@0 505 output:
rliterman@0 506 stdout
rliterman@0 507
rliterman@0 508 script:
rliterman@0 509 assembly_file = file("${assembly_directory}/${sample_name}.fasta")
rliterman@0 510
rliterman@0 511 // Ensure folder exists and file doesn't
rliterman@0 512 if(!assembly_directory.isDirectory()){
rliterman@0 513 error "$assembly_directory is not a valid directory..."
rliterman@0 514 } else if(assembly_file.isFile()){
rliterman@0 515 error "$assembly_file already exists..."
rliterman@0 516 } else if(read_type == "Paired"){
rliterman@0 517 forward_reverse = read_location.split(";")
rliterman@0 518 """
rliterman@0 519 $params.load_skesa_module
rliterman@0 520 skesa --cores ${skesa_cpus} --use_paired_ends --fastq ${forward_reverse[0]} ${forward_reverse[1]} --contigs_out ${assembly_file}
rliterman@0 521 echo "${sample_name},${read_type},${read_location},${assembly_file}"
rliterman@0 522 """
rliterman@0 523 } else if(read_type == "Single"){
rliterman@0 524 """
rliterman@0 525 $params.load_skesa_module
rliterman@0 526 skesa --cores ${skesa_cpus} --fastq ${read_location} --contigs_out ${assembly_file}
rliterman@0 527 echo "${sample_name},${read_type},${read_location},${assembly_file}"
rliterman@0 528 """
rliterman@0 529 } else{
rliterman@0 530 error "read_type should be Paired or Single, not $read_type..."
rliterman@0 531 }
rliterman@0 532 }
rliterman@0 533 process saveAssemblyLog{
rliterman@0 534 executor = 'local'
rliterman@0 535 cpus = 1
rliterman@0 536 maxForks = 1
rliterman@0 537
rliterman@0 538 input:
rliterman@0 539 val(assembly_data)
rliterman@0 540
rliterman@0 541 script:
rliterman@0 542 assembly_log.write(assembly_header)
rliterman@0 543 assembly_log.append(assembly_data.join('\n') + '\n')
rliterman@0 544 """
rliterman@0 545 """
rliterman@0 546 }