rliterman@0
|
1 // Subworkflow to fetch sample and reference data from --fasta/--reads/--ref_fasta/--ref_reads
|
rliterman@0
|
2
|
rliterman@0
|
3 // Set path variables
|
rliterman@0
|
4 output_directory = file(params.output_directory)
|
rliterman@0
|
5 assembly_directory = file(params.assembly_directory)
|
rliterman@0
|
6 log_directory = file(params.log_directory)
|
rliterman@0
|
7
|
rliterman@0
|
8 ref_id_file = file(params.ref_id_file)
|
rliterman@0
|
9
|
rliterman@0
|
10 // Set ref_mode
|
rliterman@0
|
11 ref_mode = params.ref_mode
|
rliterman@0
|
12
|
rliterman@0
|
13 // Set file headers
|
rliterman@0
|
14 assembly_header = "Isolate_ID\tRead_Type\tRead_Location\tAssembly_Path\n"
|
rliterman@0
|
15
|
rliterman@0
|
16 // Set paths to accessory files/scripts
|
rliterman@0
|
17 assembly_log = file("${log_directory}/Assembly_Data.tsv")
|
rliterman@0
|
18 user_snpdiffs_list = file("${log_directory}/Imported_SNPDiffs.txt")
|
rliterman@0
|
19 findReads = file("${projectDir}/bin/fetchReads.py")
|
rliterman@0
|
20 userSNPDiffs = file("${projectDir}/bin/userSNPDiffs.py")
|
rliterman@0
|
21
|
rliterman@12
|
22 // Set SKESA cores to 4 or fewer
|
rliterman@28
|
23 //skesa_cpus = (params.cores as Integer) >= 4 ? 4 : params.cores as Integer
|
rliterman@28
|
24 skesa_cpus = 1
|
rliterman@0
|
25
|
rliterman@0
|
26 workflow {
|
rliterman@0
|
27 main:
|
rliterman@0
|
28 input_data = fetchData()
|
rliterman@0
|
29 query_data = input_data.query_data
|
rliterman@0
|
30 reference_data = input_data.reference_data
|
rliterman@0
|
31 snpdiffs_data = input_data.snpdiff_data
|
rliterman@0
|
32
|
rliterman@0
|
33 publish:
|
rliterman@0
|
34 query_data >> 'query_data.tsv'
|
rliterman@0
|
35 reference_data >> 'reference_data.tsv'
|
rliterman@0
|
36 snpdiff_data >> 'snpdiff_data.tsv'
|
rliterman@0
|
37 }
|
rliterman@0
|
38
|
rliterman@0
|
39 // Top-level workflow //
|
rliterman@0
|
40 workflow fetchData{
|
rliterman@0
|
41
|
rliterman@0
|
42 emit:
|
rliterman@0
|
43 query_data
|
rliterman@0
|
44 reference_data
|
rliterman@0
|
45 snpdiff_data
|
rliterman@0
|
46
|
rliterman@0
|
47 main:
|
rliterman@0
|
48 // Get any excluded IDs
|
rliterman@0
|
49 ("${params.exclude}" != "" ? processExclude() : Channel.empty()).set{exclude_ids}
|
rliterman@0
|
50
|
rliterman@0
|
51 // Process snpdiffs alignments
|
rliterman@0
|
52 // If assembly file cannot be found, it will be 'null'
|
rliterman@0
|
53 ("${params.snpdiffs}" != "" ? processSNPDiffs() : Channel.empty()).set{user_snpdiffs}
|
rliterman@0
|
54
|
rliterman@0
|
55 excluded_snpdiffs = user_snpdiffs.map{it -> tuple(it[1],it[0])}
|
rliterman@0
|
56 .concat(user_snpdiffs.map{it -> tuple(it[10],it[0])})
|
rliterman@0
|
57 .join(exclude_ids,by:0)
|
rliterman@0
|
58 .unique{it -> it[1]}
|
rliterman@0
|
59 .map{it -> tuple(it[1],"Exclude")}
|
rliterman@0
|
60
|
rliterman@0
|
61 // Generate return channel: 3-item tuple (Query_ID, Reference_ID, SNPDiff_Path)
|
rliterman@0
|
62 snpdiff_data = user_snpdiffs
|
rliterman@0
|
63 .map{it -> tuple(it[0],it[1],it[10])}
|
rliterman@0
|
64 .join(excluded_snpdiffs,by:0,remainder:true)
|
rliterman@0
|
65 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
66 .filter{it -> it[3].toString() != "Exclude"}
|
rliterman@0
|
67 .unique{it -> it[0]}
|
rliterman@0
|
68 .map{it -> tuple(it[1],it[2],it[0])}
|
rliterman@0
|
69 .collect().flatten().collate(3)
|
rliterman@0
|
70
|
rliterman@0
|
71 // Get assembly data from snpdiffs
|
rliterman@0
|
72 snpdiff_assemblies = user_snpdiffs.map{it-> tuple(it[1],it[2])}
|
rliterman@0
|
73 .concat(user_snpdiffs.map{it-> tuple(it[10],it[11])})
|
rliterman@0
|
74 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
75 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
76 .filter{it -> it[2].toString() != "Exclude"}
|
rliterman@0
|
77 .map{it -> tuple(it[0],it[1],'SNPDiff')}
|
rliterman@0
|
78 .collect().flatten().collate(3)
|
rliterman@0
|
79
|
rliterman@0
|
80 assembled_snpdiffs = snpdiff_assemblies
|
rliterman@0
|
81 .filter{it -> it[1].toString() != "null"}
|
rliterman@0
|
82 .unique{it->it[0]}.collect().flatten().collate(3)
|
rliterman@0
|
83
|
rliterman@0
|
84 // Process any data provided as assemblies
|
rliterman@0
|
85 // Returns 2-item tuples with the following format: (Isolate_ID, Assembly_Path)
|
rliterman@0
|
86 ("${params.fasta}" != "" ? fetchQueryFasta() : Channel.empty()).set{query_fasta}
|
rliterman@0
|
87 ("${params.ref_fasta}" != "" ? fetchRefFasta() : Channel.empty()).set{ref_fasta}
|
rliterman@0
|
88
|
rliterman@0
|
89 pre_assembled = assembled_snpdiffs
|
rliterman@0
|
90 .map{it -> tuple(it[0],it[1])}
|
rliterman@0
|
91 .concat(query_fasta)
|
rliterman@0
|
92 .concat(ref_fasta)
|
rliterman@0
|
93 .unique{it -> it[0]}
|
rliterman@0
|
94 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
95 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
96 .filter{it -> it[2].toString() != "Exclude"}
|
rliterman@0
|
97 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
98 .collect().flatten().collate(2)
|
rliterman@0
|
99
|
rliterman@0
|
100 // Process any data provided as reads
|
rliterman@0
|
101 // Returns 3-item tuples with the following format: (Isolate_ID, Read_Type, Read_Path)
|
rliterman@0
|
102 ("${params.reads}" != "" ? fetchQueryReads() : Channel.empty()).set{query_reads}
|
rliterman@0
|
103 ("${params.ref_reads}" != "" ? fetchRefReads() : Channel.empty()).set{ref_reads}
|
rliterman@0
|
104
|
rliterman@0
|
105 all_reads = query_reads
|
rliterman@0
|
106 .concat(ref_reads)
|
rliterman@0
|
107 .unique{it->it[0]}
|
rliterman@0
|
108 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
109 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
110 .filter{it -> it[3].toString() != "Exclude"}
|
rliterman@0
|
111 .map{it->tuple(it[0],it[1],it[2])}
|
rliterman@0
|
112 .collect().flatten().collate(3)
|
rliterman@0
|
113
|
rliterman@0
|
114 // Figure out if any assembly is necessary
|
rliterman@0
|
115 fasta_read_combo = all_reads.join(pre_assembled,by:0,remainder: true) |
|
rliterman@0
|
116 branch{it ->
|
rliterman@0
|
117 assembly: it[1].toString() == "null"
|
rliterman@0
|
118 return(tuple(it[0],it[2]))
|
rliterman@0
|
119 read: it[3].toString() == "null"
|
rliterman@0
|
120 return(tuple(it[0],it[1],it[2]))
|
rliterman@0
|
121 combo: true
|
rliterman@0
|
122 return(tuple(it[0],it[3]))}
|
rliterman@0
|
123
|
rliterman@0
|
124 // Assemble reads if necessary
|
rliterman@0
|
125 assembled_reads = fasta_read_combo.read
|
rliterman@0
|
126 .collect().flatten().collate(3) | assembleReads
|
rliterman@0
|
127
|
rliterman@0
|
128 // If runmode is 'assemble', tasks are complete
|
rliterman@0
|
129 if(params.runmode == "assemble"){
|
rliterman@0
|
130 query_data = Channel.empty()
|
rliterman@0
|
131 reference_data = Channel.empty()
|
rliterman@0
|
132 } else{
|
rliterman@0
|
133
|
rliterman@0
|
134 // If FASTAs are provided via data and snpdiffs, use snpdiffs (as it's already been used)
|
rliterman@0
|
135 user_fastas = query_fasta
|
rliterman@0
|
136 .concat(ref_fasta)
|
rliterman@0
|
137 .concat(assembled_reads)
|
rliterman@0
|
138 .unique{it -> it[0]}
|
rliterman@0
|
139 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
140 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
141 .filter{it -> it[2].toString() != "Exclude"}
|
rliterman@0
|
142 .map{it->tuple(it[0],it[1],'User')}
|
rliterman@0
|
143 .collect().flatten().collate(3)
|
rliterman@0
|
144 .join(assembled_snpdiffs,by:0,remainder:true)
|
rliterman@0
|
145 .filter{it -> it[3].toString() == "null"}
|
rliterman@0
|
146 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
147
|
rliterman@0
|
148 // Get all assemblies
|
rliterman@0
|
149 all_assembled = assembled_snpdiffs
|
rliterman@0
|
150 .map{it -> tuple(it[0],it[1])}
|
rliterman@0
|
151 .concat(user_fastas)
|
rliterman@0
|
152 .unique{it->it[0]}.collect().flatten().collate(2)
|
rliterman@0
|
153
|
rliterman@0
|
154 // Get data for isolates where a SNPDiff was provided, but no FASTA could be located
|
rliterman@0
|
155 no_assembly = snpdiff_assemblies
|
rliterman@0
|
156 .map{it -> tuple(it[0],it[1])}
|
rliterman@0
|
157 .filter{it -> it[1].toString() == "null"}
|
rliterman@0
|
158 .unique{it -> it[0]}
|
rliterman@0
|
159 .join(all_assembled,by:0,remainder:true)
|
rliterman@0
|
160 .filter{it -> it[2].toString() == "null"}
|
rliterman@0
|
161 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
162 .collect().flatten().collate(2)
|
rliterman@0
|
163
|
rliterman@0
|
164 // Compile all samples
|
rliterman@0
|
165 all_samples = all_assembled
|
rliterman@0
|
166 .concat(no_assembly)
|
rliterman@0
|
167 .unique{it-> it[0]}.collect().flatten().collate(2)
|
rliterman@0
|
168
|
rliterman@0
|
169 // If no reference data is provided return a blank channel
|
rliterman@0
|
170 if(!ref_mode){
|
rliterman@0
|
171 reference_data = Channel.empty()
|
rliterman@0
|
172
|
rliterman@0
|
173 query_data = all_samples
|
rliterman@0
|
174 .unique{it -> it[0]}
|
rliterman@0
|
175 .collect().flatten().collate(2)
|
rliterman@0
|
176
|
rliterman@0
|
177 } else{
|
rliterman@0
|
178
|
rliterman@0
|
179 // Process additional reference IDs
|
rliterman@0
|
180 ("${params.ref_id}" != "" ? processRefIDs() : Channel.empty()).set{user_ref_ids}
|
rliterman@0
|
181
|
rliterman@0
|
182 all_ref_ids = ref_fasta.map{it->tuple(it[0])}
|
rliterman@0
|
183 .concat(ref_reads.map{it->tuple(it[0])})
|
rliterman@0
|
184 .concat(user_ref_ids)
|
rliterman@0
|
185 .unique{it-> it[0]}.collect().flatten().collate(1)
|
rliterman@0
|
186 .map{it -> tuple(it[0],"Reference")}
|
rliterman@0
|
187 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
188 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
189 .filter{it -> it[2].toString() != "Exclude"}
|
rliterman@0
|
190 .map{it -> tuple(it[0],it[1])}
|
rliterman@0
|
191
|
rliterman@0
|
192 reference_data = all_samples
|
rliterman@0
|
193 .join(all_ref_ids,by:0,remainder:true)
|
rliterman@0
|
194 .filter{it -> it[2].toString() == "Reference"}
|
rliterman@0
|
195 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
196 .unique{it -> it[0]}
|
rliterman@0
|
197 .collect().flatten().collate(2)
|
rliterman@0
|
198
|
rliterman@0
|
199 // Save reference data to file
|
rliterman@0
|
200 reference_data
|
rliterman@0
|
201 .collect{it -> it[0]}
|
rliterman@0
|
202 | saveRefIDs
|
rliterman@0
|
203
|
rliterman@0
|
204 if(params.runmode == "screen" || params.runmode == "align"){
|
rliterman@0
|
205 query_data = all_samples
|
rliterman@0
|
206 .join(all_ref_ids,by:0,remainder:true)
|
rliterman@0
|
207 .filter{it -> it[2].toString() != "Reference"}
|
rliterman@0
|
208 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
209 .unique{it -> it[0]}
|
rliterman@0
|
210 .collect().flatten().collate(2)
|
rliterman@0
|
211 } else if(params.runmode == "snp"){
|
rliterman@0
|
212 query_data = all_samples
|
rliterman@0
|
213 .unique{it -> it[0]}
|
rliterman@0
|
214 .collect().flatten().collate(2)
|
rliterman@0
|
215 }
|
rliterman@0
|
216 }
|
rliterman@0
|
217 }
|
rliterman@0
|
218 }
|
rliterman@0
|
219
|
rliterman@0
|
220 // Fetching preassembled data //
|
rliterman@0
|
221 workflow fetchQueryFasta{
|
rliterman@0
|
222
|
rliterman@0
|
223 emit:
|
rliterman@0
|
224 query_fasta
|
rliterman@0
|
225
|
rliterman@0
|
226 main:
|
rliterman@0
|
227
|
rliterman@0
|
228 // If --fasta is set, grab assembly paths and characterize assemblies
|
rliterman@0
|
229 ("${params.fasta}" != "" ? getAssemblies(params.fasta) : Channel.empty()).set{query_fasta}
|
rliterman@0
|
230 }
|
rliterman@0
|
231 workflow fetchRefFasta{
|
rliterman@0
|
232
|
rliterman@0
|
233 emit:
|
rliterman@0
|
234 ref_fasta
|
rliterman@0
|
235
|
rliterman@0
|
236 main:
|
rliterman@0
|
237
|
rliterman@0
|
238 // If --fasta is set, grab assembly paths and characterize assemblies
|
rliterman@0
|
239 ("${params.ref_fasta}" != "" ? getAssemblies(params.ref_fasta) : Channel.empty()).set{ref_fasta}
|
rliterman@0
|
240 }
|
rliterman@0
|
241 workflow getAssemblies{
|
rliterman@0
|
242
|
rliterman@0
|
243 take:
|
rliterman@0
|
244 fasta_loc
|
rliterman@0
|
245
|
rliterman@0
|
246 emit:
|
rliterman@0
|
247 fasta_data
|
rliterman@0
|
248
|
rliterman@0
|
249 main:
|
rliterman@0
|
250 def trim_this = "${params.trim_name}"
|
rliterman@0
|
251
|
rliterman@0
|
252 if(fasta_loc == ""){
|
rliterman@0
|
253 error "No assembly data provided via --fasta/--ref_fasta"
|
rliterman@0
|
254 } else{
|
rliterman@0
|
255
|
rliterman@0
|
256 fasta_dir = file(fasta_loc)
|
rliterman@0
|
257
|
rliterman@0
|
258 // If --fasta is a directory...
|
rliterman@0
|
259 if(fasta_dir.isDirectory()){
|
rliterman@0
|
260 ch_fasta = Channel.fromPath(["${fasta_dir}/*.fa","${fasta_dir}/*.fasta","${fasta_dir}/*.fna"])
|
rliterman@0
|
261 }
|
rliterman@0
|
262 // If --fasta is a file...
|
rliterman@0
|
263 else if(fasta_dir.isFile()){
|
rliterman@0
|
264
|
rliterman@0
|
265 // Check if it is a single fasta file...
|
rliterman@0
|
266 if(fasta_dir.getExtension() == "fa" || fasta_dir.getExtension() == "fna" || fasta_dir.getExtension() == "fasta"){
|
rliterman@0
|
267 ch_fasta = Channel.from(fasta_dir).map{it-> file(it)}
|
rliterman@0
|
268 }
|
rliterman@0
|
269 // Otherwise, assume a file with paths to FASTAs
|
rliterman@0
|
270 else{
|
rliterman@0
|
271 ch_fasta = Channel.from(fasta_dir.readLines()).filter{ file -> file =~ /\.(fa|fasta|fna)$/}.map{it-> file(it)}
|
rliterman@0
|
272 }
|
rliterman@0
|
273 } else{
|
rliterman@0
|
274 error "$fasta_dir is not a valid directory or file..."
|
rliterman@0
|
275 }
|
rliterman@27
|
276
|
rliterman@0
|
277 fasta_data = ch_fasta
|
rliterman@27
|
278 .map { filePath ->
|
rliterman@27
|
279 if (!file(filePath).exists()) { error "$filePath is not a valid directory or file..." }
|
rliterman@27
|
280 return filePath }
|
rliterman@0
|
281 .map { filePath ->
|
rliterman@0
|
282 def fileName = file(filePath).getBaseName()
|
rliterman@0
|
283 def sampleName = fileName.replaceAll(trim_this, "")
|
rliterman@27
|
284 tuple(sampleName, filePath)
|
rliterman@27
|
285 }
|
rliterman@0
|
286 }
|
rliterman@0
|
287 }
|
rliterman@0
|
288 workflow processSNPDiffs{
|
rliterman@0
|
289
|
rliterman@0
|
290 emit:
|
rliterman@0
|
291 snpdiffs_data
|
rliterman@0
|
292
|
rliterman@0
|
293 main:
|
rliterman@0
|
294
|
rliterman@0
|
295 if("${params.snpdiffs}" == ""){
|
rliterman@0
|
296 error "No assembly data provided via --snpdiffs"
|
rliterman@0
|
297 } else{
|
rliterman@0
|
298
|
rliterman@0
|
299 snpdiffs_dir = file("${params.snpdiffs}")
|
rliterman@0
|
300
|
rliterman@0
|
301 // If --fasta is a directory...
|
rliterman@0
|
302 if(snpdiffs_dir.isDirectory()){
|
rliterman@0
|
303 ch_snpdiffs = Channel.fromPath("${snpdiffs_dir}/*.snpdiffs")
|
rliterman@0
|
304 }
|
rliterman@0
|
305 // If --fasta is a file...
|
rliterman@0
|
306 else if(snpdiffs_dir.isFile()){
|
rliterman@0
|
307
|
rliterman@0
|
308 // Check if it is a single fasta file...
|
rliterman@0
|
309 if(snpdiffs_dir.getExtension() == "snpdiffs"){
|
rliterman@0
|
310 ch_snpdiffs = Channel.from(snpdiffs_dir)
|
rliterman@0
|
311 }
|
rliterman@0
|
312 // Otherwise, assume a file with paths to SNPDiffs
|
rliterman@0
|
313 else{
|
rliterman@0
|
314 ch_snpdiffs = Channel.from(snpdiffs_dir.readLines()).filter{it->it.endsWith('.snpdiffs') }
|
rliterman@0
|
315 }
|
rliterman@0
|
316 } else{
|
rliterman@0
|
317 error "$snpdiffs_dir is not a valid directory or file..."
|
rliterman@0
|
318 }
|
rliterman@0
|
319
|
rliterman@0
|
320 snpdiffs_data = ch_snpdiffs
|
rliterman@27
|
321 .map { filePath ->
|
rliterman@27
|
322 if (!file(filePath).exists()) { error "$filePath is not a valid directory or file..." }
|
rliterman@27
|
323 return filePath }
|
rliterman@27
|
324 .collect() | getSNPDiffsData | splitCsv | collect | flatten | collate(19)
|
rliterman@0
|
325
|
rliterman@0
|
326 // (1) SNPDiffs_File, (2) Query_ID, (3) Query_Assembly, (4) Query_Contig_Count, (5) Query_Assembly_Bases,
|
rliterman@0
|
327 // (6) Query_N50, (7) Query_N90, (8) Query_L50, (9) Query_L90, (10) Query_SHA256,
|
rliterman@0
|
328 // (11) Reference_ID, (12) Reference_Assembly, (13) Reference_Contig_Count, (14) Reference_Assembly_Bases,
|
rliterman@0
|
329 // (15) Reference_N50, (16) Reference_N90, (17) Reference_L50, (18) Reference_L90, (19) Reference_SHA256
|
rliterman@0
|
330 }
|
rliterman@0
|
331 }
|
rliterman@0
|
332 process getSNPDiffsData{
|
rliterman@0
|
333 executor = 'local'
|
rliterman@0
|
334 cpus = 1
|
rliterman@0
|
335 maxForks = 1
|
rliterman@0
|
336
|
rliterman@0
|
337 input:
|
rliterman@0
|
338 val(snpdiffs_paths)
|
rliterman@0
|
339
|
rliterman@0
|
340 output:
|
rliterman@0
|
341 stdout
|
rliterman@0
|
342
|
rliterman@0
|
343 script:
|
rliterman@0
|
344
|
rliterman@0
|
345 user_snpdiffs_list.write(snpdiffs_paths.join('\n') + "\n")
|
rliterman@0
|
346 """
|
rliterman@0
|
347 $params.load_python_module
|
rliterman@0
|
348 python ${userSNPDiffs} --snpdiffs_file "${user_snpdiffs_list}" --trim_name "${params.trim_name}"
|
rliterman@0
|
349 """
|
rliterman@0
|
350 }
|
rliterman@0
|
351
|
rliterman@0
|
352
|
rliterman@0
|
353 // Fetching read data //
|
rliterman@0
|
354 workflow fetchQueryReads{
|
rliterman@0
|
355
|
rliterman@0
|
356 emit:
|
rliterman@0
|
357 query_reads
|
rliterman@0
|
358
|
rliterman@0
|
359 main:
|
rliterman@0
|
360
|
rliterman@0
|
361 // If --fasta is set, grab assembly paths and characterize assemblies
|
rliterman@0
|
362 ("${params.reads}" != "" ? processReads(params.reads,params.readext,params.forward,params.reverse) : Channel.empty()).set{query_reads}
|
rliterman@0
|
363 }
|
rliterman@0
|
364 workflow fetchRefReads{
|
rliterman@0
|
365
|
rliterman@0
|
366 emit:
|
rliterman@0
|
367 ref_reads
|
rliterman@0
|
368
|
rliterman@0
|
369 main:
|
rliterman@0
|
370
|
rliterman@0
|
371 // If --fasta is set, grab assembly paths and characterize assemblies
|
rliterman@0
|
372 ("${params.ref_reads}" != "" ? processReads(params.ref_reads,params.ref_readext,params.ref_forward,params.ref_reverse) : Channel.empty()).set{ref_reads}
|
rliterman@0
|
373 }
|
rliterman@0
|
374 workflow processReads{
|
rliterman@0
|
375
|
rliterman@0
|
376 take:
|
rliterman@0
|
377 read_loc
|
rliterman@0
|
378 read_ext
|
rliterman@0
|
379 forward
|
rliterman@0
|
380 reverse
|
rliterman@0
|
381
|
rliterman@0
|
382 emit:
|
rliterman@0
|
383 read_info
|
rliterman@0
|
384
|
rliterman@0
|
385 main:
|
rliterman@0
|
386
|
rliterman@0
|
387 if(read_loc == ""){
|
rliterman@0
|
388 error "No data provided to --reads/--ref_reads"
|
rliterman@0
|
389 } else{
|
rliterman@0
|
390
|
rliterman@0
|
391 read_dir = file(read_loc)
|
rliterman@0
|
392
|
rliterman@0
|
393 // If --reads is a single directory, get all reads from that directory
|
rliterman@0
|
394 if(read_dir.isDirectory()){
|
rliterman@0
|
395 read_info = fetchReads(read_dir,read_ext,forward,reverse) | splitCsv
|
rliterman@0
|
396 }
|
rliterman@0
|
397
|
rliterman@0
|
398 // If --reads is a file including paths to many directories, process reads from all directories
|
rliterman@0
|
399 else if(read_dir.isFile()){
|
rliterman@0
|
400 read_info = fetchReads(Channel.from(read_dir.readLines()),read_ext,forward,reverse) | splitCsv
|
rliterman@0
|
401 }
|
rliterman@0
|
402 // Error if --reads doesn't point to a valid file or directory
|
rliterman@0
|
403 else{
|
rliterman@0
|
404 error "$read_dir is neither a valid file or directory..."
|
rliterman@0
|
405 }
|
rliterman@0
|
406 }
|
rliterman@0
|
407 }
|
rliterman@0
|
408 process fetchReads{
|
rliterman@0
|
409
|
rliterman@0
|
410 executor = 'local'
|
rliterman@0
|
411 cpus = 1
|
rliterman@0
|
412 maxForks = 1
|
rliterman@0
|
413
|
rliterman@0
|
414 input:
|
rliterman@0
|
415 val dir // Directory containing read files
|
rliterman@0
|
416 val read_ext // Extention for read files (e.g., fastq.gz or fq)
|
rliterman@0
|
417 val forward_suffix // Identifier for forward reads (e.g., _1.fastq or _R1_001.fq.gz)
|
rliterman@0
|
418 val reverse_suffix // Identifier for reverse reads (e.g., _2.fastq or _R2_001.fq.gz)
|
rliterman@0
|
419
|
rliterman@0
|
420 output:
|
rliterman@0
|
421 stdout
|
rliterman@0
|
422
|
rliterman@0
|
423 script:
|
rliterman@0
|
424
|
rliterman@0
|
425 if(!file(dir).isDirectory()){
|
rliterman@0
|
426 error "$dir is not a valid directory..."
|
rliterman@0
|
427 } else{
|
rliterman@0
|
428 """
|
rliterman@0
|
429 $params.load_python_module
|
rliterman@0
|
430 python ${findReads} --read_dir ${dir} --read_filetype ${read_ext} --forward_suffix ${forward_suffix} --reverse_suffix ${reverse_suffix} --trim_name ${params.trim_name}
|
rliterman@0
|
431 """
|
rliterman@0
|
432 }
|
rliterman@0
|
433 }
|
rliterman@0
|
434
|
rliterman@0
|
435 // Fetch reference IDs //
|
rliterman@0
|
436 workflow processRefIDs{
|
rliterman@0
|
437
|
rliterman@0
|
438 emit:
|
rliterman@0
|
439 ref_ids
|
rliterman@0
|
440
|
rliterman@0
|
441 main:
|
rliterman@0
|
442 def trim_this = "${params.trim_name}"
|
rliterman@0
|
443
|
rliterman@0
|
444 ref_ids = params.ref_id
|
rliterman@0
|
445 .tokenize(',')
|
rliterman@0
|
446 .unique()
|
rliterman@0
|
447 .collect { it ->
|
rliterman@0
|
448 "${it}".replaceAll(trim_this, "")}
|
rliterman@0
|
449 .flatten()
|
rliterman@0
|
450 }
|
rliterman@0
|
451
|
rliterman@0
|
452 // Fetch reference IDs //
|
rliterman@0
|
453 workflow processExclude{
|
rliterman@0
|
454
|
rliterman@0
|
455 emit:
|
rliterman@0
|
456 exclude_ids
|
rliterman@0
|
457
|
rliterman@0
|
458 main:
|
rliterman@0
|
459 def trim_this = "${params.trim_name}"
|
rliterman@0
|
460
|
rliterman@0
|
461 exclude_ids = Channel.from(params.exclude
|
rliterman@0
|
462 .tokenize(',')
|
rliterman@0
|
463 .collect { it -> "${it}".replaceAll(trim_this, "")})
|
rliterman@0
|
464 .map{it -> tuple(it.toString(),"Exclude")}
|
rliterman@0
|
465 .unique{it -> it[0]}
|
rliterman@0
|
466 }
|
rliterman@0
|
467
|
rliterman@0
|
468 process saveRefIDs{
|
rliterman@0
|
469 executor = 'local'
|
rliterman@0
|
470 cpus = 1
|
rliterman@0
|
471 maxForks = 1
|
rliterman@0
|
472
|
rliterman@0
|
473 input:
|
rliterman@0
|
474 val(ref_ids)
|
rliterman@0
|
475
|
rliterman@0
|
476 script:
|
rliterman@0
|
477 ref_id_file.append(ref_ids.join('\n') + '\n')
|
rliterman@0
|
478 """
|
rliterman@0
|
479 """
|
rliterman@0
|
480 }
|
rliterman@0
|
481
|
rliterman@0
|
482 // Assembly //
|
rliterman@0
|
483 workflow assembleReads{
|
rliterman@0
|
484
|
rliterman@0
|
485 take:
|
rliterman@0
|
486 to_assemble
|
rliterman@0
|
487
|
rliterman@0
|
488 emit:
|
rliterman@0
|
489 assembled_data
|
rliterman@0
|
490
|
rliterman@0
|
491 main:
|
rliterman@0
|
492
|
rliterman@0
|
493 // Run SKESA on each entry
|
rliterman@0
|
494 assembly_output = skesaAssemble(to_assemble).splitCsv()
|
rliterman@0
|
495
|
rliterman@0
|
496 // Print log of assemblies
|
rliterman@0
|
497 assembly_output.map {it -> it.join("\t")}.collect() | saveAssemblyLog
|
rliterman@0
|
498
|
rliterman@0
|
499 // Return assembly data
|
rliterman@0
|
500 assembled_data = assembly_output.map{it->tuple(it[0],it[3])}
|
rliterman@0
|
501 }
|
rliterman@0
|
502 process skesaAssemble{
|
rliterman@28
|
503 label 'skesaMem'
|
rliterman@28
|
504
|
rliterman@28
|
505 cpus = skesa_cpus
|
rliterman@28
|
506
|
rliterman@0
|
507 input:
|
rliterman@0
|
508 tuple val(sample_name),val(read_type),val(read_location)
|
rliterman@0
|
509
|
rliterman@0
|
510 output:
|
rliterman@0
|
511 stdout
|
rliterman@0
|
512
|
rliterman@0
|
513 script:
|
rliterman@0
|
514 assembly_file = file("${assembly_directory}/${sample_name}.fasta")
|
rliterman@0
|
515
|
rliterman@0
|
516 // Ensure folder exists and file doesn't
|
rliterman@0
|
517 if(!assembly_directory.isDirectory()){
|
rliterman@0
|
518 error "$assembly_directory is not a valid directory..."
|
rliterman@0
|
519 } else if(assembly_file.isFile()){
|
rliterman@0
|
520 error "$assembly_file already exists..."
|
rliterman@0
|
521 } else if(read_type == "Paired"){
|
rliterman@0
|
522 forward_reverse = read_location.split(";")
|
rliterman@0
|
523 """
|
rliterman@0
|
524 $params.load_skesa_module
|
rliterman@0
|
525 skesa --cores ${skesa_cpus} --use_paired_ends --fastq ${forward_reverse[0]} ${forward_reverse[1]} --contigs_out ${assembly_file}
|
rliterman@0
|
526 echo "${sample_name},${read_type},${read_location},${assembly_file}"
|
rliterman@0
|
527 """
|
rliterman@0
|
528 } else if(read_type == "Single"){
|
rliterman@0
|
529 """
|
rliterman@0
|
530 $params.load_skesa_module
|
rliterman@0
|
531 skesa --cores ${skesa_cpus} --fastq ${read_location} --contigs_out ${assembly_file}
|
rliterman@0
|
532 echo "${sample_name},${read_type},${read_location},${assembly_file}"
|
rliterman@0
|
533 """
|
rliterman@0
|
534 } else{
|
rliterman@0
|
535 error "read_type should be Paired or Single, not $read_type..."
|
rliterman@0
|
536 }
|
rliterman@0
|
537 }
|
rliterman@0
|
538 process saveAssemblyLog{
|
rliterman@0
|
539 executor = 'local'
|
rliterman@0
|
540 cpus = 1
|
rliterman@0
|
541 maxForks = 1
|
rliterman@0
|
542
|
rliterman@0
|
543 input:
|
rliterman@0
|
544 val(assembly_data)
|
rliterman@0
|
545
|
rliterman@0
|
546 script:
|
rliterman@0
|
547 assembly_log.write(assembly_header)
|
rliterman@0
|
548 assembly_log.append(assembly_data.join('\n') + '\n')
|
rliterman@0
|
549 """
|
rliterman@0
|
550 """
|
rliterman@0
|
551 }
|