annotate CSP2/subworkflows/fetchData/main.nf @ 18:eda1a8a50746

"planemo upload"
author rliterman
date Tue, 03 Dec 2024 12:44:32 -0500
parents 0d775868ee62
children 3d631131486f
rev   line source
rliterman@0 1 // Subworkflow to fetch sample and reference data from --fasta/--reads/--ref_fasta/--ref_reads
rliterman@0 2
rliterman@0 3 // Set path variables
rliterman@0 4 output_directory = file(params.output_directory)
rliterman@0 5 assembly_directory = file(params.assembly_directory)
rliterman@0 6 log_directory = file(params.log_directory)
rliterman@0 7
rliterman@0 8 ref_id_file = file(params.ref_id_file)
rliterman@0 9
rliterman@0 10 // Set ref_mode
rliterman@0 11 ref_mode = params.ref_mode
rliterman@0 12
rliterman@0 13 // Set file headers
rliterman@0 14 assembly_header = "Isolate_ID\tRead_Type\tRead_Location\tAssembly_Path\n"
rliterman@0 15
rliterman@0 16 // Set paths to accessory files/scripts
rliterman@0 17 assembly_log = file("${log_directory}/Assembly_Data.tsv")
rliterman@0 18 user_snpdiffs_list = file("${log_directory}/Imported_SNPDiffs.txt")
rliterman@0 19 findReads = file("${projectDir}/bin/fetchReads.py")
rliterman@0 20 userSNPDiffs = file("${projectDir}/bin/userSNPDiffs.py")
rliterman@0 21
rliterman@12 22 // Set SKESA cores to 4 or fewer
rliterman@2 23 skesa_cpus = (params.cores as Integer) >= 4 ? 4 : (params.cores as Integer)
rliterman@0 24
rliterman@0 25 workflow {
rliterman@0 26 main:
rliterman@0 27 input_data = fetchData()
rliterman@0 28 query_data = input_data.query_data
rliterman@0 29 reference_data = input_data.reference_data
rliterman@0 30 snpdiffs_data = input_data.snpdiff_data
rliterman@0 31
rliterman@0 32 publish:
rliterman@0 33 query_data >> 'query_data.tsv'
rliterman@0 34 reference_data >> 'reference_data.tsv'
rliterman@0 35 snpdiff_data >> 'snpdiff_data.tsv'
rliterman@0 36 }
rliterman@0 37
rliterman@0 38 // Top-level workflow //
rliterman@0 39 workflow fetchData{
rliterman@0 40
rliterman@0 41 emit:
rliterman@0 42 query_data
rliterman@0 43 reference_data
rliterman@0 44 snpdiff_data
rliterman@0 45
rliterman@0 46 main:
rliterman@0 47 // Get any excluded IDs
rliterman@0 48 ("${params.exclude}" != "" ? processExclude() : Channel.empty()).set{exclude_ids}
rliterman@0 49
rliterman@0 50 // Process snpdiffs alignments
rliterman@0 51 // If assembly file cannot be found, it will be 'null'
rliterman@0 52 ("${params.snpdiffs}" != "" ? processSNPDiffs() : Channel.empty()).set{user_snpdiffs}
rliterman@0 53
rliterman@0 54 excluded_snpdiffs = user_snpdiffs.map{it -> tuple(it[1],it[0])}
rliterman@0 55 .concat(user_snpdiffs.map{it -> tuple(it[10],it[0])})
rliterman@0 56 .join(exclude_ids,by:0)
rliterman@0 57 .unique{it -> it[1]}
rliterman@0 58 .map{it -> tuple(it[1],"Exclude")}
rliterman@0 59
rliterman@0 60 // Generate return channel: 3-item tuple (Query_ID, Reference_ID, SNPDiff_Path)
rliterman@0 61 snpdiff_data = user_snpdiffs
rliterman@0 62 .map{it -> tuple(it[0],it[1],it[10])}
rliterman@0 63 .join(excluded_snpdiffs,by:0,remainder:true)
rliterman@0 64 .filter{it -> it[0].toString() != "null"}
rliterman@0 65 .filter{it -> it[3].toString() != "Exclude"}
rliterman@0 66 .unique{it -> it[0]}
rliterman@0 67 .map{it -> tuple(it[1],it[2],it[0])}
rliterman@0 68 .collect().flatten().collate(3)
rliterman@0 69
rliterman@0 70 // Get assembly data from snpdiffs
rliterman@0 71 snpdiff_assemblies = user_snpdiffs.map{it-> tuple(it[1],it[2])}
rliterman@0 72 .concat(user_snpdiffs.map{it-> tuple(it[10],it[11])})
rliterman@0 73 .join(exclude_ids,by:0,remainder:true)
rliterman@0 74 .filter{it -> it[0].toString() != "null"}
rliterman@0 75 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 76 .map{it -> tuple(it[0],it[1],'SNPDiff')}
rliterman@0 77 .collect().flatten().collate(3)
rliterman@0 78
rliterman@0 79 assembled_snpdiffs = snpdiff_assemblies
rliterman@0 80 .filter{it -> it[1].toString() != "null"}
rliterman@0 81 .unique{it->it[0]}.collect().flatten().collate(3)
rliterman@0 82
rliterman@0 83 // Process any data provided as assemblies
rliterman@0 84 // Returns 2-item tuples with the following format: (Isolate_ID, Assembly_Path)
rliterman@0 85 ("${params.fasta}" != "" ? fetchQueryFasta() : Channel.empty()).set{query_fasta}
rliterman@0 86 ("${params.ref_fasta}" != "" ? fetchRefFasta() : Channel.empty()).set{ref_fasta}
rliterman@0 87
rliterman@0 88 pre_assembled = assembled_snpdiffs
rliterman@0 89 .map{it -> tuple(it[0],it[1])}
rliterman@0 90 .concat(query_fasta)
rliterman@0 91 .concat(ref_fasta)
rliterman@0 92 .unique{it -> it[0]}
rliterman@0 93 .join(exclude_ids,by:0,remainder:true)
rliterman@0 94 .filter{it -> it[0].toString() != "null"}
rliterman@0 95 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 96 .map{it->tuple(it[0],it[1])}
rliterman@0 97 .collect().flatten().collate(2)
rliterman@0 98
rliterman@0 99 // Process any data provided as reads
rliterman@0 100 // Returns 3-item tuples with the following format: (Isolate_ID, Read_Type, Read_Path)
rliterman@0 101 ("${params.reads}" != "" ? fetchQueryReads() : Channel.empty()).set{query_reads}
rliterman@0 102 ("${params.ref_reads}" != "" ? fetchRefReads() : Channel.empty()).set{ref_reads}
rliterman@0 103
rliterman@0 104 all_reads = query_reads
rliterman@0 105 .concat(ref_reads)
rliterman@0 106 .unique{it->it[0]}
rliterman@0 107 .join(exclude_ids,by:0,remainder:true)
rliterman@0 108 .filter{it -> it[0].toString() != "null"}
rliterman@0 109 .filter{it -> it[3].toString() != "Exclude"}
rliterman@0 110 .map{it->tuple(it[0],it[1],it[2])}
rliterman@0 111 .collect().flatten().collate(3)
rliterman@0 112
rliterman@0 113 // Figure out if any assembly is necessary
rliterman@0 114 fasta_read_combo = all_reads.join(pre_assembled,by:0,remainder: true) |
rliterman@0 115 branch{it ->
rliterman@0 116 assembly: it[1].toString() == "null"
rliterman@0 117 return(tuple(it[0],it[2]))
rliterman@0 118 read: it[3].toString() == "null"
rliterman@0 119 return(tuple(it[0],it[1],it[2]))
rliterman@0 120 combo: true
rliterman@0 121 return(tuple(it[0],it[3]))}
rliterman@0 122
rliterman@0 123 // Assemble reads if necessary
rliterman@0 124 assembled_reads = fasta_read_combo.read
rliterman@0 125 .collect().flatten().collate(3) | assembleReads
rliterman@0 126
rliterman@0 127 // If runmode is 'assemble', tasks are complete
rliterman@0 128 if(params.runmode == "assemble"){
rliterman@0 129 query_data = Channel.empty()
rliterman@0 130 reference_data = Channel.empty()
rliterman@0 131 } else{
rliterman@0 132
rliterman@0 133 // If FASTAs are provided via data and snpdiffs, use snpdiffs (as it's already been used)
rliterman@0 134 user_fastas = query_fasta
rliterman@0 135 .concat(ref_fasta)
rliterman@0 136 .concat(assembled_reads)
rliterman@0 137 .unique{it -> it[0]}
rliterman@0 138 .join(exclude_ids,by:0,remainder:true)
rliterman@0 139 .filter{it -> it[0].toString() != "null"}
rliterman@0 140 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 141 .map{it->tuple(it[0],it[1],'User')}
rliterman@0 142 .collect().flatten().collate(3)
rliterman@0 143 .join(assembled_snpdiffs,by:0,remainder:true)
rliterman@0 144 .filter{it -> it[3].toString() == "null"}
rliterman@0 145 .map{it->tuple(it[0],it[1])}
rliterman@0 146
rliterman@0 147 // Get all assemblies
rliterman@0 148 all_assembled = assembled_snpdiffs
rliterman@0 149 .map{it -> tuple(it[0],it[1])}
rliterman@0 150 .concat(user_fastas)
rliterman@0 151 .unique{it->it[0]}.collect().flatten().collate(2)
rliterman@0 152
rliterman@0 153 // Get data for isolates where a SNPDiff was provided, but no FASTA could be located
rliterman@0 154 no_assembly = snpdiff_assemblies
rliterman@0 155 .map{it -> tuple(it[0],it[1])}
rliterman@0 156 .filter{it -> it[1].toString() == "null"}
rliterman@0 157 .unique{it -> it[0]}
rliterman@0 158 .join(all_assembled,by:0,remainder:true)
rliterman@0 159 .filter{it -> it[2].toString() == "null"}
rliterman@0 160 .map{it->tuple(it[0],it[1])}
rliterman@0 161 .collect().flatten().collate(2)
rliterman@0 162
rliterman@0 163 // Compile all samples
rliterman@0 164 all_samples = all_assembled
rliterman@0 165 .concat(no_assembly)
rliterman@0 166 .unique{it-> it[0]}.collect().flatten().collate(2)
rliterman@0 167
rliterman@0 168 // If no reference data is provided return a blank channel
rliterman@0 169 if(!ref_mode){
rliterman@0 170 reference_data = Channel.empty()
rliterman@0 171
rliterman@0 172 query_data = all_samples
rliterman@0 173 .unique{it -> it[0]}
rliterman@0 174 .collect().flatten().collate(2)
rliterman@0 175
rliterman@0 176 } else{
rliterman@0 177
rliterman@0 178 // Process additional reference IDs
rliterman@0 179 ("${params.ref_id}" != "" ? processRefIDs() : Channel.empty()).set{user_ref_ids}
rliterman@0 180
rliterman@0 181 all_ref_ids = ref_fasta.map{it->tuple(it[0])}
rliterman@0 182 .concat(ref_reads.map{it->tuple(it[0])})
rliterman@0 183 .concat(user_ref_ids)
rliterman@0 184 .unique{it-> it[0]}.collect().flatten().collate(1)
rliterman@0 185 .map{it -> tuple(it[0],"Reference")}
rliterman@0 186 .join(exclude_ids,by:0,remainder:true)
rliterman@0 187 .filter{it -> it[0].toString() != "null"}
rliterman@0 188 .filter{it -> it[2].toString() != "Exclude"}
rliterman@0 189 .map{it -> tuple(it[0],it[1])}
rliterman@0 190
rliterman@0 191 reference_data = all_samples
rliterman@0 192 .join(all_ref_ids,by:0,remainder:true)
rliterman@0 193 .filter{it -> it[2].toString() == "Reference"}
rliterman@0 194 .map{it->tuple(it[0],it[1])}
rliterman@0 195 .unique{it -> it[0]}
rliterman@0 196 .collect().flatten().collate(2)
rliterman@0 197
rliterman@0 198 // Save reference data to file
rliterman@0 199 reference_data
rliterman@0 200 .collect{it -> it[0]}
rliterman@0 201 | saveRefIDs
rliterman@0 202
rliterman@0 203 if(params.runmode == "screen" || params.runmode == "align"){
rliterman@0 204 query_data = all_samples
rliterman@0 205 .join(all_ref_ids,by:0,remainder:true)
rliterman@0 206 .filter{it -> it[2].toString() != "Reference"}
rliterman@0 207 .map{it->tuple(it[0],it[1])}
rliterman@0 208 .unique{it -> it[0]}
rliterman@0 209 .collect().flatten().collate(2)
rliterman@0 210 } else if(params.runmode == "snp"){
rliterman@0 211 query_data = all_samples
rliterman@0 212 .unique{it -> it[0]}
rliterman@0 213 .collect().flatten().collate(2)
rliterman@0 214 }
rliterman@0 215 }
rliterman@0 216 }
rliterman@0 217 }
rliterman@0 218
rliterman@0 219 // Fetching preassembled data //
rliterman@0 220 workflow fetchQueryFasta{
rliterman@0 221
rliterman@0 222 emit:
rliterman@0 223 query_fasta
rliterman@0 224
rliterman@0 225 main:
rliterman@0 226
rliterman@0 227 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 228 ("${params.fasta}" != "" ? getAssemblies(params.fasta) : Channel.empty()).set{query_fasta}
rliterman@0 229 }
rliterman@0 230 workflow fetchRefFasta{
rliterman@0 231
rliterman@0 232 emit:
rliterman@0 233 ref_fasta
rliterman@0 234
rliterman@0 235 main:
rliterman@0 236
rliterman@0 237 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 238 ("${params.ref_fasta}" != "" ? getAssemblies(params.ref_fasta) : Channel.empty()).set{ref_fasta}
rliterman@0 239 }
rliterman@0 240 workflow getAssemblies{
rliterman@0 241
rliterman@0 242 take:
rliterman@0 243 fasta_loc
rliterman@0 244
rliterman@0 245 emit:
rliterman@0 246 fasta_data
rliterman@0 247
rliterman@0 248 main:
rliterman@0 249 def trim_this = "${params.trim_name}"
rliterman@0 250
rliterman@0 251 if(fasta_loc == ""){
rliterman@0 252 error "No assembly data provided via --fasta/--ref_fasta"
rliterman@0 253 } else{
rliterman@0 254
rliterman@0 255 fasta_dir = file(fasta_loc)
rliterman@0 256
rliterman@0 257 // If --fasta is a directory...
rliterman@0 258 if(fasta_dir.isDirectory()){
rliterman@0 259 ch_fasta = Channel.fromPath(["${fasta_dir}/*.fa","${fasta_dir}/*.fasta","${fasta_dir}/*.fna"])
rliterman@0 260 }
rliterman@0 261 // If --fasta is a file...
rliterman@0 262 else if(fasta_dir.isFile()){
rliterman@0 263
rliterman@0 264 // Check if it is a single fasta file...
rliterman@0 265 if(fasta_dir.getExtension() == "fa" || fasta_dir.getExtension() == "fna" || fasta_dir.getExtension() == "fasta"){
rliterman@0 266 ch_fasta = Channel.from(fasta_dir).map{it-> file(it)}
rliterman@0 267 }
rliterman@0 268 // Otherwise, assume a file with paths to FASTAs
rliterman@0 269 else{
rliterman@0 270 ch_fasta = Channel.from(fasta_dir.readLines()).filter{ file -> file =~ /\.(fa|fasta|fna)$/}.map{it-> file(it)}
rliterman@0 271 }
rliterman@0 272 } else{
rliterman@0 273 error "$fasta_dir is not a valid directory or file..."
rliterman@0 274 }
rliterman@0 275 fasta_data = ch_fasta
rliterman@0 276 .filter { file(it).exists() }
rliterman@0 277 .map { filePath ->
rliterman@0 278 def fileName = file(filePath).getBaseName()
rliterman@0 279 def sampleName = fileName.replaceAll(trim_this, "")
rliterman@0 280 tuple(sampleName, filePath)}
rliterman@0 281 }
rliterman@0 282 }
rliterman@0 283 workflow processSNPDiffs{
rliterman@0 284
rliterman@0 285 emit:
rliterman@0 286 snpdiffs_data
rliterman@0 287
rliterman@0 288 main:
rliterman@0 289
rliterman@0 290 if("${params.snpdiffs}" == ""){
rliterman@0 291 error "No assembly data provided via --snpdiffs"
rliterman@0 292 } else{
rliterman@0 293
rliterman@0 294 snpdiffs_dir = file("${params.snpdiffs}")
rliterman@0 295
rliterman@0 296 // If --fasta is a directory...
rliterman@0 297 if(snpdiffs_dir.isDirectory()){
rliterman@0 298 ch_snpdiffs = Channel.fromPath("${snpdiffs_dir}/*.snpdiffs")
rliterman@0 299 }
rliterman@0 300 // If --fasta is a file...
rliterman@0 301 else if(snpdiffs_dir.isFile()){
rliterman@0 302
rliterman@0 303 // Check if it is a single fasta file...
rliterman@0 304 if(snpdiffs_dir.getExtension() == "snpdiffs"){
rliterman@0 305 ch_snpdiffs = Channel.from(snpdiffs_dir)
rliterman@0 306 }
rliterman@0 307 // Otherwise, assume a file with paths to SNPDiffs
rliterman@0 308 else{
rliterman@0 309 ch_snpdiffs = Channel.from(snpdiffs_dir.readLines()).filter{it->it.endsWith('.snpdiffs') }
rliterman@0 310 }
rliterman@0 311 } else{
rliterman@0 312 error "$snpdiffs_dir is not a valid directory or file..."
rliterman@0 313 }
rliterman@0 314
rliterman@0 315 snpdiffs_data = ch_snpdiffs
rliterman@0 316 .filter { file(it).exists() }
rliterman@0 317 .collect() | getSNPDiffsData | splitCsv | collect | flatten | collate(19)
rliterman@0 318
rliterman@0 319 // (1) SNPDiffs_File, (2) Query_ID, (3) Query_Assembly, (4) Query_Contig_Count, (5) Query_Assembly_Bases,
rliterman@0 320 // (6) Query_N50, (7) Query_N90, (8) Query_L50, (9) Query_L90, (10) Query_SHA256,
rliterman@0 321 // (11) Reference_ID, (12) Reference_Assembly, (13) Reference_Contig_Count, (14) Reference_Assembly_Bases,
rliterman@0 322 // (15) Reference_N50, (16) Reference_N90, (17) Reference_L50, (18) Reference_L90, (19) Reference_SHA256
rliterman@0 323 }
rliterman@0 324 }
rliterman@0 325 process getSNPDiffsData{
rliterman@0 326 executor = 'local'
rliterman@0 327 cpus = 1
rliterman@0 328 maxForks = 1
rliterman@0 329
rliterman@0 330 input:
rliterman@0 331 val(snpdiffs_paths)
rliterman@0 332
rliterman@0 333 output:
rliterman@0 334 stdout
rliterman@0 335
rliterman@0 336 script:
rliterman@0 337
rliterman@0 338 user_snpdiffs_list.write(snpdiffs_paths.join('\n') + "\n")
rliterman@0 339 """
rliterman@0 340 $params.load_python_module
rliterman@0 341 python ${userSNPDiffs} --snpdiffs_file "${user_snpdiffs_list}" --trim_name "${params.trim_name}"
rliterman@0 342 """
rliterman@0 343 }
rliterman@0 344
rliterman@0 345
rliterman@0 346 // Fetching read data //
rliterman@0 347 workflow fetchQueryReads{
rliterman@0 348
rliterman@0 349 emit:
rliterman@0 350 query_reads
rliterman@0 351
rliterman@0 352 main:
rliterman@0 353
rliterman@0 354 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 355 ("${params.reads}" != "" ? processReads(params.reads,params.readext,params.forward,params.reverse) : Channel.empty()).set{query_reads}
rliterman@0 356 }
rliterman@0 357 workflow fetchRefReads{
rliterman@0 358
rliterman@0 359 emit:
rliterman@0 360 ref_reads
rliterman@0 361
rliterman@0 362 main:
rliterman@0 363
rliterman@0 364 // If --fasta is set, grab assembly paths and characterize assemblies
rliterman@0 365 ("${params.ref_reads}" != "" ? processReads(params.ref_reads,params.ref_readext,params.ref_forward,params.ref_reverse) : Channel.empty()).set{ref_reads}
rliterman@0 366 }
rliterman@0 367 workflow processReads{
rliterman@0 368
rliterman@0 369 take:
rliterman@0 370 read_loc
rliterman@0 371 read_ext
rliterman@0 372 forward
rliterman@0 373 reverse
rliterman@0 374
rliterman@0 375 emit:
rliterman@0 376 read_info
rliterman@0 377
rliterman@0 378 main:
rliterman@0 379
rliterman@0 380 if(read_loc == ""){
rliterman@0 381 error "No data provided to --reads/--ref_reads"
rliterman@0 382 } else{
rliterman@0 383
rliterman@0 384 read_dir = file(read_loc)
rliterman@0 385
rliterman@0 386 // If --reads is a single directory, get all reads from that directory
rliterman@0 387 if(read_dir.isDirectory()){
rliterman@0 388 read_info = fetchReads(read_dir,read_ext,forward,reverse) | splitCsv
rliterman@0 389 }
rliterman@0 390
rliterman@0 391 // If --reads is a file including paths to many directories, process reads from all directories
rliterman@0 392 else if(read_dir.isFile()){
rliterman@0 393 read_info = fetchReads(Channel.from(read_dir.readLines()),read_ext,forward,reverse) | splitCsv
rliterman@0 394 }
rliterman@0 395 // Error if --reads doesn't point to a valid file or directory
rliterman@0 396 else{
rliterman@0 397 error "$read_dir is neither a valid file or directory..."
rliterman@0 398 }
rliterman@0 399 }
rliterman@0 400 }
rliterman@0 401 process fetchReads{
rliterman@0 402
rliterman@0 403 executor = 'local'
rliterman@0 404 cpus = 1
rliterman@0 405 maxForks = 1
rliterman@0 406
rliterman@0 407 input:
rliterman@0 408 val dir // Directory containing read files
rliterman@0 409 val read_ext // Extention for read files (e.g., fastq.gz or fq)
rliterman@0 410 val forward_suffix // Identifier for forward reads (e.g., _1.fastq or _R1_001.fq.gz)
rliterman@0 411 val reverse_suffix // Identifier for reverse reads (e.g., _2.fastq or _R2_001.fq.gz)
rliterman@0 412
rliterman@0 413 output:
rliterman@0 414 stdout
rliterman@0 415
rliterman@0 416 script:
rliterman@0 417
rliterman@0 418 if(!file(dir).isDirectory()){
rliterman@0 419 error "$dir is not a valid directory..."
rliterman@0 420 } else{
rliterman@0 421 """
rliterman@0 422 $params.load_python_module
rliterman@0 423 python ${findReads} --read_dir ${dir} --read_filetype ${read_ext} --forward_suffix ${forward_suffix} --reverse_suffix ${reverse_suffix} --trim_name ${params.trim_name}
rliterman@0 424 """
rliterman@0 425 }
rliterman@0 426 }
rliterman@0 427
rliterman@0 428 // Fetch reference IDs //
rliterman@0 429 workflow processRefIDs{
rliterman@0 430
rliterman@0 431 emit:
rliterman@0 432 ref_ids
rliterman@0 433
rliterman@0 434 main:
rliterman@0 435 def trim_this = "${params.trim_name}"
rliterman@0 436
rliterman@0 437 ref_ids = params.ref_id
rliterman@0 438 .tokenize(',')
rliterman@0 439 .unique()
rliterman@0 440 .collect { it ->
rliterman@0 441 "${it}".replaceAll(trim_this, "")}
rliterman@0 442 .flatten()
rliterman@0 443 }
rliterman@0 444
rliterman@0 445 // Fetch reference IDs //
rliterman@0 446 workflow processExclude{
rliterman@0 447
rliterman@0 448 emit:
rliterman@0 449 exclude_ids
rliterman@0 450
rliterman@0 451 main:
rliterman@0 452 def trim_this = "${params.trim_name}"
rliterman@0 453
rliterman@0 454 exclude_ids = Channel.from(params.exclude
rliterman@0 455 .tokenize(',')
rliterman@0 456 .collect { it -> "${it}".replaceAll(trim_this, "")})
rliterman@0 457 .map{it -> tuple(it.toString(),"Exclude")}
rliterman@0 458 .unique{it -> it[0]}
rliterman@0 459 }
rliterman@0 460
rliterman@0 461 process saveRefIDs{
rliterman@0 462 executor = 'local'
rliterman@0 463 cpus = 1
rliterman@0 464 maxForks = 1
rliterman@0 465
rliterman@0 466 input:
rliterman@0 467 val(ref_ids)
rliterman@0 468
rliterman@0 469 script:
rliterman@0 470 ref_id_file.append(ref_ids.join('\n') + '\n')
rliterman@0 471 """
rliterman@0 472 """
rliterman@0 473 }
rliterman@0 474
rliterman@0 475 // Assembly //
rliterman@0 476 workflow assembleReads{
rliterman@0 477
rliterman@0 478 take:
rliterman@0 479 to_assemble
rliterman@0 480
rliterman@0 481 emit:
rliterman@0 482 assembled_data
rliterman@0 483
rliterman@0 484 main:
rliterman@0 485
rliterman@0 486 // Run SKESA on each entry
rliterman@0 487 assembly_output = skesaAssemble(to_assemble).splitCsv()
rliterman@0 488
rliterman@0 489 // Print log of assemblies
rliterman@0 490 assembly_output.map {it -> it.join("\t")}.collect() | saveAssemblyLog
rliterman@0 491
rliterman@0 492 // Return assembly data
rliterman@0 493 assembled_data = assembly_output.map{it->tuple(it[0],it[3])}
rliterman@0 494 }
rliterman@0 495 process skesaAssemble{
rliterman@0 496
rliterman@15 497 cpus = skesa_cpus
rliterman@15 498
rliterman@0 499 input:
rliterman@0 500 tuple val(sample_name),val(read_type),val(read_location)
rliterman@0 501
rliterman@0 502 output:
rliterman@0 503 stdout
rliterman@0 504
rliterman@0 505 script:
rliterman@0 506 assembly_file = file("${assembly_directory}/${sample_name}.fasta")
rliterman@0 507
rliterman@0 508 // Ensure folder exists and file doesn't
rliterman@0 509 if(!assembly_directory.isDirectory()){
rliterman@0 510 error "$assembly_directory is not a valid directory..."
rliterman@0 511 } else if(assembly_file.isFile()){
rliterman@0 512 error "$assembly_file already exists..."
rliterman@0 513 } else if(read_type == "Paired"){
rliterman@0 514 forward_reverse = read_location.split(";")
rliterman@0 515 """
rliterman@0 516 $params.load_skesa_module
rliterman@0 517 skesa --cores ${skesa_cpus} --use_paired_ends --fastq ${forward_reverse[0]} ${forward_reverse[1]} --contigs_out ${assembly_file}
rliterman@0 518 echo "${sample_name},${read_type},${read_location},${assembly_file}"
rliterman@0 519 """
rliterman@0 520 } else if(read_type == "Single"){
rliterman@0 521 """
rliterman@0 522 $params.load_skesa_module
rliterman@0 523 skesa --cores ${skesa_cpus} --fastq ${read_location} --contigs_out ${assembly_file}
rliterman@0 524 echo "${sample_name},${read_type},${read_location},${assembly_file}"
rliterman@0 525 """
rliterman@0 526 } else{
rliterman@0 527 error "read_type should be Paired or Single, not $read_type..."
rliterman@0 528 }
rliterman@0 529 }
rliterman@0 530 process saveAssemblyLog{
rliterman@0 531 executor = 'local'
rliterman@0 532 cpus = 1
rliterman@0 533 maxForks = 1
rliterman@0 534
rliterman@0 535 input:
rliterman@0 536 val(assembly_data)
rliterman@0 537
rliterman@0 538 script:
rliterman@0 539 assembly_log.write(assembly_header)
rliterman@0 540 assembly_log.append(assembly_data.join('\n') + '\n')
rliterman@0 541 """
rliterman@0 542 """
rliterman@0 543 }