annotate CSP2/subworkflows/fetchData/main.nf @ 28:893a6993efe3

"planemo upload"
author rliterman
date Wed, 04 Dec 2024 13:48:13 -0500
parents 792274118b2e
children b6ec322b1f05
rev   line source
rliterman@0 1 // Subworkflow to fetch sample and reference data from --fasta/--reads/--ref_fasta/--ref_reads
rliterman@0 2
rliterman@0 3 // Set path variables
rliterman@0 4 output_directory = file(params.output_directory)
rliterman@0 5 assembly_directory = file(params.assembly_directory)
rliterman@0 6 log_directory = file(params.log_directory)
rliterman@0 7
rliterman@0 8 ref_id_file = file(params.ref_id_file)
rliterman@0 9
rliterman@0 10 // Set ref_mode
rliterman@0 11 ref_mode = params.ref_mode
rliterman@0 12
rliterman@0 13 // Set file headers
rliterman@0 14 assembly_header = "Isolate_ID\tRead_Type\tRead_Location\tAssembly_Path\n"
rliterman@0 15
rliterman@0 16 // Set paths to accessory files/scripts
rliterman@0 17 assembly_log = file("${log_directory}/Assembly_Data.tsv")
rliterman@0 18 user_snpdiffs_list = file("${log_directory}/Imported_SNPDiffs.txt")
rliterman@0 19 findReads = file("${projectDir}/bin/fetchReads.py")
rliterman@0 20 userSNPDiffs = file("${projectDir}/bin/userSNPDiffs.py")
rliterman@0 21
rliterman@12 22 // Set SKESA cores to 4 or fewer
rliterman@28 23 //skesa_cpus = (params.cores as Integer) >= 4 ? 4 : params.cores as Integer
rliterman@28 24 skesa_cpus = 1
rliterman@0 25
rliterman@0 26 workflow {
rliterman@0 27 main:
rliterman@0 28 input_data = fetchData()
rliterman@0 29 query_data = input_data.query_data
rliterman@0 30 reference_data = input_data.reference_data
rliterman@0 31 snpdiffs_data = input_data.snpdiff_data
rliterman@0 32
rliterman@0 33 publish:
rliterman@0 34 query_data >> 'query_data.tsv'
rliterman@0 35 reference_data >> 'reference_data.tsv'
rliterman@0 36 snpdiff_data >> 'snpdiff_data.tsv'
rliterman@0 37 }
rliterman@0 38
rliterman@0 39 // Top-level workflow //
rliterman@0 40 workflow fetchData{
rliterman@0 41
rliterman@0 42 emit:
rliterman@0 43 query_data
rliterman@0 44 reference_data
rliterman@0 45 snpdiff_data
rliterman@0 46
rliterman@0 47 main:
rliterman@0 48 // Get any excluded IDs
rliterman@0 49 ("${params.exclude}" != "" ? processExclude() : Channel.empty()).set{exclude_ids}
rliterman@0 50
rliterman@0 51 // Process snpdiffs alignments
rliterman@0 52 // If assembly file cannot be found, it will be 'null'
rliterman@0 53 ("${params.snpdiffs}" != "" ? processSNPDiffs() : Channel.empty()).set{user_snpdiffs}
rliterman@0 54
rliterman@0 55 excluded_snpdiffs = user_snpdiffs.map{it -> tuple(it[1],it[0])}
rliterman@0 56 .concat(user_snpdiffs.map{it -> tuple(it[10],it[0])})
rliterman@0 57 .join(exclude_ids,by:0)
rliterman@0 58 .unique{it -> it[1]}
rliterman@0 59 .map{it -> tuple(it[1],"Exclude")}
rliterman@0 60
rliterman@0 61 // Generate return channel: 3-item tuple (Query_ID, Reference_ID, SNPDiff_Path)
rliterman@0 62 snpdiff_data = user_snpdiffs
rliterman@0 63 .map{it -> tuple(it[0],it[1],it[10])}
rliterman@0 64 .join(excluded_snpdiffs,by:0,remainder:true)
rliterman@0 65 .filter{it -> it[0].toString() != "null"}
rliterman@0 66 .filter{it -> it[3].toString() != "Exclude"}
rliterman@0 67 .unique{it -> it[0]}
rliterman@0 68 .map{it -> tuple(it[1],it[2],it[0])}
rliterman@0 69 .collect().flatten().collate(3)
rliterman@0 70
rliterman@0 71 // Get assembly data from snpdiffs
rliterman@0 72 snpdiff_assemblies = user_snpdiffs.map{it-> tuple(it[1],it[2])}
rliterman@0 73 .concat(user_snpdiffs.map{it-> tuple(it[10],it[11])})
rliterman@0 74 .join(exclude_ids,by:0,remainder:true)
rliterman@0 75 .filter{it -> it[0].toString() != "null"}
rliterman@0 76 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 77 .map{it -> tuple(it[0],it[1],'SNPDiff')}
rliterman@0 78 .collect().flatten().collate(3)
rliterman@0 79
rliterman@0 80 assembled_snpdiffs = snpdiff_assemblies
rliterman@0 81 .filter{it -> it[1].toString() != "null"}
rliterman@0 82 .unique{it->it[0]}.collect().flatten().collate(3)
rliterman@0 83
rliterman@0 84 // Process any data provided as assemblies
rliterman@0 85 // Returns 2-item tuples with the following format: (Isolate_ID, Assembly_Path)
rliterman@0 86 ("${params.fasta}" != "" ? fetchQueryFasta() : Channel.empty()).set{query_fasta}
rliterman@0 87 ("${params.ref_fasta}" != "" ? fetchRefFasta() : Channel.empty()).set{ref_fasta}
rliterman@0 88
rliterman@0 89 pre_assembled = assembled_snpdiffs
rliterman@0 90 .map{it -> tuple(it[0],it[1])}
rliterman@0 91 .concat(query_fasta)
rliterman@0 92 .concat(ref_fasta)
rliterman@0 93 .unique{it -> it[0]}
rliterman@0 94 .join(exclude_ids,by:0,remainder:true)
rliterman@0 95 .filter{it -> it[0].toString() != "null"}
rliterman@0 96 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 97 .map{it->tuple(it[0],it[1])}
rliterman@0 98 .collect().flatten().collate(2)
rliterman@0 99
rliterman@0 100 // Process any data provided as reads
rliterman@0 101 // Returns 3-item tuples with the following format: (Isolate_ID, Read_Type, Read_Path)
rliterman@0 102 ("${params.reads}" != "" ? fetchQueryReads() : Channel.empty()).set{query_reads}
rliterman@0 103 ("${params.ref_reads}" != "" ? fetchRefReads() : Channel.empty()).set{ref_reads}
rliterman@0 104
rliterman@0 105 all_reads = query_reads
rliterman@0 106 .concat(ref_reads)
rliterman@0 107 .unique{it->it[0]}
rliterman@0 108 .join(exclude_ids,by:0,remainder:true)
rliterman@0 109 .filter{it -> it[0].toString() != "null"}
rliterman@0 110 .filter{it -> it[3].toString() != "Exclude"}
rliterman@0 111 .map{it->tuple(it[0],it[1],it[2])}
rliterman@0 112 .collect().flatten().collate(3)
rliterman@0 113
rliterman@0 114 // Figure out if any assembly is necessary
rliterman@0 115 fasta_read_combo = all_reads.join(pre_assembled,by:0,remainder: true) |
rliterman@0 116 branch{it ->
rliterman@0 117 assembly: it[1].toString() == "null"
rliterman@0 118 return(tuple(it[0],it[2]))
rliterman@0 119 read: it[3].toString() == "null"
rliterman@0 120 return(tuple(it[0],it[1],it[2]))
rliterman@0 121 combo: true
rliterman@0 122 return(tuple(it[0],it[3]))}
rliterman@0 123
rliterman@0 124 // Assemble reads if necessary
rliterman@0 125 assembled_reads = fasta_read_combo.read
rliterman@0 126 .collect().flatten().collate(3) | assembleReads
rliterman@0 127
rliterman@0 128 // If runmode is 'assemble', tasks are complete
rliterman@0 129 if(params.runmode == "assemble"){
rliterman@0 130 query_data = Channel.empty()
rliterman@0 131 reference_data = Channel.empty()
rliterman@0 132 } else{
rliterman@0 133
rliterman@0 134 // If FASTAs are provided via data and snpdiffs, use snpdiffs (as it's already been used)
rliterman@0 135 user_fastas = query_fasta
rliterman@0 136 .concat(ref_fasta)
rliterman@0 137 .concat(assembled_reads)
rliterman@0 138 .unique{it -> it[0]}
rliterman@0 139 .join(exclude_ids,by:0,remainder:true)
rliterman@0 140 .filter{it -> it[0].toString() != "null"}
rliterman@0 141 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 142 .map{it->tuple(it[0],it[1],'User')}
rliterman@0 143 .collect().flatten().collate(3)
rliterman@0 144 .join(assembled_snpdiffs,by:0,remainder:true)
rliterman@0 145 .filter{it -> it[3].toString() == "null"}
rliterman@0 146 .map{it->tuple(it[0],it[1])}
rliterman@0 147
rliterman@0 148 // Get all assemblies
rliterman@0 149 all_assembled = assembled_snpdiffs
rliterman@0 150 .map{it -> tuple(it[0],it[1])}
rliterman@0 151 .concat(user_fastas)
rliterman@0 152 .unique{it->it[0]}.collect().flatten().collate(2)
rliterman@0 153
rliterman@0 154 // Get data for isolates where a SNPDiff was provided, but no FASTA could be located
rliterman@0 155 no_assembly = snpdiff_assemblies
rliterman@0 156 .map{it -> tuple(it[0],it[1])}
rliterman@0 157 .filter{it -> it[1].toString() == "null"}
rliterman@0 158 .unique{it -> it[0]}
rliterman@0 159 .join(all_assembled,by:0,remainder:true)
rliterman@0 160 .filter{it -> it[2].toString() == "null"}
rliterman@0 161 .map{it->tuple(it[0],it[1])}
rliterman@0 162 .collect().flatten().collate(2)
rliterman@0 163
rliterman@0 164 // Compile all samples
rliterman@0 165 all_samples = all_assembled
rliterman@0 166 .concat(no_assembly)
rliterman@0 167 .unique{it-> it[0]}.collect().flatten().collate(2)
rliterman@0 168
rliterman@0 169 // If no reference data is provided return a blank channel
rliterman@0 170 if(!ref_mode){
rliterman@0 171 reference_data = Channel.empty()
rliterman@0 172
rliterman@0 173 query_data = all_samples
rliterman@0 174 .unique{it -> it[0]}
rliterman@0 175 .collect().flatten().collate(2)
rliterman@0 176
rliterman@0 177 } else{
rliterman@0 178
rliterman@0 179 // Process additional reference IDs
rliterman@0 180 ("${params.ref_id}" != "" ? processRefIDs() : Channel.empty()).set{user_ref_ids}
rliterman@0 181
rliterman@0 182 all_ref_ids = ref_fasta.map{it->tuple(it[0])}
rliterman@0 183 .concat(ref_reads.map{it->tuple(it[0])})
rliterman@0 184 .concat(user_ref_ids)
rliterman@0 185 .unique{it-> it[0]}.collect().flatten().collate(1)
rliterman@0 186 .map{it -> tuple(it[0],"Reference")}
rliterman@0 187 .join(exclude_ids,by:0,remainder:true)
rliterman@0 188 .filter{it -> it[0].toString() != "null"}
rliterman@0 189 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 190 .map{it -> tuple(it[0],it[1])}
rliterman@0 191
rliterman@0 192 reference_data = all_samples
rliterman@0 193 .join(all_ref_ids,by:0,remainder:true)
rliterman@0 194 .filter{it -> it[2].toString() == "Reference"}
rliterman@0 195 .map{it->tuple(it[0],it[1])}
rliterman@0 196 .unique{it -> it[0]}
rliterman@0 197 .collect().flatten().collate(2)
rliterman@0 198
rliterman@0 199 // Save reference data to file
rliterman@0 200 reference_data
rliterman@0 201 .collect{it -> it[0]}
rliterman@0 202 | saveRefIDs
rliterman@0 203
rliterman@0 204 if(params.runmode == "screen" || params.runmode == "align"){
rliterman@0 205 query_data = all_samples
rliterman@0 206 .join(all_ref_ids,by:0,remainder:true)
rliterman@0 207 .filter{it -> it[2].toString() != "Reference"}
rliterman@0 208 .map{it->tuple(it[0],it[1])}
rliterman@0 209 .unique{it -> it[0]}
rliterman@0 210 .collect().flatten().collate(2)
rliterman@0 211 } else if(params.runmode == "snp"){
rliterman@0 212 query_data = all_samples
rliterman@0 213 .unique{it -> it[0]}
rliterman@0 214 .collect().flatten().collate(2)
rliterman@0 215 }
rliterman@0 216 }
rliterman@0 217 }
rliterman@0 218 }
rliterman@0 219
rliterman@0 220 // Fetching preassembled data //
rliterman@0 221 workflow fetchQueryFasta{
rliterman@0 222
rliterman@0 223 emit:
rliterman@0 224 query_fasta
rliterman@0 225
rliterman@0 226 main:
rliterman@0 227
rliterman@0 228 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 229 ("${params.fasta}" != "" ? getAssemblies(params.fasta) : Channel.empty()).set{query_fasta}
rliterman@0 230 }
rliterman@0 231 workflow fetchRefFasta{
rliterman@0 232
rliterman@0 233 emit:
rliterman@0 234 ref_fasta
rliterman@0 235
rliterman@0 236 main:
rliterman@0 237
rliterman@0 238 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 239 ("${params.ref_fasta}" != "" ? getAssemblies(params.ref_fasta) : Channel.empty()).set{ref_fasta}
rliterman@0 240 }
rliterman@0 241 workflow getAssemblies{
rliterman@0 242
rliterman@0 243 take:
rliterman@0 244 fasta_loc
rliterman@0 245
rliterman@0 246 emit:
rliterman@0 247 fasta_data
rliterman@0 248
rliterman@0 249 main:
rliterman@0 250 def trim_this = "${params.trim_name}"
rliterman@0 251
rliterman@0 252 if(fasta_loc == ""){
rliterman@0 253 error "No assembly data provided via --fasta/--ref_fasta"
rliterman@0 254 } else{
rliterman@0 255
rliterman@0 256 fasta_dir = file(fasta_loc)
rliterman@0 257
rliterman@0 258 // If --fasta is a directory...
rliterman@0 259 if(fasta_dir.isDirectory()){
rliterman@0 260 ch_fasta = Channel.fromPath(["${fasta_dir}/*.fa","${fasta_dir}/*.fasta","${fasta_dir}/*.fna"])
rliterman@0 261 }
rliterman@0 262 // If --fasta is a file...
rliterman@0 263 else if(fasta_dir.isFile()){
rliterman@0 264
rliterman@0 265 // Check if it is a single fasta file...
rliterman@0 266 if(fasta_dir.getExtension() == "fa" || fasta_dir.getExtension() == "fna" || fasta_dir.getExtension() == "fasta"){
rliterman@0 267 ch_fasta = Channel.from(fasta_dir).map{it-> file(it)}
rliterman@0 268 }
rliterman@0 269 // Otherwise, assume a file with paths to FASTAs
rliterman@0 270 else{
rliterman@0 271 ch_fasta = Channel.from(fasta_dir.readLines()).filter{ file -> file =~ /\.(fa|fasta|fna)$/}.map{it-> file(it)}
rliterman@0 272 }
rliterman@0 273 } else{
rliterman@0 274 error "$fasta_dir is not a valid directory or file..."
rliterman@0 275 }
rliterman@27 276
rliterman@0 277 fasta_data = ch_fasta
rliterman@27 278 .map { filePath ->
rliterman@27 279 if (!file(filePath).exists()) { error "$filePath is not a valid directory or file..." }
rliterman@27 280 return filePath }
rliterman@0 281 .map { filePath ->
rliterman@0 282 def fileName = file(filePath).getBaseName()
rliterman@0 283 def sampleName = fileName.replaceAll(trim_this, "")
rliterman@27 284 tuple(sampleName, filePath)
rliterman@27 285 }
rliterman@0 286 }
rliterman@0 287 }
rliterman@0 288 workflow processSNPDiffs{
rliterman@0 289
rliterman@0 290 emit:
rliterman@0 291 snpdiffs_data
rliterman@0 292
rliterman@0 293 main:
rliterman@0 294
rliterman@0 295 if("${params.snpdiffs}" == ""){
rliterman@0 296 error "No assembly data provided via --snpdiffs"
rliterman@0 297 } else{
rliterman@0 298
rliterman@0 299 snpdiffs_dir = file("${params.snpdiffs}")
rliterman@0 300
rliterman@0 301 // If --fasta is a directory...
rliterman@0 302 if(snpdiffs_dir.isDirectory()){
rliterman@0 303 ch_snpdiffs = Channel.fromPath("${snpdiffs_dir}/*.snpdiffs")
rliterman@0 304 }
rliterman@0 305 // If --fasta is a file...
rliterman@0 306 else if(snpdiffs_dir.isFile()){
rliterman@0 307
rliterman@0 308 // Check if it is a single fasta file...
rliterman@0 309 if(snpdiffs_dir.getExtension() == "snpdiffs"){
rliterman@0 310 ch_snpdiffs = Channel.from(snpdiffs_dir)
rliterman@0 311 }
rliterman@0 312 // Otherwise, assume a file with paths to SNPDiffs
rliterman@0 313 else{
rliterman@0 314 ch_snpdiffs = Channel.from(snpdiffs_dir.readLines()).filter{it->it.endsWith('.snpdiffs') }
rliterman@0 315 }
rliterman@0 316 } else{
rliterman@0 317 error "$snpdiffs_dir is not a valid directory or file..."
rliterman@0 318 }
rliterman@0 319
rliterman@0 320 snpdiffs_data = ch_snpdiffs
rliterman@27 321 .map { filePath ->
rliterman@27 322 if (!file(filePath).exists()) { error "$filePath is not a valid directory or file..." }
rliterman@27 323 return filePath }
rliterman@27 324 .collect() | getSNPDiffsData | splitCsv | collect | flatten | collate(19)
rliterman@0 325
rliterman@0 326 // (1) SNPDiffs_File, (2) Query_ID, (3) Query_Assembly, (4) Query_Contig_Count, (5) Query_Assembly_Bases,
rliterman@0 327 // (6) Query_N50, (7) Query_N90, (8) Query_L50, (9) Query_L90, (10) Query_SHA256,
rliterman@0 328 // (11) Reference_ID, (12) Reference_Assembly, (13) Reference_Contig_Count, (14) Reference_Assembly_Bases,
rliterman@0 329 // (15) Reference_N50, (16) Reference_N90, (17) Reference_L50, (18) Reference_L90, (19) Reference_SHA256
rliterman@0 330 }
rliterman@0 331 }
rliterman@0 332 process getSNPDiffsData{
rliterman@0 333 executor = 'local'
rliterman@0 334 cpus = 1
rliterman@0 335 maxForks = 1
rliterman@0 336
rliterman@0 337 input:
rliterman@0 338 val(snpdiffs_paths)
rliterman@0 339
rliterman@0 340 output:
rliterman@0 341 stdout
rliterman@0 342
rliterman@0 343 script:
rliterman@0 344
rliterman@0 345 user_snpdiffs_list.write(snpdiffs_paths.join('\n') + "\n")
rliterman@0 346 """
rliterman@0 347 $params.load_python_module
rliterman@0 348 python ${userSNPDiffs} --snpdiffs_file "${user_snpdiffs_list}" --trim_name "${params.trim_name}"
rliterman@0 349 """
rliterman@0 350 }
rliterman@0 351
rliterman@0 352
rliterman@0 353 // Fetching read data //
rliterman@0 354 workflow fetchQueryReads{
rliterman@0 355
rliterman@0 356 emit:
rliterman@0 357 query_reads
rliterman@0 358
rliterman@0 359 main:
rliterman@0 360
rliterman@0 361 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 362 ("${params.reads}" != "" ? processReads(params.reads,params.readext,params.forward,params.reverse) : Channel.empty()).set{query_reads}
rliterman@0 363 }
rliterman@0 364 workflow fetchRefReads{
rliterman@0 365
rliterman@0 366 emit:
rliterman@0 367 ref_reads
rliterman@0 368
rliterman@0 369 main:
rliterman@0 370
rliterman@0 371 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 372 ("${params.ref_reads}" != "" ? processReads(params.ref_reads,params.ref_readext,params.ref_forward,params.ref_reverse) : Channel.empty()).set{ref_reads}
rliterman@0 373 }
rliterman@0 374 workflow processReads{
rliterman@0 375
rliterman@0 376 take:
rliterman@0 377 read_loc
rliterman@0 378 read_ext
rliterman@0 379 forward
rliterman@0 380 reverse
rliterman@0 381
rliterman@0 382 emit:
rliterman@0 383 read_info
rliterman@0 384
rliterman@0 385 main:
rliterman@0 386
rliterman@0 387 if(read_loc == ""){
rliterman@0 388 error "No data provided to --reads/--ref_reads"
rliterman@0 389 } else{
rliterman@0 390
rliterman@0 391 read_dir = file(read_loc)
rliterman@0 392
rliterman@0 393 // If --reads is a single directory, get all reads from that directory
rliterman@0 394 if(read_dir.isDirectory()){
rliterman@0 395 read_info = fetchReads(read_dir,read_ext,forward,reverse) | splitCsv
rliterman@0 396 }
rliterman@0 397
rliterman@0 398 // If --reads is a file including paths to many directories, process reads from all directories
rliterman@0 399 else if(read_dir.isFile()){
rliterman@0 400 read_info = fetchReads(Channel.from(read_dir.readLines()),read_ext,forward,reverse) | splitCsv
rliterman@0 401 }
rliterman@0 402 // Error if --reads doesn't point to a valid file or directory
rliterman@0 403 else{
rliterman@0 404 error "$read_dir is neither a valid file or directory..."
rliterman@0 405 }
rliterman@0 406 }
rliterman@0 407 }
rliterman@0 408 process fetchReads{
rliterman@0 409
rliterman@0 410 executor = 'local'
rliterman@0 411 cpus = 1
rliterman@0 412 maxForks = 1
rliterman@0 413
rliterman@0 414 input:
rliterman@0 415 val dir // Directory containing read files
rliterman@0 416 val read_ext // Extention for read files (e.g., fastq.gz or fq)
rliterman@0 417 val forward_suffix // Identifier for forward reads (e.g., _1.fastq or _R1_001.fq.gz)
rliterman@0 418 val reverse_suffix // Identifier for reverse reads (e.g., _2.fastq or _R2_001.fq.gz)
rliterman@0 419
rliterman@0 420 output:
rliterman@0 421 stdout
rliterman@0 422
rliterman@0 423 script:
rliterman@0 424
rliterman@0 425 if(!file(dir).isDirectory()){
rliterman@0 426 error "$dir is not a valid directory..."
rliterman@0 427 } else{
rliterman@0 428 """
rliterman@0 429 $params.load_python_module
rliterman@0 430 python ${findReads} --read_dir ${dir} --read_filetype ${read_ext} --forward_suffix ${forward_suffix} --reverse_suffix ${reverse_suffix} --trim_name ${params.trim_name}
rliterman@0 431 """
rliterman@0 432 }
rliterman@0 433 }
rliterman@0 434
rliterman@0 435 // Fetch reference IDs //
rliterman@0 436 workflow processRefIDs{
rliterman@0 437
rliterman@0 438 emit:
rliterman@0 439 ref_ids
rliterman@0 440
rliterman@0 441 main:
rliterman@0 442 def trim_this = "${params.trim_name}"
rliterman@0 443
rliterman@0 444 ref_ids = params.ref_id
rliterman@0 445 .tokenize(',')
rliterman@0 446 .unique()
rliterman@0 447 .collect { it ->
rliterman@0 448 "${it}".replaceAll(trim_this, "")}
rliterman@0 449 .flatten()
rliterman@0 450 }
rliterman@0 451
rliterman@0 452 // Fetch reference IDs //
rliterman@0 453 workflow processExclude{
rliterman@0 454
rliterman@0 455 emit:
rliterman@0 456 exclude_ids
rliterman@0 457
rliterman@0 458 main:
rliterman@0 459 def trim_this = "${params.trim_name}"
rliterman@0 460
rliterman@0 461 exclude_ids = Channel.from(params.exclude
rliterman@0 462 .tokenize(',')
rliterman@0 463 .collect { it -> "${it}".replaceAll(trim_this, "")})
rliterman@0 464 .map{it -> tuple(it.toString(),"Exclude")}
rliterman@0 465 .unique{it -> it[0]}
rliterman@0 466 }
rliterman@0 467
rliterman@0 468 process saveRefIDs{
rliterman@0 469 executor = 'local'
rliterman@0 470 cpus = 1
rliterman@0 471 maxForks = 1
rliterman@0 472
rliterman@0 473 input:
rliterman@0 474 val(ref_ids)
rliterman@0 475
rliterman@0 476 script:
rliterman@0 477 ref_id_file.append(ref_ids.join('\n') + '\n')
rliterman@0 478 """
rliterman@0 479 """
rliterman@0 480 }
rliterman@0 481
rliterman@0 482 // Assembly //
rliterman@0 483 workflow assembleReads{
rliterman@0 484
rliterman@0 485 take:
rliterman@0 486 to_assemble
rliterman@0 487
rliterman@0 488 emit:
rliterman@0 489 assembled_data
rliterman@0 490
rliterman@0 491 main:
rliterman@0 492
rliterman@0 493 // Run SKESA on each entry
rliterman@0 494 assembly_output = skesaAssemble(to_assemble).splitCsv()
rliterman@0 495
rliterman@0 496 // Print log of assemblies
rliterman@0 497 assembly_output.map {it -> it.join("\t")}.collect() | saveAssemblyLog
rliterman@0 498
rliterman@0 499 // Return assembly data
rliterman@0 500 assembled_data = assembly_output.map{it->tuple(it[0],it[3])}
rliterman@0 501 }
rliterman@0 502 process skesaAssemble{
rliterman@28 503 label 'skesaMem'
rliterman@28 504
rliterman@28 505 cpus = skesa_cpus
rliterman@28 506
rliterman@0 507 input:
rliterman@0 508 tuple val(sample_name),val(read_type),val(read_location)
rliterman@0 509
rliterman@0 510 output:
rliterman@0 511 stdout
rliterman@0 512
rliterman@0 513 script:
rliterman@0 514 assembly_file = file("${assembly_directory}/${sample_name}.fasta")
rliterman@0 515
rliterman@0 516 // Ensure folder exists and file doesn't
rliterman@0 517 if(!assembly_directory.isDirectory()){
rliterman@0 518 error "$assembly_directory is not a valid directory..."
rliterman@0 519 } else if(assembly_file.isFile()){
rliterman@0 520 error "$assembly_file already exists..."
rliterman@0 521 } else if(read_type == "Paired"){
rliterman@0 522 forward_reverse = read_location.split(";")
rliterman@0 523 """
rliterman@0 524 $params.load_skesa_module
rliterman@0 525 skesa --cores ${skesa_cpus} --use_paired_ends --fastq ${forward_reverse[0]} ${forward_reverse[1]} --contigs_out ${assembly_file}
rliterman@0 526 echo "${sample_name},${read_type},${read_location},${assembly_file}"
rliterman@0 527 """
rliterman@0 528 } else if(read_type == "Single"){
rliterman@0 529 """
rliterman@0 530 $params.load_skesa_module
rliterman@0 531 skesa --cores ${skesa_cpus} --fastq ${read_location} --contigs_out ${assembly_file}
rliterman@0 532 echo "${sample_name},${read_type},${read_location},${assembly_file}"
rliterman@0 533 """
rliterman@0 534 } else{
rliterman@0 535 error "read_type should be Paired or Single, not $read_type..."
rliterman@0 536 }
rliterman@0 537 }
rliterman@0 538 process saveAssemblyLog{
rliterman@0 539 executor = 'local'
rliterman@0 540 cpus = 1
rliterman@0 541 maxForks = 1
rliterman@0 542
rliterman@0 543 input:
rliterman@0 544 val(assembly_data)
rliterman@0 545
rliterman@0 546 script:
rliterman@0 547 assembly_log.write(assembly_header)
rliterman@0 548 assembly_log.append(assembly_data.join('\n') + '\n')
rliterman@0 549 """
rliterman@0 550 """
rliterman@0 551 }