jpayne@68
|
1 package prok;
|
jpayne@68
|
2
|
jpayne@68
|
3 import java.io.File;
|
jpayne@68
|
4 import java.io.PrintStream;
|
jpayne@68
|
5 import java.util.ArrayList;
|
jpayne@68
|
6 import java.util.PriorityQueue;
|
jpayne@68
|
7
|
jpayne@68
|
8 import aligner.Alignment;
|
jpayne@68
|
9 import dna.AminoAcid;
|
jpayne@68
|
10 import fileIO.ByteFile;
|
jpayne@68
|
11 import fileIO.FileFormat;
|
jpayne@68
|
12 import fileIO.ReadWrite;
|
jpayne@68
|
13 import shared.Parse;
|
jpayne@68
|
14 import shared.Parser;
|
jpayne@68
|
15 import shared.PreParser;
|
jpayne@68
|
16 import shared.ReadStats;
|
jpayne@68
|
17 import shared.Shared;
|
jpayne@68
|
18 import shared.Timer;
|
jpayne@68
|
19 import shared.Tools;
|
jpayne@68
|
20 import stream.ConcurrentReadInputStream;
|
jpayne@68
|
21 import stream.ConcurrentReadOutputStream;
|
jpayne@68
|
22 import stream.FASTQ;
|
jpayne@68
|
23 import stream.FastaReadInputStream;
|
jpayne@68
|
24 import stream.Read;
|
jpayne@68
|
25 import stream.ReadInputStream;
|
jpayne@68
|
26 import structures.ListNum;
|
jpayne@68
|
27 import structures.LongHashSet;
|
jpayne@68
|
28 import template.Accumulator;
|
jpayne@68
|
29 import template.ThreadWaiter;
|
jpayne@68
|
30
|
jpayne@68
|
31 /**
|
jpayne@68
|
32 * Makes a consensus ribosomal sequence using raw reads as input.
|
jpayne@68
|
33 *
|
jpayne@68
|
34 * @author Brian Bushnell
|
jpayne@68
|
35 * @date October 10, 2019
|
jpayne@68
|
36 *
|
jpayne@68
|
37 */
|
jpayne@68
|
38 public class RiboMaker implements Accumulator<RiboMaker.ProcessThread> {
|
jpayne@68
|
39
|
jpayne@68
|
40 /*--------------------------------------------------------------*/
|
jpayne@68
|
41 /*---------------- Initialization ----------------*/
|
jpayne@68
|
42 /*--------------------------------------------------------------*/
|
jpayne@68
|
43
|
jpayne@68
|
44 /**
|
jpayne@68
|
45 * Code entrance from the command line.
|
jpayne@68
|
46 * @param args Command line arguments
|
jpayne@68
|
47 */
|
jpayne@68
|
48 public static void main(String[] args){
|
jpayne@68
|
49 assert(false) : "TODO";
|
jpayne@68
|
50
|
jpayne@68
|
51 //Start a timer immediately upon code entrance.
|
jpayne@68
|
52 Timer t=new Timer();
|
jpayne@68
|
53
|
jpayne@68
|
54 //Create an instance of this class
|
jpayne@68
|
55 RiboMaker x=new RiboMaker(args);
|
jpayne@68
|
56
|
jpayne@68
|
57 //Run the object
|
jpayne@68
|
58 x.process(t);
|
jpayne@68
|
59
|
jpayne@68
|
60 //Close the print stream if it was redirected
|
jpayne@68
|
61 Shared.closeStream(x.outstream);
|
jpayne@68
|
62 }
|
jpayne@68
|
63
|
jpayne@68
|
64 /**
|
jpayne@68
|
65 * Constructor.
|
jpayne@68
|
66 * @param args Command line arguments
|
jpayne@68
|
67 */
|
jpayne@68
|
68 public RiboMaker(String[] args){
|
jpayne@68
|
69
|
jpayne@68
|
70 {//Preparse block for help, config files, and outstream
|
jpayne@68
|
71 PreParser pp=new PreParser(args, getClass(), false);
|
jpayne@68
|
72 args=pp.args;
|
jpayne@68
|
73 outstream=pp.outstream;
|
jpayne@68
|
74 }
|
jpayne@68
|
75
|
jpayne@68
|
76 //Set shared static variables prior to parsing
|
jpayne@68
|
77 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
|
jpayne@68
|
78 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
|
jpayne@68
|
79
|
jpayne@68
|
80 {//Parse the arguments
|
jpayne@68
|
81 final Parser parser=parse(args);
|
jpayne@68
|
82 Parser.processQuality();
|
jpayne@68
|
83
|
jpayne@68
|
84 maxReads=parser.maxReads;
|
jpayne@68
|
85 overwrite=ReadStats.overwrite=parser.overwrite;
|
jpayne@68
|
86 append=ReadStats.append=parser.append;
|
jpayne@68
|
87 setInterleaved=parser.setInterleaved;
|
jpayne@68
|
88
|
jpayne@68
|
89 in1=parser.in1;
|
jpayne@68
|
90 in2=parser.in2;
|
jpayne@68
|
91 qfin1=parser.qfin1;
|
jpayne@68
|
92 qfin2=parser.qfin2;
|
jpayne@68
|
93 extin=parser.extin;
|
jpayne@68
|
94
|
jpayne@68
|
95 out1=parser.out1;
|
jpayne@68
|
96 qfout1=parser.qfout1;
|
jpayne@68
|
97 extout=parser.extout;
|
jpayne@68
|
98 }
|
jpayne@68
|
99
|
jpayne@68
|
100 validateParams();
|
jpayne@68
|
101 doPoundReplacement(); //Replace # with 1 and 2
|
jpayne@68
|
102 adjustInterleaving(); //Make sure interleaving agrees with number of input and output files
|
jpayne@68
|
103 fixExtensions(); //Add or remove .gz or .bz2 as needed
|
jpayne@68
|
104 checkFileExistence(); //Ensure files can be read and written
|
jpayne@68
|
105 checkStatics(); //Adjust file-related static fields as needed for this program
|
jpayne@68
|
106
|
jpayne@68
|
107 //Create output FileFormat objects
|
jpayne@68
|
108 ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
|
jpayne@68
|
109
|
jpayne@68
|
110 fffilter=FileFormat.testInput(filterFile, FileFormat.FASTA, null, true, true);
|
jpayne@68
|
111 ffref=FileFormat.testInput(refFile, FileFormat.FASTA, null, true, true);
|
jpayne@68
|
112
|
jpayne@68
|
113 //Create input FileFormat objects
|
jpayne@68
|
114 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
|
jpayne@68
|
115 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
|
jpayne@68
|
116
|
jpayne@68
|
117 if(fffilter==null){
|
jpayne@68
|
118 filter=null;
|
jpayne@68
|
119 }else{
|
jpayne@68
|
120 filter=loadFilter(fffilter, k);
|
jpayne@68
|
121 }
|
jpayne@68
|
122 loadRef();
|
jpayne@68
|
123 }
|
jpayne@68
|
124
|
jpayne@68
|
125 /*--------------------------------------------------------------*/
|
jpayne@68
|
126 /*---------------- Initialization Helpers ----------------*/
|
jpayne@68
|
127 /*--------------------------------------------------------------*/
|
jpayne@68
|
128
|
jpayne@68
|
129 /** Parse arguments from the command line */
|
jpayne@68
|
130 private Parser parse(String[] args){
|
jpayne@68
|
131
|
jpayne@68
|
132 //Create a parser object
|
jpayne@68
|
133 Parser parser=new Parser();
|
jpayne@68
|
134
|
jpayne@68
|
135 //Set any necessary Parser defaults here
|
jpayne@68
|
136 //parser.foo=bar;
|
jpayne@68
|
137
|
jpayne@68
|
138 //Parse each argument
|
jpayne@68
|
139 for(int i=0; i<args.length; i++){
|
jpayne@68
|
140 String arg=args[i];
|
jpayne@68
|
141
|
jpayne@68
|
142 //Break arguments into their constituent parts, in the form of "a=b"
|
jpayne@68
|
143 String[] split=arg.split("=");
|
jpayne@68
|
144 String a=split[0].toLowerCase();
|
jpayne@68
|
145 String b=split.length>1 ? split[1] : null;
|
jpayne@68
|
146 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
|
jpayne@68
|
147
|
jpayne@68
|
148 if(a.equals("verbose")){
|
jpayne@68
|
149 verbose=Parse.parseBoolean(b);
|
jpayne@68
|
150 }else if(a.equals("ordered")){
|
jpayne@68
|
151 ordered=Parse.parseBoolean(b);
|
jpayne@68
|
152 }else if(a.equals("filter")){
|
jpayne@68
|
153 filterFile=b;
|
jpayne@68
|
154 }else if(a.equals("ref")){
|
jpayne@68
|
155 refFile=b;
|
jpayne@68
|
156 }else if(a.equals("parse_flag_goes_here")){
|
jpayne@68
|
157 long fake_variable=Parse.parseKMG(b);
|
jpayne@68
|
158 //Set a variable here
|
jpayne@68
|
159 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
|
jpayne@68
|
160 //do nothing
|
jpayne@68
|
161 }else{
|
jpayne@68
|
162 outstream.println("Unknown parameter "+args[i]);
|
jpayne@68
|
163 assert(false) : "Unknown parameter "+args[i];
|
jpayne@68
|
164 }
|
jpayne@68
|
165 }
|
jpayne@68
|
166
|
jpayne@68
|
167 return parser;
|
jpayne@68
|
168 }
|
jpayne@68
|
169
|
jpayne@68
|
170 /** Replace # with 1 and 2 in headers */
|
jpayne@68
|
171 private void doPoundReplacement(){
|
jpayne@68
|
172 //Do input file # replacement
|
jpayne@68
|
173 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
|
jpayne@68
|
174 in2=in1.replace("#", "2");
|
jpayne@68
|
175 in1=in1.replace("#", "1");
|
jpayne@68
|
176 }
|
jpayne@68
|
177
|
jpayne@68
|
178 //Ensure there is an input file
|
jpayne@68
|
179 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
|
jpayne@68
|
180 }
|
jpayne@68
|
181
|
jpayne@68
|
182 /** Add or remove .gz or .bz2 as needed */
|
jpayne@68
|
183 private void fixExtensions(){
|
jpayne@68
|
184 in1=Tools.fixExtension(in1);
|
jpayne@68
|
185 in2=Tools.fixExtension(in2);
|
jpayne@68
|
186 qfin1=Tools.fixExtension(qfin1);
|
jpayne@68
|
187 qfin2=Tools.fixExtension(qfin2);
|
jpayne@68
|
188 }
|
jpayne@68
|
189
|
jpayne@68
|
190 /** Ensure files can be read and written */
|
jpayne@68
|
191 private void checkFileExistence(){
|
jpayne@68
|
192 //Ensure output files can be written
|
jpayne@68
|
193 if(!Tools.testOutputFiles(overwrite, append, false, out1)){
|
jpayne@68
|
194 outstream.println((out1==null)+", "+out1);
|
jpayne@68
|
195 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+"\n");
|
jpayne@68
|
196 }
|
jpayne@68
|
197
|
jpayne@68
|
198 //Ensure input files can be read
|
jpayne@68
|
199 if(!Tools.testInputFiles(false, true, in1, in2, filterFile, refFile)){
|
jpayne@68
|
200 throw new RuntimeException("\nCan't read some input files.\n");
|
jpayne@68
|
201 }
|
jpayne@68
|
202
|
jpayne@68
|
203 //Ensure that no file was specified multiple times
|
jpayne@68
|
204 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, filterFile, refFile)){
|
jpayne@68
|
205 throw new RuntimeException("\nSome file names were specified multiple times.\n");
|
jpayne@68
|
206 }
|
jpayne@68
|
207
|
jpayne@68
|
208 assert(refFile!=null);
|
jpayne@68
|
209 }
|
jpayne@68
|
210
|
jpayne@68
|
211 /** Make sure interleaving agrees with number of input and output files */
|
jpayne@68
|
212 private void adjustInterleaving(){
|
jpayne@68
|
213 //Adjust interleaved detection based on the number of input files
|
jpayne@68
|
214 if(in2!=null){
|
jpayne@68
|
215 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
|
jpayne@68
|
216 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
|
jpayne@68
|
217 }
|
jpayne@68
|
218
|
jpayne@68
|
219 //Adjust interleaved settings based on number of output files
|
jpayne@68
|
220 if(!setInterleaved){
|
jpayne@68
|
221 assert(in1!=null) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\n";
|
jpayne@68
|
222 if(in2!=null){ //If there are 2 input streams.
|
jpayne@68
|
223 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
|
jpayne@68
|
224 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
|
jpayne@68
|
225 }
|
jpayne@68
|
226 }
|
jpayne@68
|
227 }
|
jpayne@68
|
228
|
jpayne@68
|
229 /** Adjust file-related static fields as needed for this program */
|
jpayne@68
|
230 private static void checkStatics(){
|
jpayne@68
|
231 //Adjust the number of threads for input file reading
|
jpayne@68
|
232 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
|
jpayne@68
|
233 ByteFile.FORCE_MODE_BF2=true;
|
jpayne@68
|
234 }
|
jpayne@68
|
235
|
jpayne@68
|
236 assert(FastaReadInputStream.settingsOK());
|
jpayne@68
|
237 }
|
jpayne@68
|
238
|
jpayne@68
|
239 /** Ensure parameter ranges are within bounds and required parameters are set */
|
jpayne@68
|
240 private boolean validateParams(){
|
jpayne@68
|
241 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
|
jpayne@68
|
242 assert(false) : "TODO";
|
jpayne@68
|
243 return true;
|
jpayne@68
|
244 }
|
jpayne@68
|
245
|
jpayne@68
|
246 /*--------------------------------------------------------------*/
|
jpayne@68
|
247 /*---------------- Outer Methods ----------------*/
|
jpayne@68
|
248 /*--------------------------------------------------------------*/
|
jpayne@68
|
249
|
jpayne@68
|
250 /** Create read streams and process all data */
|
jpayne@68
|
251 void process(Timer t){
|
jpayne@68
|
252
|
jpayne@68
|
253 //Turn off read validation in the input threads to increase speed
|
jpayne@68
|
254 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
|
jpayne@68
|
255 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
|
jpayne@68
|
256
|
jpayne@68
|
257 //Create a read input stream
|
jpayne@68
|
258 final ConcurrentReadInputStream cris=makeCris();
|
jpayne@68
|
259
|
jpayne@68
|
260 //Optionally create a read output stream
|
jpayne@68
|
261 final ConcurrentReadOutputStream ros=makeCros(cris.paired());
|
jpayne@68
|
262
|
jpayne@68
|
263 //Reset counters
|
jpayne@68
|
264 readsProcessed=readsOut=0;
|
jpayne@68
|
265 basesProcessed=basesOut=0;
|
jpayne@68
|
266
|
jpayne@68
|
267 //Process the reads in separate threads
|
jpayne@68
|
268 spawnThreads(cris, ros);
|
jpayne@68
|
269
|
jpayne@68
|
270 if(verbose){outstream.println("Finished; closing streams.");}
|
jpayne@68
|
271
|
jpayne@68
|
272 //Write anything that was accumulated by ReadStats
|
jpayne@68
|
273 errorState|=ReadStats.writeAll();
|
jpayne@68
|
274 //Close the read streams
|
jpayne@68
|
275 errorState|=ReadWrite.closeStreams(cris, ros);
|
jpayne@68
|
276
|
jpayne@68
|
277 //Reset read validation
|
jpayne@68
|
278 Read.VALIDATE_IN_CONSTRUCTOR=vic;
|
jpayne@68
|
279
|
jpayne@68
|
280 //Report timing and results
|
jpayne@68
|
281 t.stop();
|
jpayne@68
|
282 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
|
jpayne@68
|
283 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
|
jpayne@68
|
284
|
jpayne@68
|
285 //Throw an exception of there was an error in a thread
|
jpayne@68
|
286 if(errorState){
|
jpayne@68
|
287 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
|
jpayne@68
|
288 }
|
jpayne@68
|
289 }
|
jpayne@68
|
290
|
jpayne@68
|
291 private void loadRef(){
|
jpayne@68
|
292 ArrayList<Read> reads=ReadInputStream.toReads(ffref, -1);
|
jpayne@68
|
293 ref0=reads.get(0).bases;
|
jpayne@68
|
294 ref=new byte[ref0.length+2*padding];
|
jpayne@68
|
295 for(int i=0, j=-padding; i<ref.length; i++, j++){
|
jpayne@68
|
296 byte b=(j>=0 && j<ref0.length ? ref0[j] : (byte)'N');
|
jpayne@68
|
297 ref[i]=b;
|
jpayne@68
|
298 }
|
jpayne@68
|
299
|
jpayne@68
|
300 queues=new PriorityQueue[1+ref.length/queueWidth];
|
jpayne@68
|
301 for(int i=0; i<queues.length; i++){
|
jpayne@68
|
302 queues[i]=new PriorityQueue<Alignment>(queueLen);
|
jpayne@68
|
303 }
|
jpayne@68
|
304 }
|
jpayne@68
|
305
|
jpayne@68
|
306 public static LongHashSet loadFilter(FileFormat ff, int k){
|
jpayne@68
|
307 if(ff==null){return null;}
|
jpayne@68
|
308 ArrayList<Read> reads=ReadInputStream.toReads(ff, -1);
|
jpayne@68
|
309 if(reads==null || reads.size()==0){return null;}
|
jpayne@68
|
310 LongHashSet set=new LongHashSet(4096);
|
jpayne@68
|
311
|
jpayne@68
|
312 final int shift=2*k;
|
jpayne@68
|
313 final int shift2=shift-2;
|
jpayne@68
|
314 final long mask=(shift>63 ? -1L : ~((-1L)<<shift));
|
jpayne@68
|
315 int len=0;
|
jpayne@68
|
316
|
jpayne@68
|
317 long kmer=0, rkmer=0;
|
jpayne@68
|
318 for(Read r : reads){
|
jpayne@68
|
319 final byte[] bases=r.bases;
|
jpayne@68
|
320 for(byte b : bases) {
|
jpayne@68
|
321 long x=AminoAcid.baseToNumber[b];
|
jpayne@68
|
322 long x2=AminoAcid.baseToComplementNumber[b];
|
jpayne@68
|
323 kmer=((kmer<<2)|x)&mask;
|
jpayne@68
|
324 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
|
jpayne@68
|
325
|
jpayne@68
|
326 if(x>=0){
|
jpayne@68
|
327 len++;
|
jpayne@68
|
328 if(len>=k){
|
jpayne@68
|
329 set.add(Tools.max(kmer, rkmer));
|
jpayne@68
|
330 }
|
jpayne@68
|
331 }else{
|
jpayne@68
|
332 len=0;
|
jpayne@68
|
333 kmer=rkmer=0;
|
jpayne@68
|
334 }
|
jpayne@68
|
335 }
|
jpayne@68
|
336 }
|
jpayne@68
|
337 return set;
|
jpayne@68
|
338 }
|
jpayne@68
|
339
|
jpayne@68
|
340 public static boolean passesFilter(Read r, int k, LongHashSet set){
|
jpayne@68
|
341 if(r==null) {return false;}
|
jpayne@68
|
342 if(set==null){return true;}
|
jpayne@68
|
343
|
jpayne@68
|
344 final int shift=2*k;
|
jpayne@68
|
345 final int shift2=shift-2;
|
jpayne@68
|
346 final long mask=(shift>63 ? -1L : ~((-1L)<<shift));
|
jpayne@68
|
347 int len=0;
|
jpayne@68
|
348 long kmer=0, rkmer=0;
|
jpayne@68
|
349
|
jpayne@68
|
350 final byte[] bases=r.bases;
|
jpayne@68
|
351 for(byte b : bases) {
|
jpayne@68
|
352 long x=AminoAcid.baseToNumber[b];
|
jpayne@68
|
353 long x2=AminoAcid.baseToComplementNumber[b];
|
jpayne@68
|
354 kmer=((kmer<<2)|x)&mask;
|
jpayne@68
|
355 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
|
jpayne@68
|
356
|
jpayne@68
|
357 if(x>=0){
|
jpayne@68
|
358 len++;
|
jpayne@68
|
359 if(len>=k){
|
jpayne@68
|
360 long key=Tools.max(kmer, rkmer);
|
jpayne@68
|
361 if(set.contains(key)){return true;}
|
jpayne@68
|
362 }
|
jpayne@68
|
363 }else{
|
jpayne@68
|
364 len=0;
|
jpayne@68
|
365 kmer=rkmer=0;
|
jpayne@68
|
366 }
|
jpayne@68
|
367 }
|
jpayne@68
|
368 return false;
|
jpayne@68
|
369 }
|
jpayne@68
|
370
|
jpayne@68
|
371 private ConcurrentReadInputStream makeCris(){
|
jpayne@68
|
372 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
|
jpayne@68
|
373 cris.start(); //Start the stream
|
jpayne@68
|
374 if(verbose){outstream.println("Started cris");}
|
jpayne@68
|
375 boolean paired=cris.paired();
|
jpayne@68
|
376 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
|
jpayne@68
|
377 return cris;
|
jpayne@68
|
378 }
|
jpayne@68
|
379
|
jpayne@68
|
380 private ConcurrentReadOutputStream makeCros(boolean pairedInput){
|
jpayne@68
|
381 if(ffout1==null){return null;}
|
jpayne@68
|
382
|
jpayne@68
|
383 //Select output buffer size based on whether it needs to be ordered
|
jpayne@68
|
384 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
|
jpayne@68
|
385
|
jpayne@68
|
386 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout1, null, qfout1, null, buff, null, false);
|
jpayne@68
|
387 ros.start(); //Start the stream
|
jpayne@68
|
388 return ros;
|
jpayne@68
|
389 }
|
jpayne@68
|
390
|
jpayne@68
|
391 /*--------------------------------------------------------------*/
|
jpayne@68
|
392 /*---------------- Thread Management ----------------*/
|
jpayne@68
|
393 /*--------------------------------------------------------------*/
|
jpayne@68
|
394
|
jpayne@68
|
395 /** Spawn process threads */
|
jpayne@68
|
396 private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){
|
jpayne@68
|
397
|
jpayne@68
|
398 //Do anything necessary prior to processing
|
jpayne@68
|
399
|
jpayne@68
|
400 //Determine how many threads may be used
|
jpayne@68
|
401 final int threads=Shared.threads();
|
jpayne@68
|
402
|
jpayne@68
|
403 //Fill a list with ProcessThreads
|
jpayne@68
|
404 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
|
jpayne@68
|
405 for(int i=0; i<threads; i++){
|
jpayne@68
|
406 alpt.add(new ProcessThread(cris, i));
|
jpayne@68
|
407 }
|
jpayne@68
|
408
|
jpayne@68
|
409 //Start the threads and wait for them to finish
|
jpayne@68
|
410 boolean success=ThreadWaiter.startAndWait(alpt, this);
|
jpayne@68
|
411 errorState&=!success;
|
jpayne@68
|
412
|
jpayne@68
|
413 //Do anything necessary after processing
|
jpayne@68
|
414 assert(false) : "TODO: Make consensus and write it?";
|
jpayne@68
|
415 }
|
jpayne@68
|
416
|
jpayne@68
|
417 @Override
|
jpayne@68
|
418 public final void accumulate(ProcessThread pt){
|
jpayne@68
|
419 readsProcessed+=pt.readsProcessedT;
|
jpayne@68
|
420 basesProcessed+=pt.basesProcessedT;
|
jpayne@68
|
421 readsOut+=pt.readsOutT;
|
jpayne@68
|
422 basesOut+=pt.basesOutT;
|
jpayne@68
|
423 errorState|=(!pt.success);
|
jpayne@68
|
424
|
jpayne@68
|
425 for(int i=0; i<queues.length; i++){
|
jpayne@68
|
426 PriorityQueue<Alignment> q=queues[i];
|
jpayne@68
|
427 PriorityQueue<Alignment> qt=pt.queuesT[i];
|
jpayne@68
|
428 for(Alignment a : qt){
|
jpayne@68
|
429 addToQueue(a, q);
|
jpayne@68
|
430 }
|
jpayne@68
|
431 }
|
jpayne@68
|
432 }
|
jpayne@68
|
433
|
jpayne@68
|
434 @Override
|
jpayne@68
|
435 public final boolean success(){return !errorState;}
|
jpayne@68
|
436
|
jpayne@68
|
437 /*--------------------------------------------------------------*/
|
jpayne@68
|
438 /*---------------- Inner Methods ----------------*/
|
jpayne@68
|
439 /*--------------------------------------------------------------*/
|
jpayne@68
|
440
|
jpayne@68
|
441 boolean addToQueue(Alignment best, PriorityQueue<Alignment>[] queues){
|
jpayne@68
|
442 int start=best.start;
|
jpayne@68
|
443 int qnum=start/queueWidth;
|
jpayne@68
|
444 PriorityQueue<Alignment> queue=queues[qnum];
|
jpayne@68
|
445 return addToQueue(best, queue);
|
jpayne@68
|
446 }
|
jpayne@68
|
447
|
jpayne@68
|
448 boolean addToQueue(Alignment best, PriorityQueue<Alignment> queue){
|
jpayne@68
|
449 if(queue.size()<queueLen){queue.add(best);}
|
jpayne@68
|
450 else{
|
jpayne@68
|
451 Alignment bottom=queue.peek();
|
jpayne@68
|
452 if(bottom.compareTo(best)>=0){return false;}
|
jpayne@68
|
453 queue.poll();
|
jpayne@68
|
454 queue.add(best);
|
jpayne@68
|
455 }
|
jpayne@68
|
456 return true;
|
jpayne@68
|
457 }
|
jpayne@68
|
458
|
jpayne@68
|
459 /*--------------------------------------------------------------*/
|
jpayne@68
|
460 /*---------------- Inner Classes ----------------*/
|
jpayne@68
|
461 /*--------------------------------------------------------------*/
|
jpayne@68
|
462
|
jpayne@68
|
463 /** This class is static to prevent accidental writing to shared variables.
|
jpayne@68
|
464 * It is safe to remove the static modifier. */
|
jpayne@68
|
465 class ProcessThread extends Thread {
|
jpayne@68
|
466
|
jpayne@68
|
467 //Constructor
|
jpayne@68
|
468 ProcessThread(final ConcurrentReadInputStream cris_, final int tid_){
|
jpayne@68
|
469 cris=cris_;
|
jpayne@68
|
470 tid=tid_;
|
jpayne@68
|
471
|
jpayne@68
|
472 queuesT=new PriorityQueue[1+ref.length/queueWidth];
|
jpayne@68
|
473 for(int i=0; i<queuesT.length; i++){
|
jpayne@68
|
474 queuesT[i]=new PriorityQueue<Alignment>(queueLen);
|
jpayne@68
|
475 }
|
jpayne@68
|
476 }
|
jpayne@68
|
477
|
jpayne@68
|
478 //Called by start()
|
jpayne@68
|
479 @Override
|
jpayne@68
|
480 public void run(){
|
jpayne@68
|
481 //Do anything necessary prior to processing
|
jpayne@68
|
482
|
jpayne@68
|
483 //Process the reads
|
jpayne@68
|
484 processInner();
|
jpayne@68
|
485
|
jpayne@68
|
486 //Do anything necessary after processing
|
jpayne@68
|
487
|
jpayne@68
|
488 //Indicate successful exit status
|
jpayne@68
|
489 success=true;
|
jpayne@68
|
490 }
|
jpayne@68
|
491
|
jpayne@68
|
492 /** Iterate through the reads */
|
jpayne@68
|
493 void processInner(){
|
jpayne@68
|
494
|
jpayne@68
|
495 //Grab the first ListNum of reads
|
jpayne@68
|
496 ListNum<Read> ln=cris.nextList();
|
jpayne@68
|
497
|
jpayne@68
|
498 //Check to ensure pairing is as expected
|
jpayne@68
|
499 if(ln!=null && !ln.isEmpty()){
|
jpayne@68
|
500 Read r=ln.get(0);
|
jpayne@68
|
501 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
|
jpayne@68
|
502 }
|
jpayne@68
|
503
|
jpayne@68
|
504 //As long as there is a nonempty read list...
|
jpayne@68
|
505 while(ln!=null && ln.size()>0){
|
jpayne@68
|
506 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
|
jpayne@68
|
507
|
jpayne@68
|
508 processList(ln);
|
jpayne@68
|
509
|
jpayne@68
|
510 //Notify the input stream that the list was used
|
jpayne@68
|
511 cris.returnList(ln);
|
jpayne@68
|
512 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
|
jpayne@68
|
513
|
jpayne@68
|
514 //Fetch a new list
|
jpayne@68
|
515 ln=cris.nextList();
|
jpayne@68
|
516 }
|
jpayne@68
|
517
|
jpayne@68
|
518 //Notify the input stream that the final list was used
|
jpayne@68
|
519 if(ln!=null){
|
jpayne@68
|
520 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
|
jpayne@68
|
521 }
|
jpayne@68
|
522 }
|
jpayne@68
|
523
|
jpayne@68
|
524 void processList(ListNum<Read> ln){
|
jpayne@68
|
525
|
jpayne@68
|
526 //Grab the actual read list from the ListNum
|
jpayne@68
|
527 final ArrayList<Read> reads=ln.list;
|
jpayne@68
|
528
|
jpayne@68
|
529 //Loop through each read in the list
|
jpayne@68
|
530 for(int idx=0; idx<reads.size(); idx++){
|
jpayne@68
|
531 final Read r1=reads.get(idx);
|
jpayne@68
|
532 final Read r2=r1.mate;
|
jpayne@68
|
533
|
jpayne@68
|
534 //Validate reads in worker threads
|
jpayne@68
|
535 if(!r1.validated()){r1.validate(true);}
|
jpayne@68
|
536 if(r2!=null && !r2.validated()){r2.validate(true);}
|
jpayne@68
|
537
|
jpayne@68
|
538 //Track the initial length for statistics
|
jpayne@68
|
539 final int initialLength1=r1.length();
|
jpayne@68
|
540 final int initialLength2=r1.mateLength();
|
jpayne@68
|
541
|
jpayne@68
|
542 //Increment counters
|
jpayne@68
|
543 readsProcessedT+=r1.pairCount();
|
jpayne@68
|
544 basesProcessedT+=initialLength1+initialLength2;
|
jpayne@68
|
545
|
jpayne@68
|
546 {
|
jpayne@68
|
547 //Reads are processed in this block.
|
jpayne@68
|
548 processReadPair(r1, r2);
|
jpayne@68
|
549
|
jpayne@68
|
550 // if(!keep){reads.set(idx, null);}
|
jpayne@68
|
551 // else{
|
jpayne@68
|
552 // readsOutT+=r1.pairCount();
|
jpayne@68
|
553 // basesOutT+=r1.pairLength();
|
jpayne@68
|
554 // }
|
jpayne@68
|
555 }
|
jpayne@68
|
556 }
|
jpayne@68
|
557
|
jpayne@68
|
558 //Output reads to the output stream
|
jpayne@68
|
559 // if(ros!=null){ros.add(reads, ln.id);}
|
jpayne@68
|
560 }
|
jpayne@68
|
561
|
jpayne@68
|
562 /**
|
jpayne@68
|
563 * Process a read or a read pair.
|
jpayne@68
|
564 * @param r1 Read 1
|
jpayne@68
|
565 * @param r2 Read 2 (may be null)
|
jpayne@68
|
566 * @return True if the reads should be kept, false if they should be discarded.
|
jpayne@68
|
567 */
|
jpayne@68
|
568 void processReadPair(final Read r1, final Read r2){
|
jpayne@68
|
569 boolean pass=passesFilter(r1, k, filter) || passesFilter(r2, k, filter);
|
jpayne@68
|
570 if(!pass){return;}
|
jpayne@68
|
571 processRead(r1);
|
jpayne@68
|
572 processRead(r2);
|
jpayne@68
|
573 }
|
jpayne@68
|
574
|
jpayne@68
|
575 void processRead(final Read r){
|
jpayne@68
|
576 Alignment plus=new Alignment(r);
|
jpayne@68
|
577 plus.align(ref);
|
jpayne@68
|
578
|
jpayne@68
|
579 r.reverseComplement();
|
jpayne@68
|
580 Alignment minus=new Alignment(r);
|
jpayne@68
|
581 minus.align(ref);
|
jpayne@68
|
582
|
jpayne@68
|
583 Alignment best=null;
|
jpayne@68
|
584 if(plus.id>=minus.id){
|
jpayne@68
|
585 r.reverseComplement();
|
jpayne@68
|
586 best=plus;
|
jpayne@68
|
587 }else{
|
jpayne@68
|
588 best=minus;
|
jpayne@68
|
589 }
|
jpayne@68
|
590 if(best.id<minID) {return;}
|
jpayne@68
|
591
|
jpayne@68
|
592 addToQueue(best, queuesT);
|
jpayne@68
|
593 }
|
jpayne@68
|
594
|
jpayne@68
|
595 /** Number of reads processed by this thread */
|
jpayne@68
|
596 protected long readsProcessedT=0;
|
jpayne@68
|
597 /** Number of bases processed by this thread */
|
jpayne@68
|
598 protected long basesProcessedT=0;
|
jpayne@68
|
599
|
jpayne@68
|
600 /** Number of reads retained by this thread */
|
jpayne@68
|
601 protected long readsOutT=0;
|
jpayne@68
|
602 /** Number of bases retained by this thread */
|
jpayne@68
|
603 protected long basesOutT=0;
|
jpayne@68
|
604
|
jpayne@68
|
605 /** True only if this thread has completed successfully */
|
jpayne@68
|
606 boolean success=false;
|
jpayne@68
|
607
|
jpayne@68
|
608
|
jpayne@68
|
609 private PriorityQueue<Alignment>[] queuesT;
|
jpayne@68
|
610
|
jpayne@68
|
611 /** Shared input stream */
|
jpayne@68
|
612 private final ConcurrentReadInputStream cris;
|
jpayne@68
|
613 /** Thread ID */
|
jpayne@68
|
614 final int tid;
|
jpayne@68
|
615 }
|
jpayne@68
|
616
|
jpayne@68
|
617 /*--------------------------------------------------------------*/
|
jpayne@68
|
618 /*---------------- Fields ----------------*/
|
jpayne@68
|
619 /*--------------------------------------------------------------*/
|
jpayne@68
|
620
|
jpayne@68
|
621 /** Primary input file path */
|
jpayne@68
|
622 private String in1=null;
|
jpayne@68
|
623 /** Secondary input file path */
|
jpayne@68
|
624 private String in2=null;
|
jpayne@68
|
625
|
jpayne@68
|
626 private String qfin1=null;
|
jpayne@68
|
627 private String qfin2=null;
|
jpayne@68
|
628
|
jpayne@68
|
629 /** Primary output file path */
|
jpayne@68
|
630 private String out1=null;
|
jpayne@68
|
631
|
jpayne@68
|
632 private String qfout1=null;
|
jpayne@68
|
633
|
jpayne@68
|
634 private String filterFile;
|
jpayne@68
|
635 private String refFile;
|
jpayne@68
|
636
|
jpayne@68
|
637 /** Override input file extension */
|
jpayne@68
|
638 private String extin=null;
|
jpayne@68
|
639 /** Override output file extension */
|
jpayne@68
|
640 private String extout=null;
|
jpayne@68
|
641
|
jpayne@68
|
642 /** Whether interleaved was explicitly set. */
|
jpayne@68
|
643 private boolean setInterleaved=false;
|
jpayne@68
|
644
|
jpayne@68
|
645 /** Original ref */
|
jpayne@68
|
646 private byte[] ref0;
|
jpayne@68
|
647 /** Padded ref */
|
jpayne@68
|
648 private byte[] ref;
|
jpayne@68
|
649
|
jpayne@68
|
650 private int padding=100;
|
jpayne@68
|
651 private int queueLen=20;
|
jpayne@68
|
652 private int queueWidth=20;
|
jpayne@68
|
653 private float minID=0.4f;
|
jpayne@68
|
654
|
jpayne@68
|
655 private PriorityQueue<Alignment>[] queues;
|
jpayne@68
|
656
|
jpayne@68
|
657 /*--------------------------------------------------------------*/
|
jpayne@68
|
658
|
jpayne@68
|
659 /** Number of reads processed */
|
jpayne@68
|
660 protected long readsProcessed=0;
|
jpayne@68
|
661 /** Number of bases processed */
|
jpayne@68
|
662 protected long basesProcessed=0;
|
jpayne@68
|
663
|
jpayne@68
|
664 /** Number of reads retained */
|
jpayne@68
|
665 protected long readsOut=0;
|
jpayne@68
|
666 /** Number of bases retained */
|
jpayne@68
|
667 protected long basesOut=0;
|
jpayne@68
|
668
|
jpayne@68
|
669 /** Quit after processing this many input reads; -1 means no limit */
|
jpayne@68
|
670 private long maxReads=-1;
|
jpayne@68
|
671
|
jpayne@68
|
672 /*--------------------------------------------------------------*/
|
jpayne@68
|
673 /*---------------- Final Fields ----------------*/
|
jpayne@68
|
674 /*--------------------------------------------------------------*/
|
jpayne@68
|
675
|
jpayne@68
|
676 /** Primary input file */
|
jpayne@68
|
677 private final FileFormat ffin1;
|
jpayne@68
|
678 /** Secondary input file */
|
jpayne@68
|
679 private final FileFormat ffin2;
|
jpayne@68
|
680 /** Filter input file */
|
jpayne@68
|
681 private final FileFormat fffilter;
|
jpayne@68
|
682 /** Ref input file */
|
jpayne@68
|
683 private final FileFormat ffref;
|
jpayne@68
|
684
|
jpayne@68
|
685 /** Primary output file */
|
jpayne@68
|
686 private final FileFormat ffout1;
|
jpayne@68
|
687
|
jpayne@68
|
688 private final LongHashSet filter;
|
jpayne@68
|
689 private final int k=31;
|
jpayne@68
|
690
|
jpayne@68
|
691
|
jpayne@68
|
692 /*--------------------------------------------------------------*/
|
jpayne@68
|
693 /*---------------- Common Fields ----------------*/
|
jpayne@68
|
694 /*--------------------------------------------------------------*/
|
jpayne@68
|
695
|
jpayne@68
|
696 /** Print status messages to this output stream */
|
jpayne@68
|
697 private PrintStream outstream=System.err;
|
jpayne@68
|
698 /** Print verbose messages */
|
jpayne@68
|
699 public static boolean verbose=false;
|
jpayne@68
|
700 /** True if an error was encountered */
|
jpayne@68
|
701 public boolean errorState=false;
|
jpayne@68
|
702 /** Overwrite existing output files */
|
jpayne@68
|
703 private boolean overwrite=false;
|
jpayne@68
|
704 /** Append to existing output files */
|
jpayne@68
|
705 private boolean append=false;
|
jpayne@68
|
706 /** Reads are output in input order */
|
jpayne@68
|
707 private boolean ordered=false;
|
jpayne@68
|
708
|
jpayne@68
|
709 }
|