rliterman@0
|
1 // Subworkflow to fetch sample and reference data from --fasta/--reads/--ref_fasta/--ref_reads
|
rliterman@0
|
2
|
rliterman@0
|
3 // Set path variables
|
rliterman@0
|
4 output_directory = file(params.output_directory)
|
rliterman@0
|
5 assembly_directory = file(params.assembly_directory)
|
rliterman@0
|
6 log_directory = file(params.log_directory)
|
rliterman@0
|
7
|
rliterman@0
|
8 ref_id_file = file(params.ref_id_file)
|
rliterman@0
|
9
|
rliterman@0
|
10 // Set ref_mode
|
rliterman@0
|
11 ref_mode = params.ref_mode
|
rliterman@0
|
12
|
rliterman@0
|
13 // Set file headers
|
rliterman@0
|
14 assembly_header = "Isolate_ID\tRead_Type\tRead_Location\tAssembly_Path\n"
|
rliterman@0
|
15
|
rliterman@0
|
16 // Set paths to accessory files/scripts
|
rliterman@0
|
17 assembly_log = file("${log_directory}/Assembly_Data.tsv")
|
rliterman@0
|
18 user_snpdiffs_list = file("${log_directory}/Imported_SNPDiffs.txt")
|
rliterman@0
|
19 findReads = file("${projectDir}/bin/fetchReads.py")
|
rliterman@0
|
20 userSNPDiffs = file("${projectDir}/bin/userSNPDiffs.py")
|
rliterman@0
|
21
|
rliterman@12
|
22 // Set SKESA cores to 4 or fewer
|
rliterman@24
|
23 skesa_cpus = (params.cores as Integer) >= 4 ? 4 : params.cores as Integer
|
rliterman@25
|
24 println "params.cores: ${params.cores}"
|
rliterman@25
|
25 println "skesa_cpus: ${skesa_cpus}"
|
rliterman@0
|
26
|
rliterman@0
|
27 workflow {
|
rliterman@0
|
28 main:
|
rliterman@0
|
29 input_data = fetchData()
|
rliterman@0
|
30 query_data = input_data.query_data
|
rliterman@0
|
31 reference_data = input_data.reference_data
|
rliterman@0
|
32 snpdiffs_data = input_data.snpdiff_data
|
rliterman@0
|
33
|
rliterman@0
|
34 publish:
|
rliterman@0
|
35 query_data >> 'query_data.tsv'
|
rliterman@0
|
36 reference_data >> 'reference_data.tsv'
|
rliterman@0
|
37 snpdiff_data >> 'snpdiff_data.tsv'
|
rliterman@0
|
38 }
|
rliterman@0
|
39
|
rliterman@0
|
40 // Top-level workflow //
|
rliterman@0
|
41 workflow fetchData{
|
rliterman@0
|
42
|
rliterman@0
|
43 emit:
|
rliterman@0
|
44 query_data
|
rliterman@0
|
45 reference_data
|
rliterman@0
|
46 snpdiff_data
|
rliterman@0
|
47
|
rliterman@0
|
48 main:
|
rliterman@0
|
49 // Get any excluded IDs
|
rliterman@0
|
50 ("${params.exclude}" != "" ? processExclude() : Channel.empty()).set{exclude_ids}
|
rliterman@0
|
51
|
rliterman@0
|
52 // Process snpdiffs alignments
|
rliterman@0
|
53 // If assembly file cannot be found, it will be 'null'
|
rliterman@0
|
54 ("${params.snpdiffs}" != "" ? processSNPDiffs() : Channel.empty()).set{user_snpdiffs}
|
rliterman@0
|
55
|
rliterman@0
|
56 excluded_snpdiffs = user_snpdiffs.map{it -> tuple(it[1],it[0])}
|
rliterman@0
|
57 .concat(user_snpdiffs.map{it -> tuple(it[10],it[0])})
|
rliterman@0
|
58 .join(exclude_ids,by:0)
|
rliterman@0
|
59 .unique{it -> it[1]}
|
rliterman@0
|
60 .map{it -> tuple(it[1],"Exclude")}
|
rliterman@0
|
61
|
rliterman@0
|
62 // Generate return channel: 3-item tuple (Query_ID, Reference_ID, SNPDiff_Path)
|
rliterman@0
|
63 snpdiff_data = user_snpdiffs
|
rliterman@0
|
64 .map{it -> tuple(it[0],it[1],it[10])}
|
rliterman@0
|
65 .join(excluded_snpdiffs,by:0,remainder:true)
|
rliterman@0
|
66 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
67 .filter{it -> it[3].toString() != "Exclude"}
|
rliterman@0
|
68 .unique{it -> it[0]}
|
rliterman@0
|
69 .map{it -> tuple(it[1],it[2],it[0])}
|
rliterman@0
|
70 .collect().flatten().collate(3)
|
rliterman@0
|
71
|
rliterman@0
|
72 // Get assembly data from snpdiffs
|
rliterman@0
|
73 snpdiff_assemblies = user_snpdiffs.map{it-> tuple(it[1],it[2])}
|
rliterman@0
|
74 .concat(user_snpdiffs.map{it-> tuple(it[10],it[11])})
|
rliterman@0
|
75 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
76 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
77 .filter{it -> it[2].toString() != "Exclude"}
|
rliterman@0
|
78 .map{it -> tuple(it[0],it[1],'SNPDiff')}
|
rliterman@0
|
79 .collect().flatten().collate(3)
|
rliterman@0
|
80
|
rliterman@0
|
81 assembled_snpdiffs = snpdiff_assemblies
|
rliterman@0
|
82 .filter{it -> it[1].toString() != "null"}
|
rliterman@0
|
83 .unique{it->it[0]}.collect().flatten().collate(3)
|
rliterman@0
|
84
|
rliterman@0
|
85 // Process any data provided as assemblies
|
rliterman@0
|
86 // Returns 2-item tuples with the following format: (Isolate_ID, Assembly_Path)
|
rliterman@0
|
87 ("${params.fasta}" != "" ? fetchQueryFasta() : Channel.empty()).set{query_fasta}
|
rliterman@0
|
88 ("${params.ref_fasta}" != "" ? fetchRefFasta() : Channel.empty()).set{ref_fasta}
|
rliterman@0
|
89
|
rliterman@0
|
90 pre_assembled = assembled_snpdiffs
|
rliterman@0
|
91 .map{it -> tuple(it[0],it[1])}
|
rliterman@0
|
92 .concat(query_fasta)
|
rliterman@0
|
93 .concat(ref_fasta)
|
rliterman@0
|
94 .unique{it -> it[0]}
|
rliterman@0
|
95 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
96 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
97 .filter{it -> it[2].toString() != "Exclude"}
|
rliterman@0
|
98 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
99 .collect().flatten().collate(2)
|
rliterman@0
|
100
|
rliterman@0
|
101 // Process any data provided as reads
|
rliterman@0
|
102 // Returns 3-item tuples with the following format: (Isolate_ID, Read_Type, Read_Path)
|
rliterman@0
|
103 ("${params.reads}" != "" ? fetchQueryReads() : Channel.empty()).set{query_reads}
|
rliterman@0
|
104 ("${params.ref_reads}" != "" ? fetchRefReads() : Channel.empty()).set{ref_reads}
|
rliterman@0
|
105
|
rliterman@0
|
106 all_reads = query_reads
|
rliterman@0
|
107 .concat(ref_reads)
|
rliterman@0
|
108 .unique{it->it[0]}
|
rliterman@0
|
109 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
110 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
111 .filter{it -> it[3].toString() != "Exclude"}
|
rliterman@0
|
112 .map{it->tuple(it[0],it[1],it[2])}
|
rliterman@0
|
113 .collect().flatten().collate(3)
|
rliterman@0
|
114
|
rliterman@0
|
115 // Figure out if any assembly is necessary
|
rliterman@0
|
116 fasta_read_combo = all_reads.join(pre_assembled,by:0,remainder: true) |
|
rliterman@0
|
117 branch{it ->
|
rliterman@0
|
118 assembly: it[1].toString() == "null"
|
rliterman@0
|
119 return(tuple(it[0],it[2]))
|
rliterman@0
|
120 read: it[3].toString() == "null"
|
rliterman@0
|
121 return(tuple(it[0],it[1],it[2]))
|
rliterman@0
|
122 combo: true
|
rliterman@0
|
123 return(tuple(it[0],it[3]))}
|
rliterman@0
|
124
|
rliterman@0
|
125 // Assemble reads if necessary
|
rliterman@0
|
126 assembled_reads = fasta_read_combo.read
|
rliterman@0
|
127 .collect().flatten().collate(3) | assembleReads
|
rliterman@0
|
128
|
rliterman@0
|
129 // If runmode is 'assemble', tasks are complete
|
rliterman@0
|
130 if(params.runmode == "assemble"){
|
rliterman@0
|
131 query_data = Channel.empty()
|
rliterman@0
|
132 reference_data = Channel.empty()
|
rliterman@0
|
133 } else{
|
rliterman@0
|
134
|
rliterman@0
|
135 // If FASTAs are provided via data and snpdiffs, use snpdiffs (as it's already been used)
|
rliterman@0
|
136 user_fastas = query_fasta
|
rliterman@0
|
137 .concat(ref_fasta)
|
rliterman@0
|
138 .concat(assembled_reads)
|
rliterman@0
|
139 .unique{it -> it[0]}
|
rliterman@0
|
140 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
141 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
142 .filter{it -> it[2].toString() != "Exclude"}
|
rliterman@0
|
143 .map{it->tuple(it[0],it[1],'User')}
|
rliterman@0
|
144 .collect().flatten().collate(3)
|
rliterman@0
|
145 .join(assembled_snpdiffs,by:0,remainder:true)
|
rliterman@0
|
146 .filter{it -> it[3].toString() == "null"}
|
rliterman@0
|
147 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
148
|
rliterman@0
|
149 // Get all assemblies
|
rliterman@0
|
150 all_assembled = assembled_snpdiffs
|
rliterman@0
|
151 .map{it -> tuple(it[0],it[1])}
|
rliterman@0
|
152 .concat(user_fastas)
|
rliterman@0
|
153 .unique{it->it[0]}.collect().flatten().collate(2)
|
rliterman@0
|
154
|
rliterman@0
|
155 // Get data for isolates where a SNPDiff was provided, but no FASTA could be located
|
rliterman@0
|
156 no_assembly = snpdiff_assemblies
|
rliterman@0
|
157 .map{it -> tuple(it[0],it[1])}
|
rliterman@0
|
158 .filter{it -> it[1].toString() == "null"}
|
rliterman@0
|
159 .unique{it -> it[0]}
|
rliterman@0
|
160 .join(all_assembled,by:0,remainder:true)
|
rliterman@0
|
161 .filter{it -> it[2].toString() == "null"}
|
rliterman@0
|
162 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
163 .collect().flatten().collate(2)
|
rliterman@0
|
164
|
rliterman@0
|
165 // Compile all samples
|
rliterman@0
|
166 all_samples = all_assembled
|
rliterman@0
|
167 .concat(no_assembly)
|
rliterman@0
|
168 .unique{it-> it[0]}.collect().flatten().collate(2)
|
rliterman@0
|
169
|
rliterman@0
|
170 // If no reference data is provided return a blank channel
|
rliterman@0
|
171 if(!ref_mode){
|
rliterman@0
|
172 reference_data = Channel.empty()
|
rliterman@0
|
173
|
rliterman@0
|
174 query_data = all_samples
|
rliterman@0
|
175 .unique{it -> it[0]}
|
rliterman@0
|
176 .collect().flatten().collate(2)
|
rliterman@0
|
177
|
rliterman@0
|
178 } else{
|
rliterman@0
|
179
|
rliterman@0
|
180 // Process additional reference IDs
|
rliterman@0
|
181 ("${params.ref_id}" != "" ? processRefIDs() : Channel.empty()).set{user_ref_ids}
|
rliterman@0
|
182
|
rliterman@0
|
183 all_ref_ids = ref_fasta.map{it->tuple(it[0])}
|
rliterman@0
|
184 .concat(ref_reads.map{it->tuple(it[0])})
|
rliterman@0
|
185 .concat(user_ref_ids)
|
rliterman@0
|
186 .unique{it-> it[0]}.collect().flatten().collate(1)
|
rliterman@0
|
187 .map{it -> tuple(it[0],"Reference")}
|
rliterman@0
|
188 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
189 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
190 .filter{it -> it[2].toString() != "Exclude"}
|
rliterman@0
|
191 .map{it -> tuple(it[0],it[1])}
|
rliterman@0
|
192
|
rliterman@0
|
193 reference_data = all_samples
|
rliterman@0
|
194 .join(all_ref_ids,by:0,remainder:true)
|
rliterman@0
|
195 .filter{it -> it[2].toString() == "Reference"}
|
rliterman@0
|
196 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
197 .unique{it -> it[0]}
|
rliterman@0
|
198 .collect().flatten().collate(2)
|
rliterman@0
|
199
|
rliterman@0
|
200 // Save reference data to file
|
rliterman@0
|
201 reference_data
|
rliterman@0
|
202 .collect{it -> it[0]}
|
rliterman@0
|
203 | saveRefIDs
|
rliterman@0
|
204
|
rliterman@0
|
205 if(params.runmode == "screen" || params.runmode == "align"){
|
rliterman@0
|
206 query_data = all_samples
|
rliterman@0
|
207 .join(all_ref_ids,by:0,remainder:true)
|
rliterman@0
|
208 .filter{it -> it[2].toString() != "Reference"}
|
rliterman@0
|
209 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
210 .unique{it -> it[0]}
|
rliterman@0
|
211 .collect().flatten().collate(2)
|
rliterman@0
|
212 } else if(params.runmode == "snp"){
|
rliterman@0
|
213 query_data = all_samples
|
rliterman@0
|
214 .unique{it -> it[0]}
|
rliterman@0
|
215 .collect().flatten().collate(2)
|
rliterman@0
|
216 }
|
rliterman@0
|
217 }
|
rliterman@0
|
218 }
|
rliterman@0
|
219 }
|
rliterman@0
|
220
|
rliterman@0
|
221 // Fetching preassembled data //
|
rliterman@0
|
222 workflow fetchQueryFasta{
|
rliterman@0
|
223
|
rliterman@0
|
224 emit:
|
rliterman@0
|
225 query_fasta
|
rliterman@0
|
226
|
rliterman@0
|
227 main:
|
rliterman@0
|
228
|
rliterman@0
|
229 // If --fasta is set, grab assembly paths and characterize assemblies
|
rliterman@0
|
230 ("${params.fasta}" != "" ? getAssemblies(params.fasta) : Channel.empty()).set{query_fasta}
|
rliterman@0
|
231 }
|
rliterman@0
|
232 workflow fetchRefFasta{
|
rliterman@0
|
233
|
rliterman@0
|
234 emit:
|
rliterman@0
|
235 ref_fasta
|
rliterman@0
|
236
|
rliterman@0
|
237 main:
|
rliterman@0
|
238
|
rliterman@0
|
239 // If --fasta is set, grab assembly paths and characterize assemblies
|
rliterman@0
|
240 ("${params.ref_fasta}" != "" ? getAssemblies(params.ref_fasta) : Channel.empty()).set{ref_fasta}
|
rliterman@0
|
241 }
|
rliterman@0
|
242 workflow getAssemblies{
|
rliterman@0
|
243
|
rliterman@0
|
244 take:
|
rliterman@0
|
245 fasta_loc
|
rliterman@0
|
246
|
rliterman@0
|
247 emit:
|
rliterman@0
|
248 fasta_data
|
rliterman@0
|
249
|
rliterman@0
|
250 main:
|
rliterman@0
|
251 def trim_this = "${params.trim_name}"
|
rliterman@0
|
252
|
rliterman@0
|
253 if(fasta_loc == ""){
|
rliterman@0
|
254 error "No assembly data provided via --fasta/--ref_fasta"
|
rliterman@0
|
255 } else{
|
rliterman@0
|
256
|
rliterman@0
|
257 fasta_dir = file(fasta_loc)
|
rliterman@0
|
258
|
rliterman@0
|
259 // If --fasta is a directory...
|
rliterman@0
|
260 if(fasta_dir.isDirectory()){
|
rliterman@0
|
261 ch_fasta = Channel.fromPath(["${fasta_dir}/*.fa","${fasta_dir}/*.fasta","${fasta_dir}/*.fna"])
|
rliterman@0
|
262 }
|
rliterman@0
|
263 // If --fasta is a file...
|
rliterman@0
|
264 else if(fasta_dir.isFile()){
|
rliterman@0
|
265
|
rliterman@0
|
266 // Check if it is a single fasta file...
|
rliterman@0
|
267 if(fasta_dir.getExtension() == "fa" || fasta_dir.getExtension() == "fna" || fasta_dir.getExtension() == "fasta"){
|
rliterman@0
|
268 ch_fasta = Channel.from(fasta_dir).map{it-> file(it)}
|
rliterman@0
|
269 }
|
rliterman@0
|
270 // Otherwise, assume a file with paths to FASTAs
|
rliterman@0
|
271 else{
|
rliterman@0
|
272 ch_fasta = Channel.from(fasta_dir.readLines()).filter{ file -> file =~ /\.(fa|fasta|fna)$/}.map{it-> file(it)}
|
rliterman@0
|
273 }
|
rliterman@0
|
274 } else{
|
rliterman@0
|
275 error "$fasta_dir is not a valid directory or file..."
|
rliterman@0
|
276 }
|
rliterman@0
|
277 fasta_data = ch_fasta
|
rliterman@0
|
278 .filter { file(it).exists() }
|
rliterman@0
|
279 .map { filePath ->
|
rliterman@0
|
280 def fileName = file(filePath).getBaseName()
|
rliterman@0
|
281 def sampleName = fileName.replaceAll(trim_this, "")
|
rliterman@0
|
282 tuple(sampleName, filePath)}
|
rliterman@0
|
283 }
|
rliterman@0
|
284 }
|
rliterman@0
|
285 workflow processSNPDiffs{
|
rliterman@0
|
286
|
rliterman@0
|
287 emit:
|
rliterman@0
|
288 snpdiffs_data
|
rliterman@0
|
289
|
rliterman@0
|
290 main:
|
rliterman@0
|
291
|
rliterman@0
|
292 if("${params.snpdiffs}" == ""){
|
rliterman@0
|
293 error "No assembly data provided via --snpdiffs"
|
rliterman@0
|
294 } else{
|
rliterman@0
|
295
|
rliterman@0
|
296 snpdiffs_dir = file("${params.snpdiffs}")
|
rliterman@0
|
297
|
rliterman@0
|
298 // If --fasta is a directory...
|
rliterman@0
|
299 if(snpdiffs_dir.isDirectory()){
|
rliterman@0
|
300 ch_snpdiffs = Channel.fromPath("${snpdiffs_dir}/*.snpdiffs")
|
rliterman@0
|
301 }
|
rliterman@0
|
302 // If --fasta is a file...
|
rliterman@0
|
303 else if(snpdiffs_dir.isFile()){
|
rliterman@0
|
304
|
rliterman@0
|
305 // Check if it is a single fasta file...
|
rliterman@0
|
306 if(snpdiffs_dir.getExtension() == "snpdiffs"){
|
rliterman@0
|
307 ch_snpdiffs = Channel.from(snpdiffs_dir)
|
rliterman@0
|
308 }
|
rliterman@0
|
309 // Otherwise, assume a file with paths to SNPDiffs
|
rliterman@0
|
310 else{
|
rliterman@0
|
311 ch_snpdiffs = Channel.from(snpdiffs_dir.readLines()).filter{it->it.endsWith('.snpdiffs') }
|
rliterman@0
|
312 }
|
rliterman@0
|
313 } else{
|
rliterman@0
|
314 error "$snpdiffs_dir is not a valid directory or file..."
|
rliterman@0
|
315 }
|
rliterman@0
|
316
|
rliterman@0
|
317 snpdiffs_data = ch_snpdiffs
|
rliterman@0
|
318 .filter { file(it).exists() }
|
rliterman@0
|
319 .collect() | getSNPDiffsData | splitCsv | collect | flatten | collate(19)
|
rliterman@0
|
320
|
rliterman@0
|
321 // (1) SNPDiffs_File, (2) Query_ID, (3) Query_Assembly, (4) Query_Contig_Count, (5) Query_Assembly_Bases,
|
rliterman@0
|
322 // (6) Query_N50, (7) Query_N90, (8) Query_L50, (9) Query_L90, (10) Query_SHA256,
|
rliterman@0
|
323 // (11) Reference_ID, (12) Reference_Assembly, (13) Reference_Contig_Count, (14) Reference_Assembly_Bases,
|
rliterman@0
|
324 // (15) Reference_N50, (16) Reference_N90, (17) Reference_L50, (18) Reference_L90, (19) Reference_SHA256
|
rliterman@0
|
325 }
|
rliterman@0
|
326 }
|
rliterman@0
|
327 process getSNPDiffsData{
|
rliterman@0
|
328 executor = 'local'
|
rliterman@0
|
329 cpus = 1
|
rliterman@0
|
330 maxForks = 1
|
rliterman@0
|
331
|
rliterman@0
|
332 input:
|
rliterman@0
|
333 val(snpdiffs_paths)
|
rliterman@0
|
334
|
rliterman@0
|
335 output:
|
rliterman@0
|
336 stdout
|
rliterman@0
|
337
|
rliterman@0
|
338 script:
|
rliterman@0
|
339
|
rliterman@0
|
340 user_snpdiffs_list.write(snpdiffs_paths.join('\n') + "\n")
|
rliterman@0
|
341 """
|
rliterman@0
|
342 $params.load_python_module
|
rliterman@0
|
343 python ${userSNPDiffs} --snpdiffs_file "${user_snpdiffs_list}" --trim_name "${params.trim_name}"
|
rliterman@0
|
344 """
|
rliterman@0
|
345 }
|
rliterman@0
|
346
|
rliterman@0
|
347
|
rliterman@0
|
348 // Fetching read data //
|
rliterman@0
|
349 workflow fetchQueryReads{
|
rliterman@0
|
350
|
rliterman@0
|
351 emit:
|
rliterman@0
|
352 query_reads
|
rliterman@0
|
353
|
rliterman@0
|
354 main:
|
rliterman@0
|
355
|
rliterman@0
|
356 // If --fasta is set, grab assembly paths and characterize assemblies
|
rliterman@0
|
357 ("${params.reads}" != "" ? processReads(params.reads,params.readext,params.forward,params.reverse) : Channel.empty()).set{query_reads}
|
rliterman@0
|
358 }
|
rliterman@0
|
359 workflow fetchRefReads{
|
rliterman@0
|
360
|
rliterman@0
|
361 emit:
|
rliterman@0
|
362 ref_reads
|
rliterman@0
|
363
|
rliterman@0
|
364 main:
|
rliterman@0
|
365
|
rliterman@0
|
366 // If --fasta is set, grab assembly paths and characterize assemblies
|
rliterman@0
|
367 ("${params.ref_reads}" != "" ? processReads(params.ref_reads,params.ref_readext,params.ref_forward,params.ref_reverse) : Channel.empty()).set{ref_reads}
|
rliterman@0
|
368 }
|
rliterman@0
|
369 workflow processReads{
|
rliterman@0
|
370
|
rliterman@0
|
371 take:
|
rliterman@0
|
372 read_loc
|
rliterman@0
|
373 read_ext
|
rliterman@0
|
374 forward
|
rliterman@0
|
375 reverse
|
rliterman@0
|
376
|
rliterman@0
|
377 emit:
|
rliterman@0
|
378 read_info
|
rliterman@0
|
379
|
rliterman@0
|
380 main:
|
rliterman@0
|
381
|
rliterman@0
|
382 if(read_loc == ""){
|
rliterman@0
|
383 error "No data provided to --reads/--ref_reads"
|
rliterman@0
|
384 } else{
|
rliterman@0
|
385
|
rliterman@0
|
386 read_dir = file(read_loc)
|
rliterman@0
|
387
|
rliterman@0
|
388 // If --reads is a single directory, get all reads from that directory
|
rliterman@0
|
389 if(read_dir.isDirectory()){
|
rliterman@0
|
390 read_info = fetchReads(read_dir,read_ext,forward,reverse) | splitCsv
|
rliterman@0
|
391 }
|
rliterman@0
|
392
|
rliterman@0
|
393 // If --reads is a file including paths to many directories, process reads from all directories
|
rliterman@0
|
394 else if(read_dir.isFile()){
|
rliterman@0
|
395 read_info = fetchReads(Channel.from(read_dir.readLines()),read_ext,forward,reverse) | splitCsv
|
rliterman@0
|
396 }
|
rliterman@0
|
397 // Error if --reads doesn't point to a valid file or directory
|
rliterman@0
|
398 else{
|
rliterman@0
|
399 error "$read_dir is neither a valid file or directory..."
|
rliterman@0
|
400 }
|
rliterman@0
|
401 }
|
rliterman@0
|
402 }
|
rliterman@0
|
403 process fetchReads{
|
rliterman@0
|
404
|
rliterman@0
|
405 executor = 'local'
|
rliterman@0
|
406 cpus = 1
|
rliterman@0
|
407 maxForks = 1
|
rliterman@0
|
408
|
rliterman@0
|
409 input:
|
rliterman@0
|
410 val dir // Directory containing read files
|
rliterman@0
|
411 val read_ext // Extention for read files (e.g., fastq.gz or fq)
|
rliterman@0
|
412 val forward_suffix // Identifier for forward reads (e.g., _1.fastq or _R1_001.fq.gz)
|
rliterman@0
|
413 val reverse_suffix // Identifier for reverse reads (e.g., _2.fastq or _R2_001.fq.gz)
|
rliterman@0
|
414
|
rliterman@0
|
415 output:
|
rliterman@0
|
416 stdout
|
rliterman@0
|
417
|
rliterman@0
|
418 script:
|
rliterman@0
|
419
|
rliterman@0
|
420 if(!file(dir).isDirectory()){
|
rliterman@0
|
421 error "$dir is not a valid directory..."
|
rliterman@0
|
422 } else{
|
rliterman@0
|
423 """
|
rliterman@0
|
424 $params.load_python_module
|
rliterman@0
|
425 python ${findReads} --read_dir ${dir} --read_filetype ${read_ext} --forward_suffix ${forward_suffix} --reverse_suffix ${reverse_suffix} --trim_name ${params.trim_name}
|
rliterman@0
|
426 """
|
rliterman@0
|
427 }
|
rliterman@0
|
428 }
|
rliterman@0
|
429
|
rliterman@0
|
430 // Fetch reference IDs //
|
rliterman@0
|
431 workflow processRefIDs{
|
rliterman@0
|
432
|
rliterman@0
|
433 emit:
|
rliterman@0
|
434 ref_ids
|
rliterman@0
|
435
|
rliterman@0
|
436 main:
|
rliterman@0
|
437 def trim_this = "${params.trim_name}"
|
rliterman@0
|
438
|
rliterman@0
|
439 ref_ids = params.ref_id
|
rliterman@0
|
440 .tokenize(',')
|
rliterman@0
|
441 .unique()
|
rliterman@0
|
442 .collect { it ->
|
rliterman@0
|
443 "${it}".replaceAll(trim_this, "")}
|
rliterman@0
|
444 .flatten()
|
rliterman@0
|
445 }
|
rliterman@0
|
446
|
rliterman@0
|
447 // Fetch reference IDs //
|
rliterman@0
|
448 workflow processExclude{
|
rliterman@0
|
449
|
rliterman@0
|
450 emit:
|
rliterman@0
|
451 exclude_ids
|
rliterman@0
|
452
|
rliterman@0
|
453 main:
|
rliterman@0
|
454 def trim_this = "${params.trim_name}"
|
rliterman@0
|
455
|
rliterman@0
|
456 exclude_ids = Channel.from(params.exclude
|
rliterman@0
|
457 .tokenize(',')
|
rliterman@0
|
458 .collect { it -> "${it}".replaceAll(trim_this, "")})
|
rliterman@0
|
459 .map{it -> tuple(it.toString(),"Exclude")}
|
rliterman@0
|
460 .unique{it -> it[0]}
|
rliterman@0
|
461 }
|
rliterman@0
|
462
|
rliterman@0
|
463 process saveRefIDs{
|
rliterman@0
|
464 executor = 'local'
|
rliterman@0
|
465 cpus = 1
|
rliterman@0
|
466 maxForks = 1
|
rliterman@0
|
467
|
rliterman@0
|
468 input:
|
rliterman@0
|
469 val(ref_ids)
|
rliterman@0
|
470
|
rliterman@0
|
471 script:
|
rliterman@0
|
472 ref_id_file.append(ref_ids.join('\n') + '\n')
|
rliterman@0
|
473 """
|
rliterman@0
|
474 """
|
rliterman@0
|
475 }
|
rliterman@0
|
476
|
rliterman@0
|
477 // Assembly //
|
rliterman@0
|
478 workflow assembleReads{
|
rliterman@0
|
479
|
rliterman@0
|
480 take:
|
rliterman@0
|
481 to_assemble
|
rliterman@0
|
482
|
rliterman@0
|
483 emit:
|
rliterman@0
|
484 assembled_data
|
rliterman@0
|
485
|
rliterman@0
|
486 main:
|
rliterman@0
|
487
|
rliterman@0
|
488 // Run SKESA on each entry
|
rliterman@0
|
489 assembly_output = skesaAssemble(to_assemble).splitCsv()
|
rliterman@0
|
490
|
rliterman@0
|
491 // Print log of assemblies
|
rliterman@0
|
492 assembly_output.map {it -> it.join("\t")}.collect() | saveAssemblyLog
|
rliterman@0
|
493
|
rliterman@0
|
494 // Return assembly data
|
rliterman@0
|
495 assembled_data = assembly_output.map{it->tuple(it[0],it[3])}
|
rliterman@0
|
496 }
|
rliterman@0
|
497 process skesaAssemble{
|
rliterman@22
|
498 // label 'skesaMem'
|
rliterman@0
|
499
|
rliterman@15
|
500 cpus = skesa_cpus
|
rliterman@15
|
501
|
rliterman@0
|
502 input:
|
rliterman@0
|
503 tuple val(sample_name),val(read_type),val(read_location)
|
rliterman@0
|
504
|
rliterman@0
|
505 output:
|
rliterman@0
|
506 stdout
|
rliterman@0
|
507
|
rliterman@0
|
508 script:
|
rliterman@0
|
509 assembly_file = file("${assembly_directory}/${sample_name}.fasta")
|
rliterman@0
|
510
|
rliterman@0
|
511 // Ensure folder exists and file doesn't
|
rliterman@0
|
512 if(!assembly_directory.isDirectory()){
|
rliterman@0
|
513 error "$assembly_directory is not a valid directory..."
|
rliterman@0
|
514 } else if(assembly_file.isFile()){
|
rliterman@0
|
515 error "$assembly_file already exists..."
|
rliterman@0
|
516 } else if(read_type == "Paired"){
|
rliterman@0
|
517 forward_reverse = read_location.split(";")
|
rliterman@0
|
518 """
|
rliterman@0
|
519 $params.load_skesa_module
|
rliterman@0
|
520 skesa --cores ${skesa_cpus} --use_paired_ends --fastq ${forward_reverse[0]} ${forward_reverse[1]} --contigs_out ${assembly_file}
|
rliterman@0
|
521 echo "${sample_name},${read_type},${read_location},${assembly_file}"
|
rliterman@0
|
522 """
|
rliterman@0
|
523 } else if(read_type == "Single"){
|
rliterman@0
|
524 """
|
rliterman@0
|
525 $params.load_skesa_module
|
rliterman@0
|
526 skesa --cores ${skesa_cpus} --fastq ${read_location} --contigs_out ${assembly_file}
|
rliterman@0
|
527 echo "${sample_name},${read_type},${read_location},${assembly_file}"
|
rliterman@0
|
528 """
|
rliterman@0
|
529 } else{
|
rliterman@0
|
530 error "read_type should be Paired or Single, not $read_type..."
|
rliterman@0
|
531 }
|
rliterman@0
|
532 }
|
rliterman@0
|
533 process saveAssemblyLog{
|
rliterman@0
|
534 executor = 'local'
|
rliterman@0
|
535 cpus = 1
|
rliterman@0
|
536 maxForks = 1
|
rliterman@0
|
537
|
rliterman@0
|
538 input:
|
rliterman@0
|
539 val(assembly_data)
|
rliterman@0
|
540
|
rliterman@0
|
541 script:
|
rliterman@0
|
542 assembly_log.write(assembly_header)
|
rliterman@0
|
543 assembly_log.append(assembly_data.join('\n') + '\n')
|
rliterman@0
|
544 """
|
rliterman@0
|
545 """
|
rliterman@0
|
546 }
|