rliterman@0
|
1 // Subworkflow to fetch sample and reference data from --fasta/--reads/--ref_fasta/--ref_reads
|
rliterman@0
|
2
|
rliterman@0
|
3 // Set path variables
|
rliterman@0
|
4 output_directory = file(params.output_directory)
|
rliterman@0
|
5 assembly_directory = file(params.assembly_directory)
|
rliterman@0
|
6 log_directory = file(params.log_directory)
|
rliterman@0
|
7
|
rliterman@0
|
8 ref_id_file = file(params.ref_id_file)
|
rliterman@0
|
9
|
rliterman@0
|
10 // Set ref_mode
|
rliterman@0
|
11 ref_mode = params.ref_mode
|
rliterman@0
|
12
|
rliterman@0
|
13 // Set file headers
|
rliterman@0
|
14 assembly_header = "Isolate_ID\tRead_Type\tRead_Location\tAssembly_Path\n"
|
rliterman@0
|
15
|
rliterman@0
|
16 // Set paths to accessory files/scripts
|
rliterman@0
|
17 assembly_log = file("${log_directory}/Assembly_Data.tsv")
|
rliterman@0
|
18 user_snpdiffs_list = file("${log_directory}/Imported_SNPDiffs.txt")
|
rliterman@0
|
19 findReads = file("${projectDir}/bin/fetchReads.py")
|
rliterman@0
|
20 userSNPDiffs = file("${projectDir}/bin/userSNPDiffs.py")
|
rliterman@0
|
21
|
rliterman@0
|
22 // Set SKESA cores to 5 or fewer
|
rliterman@0
|
23 skesa_cpus = (params.cores as Integer) >= 5 ? 5 : (params.cores as Integer)
|
rliterman@0
|
24
|
rliterman@0
|
25 workflow {
|
rliterman@0
|
26 main:
|
rliterman@0
|
27 input_data = fetchData()
|
rliterman@0
|
28 query_data = input_data.query_data
|
rliterman@0
|
29 reference_data = input_data.reference_data
|
rliterman@0
|
30 snpdiffs_data = input_data.snpdiff_data
|
rliterman@0
|
31
|
rliterman@0
|
32 publish:
|
rliterman@0
|
33 query_data >> 'query_data.tsv'
|
rliterman@0
|
34 reference_data >> 'reference_data.tsv'
|
rliterman@0
|
35 snpdiff_data >> 'snpdiff_data.tsv'
|
rliterman@0
|
36 }
|
rliterman@0
|
37
|
rliterman@0
|
38 // Top-level workflow //
|
rliterman@0
|
39 workflow fetchData{
|
rliterman@0
|
40
|
rliterman@0
|
41 emit:
|
rliterman@0
|
42 query_data
|
rliterman@0
|
43 reference_data
|
rliterman@0
|
44 snpdiff_data
|
rliterman@0
|
45
|
rliterman@0
|
46 main:
|
rliterman@0
|
47 // Get any excluded IDs
|
rliterman@0
|
48 ("${params.exclude}" != "" ? processExclude() : Channel.empty()).set{exclude_ids}
|
rliterman@0
|
49
|
rliterman@0
|
50 // Process snpdiffs alignments
|
rliterman@0
|
51 // If assembly file cannot be found, it will be 'null'
|
rliterman@0
|
52 ("${params.snpdiffs}" != "" ? processSNPDiffs() : Channel.empty()).set{user_snpdiffs}
|
rliterman@0
|
53
|
rliterman@0
|
54 excluded_snpdiffs = user_snpdiffs.map{it -> tuple(it[1],it[0])}
|
rliterman@0
|
55 .concat(user_snpdiffs.map{it -> tuple(it[10],it[0])})
|
rliterman@0
|
56 .join(exclude_ids,by:0)
|
rliterman@0
|
57 .unique{it -> it[1]}
|
rliterman@0
|
58 .map{it -> tuple(it[1],"Exclude")}
|
rliterman@0
|
59
|
rliterman@0
|
60 // Generate return channel: 3-item tuple (Query_ID, Reference_ID, SNPDiff_Path)
|
rliterman@0
|
61 snpdiff_data = user_snpdiffs
|
rliterman@0
|
62 .map{it -> tuple(it[0],it[1],it[10])}
|
rliterman@0
|
63 .join(excluded_snpdiffs,by:0,remainder:true)
|
rliterman@0
|
64 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
65 .filter{it -> it[3].toString() != "Exclude"}
|
rliterman@0
|
66 .unique{it -> it[0]}
|
rliterman@0
|
67 .map{it -> tuple(it[1],it[2],it[0])}
|
rliterman@0
|
68 .collect().flatten().collate(3)
|
rliterman@0
|
69
|
rliterman@0
|
70 // Get assembly data from snpdiffs
|
rliterman@0
|
71 snpdiff_assemblies = user_snpdiffs.map{it-> tuple(it[1],it[2])}
|
rliterman@0
|
72 .concat(user_snpdiffs.map{it-> tuple(it[10],it[11])})
|
rliterman@0
|
73 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
74 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
75 .filter{it -> it[2].toString() != "Exclude"}
|
rliterman@0
|
76 .map{it -> tuple(it[0],it[1],'SNPDiff')}
|
rliterman@0
|
77 .collect().flatten().collate(3)
|
rliterman@0
|
78
|
rliterman@0
|
79 assembled_snpdiffs = snpdiff_assemblies
|
rliterman@0
|
80 .filter{it -> it[1].toString() != "null"}
|
rliterman@0
|
81 .unique{it->it[0]}.collect().flatten().collate(3)
|
rliterman@0
|
82
|
rliterman@0
|
83 // Process any data provided as assemblies
|
rliterman@0
|
84 // Returns 2-item tuples with the following format: (Isolate_ID, Assembly_Path)
|
rliterman@0
|
85 ("${params.fasta}" != "" ? fetchQueryFasta() : Channel.empty()).set{query_fasta}
|
rliterman@0
|
86 ("${params.ref_fasta}" != "" ? fetchRefFasta() : Channel.empty()).set{ref_fasta}
|
rliterman@0
|
87
|
rliterman@0
|
88 pre_assembled = assembled_snpdiffs
|
rliterman@0
|
89 .map{it -> tuple(it[0],it[1])}
|
rliterman@0
|
90 .concat(query_fasta)
|
rliterman@0
|
91 .concat(ref_fasta)
|
rliterman@0
|
92 .unique{it -> it[0]}
|
rliterman@0
|
93 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
94 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
95 .filter{it -> it[2].toString() != "Exclude"}
|
rliterman@0
|
96 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
97 .collect().flatten().collate(2)
|
rliterman@0
|
98
|
rliterman@0
|
99 // Process any data provided as reads
|
rliterman@0
|
100 // Returns 3-item tuples with the following format: (Isolate_ID, Read_Type, Read_Path)
|
rliterman@0
|
101 ("${params.reads}" != "" ? fetchQueryReads() : Channel.empty()).set{query_reads}
|
rliterman@0
|
102 ("${params.ref_reads}" != "" ? fetchRefReads() : Channel.empty()).set{ref_reads}
|
rliterman@0
|
103
|
rliterman@0
|
104 all_reads = query_reads
|
rliterman@0
|
105 .concat(ref_reads)
|
rliterman@0
|
106 .unique{it->it[0]}
|
rliterman@0
|
107 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
108 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
109 .filter{it -> it[3].toString() != "Exclude"}
|
rliterman@0
|
110 .map{it->tuple(it[0],it[1],it[2])}
|
rliterman@0
|
111 .collect().flatten().collate(3)
|
rliterman@0
|
112
|
rliterman@0
|
113 // Figure out if any assembly is necessary
|
rliterman@0
|
114 fasta_read_combo = all_reads.join(pre_assembled,by:0,remainder: true) |
|
rliterman@0
|
115 branch{it ->
|
rliterman@0
|
116 assembly: it[1].toString() == "null"
|
rliterman@0
|
117 return(tuple(it[0],it[2]))
|
rliterman@0
|
118 read: it[3].toString() == "null"
|
rliterman@0
|
119 return(tuple(it[0],it[1],it[2]))
|
rliterman@0
|
120 combo: true
|
rliterman@0
|
121 return(tuple(it[0],it[3]))}
|
rliterman@0
|
122
|
rliterman@0
|
123 // Assemble reads if necessary
|
rliterman@0
|
124 assembled_reads = fasta_read_combo.read
|
rliterman@0
|
125 .collect().flatten().collate(3) | assembleReads
|
rliterman@0
|
126
|
rliterman@0
|
127 // If runmode is 'assemble', tasks are complete
|
rliterman@0
|
128 if(params.runmode == "assemble"){
|
rliterman@0
|
129 query_data = Channel.empty()
|
rliterman@0
|
130 reference_data = Channel.empty()
|
rliterman@0
|
131 } else{
|
rliterman@0
|
132
|
rliterman@0
|
133 // If FASTAs are provided via data and snpdiffs, use snpdiffs (as it's already been used)
|
rliterman@0
|
134 user_fastas = query_fasta
|
rliterman@0
|
135 .concat(ref_fasta)
|
rliterman@0
|
136 .concat(assembled_reads)
|
rliterman@0
|
137 .unique{it -> it[0]}
|
rliterman@0
|
138 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
139 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
140 .filter{it -> it[2].toString() != "Exclude"}
|
rliterman@0
|
141 .map{it->tuple(it[0],it[1],'User')}
|
rliterman@0
|
142 .collect().flatten().collate(3)
|
rliterman@0
|
143 .join(assembled_snpdiffs,by:0,remainder:true)
|
rliterman@0
|
144 .filter{it -> it[3].toString() == "null"}
|
rliterman@0
|
145 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
146
|
rliterman@0
|
147 // Get all assemblies
|
rliterman@0
|
148 all_assembled = assembled_snpdiffs
|
rliterman@0
|
149 .map{it -> tuple(it[0],it[1])}
|
rliterman@0
|
150 .concat(user_fastas)
|
rliterman@0
|
151 .unique{it->it[0]}.collect().flatten().collate(2)
|
rliterman@0
|
152
|
rliterman@0
|
153 // Get data for isolates where a SNPDiff was provided, but no FASTA could be located
|
rliterman@0
|
154 no_assembly = snpdiff_assemblies
|
rliterman@0
|
155 .map{it -> tuple(it[0],it[1])}
|
rliterman@0
|
156 .filter{it -> it[1].toString() == "null"}
|
rliterman@0
|
157 .unique{it -> it[0]}
|
rliterman@0
|
158 .join(all_assembled,by:0,remainder:true)
|
rliterman@0
|
159 .filter{it -> it[2].toString() == "null"}
|
rliterman@0
|
160 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
161 .collect().flatten().collate(2)
|
rliterman@0
|
162
|
rliterman@0
|
163 // Compile all samples
|
rliterman@0
|
164 all_samples = all_assembled
|
rliterman@0
|
165 .concat(no_assembly)
|
rliterman@0
|
166 .unique{it-> it[0]}.collect().flatten().collate(2)
|
rliterman@0
|
167
|
rliterman@0
|
168 // If no reference data is provided return a blank channel
|
rliterman@0
|
169 if(!ref_mode){
|
rliterman@0
|
170 reference_data = Channel.empty()
|
rliterman@0
|
171
|
rliterman@0
|
172 query_data = all_samples
|
rliterman@0
|
173 .unique{it -> it[0]}
|
rliterman@0
|
174 .collect().flatten().collate(2)
|
rliterman@0
|
175
|
rliterman@0
|
176 } else{
|
rliterman@0
|
177
|
rliterman@0
|
178 // Process additional reference IDs
|
rliterman@0
|
179 ("${params.ref_id}" != "" ? processRefIDs() : Channel.empty()).set{user_ref_ids}
|
rliterman@0
|
180
|
rliterman@0
|
181 all_ref_ids = ref_fasta.map{it->tuple(it[0])}
|
rliterman@0
|
182 .concat(ref_reads.map{it->tuple(it[0])})
|
rliterman@0
|
183 .concat(user_ref_ids)
|
rliterman@0
|
184 .unique{it-> it[0]}.collect().flatten().collate(1)
|
rliterman@0
|
185 .map{it -> tuple(it[0],"Reference")}
|
rliterman@0
|
186 .join(exclude_ids,by:0,remainder:true)
|
rliterman@0
|
187 .filter{it -> it[0].toString() != "null"}
|
rliterman@0
|
188 .filter{it -> it[2].toString() != "Exclude"}
|
rliterman@0
|
189 .map{it -> tuple(it[0],it[1])}
|
rliterman@0
|
190
|
rliterman@0
|
191 reference_data = all_samples
|
rliterman@0
|
192 .join(all_ref_ids,by:0,remainder:true)
|
rliterman@0
|
193 .filter{it -> it[2].toString() == "Reference"}
|
rliterman@0
|
194 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
195 .unique{it -> it[0]}
|
rliterman@0
|
196 .collect().flatten().collate(2)
|
rliterman@0
|
197
|
rliterman@0
|
198 // Save reference data to file
|
rliterman@0
|
199 reference_data
|
rliterman@0
|
200 .collect{it -> it[0]}
|
rliterman@0
|
201 | saveRefIDs
|
rliterman@0
|
202
|
rliterman@0
|
203 if(params.runmode == "screen" || params.runmode == "align"){
|
rliterman@0
|
204 query_data = all_samples
|
rliterman@0
|
205 .join(all_ref_ids,by:0,remainder:true)
|
rliterman@0
|
206 .filter{it -> it[2].toString() != "Reference"}
|
rliterman@0
|
207 .map{it->tuple(it[0],it[1])}
|
rliterman@0
|
208 .unique{it -> it[0]}
|
rliterman@0
|
209 .collect().flatten().collate(2)
|
rliterman@0
|
210 } else if(params.runmode == "snp"){
|
rliterman@0
|
211 query_data = all_samples
|
rliterman@0
|
212 .unique{it -> it[0]}
|
rliterman@0
|
213 .collect().flatten().collate(2)
|
rliterman@0
|
214 }
|
rliterman@0
|
215 }
|
rliterman@0
|
216 }
|
rliterman@0
|
217 }
|
rliterman@0
|
218
|
rliterman@0
|
219 // Fetching preassembled data //
|
rliterman@0
|
220 workflow fetchQueryFasta{
|
rliterman@0
|
221
|
rliterman@0
|
222 emit:
|
rliterman@0
|
223 query_fasta
|
rliterman@0
|
224
|
rliterman@0
|
225 main:
|
rliterman@0
|
226
|
rliterman@0
|
227 // If --fasta is set, grab assembly paths and characterize assemblies
|
rliterman@0
|
228 ("${params.fasta}" != "" ? getAssemblies(params.fasta) : Channel.empty()).set{query_fasta}
|
rliterman@0
|
229 }
|
rliterman@0
|
230 workflow fetchRefFasta{
|
rliterman@0
|
231
|
rliterman@0
|
232 emit:
|
rliterman@0
|
233 ref_fasta
|
rliterman@0
|
234
|
rliterman@0
|
235 main:
|
rliterman@0
|
236
|
rliterman@0
|
237 // If --fasta is set, grab assembly paths and characterize assemblies
|
rliterman@0
|
238 ("${params.ref_fasta}" != "" ? getAssemblies(params.ref_fasta) : Channel.empty()).set{ref_fasta}
|
rliterman@0
|
239 }
|
rliterman@0
|
240 workflow getAssemblies{
|
rliterman@0
|
241
|
rliterman@0
|
242 take:
|
rliterman@0
|
243 fasta_loc
|
rliterman@0
|
244
|
rliterman@0
|
245 emit:
|
rliterman@0
|
246 fasta_data
|
rliterman@0
|
247
|
rliterman@0
|
248 main:
|
rliterman@0
|
249 def trim_this = "${params.trim_name}"
|
rliterman@0
|
250
|
rliterman@0
|
251 if(fasta_loc == ""){
|
rliterman@0
|
252 error "No assembly data provided via --fasta/--ref_fasta"
|
rliterman@0
|
253 } else{
|
rliterman@0
|
254
|
rliterman@0
|
255 fasta_dir = file(fasta_loc)
|
rliterman@0
|
256
|
rliterman@0
|
257 // If --fasta is a directory...
|
rliterman@0
|
258 if(fasta_dir.isDirectory()){
|
rliterman@0
|
259 ch_fasta = Channel.fromPath(["${fasta_dir}/*.fa","${fasta_dir}/*.fasta","${fasta_dir}/*.fna"])
|
rliterman@0
|
260 }
|
rliterman@0
|
261 // If --fasta is a file...
|
rliterman@0
|
262 else if(fasta_dir.isFile()){
|
rliterman@0
|
263
|
rliterman@0
|
264 // Check if it is a single fasta file...
|
rliterman@0
|
265 if(fasta_dir.getExtension() == "fa" || fasta_dir.getExtension() == "fna" || fasta_dir.getExtension() == "fasta"){
|
rliterman@0
|
266 ch_fasta = Channel.from(fasta_dir).map{it-> file(it)}
|
rliterman@0
|
267 }
|
rliterman@0
|
268 // Otherwise, assume a file with paths to FASTAs
|
rliterman@0
|
269 else{
|
rliterman@0
|
270 ch_fasta = Channel.from(fasta_dir.readLines()).filter{ file -> file =~ /\.(fa|fasta|fna)$/}.map{it-> file(it)}
|
rliterman@0
|
271 }
|
rliterman@0
|
272 } else{
|
rliterman@0
|
273 error "$fasta_dir is not a valid directory or file..."
|
rliterman@0
|
274 }
|
rliterman@0
|
275 fasta_data = ch_fasta
|
rliterman@0
|
276 .filter { file(it).exists() }
|
rliterman@0
|
277 .map { filePath ->
|
rliterman@0
|
278 def fileName = file(filePath).getBaseName()
|
rliterman@0
|
279 def sampleName = fileName.replaceAll(trim_this, "")
|
rliterman@0
|
280 tuple(sampleName, filePath)}
|
rliterman@0
|
281 }
|
rliterman@0
|
282 }
|
rliterman@0
|
283 workflow processSNPDiffs{
|
rliterman@0
|
284
|
rliterman@0
|
285 emit:
|
rliterman@0
|
286 snpdiffs_data
|
rliterman@0
|
287
|
rliterman@0
|
288 main:
|
rliterman@0
|
289
|
rliterman@0
|
290 if("${params.snpdiffs}" == ""){
|
rliterman@0
|
291 error "No assembly data provided via --snpdiffs"
|
rliterman@0
|
292 } else{
|
rliterman@0
|
293
|
rliterman@0
|
294 snpdiffs_dir = file("${params.snpdiffs}")
|
rliterman@0
|
295
|
rliterman@0
|
296 // If --fasta is a directory...
|
rliterman@0
|
297 if(snpdiffs_dir.isDirectory()){
|
rliterman@0
|
298 ch_snpdiffs = Channel.fromPath("${snpdiffs_dir}/*.snpdiffs")
|
rliterman@0
|
299 }
|
rliterman@0
|
300 // If --fasta is a file...
|
rliterman@0
|
301 else if(snpdiffs_dir.isFile()){
|
rliterman@0
|
302
|
rliterman@0
|
303 // Check if it is a single fasta file...
|
rliterman@0
|
304 if(snpdiffs_dir.getExtension() == "snpdiffs"){
|
rliterman@0
|
305 ch_snpdiffs = Channel.from(snpdiffs_dir)
|
rliterman@0
|
306 }
|
rliterman@0
|
307 // Otherwise, assume a file with paths to SNPDiffs
|
rliterman@0
|
308 else{
|
rliterman@0
|
309 ch_snpdiffs = Channel.from(snpdiffs_dir.readLines()).filter{it->it.endsWith('.snpdiffs') }
|
rliterman@0
|
310 }
|
rliterman@0
|
311 } else{
|
rliterman@0
|
312 error "$snpdiffs_dir is not a valid directory or file..."
|
rliterman@0
|
313 }
|
rliterman@0
|
314
|
rliterman@0
|
315 snpdiffs_data = ch_snpdiffs
|
rliterman@0
|
316 .filter { file(it).exists() }
|
rliterman@0
|
317 .collect() | getSNPDiffsData | splitCsv | collect | flatten | collate(19)
|
rliterman@0
|
318
|
rliterman@0
|
319 // (1) SNPDiffs_File, (2) Query_ID, (3) Query_Assembly, (4) Query_Contig_Count, (5) Query_Assembly_Bases,
|
rliterman@0
|
320 // (6) Query_N50, (7) Query_N90, (8) Query_L50, (9) Query_L90, (10) Query_SHA256,
|
rliterman@0
|
321 // (11) Reference_ID, (12) Reference_Assembly, (13) Reference_Contig_Count, (14) Reference_Assembly_Bases,
|
rliterman@0
|
322 // (15) Reference_N50, (16) Reference_N90, (17) Reference_L50, (18) Reference_L90, (19) Reference_SHA256
|
rliterman@0
|
323 }
|
rliterman@0
|
324 }
|
rliterman@0
|
325 process getSNPDiffsData{
|
rliterman@0
|
326 executor = 'local'
|
rliterman@0
|
327 cpus = 1
|
rliterman@0
|
328 maxForks = 1
|
rliterman@0
|
329
|
rliterman@0
|
330 input:
|
rliterman@0
|
331 val(snpdiffs_paths)
|
rliterman@0
|
332
|
rliterman@0
|
333 output:
|
rliterman@0
|
334 stdout
|
rliterman@0
|
335
|
rliterman@0
|
336 script:
|
rliterman@0
|
337
|
rliterman@0
|
338 user_snpdiffs_list.write(snpdiffs_paths.join('\n') + "\n")
|
rliterman@0
|
339 """
|
rliterman@0
|
340 $params.load_python_module
|
rliterman@0
|
341 python ${userSNPDiffs} --snpdiffs_file "${user_snpdiffs_list}" --trim_name "${params.trim_name}"
|
rliterman@0
|
342 """
|
rliterman@0
|
343 }
|
rliterman@0
|
344
|
rliterman@0
|
345
|
rliterman@0
|
346 // Fetching read data //
|
rliterman@0
|
347 workflow fetchQueryReads{
|
rliterman@0
|
348
|
rliterman@0
|
349 emit:
|
rliterman@0
|
350 query_reads
|
rliterman@0
|
351
|
rliterman@0
|
352 main:
|
rliterman@0
|
353
|
rliterman@0
|
354 // If --fasta is set, grab assembly paths and characterize assemblies
|
rliterman@0
|
355 ("${params.reads}" != "" ? processReads(params.reads,params.readext,params.forward,params.reverse) : Channel.empty()).set{query_reads}
|
rliterman@0
|
356 }
|
rliterman@0
|
357 workflow fetchRefReads{
|
rliterman@0
|
358
|
rliterman@0
|
359 emit:
|
rliterman@0
|
360 ref_reads
|
rliterman@0
|
361
|
rliterman@0
|
362 main:
|
rliterman@0
|
363
|
rliterman@0
|
364 // If --fasta is set, grab assembly paths and characterize assemblies
|
rliterman@0
|
365 ("${params.ref_reads}" != "" ? processReads(params.ref_reads,params.ref_readext,params.ref_forward,params.ref_reverse) : Channel.empty()).set{ref_reads}
|
rliterman@0
|
366 }
|
rliterman@0
|
367 workflow processReads{
|
rliterman@0
|
368
|
rliterman@0
|
369 take:
|
rliterman@0
|
370 read_loc
|
rliterman@0
|
371 read_ext
|
rliterman@0
|
372 forward
|
rliterman@0
|
373 reverse
|
rliterman@0
|
374
|
rliterman@0
|
375 emit:
|
rliterman@0
|
376 read_info
|
rliterman@0
|
377
|
rliterman@0
|
378 main:
|
rliterman@0
|
379
|
rliterman@0
|
380 if(read_loc == ""){
|
rliterman@0
|
381 error "No data provided to --reads/--ref_reads"
|
rliterman@0
|
382 } else{
|
rliterman@0
|
383
|
rliterman@0
|
384 read_dir = file(read_loc)
|
rliterman@0
|
385
|
rliterman@0
|
386 // If --reads is a single directory, get all reads from that directory
|
rliterman@0
|
387 if(read_dir.isDirectory()){
|
rliterman@0
|
388 read_info = fetchReads(read_dir,read_ext,forward,reverse) | splitCsv
|
rliterman@0
|
389 }
|
rliterman@0
|
390
|
rliterman@0
|
391 // If --reads is a file including paths to many directories, process reads from all directories
|
rliterman@0
|
392 else if(read_dir.isFile()){
|
rliterman@0
|
393 read_info = fetchReads(Channel.from(read_dir.readLines()),read_ext,forward,reverse) | splitCsv
|
rliterman@0
|
394 }
|
rliterman@0
|
395 // Error if --reads doesn't point to a valid file or directory
|
rliterman@0
|
396 else{
|
rliterman@0
|
397 error "$read_dir is neither a valid file or directory..."
|
rliterman@0
|
398 }
|
rliterman@0
|
399 }
|
rliterman@0
|
400 }
|
rliterman@0
|
401 process fetchReads{
|
rliterman@0
|
402
|
rliterman@0
|
403 executor = 'local'
|
rliterman@0
|
404 cpus = 1
|
rliterman@0
|
405 maxForks = 1
|
rliterman@0
|
406
|
rliterman@0
|
407 input:
|
rliterman@0
|
408 val dir // Directory containing read files
|
rliterman@0
|
409 val read_ext // Extention for read files (e.g., fastq.gz or fq)
|
rliterman@0
|
410 val forward_suffix // Identifier for forward reads (e.g., _1.fastq or _R1_001.fq.gz)
|
rliterman@0
|
411 val reverse_suffix // Identifier for reverse reads (e.g., _2.fastq or _R2_001.fq.gz)
|
rliterman@0
|
412
|
rliterman@0
|
413 output:
|
rliterman@0
|
414 stdout
|
rliterman@0
|
415
|
rliterman@0
|
416 script:
|
rliterman@0
|
417
|
rliterman@0
|
418 if(!file(dir).isDirectory()){
|
rliterman@0
|
419 error "$dir is not a valid directory..."
|
rliterman@0
|
420 } else{
|
rliterman@0
|
421 """
|
rliterman@0
|
422 $params.load_python_module
|
rliterman@0
|
423 python ${findReads} --read_dir ${dir} --read_filetype ${read_ext} --forward_suffix ${forward_suffix} --reverse_suffix ${reverse_suffix} --trim_name ${params.trim_name}
|
rliterman@0
|
424 """
|
rliterman@0
|
425 }
|
rliterman@0
|
426 }
|
rliterman@0
|
427
|
rliterman@0
|
428 // Fetch reference IDs //
|
rliterman@0
|
429 workflow processRefIDs{
|
rliterman@0
|
430
|
rliterman@0
|
431 emit:
|
rliterman@0
|
432 ref_ids
|
rliterman@0
|
433
|
rliterman@0
|
434 main:
|
rliterman@0
|
435 def trim_this = "${params.trim_name}"
|
rliterman@0
|
436
|
rliterman@0
|
437 ref_ids = params.ref_id
|
rliterman@0
|
438 .tokenize(',')
|
rliterman@0
|
439 .unique()
|
rliterman@0
|
440 .collect { it ->
|
rliterman@0
|
441 "${it}".replaceAll(trim_this, "")}
|
rliterman@0
|
442 .flatten()
|
rliterman@0
|
443 }
|
rliterman@0
|
444
|
rliterman@0
|
445 // Fetch reference IDs //
|
rliterman@0
|
446 workflow processExclude{
|
rliterman@0
|
447
|
rliterman@0
|
448 emit:
|
rliterman@0
|
449 exclude_ids
|
rliterman@0
|
450
|
rliterman@0
|
451 main:
|
rliterman@0
|
452 def trim_this = "${params.trim_name}"
|
rliterman@0
|
453
|
rliterman@0
|
454 exclude_ids = Channel.from(params.exclude
|
rliterman@0
|
455 .tokenize(',')
|
rliterman@0
|
456 .collect { it -> "${it}".replaceAll(trim_this, "")})
|
rliterman@0
|
457 .map{it -> tuple(it.toString(),"Exclude")}
|
rliterman@0
|
458 .unique{it -> it[0]}
|
rliterman@0
|
459 }
|
rliterman@0
|
460
|
rliterman@0
|
461 process saveRefIDs{
|
rliterman@0
|
462 executor = 'local'
|
rliterman@0
|
463 cpus = 1
|
rliterman@0
|
464 maxForks = 1
|
rliterman@0
|
465
|
rliterman@0
|
466 input:
|
rliterman@0
|
467 val(ref_ids)
|
rliterman@0
|
468
|
rliterman@0
|
469 script:
|
rliterman@0
|
470 ref_id_file.append(ref_ids.join('\n') + '\n')
|
rliterman@0
|
471 """
|
rliterman@0
|
472 """
|
rliterman@0
|
473 }
|
rliterman@0
|
474
|
rliterman@0
|
475 // Assembly //
|
rliterman@0
|
476 workflow assembleReads{
|
rliterman@0
|
477
|
rliterman@0
|
478 take:
|
rliterman@0
|
479 to_assemble
|
rliterman@0
|
480
|
rliterman@0
|
481 emit:
|
rliterman@0
|
482 assembled_data
|
rliterman@0
|
483
|
rliterman@0
|
484 main:
|
rliterman@0
|
485
|
rliterman@0
|
486 // Run SKESA on each entry
|
rliterman@0
|
487 assembly_output = skesaAssemble(to_assemble).splitCsv()
|
rliterman@0
|
488
|
rliterman@0
|
489 // Print log of assemblies
|
rliterman@0
|
490 assembly_output.map {it -> it.join("\t")}.collect() | saveAssemblyLog
|
rliterman@0
|
491
|
rliterman@0
|
492 // Return assembly data
|
rliterman@0
|
493 assembled_data = assembly_output.map{it->tuple(it[0],it[3])}
|
rliterman@0
|
494 }
|
rliterman@0
|
495 process skesaAssemble{
|
rliterman@0
|
496 memory '12 GB' // Add readcount/memory check?
|
rliterman@0
|
497
|
rliterman@0
|
498 input:
|
rliterman@0
|
499 tuple val(sample_name),val(read_type),val(read_location)
|
rliterman@0
|
500
|
rliterman@0
|
501 output:
|
rliterman@0
|
502 stdout
|
rliterman@0
|
503
|
rliterman@0
|
504 script:
|
rliterman@0
|
505 assembly_file = file("${assembly_directory}/${sample_name}.fasta")
|
rliterman@0
|
506
|
rliterman@0
|
507 // Ensure folder exists and file doesn't
|
rliterman@0
|
508 if(!assembly_directory.isDirectory()){
|
rliterman@0
|
509 error "$assembly_directory is not a valid directory..."
|
rliterman@0
|
510 } else if(assembly_file.isFile()){
|
rliterman@0
|
511 error "$assembly_file already exists..."
|
rliterman@0
|
512 } else if(read_type == "Paired"){
|
rliterman@0
|
513 forward_reverse = read_location.split(";")
|
rliterman@0
|
514 """
|
rliterman@0
|
515 $params.load_skesa_module
|
rliterman@0
|
516 skesa --cores ${skesa_cpus} --use_paired_ends --fastq ${forward_reverse[0]} ${forward_reverse[1]} --contigs_out ${assembly_file}
|
rliterman@0
|
517 echo "${sample_name},${read_type},${read_location},${assembly_file}"
|
rliterman@0
|
518 """
|
rliterman@0
|
519 } else if(read_type == "Single"){
|
rliterman@0
|
520 """
|
rliterman@0
|
521 $params.load_skesa_module
|
rliterman@0
|
522 skesa --cores ${skesa_cpus} --fastq ${read_location} --contigs_out ${assembly_file}
|
rliterman@0
|
523 echo "${sample_name},${read_type},${read_location},${assembly_file}"
|
rliterman@0
|
524 """
|
rliterman@0
|
525 } else{
|
rliterman@0
|
526 error "read_type should be Paired or Single, not $read_type..."
|
rliterman@0
|
527 }
|
rliterman@0
|
528 }
|
rliterman@0
|
529 process saveAssemblyLog{
|
rliterman@0
|
530 executor = 'local'
|
rliterman@0
|
531 cpus = 1
|
rliterman@0
|
532 maxForks = 1
|
rliterman@0
|
533
|
rliterman@0
|
534 input:
|
rliterman@0
|
535 val(assembly_data)
|
rliterman@0
|
536
|
rliterman@0
|
537 script:
|
rliterman@0
|
538 assembly_log.write(assembly_header)
|
rliterman@0
|
539 assembly_log.append(assembly_data.join('\n') + '\n')
|
rliterman@0
|
540 """
|
rliterman@0
|
541 """
|
rliterman@0
|
542 }
|