jpayne@68
|
1 package sketch;
|
jpayne@68
|
2
|
jpayne@68
|
3 import java.io.File;
|
jpayne@68
|
4 import java.io.PrintStream;
|
jpayne@68
|
5 import java.util.ArrayList;
|
jpayne@68
|
6 import java.util.Arrays;
|
jpayne@68
|
7
|
jpayne@68
|
8 import dna.AminoAcid;
|
jpayne@68
|
9 import fileIO.ByteFile;
|
jpayne@68
|
10 import fileIO.FileFormat;
|
jpayne@68
|
11 import fileIO.ReadWrite;
|
jpayne@68
|
12 import shared.Parse;
|
jpayne@68
|
13 import shared.Parser;
|
jpayne@68
|
14 import shared.PreParser;
|
jpayne@68
|
15 import shared.ReadStats;
|
jpayne@68
|
16 import shared.Shared;
|
jpayne@68
|
17 import shared.Timer;
|
jpayne@68
|
18 import shared.Tools;
|
jpayne@68
|
19 import stream.ConcurrentReadInputStream;
|
jpayne@68
|
20 import stream.ConcurrentReadOutputStream;
|
jpayne@68
|
21 import stream.FASTQ;
|
jpayne@68
|
22 import stream.FastaReadInputStream;
|
jpayne@68
|
23 import stream.Read;
|
jpayne@68
|
24 import structures.ListNum;
|
jpayne@68
|
25
|
jpayne@68
|
26 /**
|
jpayne@68
|
27 *
|
jpayne@68
|
28 * @author Brian Bushnell
|
jpayne@68
|
29 * @date July 25, 2018
|
jpayne@68
|
30 *
|
jpayne@68
|
31 */
|
jpayne@68
|
32 public class KmerLimit extends SketchObject {
|
jpayne@68
|
33
|
jpayne@68
|
34 /*--------------------------------------------------------------*/
|
jpayne@68
|
35 /*---------------- Initialization ----------------*/
|
jpayne@68
|
36 /*--------------------------------------------------------------*/
|
jpayne@68
|
37
|
jpayne@68
|
38 /**
|
jpayne@68
|
39 * Code entrance from the command line.
|
jpayne@68
|
40 * @param args Command line arguments
|
jpayne@68
|
41 */
|
jpayne@68
|
42 public static void main(String[] args){
|
jpayne@68
|
43 //Start a timer immediately upon code entrance.
|
jpayne@68
|
44 Timer t=new Timer();
|
jpayne@68
|
45
|
jpayne@68
|
46 //Create an instance of this class
|
jpayne@68
|
47 KmerLimit x=new KmerLimit(args);
|
jpayne@68
|
48
|
jpayne@68
|
49 //Run the object
|
jpayne@68
|
50 x.process(t);
|
jpayne@68
|
51
|
jpayne@68
|
52 //Close the print stream if it was redirected
|
jpayne@68
|
53 Shared.closeStream(x.outstream);
|
jpayne@68
|
54 }
|
jpayne@68
|
55
|
jpayne@68
|
56 /**
|
jpayne@68
|
57 * Constructor.
|
jpayne@68
|
58 * @param args Command line arguments
|
jpayne@68
|
59 */
|
jpayne@68
|
60 public KmerLimit(String[] args){
|
jpayne@68
|
61
|
jpayne@68
|
62 {//Preparse block for help, config files, and outstream
|
jpayne@68
|
63 PreParser pp=new PreParser(args, getClass(), false);
|
jpayne@68
|
64 args=pp.args;
|
jpayne@68
|
65 outstream=pp.outstream;
|
jpayne@68
|
66 }
|
jpayne@68
|
67
|
jpayne@68
|
68 boolean setInterleaved=false; //Whether interleaved was explicitly set.
|
jpayne@68
|
69
|
jpayne@68
|
70 //Set shared static variables
|
jpayne@68
|
71 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
|
jpayne@68
|
72 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
|
jpayne@68
|
73 SketchObject.setKeyFraction(0.1);
|
jpayne@68
|
74 defaultParams.minEntropy=0;
|
jpayne@68
|
75 defaultParams.minProb=0.2f;
|
jpayne@68
|
76
|
jpayne@68
|
77 boolean setHeapSize=false;
|
jpayne@68
|
78 int heapSize_=4095;
|
jpayne@68
|
79 long targetKmers_=0;
|
jpayne@68
|
80 int k_=32;
|
jpayne@68
|
81 int minCount_=1;
|
jpayne@68
|
82
|
jpayne@68
|
83 //Create a parser object
|
jpayne@68
|
84 Parser parser=new Parser();
|
jpayne@68
|
85 parser.overwrite=true;
|
jpayne@68
|
86
|
jpayne@68
|
87 //Parse each argument
|
jpayne@68
|
88 for(int i=0; i<args.length; i++){
|
jpayne@68
|
89 String arg=args[i];
|
jpayne@68
|
90
|
jpayne@68
|
91 //Break arguments into their constituent parts, in the form of "a=b"
|
jpayne@68
|
92 String[] split=arg.split("=");
|
jpayne@68
|
93 String a=split[0].toLowerCase();
|
jpayne@68
|
94 String b=split.length>1 ? split[1] : null;
|
jpayne@68
|
95 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
|
jpayne@68
|
96
|
jpayne@68
|
97 if(a.equals("verbose")){
|
jpayne@68
|
98 verbose=Parse.parseBoolean(b);
|
jpayne@68
|
99 }else if(a.equals("ordered")){
|
jpayne@68
|
100 ordered=Parse.parseBoolean(b);
|
jpayne@68
|
101 }else if(a.equals("shuffle")){
|
jpayne@68
|
102 shuffle=Parse.parseBoolean(b);
|
jpayne@68
|
103 }else if(a.equals("size") || a.equals("heapsize")){
|
jpayne@68
|
104 heapSize_=Parse.parseIntKMG(b);
|
jpayne@68
|
105 setHeapSize=true;
|
jpayne@68
|
106 }else if(a.equals("kmers") || a.equals("target") || a.equals("limit")){
|
jpayne@68
|
107 targetKmers_=Parse.parseKMG(b);
|
jpayne@68
|
108 }else if(a.equals("mincount")){
|
jpayne@68
|
109 minCount_=Parse.parseIntKMG(b);
|
jpayne@68
|
110 }else if(parseSketchFlags(arg, a, b)){
|
jpayne@68
|
111 parser.parse(arg, a, b);
|
jpayne@68
|
112 }else if(defaultParams.parse(arg, a, b)){
|
jpayne@68
|
113 parser.parse(arg, a, b);
|
jpayne@68
|
114 }else if(a.equals("parse_flag_goes_here")){
|
jpayne@68
|
115 long fake_variable=Parse.parseKMG(b);
|
jpayne@68
|
116 //Set a variable here
|
jpayne@68
|
117 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
|
jpayne@68
|
118 //do nothing
|
jpayne@68
|
119 }else{
|
jpayne@68
|
120 outstream.println("Unknown parameter "+args[i]);
|
jpayne@68
|
121 assert(false) : "Unknown parameter "+args[i];
|
jpayne@68
|
122 }
|
jpayne@68
|
123 }
|
jpayne@68
|
124
|
jpayne@68
|
125 if(!setHeapSize && minCount_>1){heapSize_=32000;}
|
jpayne@68
|
126 heapSize=heapSize_;
|
jpayne@68
|
127 targetKmers=targetKmers_;
|
jpayne@68
|
128 k=k_;
|
jpayne@68
|
129 minCount=minCount_;
|
jpayne@68
|
130 assert(targetKmers>0) : "Must set a kmer limit.";
|
jpayne@68
|
131 assert(heapSize>0) : "Heap size must be positive.";
|
jpayne@68
|
132 assert(k>0 && k<=32) : "0<k<33; k="+k;
|
jpayne@68
|
133 postParse();
|
jpayne@68
|
134
|
jpayne@68
|
135 if(minCount>1){
|
jpayne@68
|
136 Shared.setBufferLen(800);
|
jpayne@68
|
137 }
|
jpayne@68
|
138
|
jpayne@68
|
139 {//Process parser fields
|
jpayne@68
|
140 Parser.processQuality();
|
jpayne@68
|
141
|
jpayne@68
|
142 maxReads=parser.maxReads;
|
jpayne@68
|
143
|
jpayne@68
|
144 overwrite=ReadStats.overwrite=parser.overwrite;
|
jpayne@68
|
145 append=ReadStats.append=parser.append;
|
jpayne@68
|
146 setInterleaved=parser.setInterleaved;
|
jpayne@68
|
147
|
jpayne@68
|
148 in1=parser.in1;
|
jpayne@68
|
149 in2=parser.in2;
|
jpayne@68
|
150 qfin1=parser.qfin1;
|
jpayne@68
|
151 qfin2=parser.qfin2;
|
jpayne@68
|
152
|
jpayne@68
|
153 out1=parser.out1;
|
jpayne@68
|
154 out2=parser.out2;
|
jpayne@68
|
155 qfout1=parser.qfout1;
|
jpayne@68
|
156 qfout2=parser.qfout2;
|
jpayne@68
|
157
|
jpayne@68
|
158 extin=parser.extin;
|
jpayne@68
|
159 extout=parser.extout;
|
jpayne@68
|
160 }
|
jpayne@68
|
161
|
jpayne@68
|
162 //Do input file # replacement
|
jpayne@68
|
163 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
|
jpayne@68
|
164 in2=in1.replace("#", "2");
|
jpayne@68
|
165 in1=in1.replace("#", "1");
|
jpayne@68
|
166 }
|
jpayne@68
|
167
|
jpayne@68
|
168 //Do output file # replacement
|
jpayne@68
|
169 if(out1!=null && out2==null && out1.indexOf('#')>-1){
|
jpayne@68
|
170 out2=out1.replace("#", "2");
|
jpayne@68
|
171 out1=out1.replace("#", "1");
|
jpayne@68
|
172 }
|
jpayne@68
|
173
|
jpayne@68
|
174 //Adjust interleaved detection based on the number of input files
|
jpayne@68
|
175 if(in2!=null){
|
jpayne@68
|
176 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
|
jpayne@68
|
177 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
|
jpayne@68
|
178 }
|
jpayne@68
|
179
|
jpayne@68
|
180 assert(FastaReadInputStream.settingsOK());
|
jpayne@68
|
181
|
jpayne@68
|
182 //Ensure there is an input file
|
jpayne@68
|
183 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
|
jpayne@68
|
184
|
jpayne@68
|
185 //Adjust the number of threads for input file reading
|
jpayne@68
|
186 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
|
jpayne@68
|
187 ByteFile.FORCE_MODE_BF2=true;
|
jpayne@68
|
188 }
|
jpayne@68
|
189
|
jpayne@68
|
190 //Ensure out2 is not set without out1
|
jpayne@68
|
191 if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");}
|
jpayne@68
|
192
|
jpayne@68
|
193 //Adjust interleaved settings based on number of output files
|
jpayne@68
|
194 if(!setInterleaved){
|
jpayne@68
|
195 assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n";
|
jpayne@68
|
196 if(in2!=null){ //If there are 2 input streams.
|
jpayne@68
|
197 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
|
jpayne@68
|
198 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
|
jpayne@68
|
199 }else{ //There is one input stream.
|
jpayne@68
|
200 if(out2!=null){
|
jpayne@68
|
201 FASTQ.FORCE_INTERLEAVED=true;
|
jpayne@68
|
202 FASTQ.TEST_INTERLEAVED=false;
|
jpayne@68
|
203 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
|
jpayne@68
|
204 }
|
jpayne@68
|
205 }
|
jpayne@68
|
206 }
|
jpayne@68
|
207
|
jpayne@68
|
208 //Ensure output files can be written
|
jpayne@68
|
209 if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){
|
jpayne@68
|
210 outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2);
|
jpayne@68
|
211 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n");
|
jpayne@68
|
212 }
|
jpayne@68
|
213
|
jpayne@68
|
214 //Ensure input files can be read
|
jpayne@68
|
215 if(!Tools.testInputFiles(false, true, in1, in2)){
|
jpayne@68
|
216 throw new RuntimeException("\nCan't read some input files.\n");
|
jpayne@68
|
217 }
|
jpayne@68
|
218
|
jpayne@68
|
219 //Ensure that no file was specified multiple times
|
jpayne@68
|
220 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){
|
jpayne@68
|
221 throw new RuntimeException("\nSome file names were specified multiple times.\n");
|
jpayne@68
|
222 }
|
jpayne@68
|
223
|
jpayne@68
|
224 //Create output FileFormat objects
|
jpayne@68
|
225 ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
|
jpayne@68
|
226 ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
|
jpayne@68
|
227
|
jpayne@68
|
228 //Create input FileFormat objects
|
jpayne@68
|
229 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
|
jpayne@68
|
230 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
|
jpayne@68
|
231
|
jpayne@68
|
232 minProb=defaultParams.minProb;
|
jpayne@68
|
233 minQual=defaultParams.minQual;
|
jpayne@68
|
234
|
jpayne@68
|
235 shift=2*k;
|
jpayne@68
|
236 shift2=shift-2;
|
jpayne@68
|
237 mask=(shift>63 ? -1L : ~((-1L)<<shift)); //Conditional allows K=32
|
jpayne@68
|
238 sharedHeap=new SketchHeap(heapSize, 0, minCount>1);
|
jpayne@68
|
239 }
|
jpayne@68
|
240
|
jpayne@68
|
241 /*--------------------------------------------------------------*/
|
jpayne@68
|
242 /*---------------- Outer Methods ----------------*/
|
jpayne@68
|
243 /*--------------------------------------------------------------*/
|
jpayne@68
|
244
|
jpayne@68
|
245 /** Create read streams and process all data */
|
jpayne@68
|
246 void process(Timer t){
|
jpayne@68
|
247
|
jpayne@68
|
248 //Turn off read validation in the input threads to increase speed
|
jpayne@68
|
249 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
|
jpayne@68
|
250 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
|
jpayne@68
|
251
|
jpayne@68
|
252 //Create a read input stream
|
jpayne@68
|
253 final ConcurrentReadInputStream cris;
|
jpayne@68
|
254 {
|
jpayne@68
|
255 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
|
jpayne@68
|
256 cris.start(); //Start the stream
|
jpayne@68
|
257 if(verbose){outstream.println("Started cris");}
|
jpayne@68
|
258 }
|
jpayne@68
|
259 boolean paired=cris.paired();
|
jpayne@68
|
260 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
|
jpayne@68
|
261
|
jpayne@68
|
262 //Optionally create a read output stream
|
jpayne@68
|
263 final ConcurrentReadOutputStream ros;
|
jpayne@68
|
264 if(ffout1!=null){
|
jpayne@68
|
265 //Select output buffer size based on whether it needs to be ordered
|
jpayne@68
|
266 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
|
jpayne@68
|
267
|
jpayne@68
|
268 //Notify user of output mode
|
jpayne@68
|
269 if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){
|
jpayne@68
|
270 outstream.println("Writing interleaved.");
|
jpayne@68
|
271 }
|
jpayne@68
|
272
|
jpayne@68
|
273 ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false);
|
jpayne@68
|
274 ros.start(); //Start the stream
|
jpayne@68
|
275 }else{ros=null;}
|
jpayne@68
|
276
|
jpayne@68
|
277 //Reset counters
|
jpayne@68
|
278 readsProcessed=readsOut=0;
|
jpayne@68
|
279 basesProcessed=basesOut=0;
|
jpayne@68
|
280
|
jpayne@68
|
281 //Process the reads in separate threads
|
jpayne@68
|
282 spawnThreads(cris, ros);
|
jpayne@68
|
283
|
jpayne@68
|
284 if(verbose){outstream.println("Finished; closing streams.");}
|
jpayne@68
|
285
|
jpayne@68
|
286 //Write anything that was accumulated by ReadStats
|
jpayne@68
|
287 errorState|=ReadStats.writeAll();
|
jpayne@68
|
288 //Close the read streams
|
jpayne@68
|
289 errorState|=ReadWrite.closeStreams(cris, ros);
|
jpayne@68
|
290
|
jpayne@68
|
291 //Reset read validation
|
jpayne@68
|
292 Read.VALIDATE_IN_CONSTRUCTOR=vic;
|
jpayne@68
|
293
|
jpayne@68
|
294 //Report timing and results
|
jpayne@68
|
295 t.stop();
|
jpayne@68
|
296 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
|
jpayne@68
|
297 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
|
jpayne@68
|
298 String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8);
|
jpayne@68
|
299 outstream.println("Unique Kmers Out: "+kstring);
|
jpayne@68
|
300
|
jpayne@68
|
301 // Sketch sk=new Sketch(sharedHeap, true, true, null);
|
jpayne@68
|
302 // outstream.println(sk.genomeSizeEstimate());
|
jpayne@68
|
303
|
jpayne@68
|
304 //Throw an exception of there was an error in a thread
|
jpayne@68
|
305 if(errorState){
|
jpayne@68
|
306 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
|
jpayne@68
|
307 }
|
jpayne@68
|
308 }
|
jpayne@68
|
309
|
jpayne@68
|
310 /** Spawn process threads */
|
jpayne@68
|
311 private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){
|
jpayne@68
|
312
|
jpayne@68
|
313 //Do anything necessary prior to processing
|
jpayne@68
|
314
|
jpayne@68
|
315 //Determine how many threads may be used
|
jpayne@68
|
316 final int threads=Tools.min(8, Shared.threads());
|
jpayne@68
|
317
|
jpayne@68
|
318 //Fill a list with ProcessThreads
|
jpayne@68
|
319 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
|
jpayne@68
|
320 int tSize=heapSize/2;
|
jpayne@68
|
321 for(int i=0; i<threads; i++){
|
jpayne@68
|
322 alpt.add(new ProcessThread(cris, ros, i, tSize));
|
jpayne@68
|
323 }
|
jpayne@68
|
324
|
jpayne@68
|
325 //Start the threads
|
jpayne@68
|
326 for(ProcessThread pt : alpt){
|
jpayne@68
|
327 pt.start();
|
jpayne@68
|
328 }
|
jpayne@68
|
329
|
jpayne@68
|
330 //Wait for completion of all threads
|
jpayne@68
|
331 boolean success=true;
|
jpayne@68
|
332 for(ProcessThread pt : alpt){
|
jpayne@68
|
333
|
jpayne@68
|
334 //Wait until this thread has terminated
|
jpayne@68
|
335 while(pt.getState()!=Thread.State.TERMINATED){
|
jpayne@68
|
336 try {
|
jpayne@68
|
337 //Attempt a join operation
|
jpayne@68
|
338 pt.join();
|
jpayne@68
|
339 } catch (InterruptedException e) {
|
jpayne@68
|
340 //Potentially handle this, if it is expected to occur
|
jpayne@68
|
341 e.printStackTrace();
|
jpayne@68
|
342 }
|
jpayne@68
|
343 }
|
jpayne@68
|
344
|
jpayne@68
|
345 //Accumulate per-thread statistics
|
jpayne@68
|
346 readsProcessed+=pt.readsProcessedT;
|
jpayne@68
|
347 basesProcessed+=pt.basesProcessedT;
|
jpayne@68
|
348 readsOut+=pt.readsOutT;
|
jpayne@68
|
349 basesOut+=pt.basesOutT;
|
jpayne@68
|
350 success&=pt.success;
|
jpayne@68
|
351 }
|
jpayne@68
|
352
|
jpayne@68
|
353 //Track whether any threads failed
|
jpayne@68
|
354 if(!success){errorState=true;}
|
jpayne@68
|
355
|
jpayne@68
|
356 //Do anything necessary after processing
|
jpayne@68
|
357
|
jpayne@68
|
358 }
|
jpayne@68
|
359
|
jpayne@68
|
360 /*--------------------------------------------------------------*/
|
jpayne@68
|
361 /*---------------- Inner Methods ----------------*/
|
jpayne@68
|
362 /*--------------------------------------------------------------*/
|
jpayne@68
|
363
|
jpayne@68
|
364 /*--------------------------------------------------------------*/
|
jpayne@68
|
365 /*---------------- Inner Classes ----------------*/
|
jpayne@68
|
366 /*--------------------------------------------------------------*/
|
jpayne@68
|
367
|
jpayne@68
|
368 /** This class is static to prevent accidental writing to shared variables.
|
jpayne@68
|
369 * It is safe to remove the static modifier. */
|
jpayne@68
|
370 private class ProcessThread extends Thread {
|
jpayne@68
|
371
|
jpayne@68
|
372 //Constructor
|
jpayne@68
|
373 ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_, final int size){
|
jpayne@68
|
374 cris=cris_;
|
jpayne@68
|
375 ros=ros_;
|
jpayne@68
|
376 tid=tid_;
|
jpayne@68
|
377 localHeap=new SketchHeap(size, 0, minCount>1);
|
jpayne@68
|
378 }
|
jpayne@68
|
379
|
jpayne@68
|
380 //Called by start()
|
jpayne@68
|
381 @Override
|
jpayne@68
|
382 public void run(){
|
jpayne@68
|
383 //Do anything necessary prior to processing
|
jpayne@68
|
384
|
jpayne@68
|
385 //Process the reads
|
jpayne@68
|
386 processInner();
|
jpayne@68
|
387
|
jpayne@68
|
388 //Do anything necessary after processing
|
jpayne@68
|
389
|
jpayne@68
|
390 //Indicate successful exit status
|
jpayne@68
|
391 success=true;
|
jpayne@68
|
392 }
|
jpayne@68
|
393
|
jpayne@68
|
394 /** Iterate through the reads */
|
jpayne@68
|
395 void processInner(){
|
jpayne@68
|
396
|
jpayne@68
|
397 //Grab the first ListNum of reads
|
jpayne@68
|
398 ListNum<Read> ln=cris.nextList();
|
jpayne@68
|
399 //Grab the actual read list from the ListNum
|
jpayne@68
|
400 ArrayList<Read> reads=(ln!=null ? ln.list : null);
|
jpayne@68
|
401
|
jpayne@68
|
402 //Check to ensure pairing is as expected
|
jpayne@68
|
403 if(reads!=null && !reads.isEmpty()){
|
jpayne@68
|
404 Read r=reads.get(0);
|
jpayne@68
|
405 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
|
jpayne@68
|
406 }
|
jpayne@68
|
407
|
jpayne@68
|
408 //As long as there is a nonempty read list...
|
jpayne@68
|
409 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
|
jpayne@68
|
410 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
|
jpayne@68
|
411
|
jpayne@68
|
412 //Loop through each read in the list
|
jpayne@68
|
413 for(int idx=0; idx<reads.size(); idx++){
|
jpayne@68
|
414 final Read r1=reads.get(idx);
|
jpayne@68
|
415 final Read r2=r1.mate;
|
jpayne@68
|
416
|
jpayne@68
|
417 //Validate reads in worker threads
|
jpayne@68
|
418 if(!r1.validated()){r1.validate(true);}
|
jpayne@68
|
419 if(r2!=null && !r2.validated()){r2.validate(true);}
|
jpayne@68
|
420
|
jpayne@68
|
421 //Track the initial length for statistics
|
jpayne@68
|
422 final int initialLength1=r1.length();
|
jpayne@68
|
423 final int initialLength2=r1.mateLength();
|
jpayne@68
|
424
|
jpayne@68
|
425 //Increment counters
|
jpayne@68
|
426 readsProcessedT+=r1.pairCount();
|
jpayne@68
|
427 basesProcessedT+=initialLength1+initialLength2;
|
jpayne@68
|
428
|
jpayne@68
|
429 //Reads are processed in this block.
|
jpayne@68
|
430 processReadPair(r1, r2);
|
jpayne@68
|
431 }
|
jpayne@68
|
432
|
jpayne@68
|
433 long count=dumpHeap();
|
jpayne@68
|
434 if(count>=targetKmers){break;}
|
jpayne@68
|
435
|
jpayne@68
|
436 for(Read r1 : reads){
|
jpayne@68
|
437 readsOutT+=r1.pairCount();
|
jpayne@68
|
438 basesOutT+=r1.pairLength();
|
jpayne@68
|
439 }
|
jpayne@68
|
440
|
jpayne@68
|
441 //Output reads to the output stream
|
jpayne@68
|
442 if(ros!=null){ros.add(reads, ln.id);}
|
jpayne@68
|
443
|
jpayne@68
|
444 //Notify the input stream that the list was used
|
jpayne@68
|
445 cris.returnList(ln);
|
jpayne@68
|
446 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
|
jpayne@68
|
447
|
jpayne@68
|
448 //Fetch a new list
|
jpayne@68
|
449 ln=cris.nextList();
|
jpayne@68
|
450 reads=(ln!=null ? ln.list : null);
|
jpayne@68
|
451 }
|
jpayne@68
|
452
|
jpayne@68
|
453 //Notify the input stream that the final list was used
|
jpayne@68
|
454 if(ln!=null){
|
jpayne@68
|
455 if(ln.list!=null){ln.list.clear();}
|
jpayne@68
|
456 cris.returnList(ln.id, true);
|
jpayne@68
|
457 }
|
jpayne@68
|
458 }
|
jpayne@68
|
459
|
jpayne@68
|
460 /**
|
jpayne@68
|
461 * Process a read or a read pair.
|
jpayne@68
|
462 * @param r1 Read 1
|
jpayne@68
|
463 * @param r2 Read 2 (may be null)
|
jpayne@68
|
464 */
|
jpayne@68
|
465 void processReadPair(final Read r1, final Read r2){
|
jpayne@68
|
466 processReadNucleotide(r1);
|
jpayne@68
|
467 if(r2!=null){processReadNucleotide(r2);}
|
jpayne@68
|
468 }
|
jpayne@68
|
469
|
jpayne@68
|
470 void processReadNucleotide(final Read r){
|
jpayne@68
|
471 final byte[] bases=r.bases;
|
jpayne@68
|
472 final byte[] quals=r.quality;
|
jpayne@68
|
473 long kmer=0;
|
jpayne@68
|
474 long rkmer=0;
|
jpayne@68
|
475 int len=0;
|
jpayne@68
|
476 assert(!r.aminoacid());
|
jpayne@68
|
477
|
jpayne@68
|
478 final long min=minHashValue;
|
jpayne@68
|
479 localHeap.genomeSizeBases+=r.length();
|
jpayne@68
|
480 localHeap.genomeSequences++;
|
jpayne@68
|
481
|
jpayne@68
|
482 if(quals==null || (minProb<=0 && minQual<2)){
|
jpayne@68
|
483 for(int i=0; i<bases.length; i++){
|
jpayne@68
|
484 byte b=bases[i];
|
jpayne@68
|
485 long x=AminoAcid.baseToNumber[b];
|
jpayne@68
|
486 long x2=AminoAcid.baseToComplementNumber[b];
|
jpayne@68
|
487
|
jpayne@68
|
488 kmer=((kmer<<2)|x)&mask;
|
jpayne@68
|
489 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
|
jpayne@68
|
490
|
jpayne@68
|
491 if(x<0){len=0; rkmer=0;}else{len++;}
|
jpayne@68
|
492 if(len>=k){
|
jpayne@68
|
493 localHeap.genomeSizeKmers++;
|
jpayne@68
|
494 final long hashcode=hash(kmer, rkmer);
|
jpayne@68
|
495 if(hashcode>min){localHeap.add(hashcode);}
|
jpayne@68
|
496 }
|
jpayne@68
|
497 }
|
jpayne@68
|
498 }else{
|
jpayne@68
|
499 float prob=1;
|
jpayne@68
|
500 for(int i=0; i<bases.length; i++){
|
jpayne@68
|
501 final byte b=bases[i];
|
jpayne@68
|
502 final long x=AminoAcid.baseToNumber[b];
|
jpayne@68
|
503 final long x2=AminoAcid.baseToComplementNumber[b];
|
jpayne@68
|
504
|
jpayne@68
|
505 kmer=((kmer<<2)|x)&mask;
|
jpayne@68
|
506 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
|
jpayne@68
|
507
|
jpayne@68
|
508 {//Quality-related stuff
|
jpayne@68
|
509 final byte q=quals[i];
|
jpayne@68
|
510 assert(q>=0) : Arrays.toString(quals)+"\n"+minProb+", "+minQual;
|
jpayne@68
|
511 prob=prob*align2.QualityTools.PROB_CORRECT[q];
|
jpayne@68
|
512 if(len>k){
|
jpayne@68
|
513 byte oldq=quals[i-k];
|
jpayne@68
|
514 prob=prob*align2.QualityTools.PROB_CORRECT_INVERSE[oldq];
|
jpayne@68
|
515 }
|
jpayne@68
|
516 if(x<0 || q<minQual){
|
jpayne@68
|
517 len=0;
|
jpayne@68
|
518 kmer=rkmer=0;
|
jpayne@68
|
519 prob=1;
|
jpayne@68
|
520 }else{
|
jpayne@68
|
521 len++;
|
jpayne@68
|
522 }
|
jpayne@68
|
523 }
|
jpayne@68
|
524
|
jpayne@68
|
525 if(len>=k && prob>=minProb){
|
jpayne@68
|
526 localHeap.genomeSizeKmers++;
|
jpayne@68
|
527 localHeap.probSum+=prob;
|
jpayne@68
|
528 final long hashcode=hash(kmer, rkmer);
|
jpayne@68
|
529 if(hashcode>min){localHeap.checkAndAdd(hashcode);}
|
jpayne@68
|
530 }
|
jpayne@68
|
531 }
|
jpayne@68
|
532 }
|
jpayne@68
|
533 }
|
jpayne@68
|
534
|
jpayne@68
|
535 private long dumpHeap(){
|
jpayne@68
|
536 long count=0;
|
jpayne@68
|
537 synchronized(sharedHeap){
|
jpayne@68
|
538 count=sharedHeap.genomeSizeEstimate(minCount);
|
jpayne@68
|
539 if(count<targetKmers){
|
jpayne@68
|
540 sharedHeap.add(localHeap);
|
jpayne@68
|
541 localHeap.clear();
|
jpayne@68
|
542 }
|
jpayne@68
|
543 }
|
jpayne@68
|
544 return count;
|
jpayne@68
|
545 }
|
jpayne@68
|
546
|
jpayne@68
|
547 /** Number of reads processed by this thread */
|
jpayne@68
|
548 protected long readsProcessedT=0;
|
jpayne@68
|
549 /** Number of bases processed by this thread */
|
jpayne@68
|
550 protected long basesProcessedT=0;
|
jpayne@68
|
551
|
jpayne@68
|
552 /** Number of reads retained by this thread */
|
jpayne@68
|
553 protected long readsOutT=0;
|
jpayne@68
|
554 /** Number of bases retained by this thread */
|
jpayne@68
|
555 protected long basesOutT=0;
|
jpayne@68
|
556
|
jpayne@68
|
557 /** True only if this thread has completed successfully */
|
jpayne@68
|
558 boolean success=false;
|
jpayne@68
|
559
|
jpayne@68
|
560 /** Shared input stream */
|
jpayne@68
|
561 private final ConcurrentReadInputStream cris;
|
jpayne@68
|
562 /** Shared output stream */
|
jpayne@68
|
563 private final ConcurrentReadOutputStream ros;
|
jpayne@68
|
564 /** Thread ID */
|
jpayne@68
|
565 final int tid;
|
jpayne@68
|
566
|
jpayne@68
|
567 final SketchHeap localHeap;
|
jpayne@68
|
568 }
|
jpayne@68
|
569
|
jpayne@68
|
570 /*--------------------------------------------------------------*/
|
jpayne@68
|
571 /*---------------- Fields ----------------*/
|
jpayne@68
|
572 /*--------------------------------------------------------------*/
|
jpayne@68
|
573
|
jpayne@68
|
574 /** Primary input file path */
|
jpayne@68
|
575 private String in1=null;
|
jpayne@68
|
576 /** Secondary input file path */
|
jpayne@68
|
577 private String in2=null;
|
jpayne@68
|
578
|
jpayne@68
|
579 private String qfin1=null;
|
jpayne@68
|
580 private String qfin2=null;
|
jpayne@68
|
581
|
jpayne@68
|
582 /** Primary output file path */
|
jpayne@68
|
583 private String out1=null;
|
jpayne@68
|
584 /** Secondary output file path */
|
jpayne@68
|
585 private String out2=null;
|
jpayne@68
|
586
|
jpayne@68
|
587 private String qfout1=null;
|
jpayne@68
|
588 private String qfout2=null;
|
jpayne@68
|
589
|
jpayne@68
|
590 /** Override input file extension */
|
jpayne@68
|
591 private String extin=null;
|
jpayne@68
|
592 /** Override output file extension */
|
jpayne@68
|
593 private String extout=null;
|
jpayne@68
|
594
|
jpayne@68
|
595 /*--------------------------------------------------------------*/
|
jpayne@68
|
596
|
jpayne@68
|
597 /** Number of reads processed */
|
jpayne@68
|
598 protected long readsProcessed=0;
|
jpayne@68
|
599 /** Number of bases processed */
|
jpayne@68
|
600 protected long basesProcessed=0;
|
jpayne@68
|
601
|
jpayne@68
|
602 /** Number of reads retained */
|
jpayne@68
|
603 protected long readsOut=0;
|
jpayne@68
|
604 /** Number of bases retained */
|
jpayne@68
|
605 protected long basesOut=0;
|
jpayne@68
|
606
|
jpayne@68
|
607 /** Quit after processing this many input reads; -1 means no limit */
|
jpayne@68
|
608 private long maxReads=-1;
|
jpayne@68
|
609
|
jpayne@68
|
610 /*--------------------------------------------------------------*/
|
jpayne@68
|
611 /*---------------- Final Fields ----------------*/
|
jpayne@68
|
612 /*--------------------------------------------------------------*/
|
jpayne@68
|
613
|
jpayne@68
|
614 /** Primary input file */
|
jpayne@68
|
615 private final FileFormat ffin1;
|
jpayne@68
|
616 /** Secondary input file */
|
jpayne@68
|
617 private final FileFormat ffin2;
|
jpayne@68
|
618
|
jpayne@68
|
619 /** Primary output file */
|
jpayne@68
|
620 private final FileFormat ffout1;
|
jpayne@68
|
621 /** Secondary output file */
|
jpayne@68
|
622 private final FileFormat ffout2;
|
jpayne@68
|
623
|
jpayne@68
|
624 private final SketchHeap sharedHeap;
|
jpayne@68
|
625 private final int heapSize;
|
jpayne@68
|
626 private final long targetKmers;
|
jpayne@68
|
627 private final int minCount;
|
jpayne@68
|
628
|
jpayne@68
|
629 final int shift;
|
jpayne@68
|
630 final int shift2;
|
jpayne@68
|
631 final long mask;
|
jpayne@68
|
632
|
jpayne@68
|
633 final float minProb;
|
jpayne@68
|
634 final byte minQual;
|
jpayne@68
|
635
|
jpayne@68
|
636 /*--------------------------------------------------------------*/
|
jpayne@68
|
637 /*---------------- Common Fields ----------------*/
|
jpayne@68
|
638 /*--------------------------------------------------------------*/
|
jpayne@68
|
639
|
jpayne@68
|
640 /** Print status messages to this output stream */
|
jpayne@68
|
641 private PrintStream outstream=System.err;
|
jpayne@68
|
642 /** Print verbose messages */
|
jpayne@68
|
643 public static boolean verbose=false;
|
jpayne@68
|
644 /** True if an error was encountered */
|
jpayne@68
|
645 public boolean errorState=false;
|
jpayne@68
|
646 /** Overwrite existing output files */
|
jpayne@68
|
647 private boolean overwrite=true;
|
jpayne@68
|
648 /** Append to existing output files */
|
jpayne@68
|
649 private boolean append=false;
|
jpayne@68
|
650 /** Reads are output in input order (not enabled) */
|
jpayne@68
|
651 private boolean ordered=false;
|
jpayne@68
|
652 /** Shuffle input */
|
jpayne@68
|
653 private boolean shuffle=false;
|
jpayne@68
|
654
|
jpayne@68
|
655 }
|