annotate CSP2/subworkflows/fetchData/main.nf @ 29:b6ec322b1f05

"planemo upload"
author rliterman
date Wed, 04 Dec 2024 16:02:07 -0500
parents 893a6993efe3
children
rev   line source
rliterman@0 1 // Subworkflow to fetch sample and reference data from --fasta/--reads/--ref_fasta/--ref_reads
rliterman@0 2
rliterman@0 3 // Set path variables
rliterman@0 4 output_directory = file(params.output_directory)
rliterman@0 5 assembly_directory = file(params.assembly_directory)
rliterman@0 6 log_directory = file(params.log_directory)
rliterman@0 7
rliterman@0 8 ref_id_file = file(params.ref_id_file)
rliterman@0 9
rliterman@0 10 // Set ref_mode
rliterman@0 11 ref_mode = params.ref_mode
rliterman@0 12
rliterman@0 13 // Set file headers
rliterman@0 14 assembly_header = "Isolate_ID\tRead_Type\tRead_Location\tAssembly_Path\n"
rliterman@0 15
rliterman@0 16 // Set paths to accessory files/scripts
rliterman@0 17 assembly_log = file("${log_directory}/Assembly_Data.tsv")
rliterman@0 18 user_snpdiffs_list = file("${log_directory}/Imported_SNPDiffs.txt")
rliterman@0 19 findReads = file("${projectDir}/bin/fetchReads.py")
rliterman@0 20 userSNPDiffs = file("${projectDir}/bin/userSNPDiffs.py")
rliterman@0 21
rliterman@12 22 // Set SKESA cores to 4 or fewer
rliterman@29 23 skesa_cpus = (params.cores as Integer) >= 4 ? 4 : params.cores as Integer
rliterman@0 24
rliterman@0 25 workflow {
rliterman@0 26 main:
rliterman@0 27 input_data = fetchData()
rliterman@0 28 query_data = input_data.query_data
rliterman@0 29 reference_data = input_data.reference_data
rliterman@0 30 snpdiffs_data = input_data.snpdiff_data
rliterman@0 31
rliterman@0 32 publish:
rliterman@0 33 query_data >> 'query_data.tsv'
rliterman@0 34 reference_data >> 'reference_data.tsv'
rliterman@0 35 snpdiff_data >> 'snpdiff_data.tsv'
rliterman@0 36 }
rliterman@0 37
rliterman@0 38 // Top-level workflow //
rliterman@0 39 workflow fetchData{
rliterman@0 40
rliterman@0 41 emit:
rliterman@0 42 query_data
rliterman@0 43 reference_data
rliterman@0 44 snpdiff_data
rliterman@0 45
rliterman@0 46 main:
rliterman@0 47 // Get any excluded IDs
rliterman@0 48 ("${params.exclude}" != "" ? processExclude() : Channel.empty()).set{exclude_ids}
rliterman@0 49
rliterman@0 50 // Process snpdiffs alignments
rliterman@0 51 // If assembly file cannot be found, it will be 'null'
rliterman@0 52 ("${params.snpdiffs}" != "" ? processSNPDiffs() : Channel.empty()).set{user_snpdiffs}
rliterman@0 53
rliterman@0 54 excluded_snpdiffs = user_snpdiffs.map{it -> tuple(it[1],it[0])}
rliterman@0 55 .concat(user_snpdiffs.map{it -> tuple(it[10],it[0])})
rliterman@0 56 .join(exclude_ids,by:0)
rliterman@0 57 .unique{it -> it[1]}
rliterman@0 58 .map{it -> tuple(it[1],"Exclude")}
rliterman@0 59
rliterman@0 60 // Generate return channel: 3-item tuple (Query_ID, Reference_ID, SNPDiff_Path)
rliterman@0 61 snpdiff_data = user_snpdiffs
rliterman@0 62 .map{it -> tuple(it[0],it[1],it[10])}
rliterman@0 63 .join(excluded_snpdiffs,by:0,remainder:true)
rliterman@0 64 .filter{it -> it[0].toString() != "null"}
rliterman@0 65 .filter{it -> it[3].toString() != "Exclude"}
rliterman@0 66 .unique{it -> it[0]}
rliterman@0 67 .map{it -> tuple(it[1],it[2],it[0])}
rliterman@0 68 .collect().flatten().collate(3)
rliterman@0 69
rliterman@0 70 // Get assembly data from snpdiffs
rliterman@0 71 snpdiff_assemblies = user_snpdiffs.map{it-> tuple(it[1],it[2])}
rliterman@0 72 .concat(user_snpdiffs.map{it-> tuple(it[10],it[11])})
rliterman@0 73 .join(exclude_ids,by:0,remainder:true)
rliterman@0 74 .filter{it -> it[0].toString() != "null"}
rliterman@0 75 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 76 .map{it -> tuple(it[0],it[1],'SNPDiff')}
rliterman@0 77 .collect().flatten().collate(3)
rliterman@0 78
rliterman@0 79 assembled_snpdiffs = snpdiff_assemblies
rliterman@0 80 .filter{it -> it[1].toString() != "null"}
rliterman@0 81 .unique{it->it[0]}.collect().flatten().collate(3)
rliterman@0 82
rliterman@0 83 // Process any data provided as assemblies
rliterman@0 84 // Returns 2-item tuples with the following format: (Isolate_ID, Assembly_Path)
rliterman@0 85 ("${params.fasta}" != "" ? fetchQueryFasta() : Channel.empty()).set{query_fasta}
rliterman@0 86 ("${params.ref_fasta}" != "" ? fetchRefFasta() : Channel.empty()).set{ref_fasta}
rliterman@0 87
rliterman@0 88 pre_assembled = assembled_snpdiffs
rliterman@0 89 .map{it -> tuple(it[0],it[1])}
rliterman@0 90 .concat(query_fasta)
rliterman@0 91 .concat(ref_fasta)
rliterman@0 92 .unique{it -> it[0]}
rliterman@0 93 .join(exclude_ids,by:0,remainder:true)
rliterman@0 94 .filter{it -> it[0].toString() != "null"}
rliterman@0 95 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 96 .map{it->tuple(it[0],it[1])}
rliterman@0 97 .collect().flatten().collate(2)
rliterman@0 98
rliterman@0 99 // Process any data provided as reads
rliterman@0 100 // Returns 3-item tuples with the following format: (Isolate_ID, Read_Type, Read_Path)
rliterman@0 101 ("${params.reads}" != "" ? fetchQueryReads() : Channel.empty()).set{query_reads}
rliterman@0 102 ("${params.ref_reads}" != "" ? fetchRefReads() : Channel.empty()).set{ref_reads}
rliterman@0 103
rliterman@0 104 all_reads = query_reads
rliterman@0 105 .concat(ref_reads)
rliterman@0 106 .unique{it->it[0]}
rliterman@0 107 .join(exclude_ids,by:0,remainder:true)
rliterman@0 108 .filter{it -> it[0].toString() != "null"}
rliterman@0 109 .filter{it -> it[3].toString() != "Exclude"}
rliterman@0 110 .map{it->tuple(it[0],it[1],it[2])}
rliterman@0 111 .collect().flatten().collate(3)
rliterman@0 112
rliterman@0 113 // Figure out if any assembly is necessary
rliterman@0 114 fasta_read_combo = all_reads.join(pre_assembled,by:0,remainder: true) |
rliterman@0 115 branch{it ->
rliterman@0 116 assembly: it[1].toString() == "null"
rliterman@0 117 return(tuple(it[0],it[2]))
rliterman@0 118 read: it[3].toString() == "null"
rliterman@0 119 return(tuple(it[0],it[1],it[2]))
rliterman@0 120 combo: true
rliterman@0 121 return(tuple(it[0],it[3]))}
rliterman@0 122
rliterman@0 123 // Assemble reads if necessary
rliterman@0 124 assembled_reads = fasta_read_combo.read
rliterman@0 125 .collect().flatten().collate(3) | assembleReads
rliterman@0 126
rliterman@0 127 // If runmode is 'assemble', tasks are complete
rliterman@0 128 if(params.runmode == "assemble"){
rliterman@0 129 query_data = Channel.empty()
rliterman@0 130 reference_data = Channel.empty()
rliterman@0 131 } else{
rliterman@0 132
rliterman@0 133 // If FASTAs are provided via data and snpdiffs, use snpdiffs (as it's already been used)
rliterman@0 134 user_fastas = query_fasta
rliterman@0 135 .concat(ref_fasta)
rliterman@0 136 .concat(assembled_reads)
rliterman@0 137 .unique{it -> it[0]}
rliterman@0 138 .join(exclude_ids,by:0,remainder:true)
rliterman@0 139 .filter{it -> it[0].toString() != "null"}
rliterman@0 140 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 141 .map{it->tuple(it[0],it[1],'User')}
rliterman@0 142 .collect().flatten().collate(3)
rliterman@0 143 .join(assembled_snpdiffs,by:0,remainder:true)
rliterman@0 144 .filter{it -> it[3].toString() == "null"}
rliterman@0 145 .map{it->tuple(it[0],it[1])}
rliterman@0 146
rliterman@0 147 // Get all assemblies
rliterman@0 148 all_assembled = assembled_snpdiffs
rliterman@0 149 .map{it -> tuple(it[0],it[1])}
rliterman@0 150 .concat(user_fastas)
rliterman@0 151 .unique{it->it[0]}.collect().flatten().collate(2)
rliterman@0 152
rliterman@0 153 // Get data for isolates where a SNPDiff was provided, but no FASTA could be located
rliterman@0 154 no_assembly = snpdiff_assemblies
rliterman@0 155 .map{it -> tuple(it[0],it[1])}
rliterman@0 156 .filter{it -> it[1].toString() == "null"}
rliterman@0 157 .unique{it -> it[0]}
rliterman@0 158 .join(all_assembled,by:0,remainder:true)
rliterman@0 159 .filter{it -> it[2].toString() == "null"}
rliterman@0 160 .map{it->tuple(it[0],it[1])}
rliterman@0 161 .collect().flatten().collate(2)
rliterman@0 162
rliterman@0 163 // Compile all samples
rliterman@0 164 all_samples = all_assembled
rliterman@0 165 .concat(no_assembly)
rliterman@0 166 .unique{it-> it[0]}.collect().flatten().collate(2)
rliterman@0 167
rliterman@0 168 // If no reference data is provided return a blank channel
rliterman@0 169 if(!ref_mode){
rliterman@0 170 reference_data = Channel.empty()
rliterman@0 171
rliterman@0 172 query_data = all_samples
rliterman@0 173 .unique{it -> it[0]}
rliterman@0 174 .collect().flatten().collate(2)
rliterman@0 175
rliterman@0 176 } else{
rliterman@0 177
rliterman@0 178 // Process additional reference IDs
rliterman@0 179 ("${params.ref_id}" != "" ? processRefIDs() : Channel.empty()).set{user_ref_ids}
rliterman@0 180
rliterman@0 181 all_ref_ids = ref_fasta.map{it->tuple(it[0])}
rliterman@0 182 .concat(ref_reads.map{it->tuple(it[0])})
rliterman@0 183 .concat(user_ref_ids)
rliterman@0 184 .unique{it-> it[0]}.collect().flatten().collate(1)
rliterman@0 185 .map{it -> tuple(it[0],"Reference")}
rliterman@0 186 .join(exclude_ids,by:0,remainder:true)
rliterman@0 187 .filter{it -> it[0].toString() != "null"}
rliterman@0 188 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 189 .map{it -> tuple(it[0],it[1])}
rliterman@0 190
rliterman@0 191 reference_data = all_samples
rliterman@0 192 .join(all_ref_ids,by:0,remainder:true)
rliterman@0 193 .filter{it -> it[2].toString() == "Reference"}
rliterman@0 194 .map{it->tuple(it[0],it[1])}
rliterman@0 195 .unique{it -> it[0]}
rliterman@0 196 .collect().flatten().collate(2)
rliterman@0 197
rliterman@0 198 // Save reference data to file
rliterman@0 199 reference_data
rliterman@0 200 .collect{it -> it[0]}
rliterman@0 201 | saveRefIDs
rliterman@0 202
rliterman@0 203 if(params.runmode == "screen" || params.runmode == "align"){
rliterman@0 204 query_data = all_samples
rliterman@0 205 .join(all_ref_ids,by:0,remainder:true)
rliterman@0 206 .filter{it -> it[2].toString() != "Reference"}
rliterman@0 207 .map{it->tuple(it[0],it[1])}
rliterman@0 208 .unique{it -> it[0]}
rliterman@0 209 .collect().flatten().collate(2)
rliterman@0 210 } else if(params.runmode == "snp"){
rliterman@0 211 query_data = all_samples
rliterman@0 212 .unique{it -> it[0]}
rliterman@0 213 .collect().flatten().collate(2)
rliterman@0 214 }
rliterman@0 215 }
rliterman@0 216 }
rliterman@0 217 }
rliterman@0 218
rliterman@0 219 // Fetching preassembled data //
rliterman@0 220 workflow fetchQueryFasta{
rliterman@0 221
rliterman@0 222 emit:
rliterman@0 223 query_fasta
rliterman@0 224
rliterman@0 225 main:
rliterman@0 226
rliterman@0 227 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 228 ("${params.fasta}" != "" ? getAssemblies(params.fasta) : Channel.empty()).set{query_fasta}
rliterman@0 229 }
rliterman@0 230 workflow fetchRefFasta{
rliterman@0 231
rliterman@0 232 emit:
rliterman@0 233 ref_fasta
rliterman@0 234
rliterman@0 235 main:
rliterman@0 236
rliterman@0 237 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 238 ("${params.ref_fasta}" != "" ? getAssemblies(params.ref_fasta) : Channel.empty()).set{ref_fasta}
rliterman@0 239 }
rliterman@0 240 workflow getAssemblies{
rliterman@0 241
rliterman@0 242 take:
rliterman@0 243 fasta_loc
rliterman@0 244
rliterman@0 245 emit:
rliterman@0 246 fasta_data
rliterman@0 247
rliterman@0 248 main:
rliterman@0 249 def trim_this = "${params.trim_name}"
rliterman@0 250
rliterman@0 251 if(fasta_loc == ""){
rliterman@0 252 error "No assembly data provided via --fasta/--ref_fasta"
rliterman@0 253 } else{
rliterman@0 254
rliterman@0 255 fasta_dir = file(fasta_loc)
rliterman@0 256
rliterman@0 257 // If --fasta is a directory...
rliterman@0 258 if(fasta_dir.isDirectory()){
rliterman@0 259 ch_fasta = Channel.fromPath(["${fasta_dir}/*.fa","${fasta_dir}/*.fasta","${fasta_dir}/*.fna"])
rliterman@0 260 }
rliterman@0 261 // If --fasta is a file...
rliterman@0 262 else if(fasta_dir.isFile()){
rliterman@0 263
rliterman@0 264 // Check if it is a single fasta file...
rliterman@0 265 if(fasta_dir.getExtension() == "fa" || fasta_dir.getExtension() == "fna" || fasta_dir.getExtension() == "fasta"){
rliterman@0 266 ch_fasta = Channel.from(fasta_dir).map{it-> file(it)}
rliterman@0 267 }
rliterman@0 268 // Otherwise, assume a file with paths to FASTAs
rliterman@0 269 else{
rliterman@0 270 ch_fasta = Channel.from(fasta_dir.readLines()).filter{ file -> file =~ /\.(fa|fasta|fna)$/}.map{it-> file(it)}
rliterman@0 271 }
rliterman@0 272 } else{
rliterman@0 273 error "$fasta_dir is not a valid directory or file..."
rliterman@0 274 }
rliterman@27 275
rliterman@0 276 fasta_data = ch_fasta
rliterman@27 277 .map { filePath ->
rliterman@27 278 if (!file(filePath).exists()) { error "$filePath is not a valid directory or file..." }
rliterman@27 279 return filePath }
rliterman@0 280 .map { filePath ->
rliterman@0 281 def fileName = file(filePath).getBaseName()
rliterman@0 282 def sampleName = fileName.replaceAll(trim_this, "")
rliterman@27 283 tuple(sampleName, filePath)
rliterman@27 284 }
rliterman@0 285 }
rliterman@0 286 }
rliterman@0 287 workflow processSNPDiffs{
rliterman@0 288
rliterman@0 289 emit:
rliterman@0 290 snpdiffs_data
rliterman@0 291
rliterman@0 292 main:
rliterman@0 293
rliterman@0 294 if("${params.snpdiffs}" == ""){
rliterman@0 295 error "No assembly data provided via --snpdiffs"
rliterman@0 296 } else{
rliterman@0 297
rliterman@0 298 snpdiffs_dir = file("${params.snpdiffs}")
rliterman@0 299
rliterman@0 300 // If --fasta is a directory...
rliterman@0 301 if(snpdiffs_dir.isDirectory()){
rliterman@0 302 ch_snpdiffs = Channel.fromPath("${snpdiffs_dir}/*.snpdiffs")
rliterman@0 303 }
rliterman@0 304 // If --fasta is a file...
rliterman@0 305 else if(snpdiffs_dir.isFile()){
rliterman@0 306
rliterman@0 307 // Check if it is a single fasta file...
rliterman@0 308 if(snpdiffs_dir.getExtension() == "snpdiffs"){
rliterman@0 309 ch_snpdiffs = Channel.from(snpdiffs_dir)
rliterman@0 310 }
rliterman@0 311 // Otherwise, assume a file with paths to SNPDiffs
rliterman@0 312 else{
rliterman@0 313 ch_snpdiffs = Channel.from(snpdiffs_dir.readLines()).filter{it->it.endsWith('.snpdiffs') }
rliterman@0 314 }
rliterman@0 315 } else{
rliterman@0 316 error "$snpdiffs_dir is not a valid directory or file..."
rliterman@0 317 }
rliterman@0 318
rliterman@0 319 snpdiffs_data = ch_snpdiffs
rliterman@27 320 .map { filePath ->
rliterman@27 321 if (!file(filePath).exists()) { error "$filePath is not a valid directory or file..." }
rliterman@27 322 return filePath }
rliterman@27 323 .collect() | getSNPDiffsData | splitCsv | collect | flatten | collate(19)
rliterman@0 324
rliterman@0 325 // (1) SNPDiffs_File, (2) Query_ID, (3) Query_Assembly, (4) Query_Contig_Count, (5) Query_Assembly_Bases,
rliterman@0 326 // (6) Query_N50, (7) Query_N90, (8) Query_L50, (9) Query_L90, (10) Query_SHA256,
rliterman@0 327 // (11) Reference_ID, (12) Reference_Assembly, (13) Reference_Contig_Count, (14) Reference_Assembly_Bases,
rliterman@0 328 // (15) Reference_N50, (16) Reference_N90, (17) Reference_L50, (18) Reference_L90, (19) Reference_SHA256
rliterman@0 329 }
rliterman@0 330 }
rliterman@0 331 process getSNPDiffsData{
rliterman@0 332 executor = 'local'
rliterman@0 333 cpus = 1
rliterman@0 334 maxForks = 1
rliterman@0 335
rliterman@0 336 input:
rliterman@0 337 val(snpdiffs_paths)
rliterman@0 338
rliterman@0 339 output:
rliterman@0 340 stdout
rliterman@0 341
rliterman@0 342 script:
rliterman@0 343
rliterman@0 344 user_snpdiffs_list.write(snpdiffs_paths.join('\n') + "\n")
rliterman@0 345 """
rliterman@0 346 $params.load_python_module
rliterman@0 347 python ${userSNPDiffs} --snpdiffs_file "${user_snpdiffs_list}" --trim_name "${params.trim_name}"
rliterman@0 348 """
rliterman@0 349 }
rliterman@0 350
rliterman@0 351
rliterman@0 352 // Fetching read data //
rliterman@0 353 workflow fetchQueryReads{
rliterman@0 354
rliterman@0 355 emit:
rliterman@0 356 query_reads
rliterman@0 357
rliterman@0 358 main:
rliterman@0 359
rliterman@0 360 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 361 ("${params.reads}" != "" ? processReads(params.reads,params.readext,params.forward,params.reverse) : Channel.empty()).set{query_reads}
rliterman@0 362 }
rliterman@0 363 workflow fetchRefReads{
rliterman@0 364
rliterman@0 365 emit:
rliterman@0 366 ref_reads
rliterman@0 367
rliterman@0 368 main:
rliterman@0 369
rliterman@0 370 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 371 ("${params.ref_reads}" != "" ? processReads(params.ref_reads,params.ref_readext,params.ref_forward,params.ref_reverse) : Channel.empty()).set{ref_reads}
rliterman@0 372 }
rliterman@0 373 workflow processReads{
rliterman@0 374
rliterman@0 375 take:
rliterman@0 376 read_loc
rliterman@0 377 read_ext
rliterman@0 378 forward
rliterman@0 379 reverse
rliterman@0 380
rliterman@0 381 emit:
rliterman@0 382 read_info
rliterman@0 383
rliterman@0 384 main:
rliterman@0 385
rliterman@0 386 if(read_loc == ""){
rliterman@0 387 error "No data provided to --reads/--ref_reads"
rliterman@0 388 } else{
rliterman@0 389
rliterman@0 390 read_dir = file(read_loc)
rliterman@0 391
rliterman@0 392 // If --reads is a single directory, get all reads from that directory
rliterman@0 393 if(read_dir.isDirectory()){
rliterman@0 394 read_info = fetchReads(read_dir,read_ext,forward,reverse) | splitCsv
rliterman@0 395 }
rliterman@0 396
rliterman@0 397 // If --reads is a file including paths to many directories, process reads from all directories
rliterman@0 398 else if(read_dir.isFile()){
rliterman@0 399 read_info = fetchReads(Channel.from(read_dir.readLines()),read_ext,forward,reverse) | splitCsv
rliterman@0 400 }
rliterman@0 401 // Error if --reads doesn't point to a valid file or directory
rliterman@0 402 else{
rliterman@0 403 error "$read_dir is neither a valid file or directory..."
rliterman@0 404 }
rliterman@0 405 }
rliterman@0 406 }
rliterman@0 407 process fetchReads{
rliterman@0 408
rliterman@0 409 executor = 'local'
rliterman@0 410 cpus = 1
rliterman@0 411 maxForks = 1
rliterman@0 412
rliterman@0 413 input:
rliterman@0 414 val dir // Directory containing read files
rliterman@0 415 val read_ext // Extention for read files (e.g., fastq.gz or fq)
rliterman@0 416 val forward_suffix // Identifier for forward reads (e.g., _1.fastq or _R1_001.fq.gz)
rliterman@0 417 val reverse_suffix // Identifier for reverse reads (e.g., _2.fastq or _R2_001.fq.gz)
rliterman@0 418
rliterman@0 419 output:
rliterman@0 420 stdout
rliterman@0 421
rliterman@0 422 script:
rliterman@0 423
rliterman@0 424 if(!file(dir).isDirectory()){
rliterman@0 425 error "$dir is not a valid directory..."
rliterman@0 426 } else{
rliterman@0 427 """
rliterman@0 428 $params.load_python_module
rliterman@0 429 python ${findReads} --read_dir ${dir} --read_filetype ${read_ext} --forward_suffix ${forward_suffix} --reverse_suffix ${reverse_suffix} --trim_name ${params.trim_name}
rliterman@0 430 """
rliterman@0 431 }
rliterman@0 432 }
rliterman@0 433
rliterman@0 434 // Fetch reference IDs //
rliterman@0 435 workflow processRefIDs{
rliterman@0 436
rliterman@0 437 emit:
rliterman@0 438 ref_ids
rliterman@0 439
rliterman@0 440 main:
rliterman@0 441 def trim_this = "${params.trim_name}"
rliterman@0 442
rliterman@0 443 ref_ids = params.ref_id
rliterman@0 444 .tokenize(',')
rliterman@0 445 .unique()
rliterman@0 446 .collect { it ->
rliterman@0 447 "${it}".replaceAll(trim_this, "")}
rliterman@0 448 .flatten()
rliterman@0 449 }
rliterman@0 450
rliterman@0 451 // Fetch reference IDs //
rliterman@0 452 workflow processExclude{
rliterman@0 453
rliterman@0 454 emit:
rliterman@0 455 exclude_ids
rliterman@0 456
rliterman@0 457 main:
rliterman@0 458 def trim_this = "${params.trim_name}"
rliterman@0 459
rliterman@0 460 exclude_ids = Channel.from(params.exclude
rliterman@0 461 .tokenize(',')
rliterman@0 462 .collect { it -> "${it}".replaceAll(trim_this, "")})
rliterman@0 463 .map{it -> tuple(it.toString(),"Exclude")}
rliterman@0 464 .unique{it -> it[0]}
rliterman@0 465 }
rliterman@0 466
rliterman@0 467 process saveRefIDs{
rliterman@0 468 executor = 'local'
rliterman@0 469 cpus = 1
rliterman@0 470 maxForks = 1
rliterman@0 471
rliterman@0 472 input:
rliterman@0 473 val(ref_ids)
rliterman@0 474
rliterman@0 475 script:
rliterman@0 476 ref_id_file.append(ref_ids.join('\n') + '\n')
rliterman@0 477 """
rliterman@0 478 """
rliterman@0 479 }
rliterman@0 480
rliterman@0 481 // Assembly //
rliterman@0 482 workflow assembleReads{
rliterman@0 483
rliterman@0 484 take:
rliterman@0 485 to_assemble
rliterman@0 486
rliterman@0 487 emit:
rliterman@0 488 assembled_data
rliterman@0 489
rliterman@0 490 main:
rliterman@0 491
rliterman@0 492 // Run SKESA on each entry
rliterman@0 493 assembly_output = skesaAssemble(to_assemble).splitCsv()
rliterman@0 494
rliterman@0 495 // Print log of assemblies
rliterman@0 496 assembly_output.map {it -> it.join("\t")}.collect() | saveAssemblyLog
rliterman@0 497
rliterman@0 498 // Return assembly data
rliterman@0 499 assembled_data = assembly_output.map{it->tuple(it[0],it[3])}
rliterman@0 500 }
rliterman@0 501 process skesaAssemble{
rliterman@28 502 label 'skesaMem'
rliterman@28 503
rliterman@28 504 cpus = skesa_cpus
rliterman@28 505
rliterman@0 506 input:
rliterman@0 507 tuple val(sample_name),val(read_type),val(read_location)
rliterman@0 508
rliterman@0 509 output:
rliterman@0 510 stdout
rliterman@0 511
rliterman@0 512 script:
rliterman@0 513 assembly_file = file("${assembly_directory}/${sample_name}.fasta")
rliterman@0 514
rliterman@0 515 // Ensure folder exists and file doesn't
rliterman@0 516 if(!assembly_directory.isDirectory()){
rliterman@0 517 error "$assembly_directory is not a valid directory..."
rliterman@0 518 } else if(assembly_file.isFile()){
rliterman@0 519 error "$assembly_file already exists..."
rliterman@0 520 } else if(read_type == "Paired"){
rliterman@0 521 forward_reverse = read_location.split(";")
rliterman@0 522 """
rliterman@0 523 $params.load_skesa_module
rliterman@0 524 skesa --cores ${skesa_cpus} --use_paired_ends --fastq ${forward_reverse[0]} ${forward_reverse[1]} --contigs_out ${assembly_file}
rliterman@0 525 echo "${sample_name},${read_type},${read_location},${assembly_file}"
rliterman@0 526 """
rliterman@0 527 } else if(read_type == "Single"){
rliterman@0 528 """
rliterman@0 529 $params.load_skesa_module
rliterman@0 530 skesa --cores ${skesa_cpus} --fastq ${read_location} --contigs_out ${assembly_file}
rliterman@0 531 echo "${sample_name},${read_type},${read_location},${assembly_file}"
rliterman@0 532 """
rliterman@0 533 } else{
rliterman@0 534 error "read_type should be Paired or Single, not $read_type..."
rliterman@0 535 }
rliterman@0 536 }
rliterman@0 537 process saveAssemblyLog{
rliterman@0 538 executor = 'local'
rliterman@0 539 cpus = 1
rliterman@0 540 maxForks = 1
rliterman@0 541
rliterman@0 542 input:
rliterman@0 543 val(assembly_data)
rliterman@0 544
rliterman@0 545 script:
rliterman@0 546 assembly_log.write(assembly_header)
rliterman@0 547 assembly_log.append(assembly_data.join('\n') + '\n')
rliterman@0 548 """
rliterman@0 549 """
rliterman@0 550 }