jpayne@68
|
1 package sketch;
|
jpayne@68
|
2
|
jpayne@68
|
3 import java.io.File;
|
jpayne@68
|
4 import java.io.PrintStream;
|
jpayne@68
|
5 import java.util.ArrayList;
|
jpayne@68
|
6 import java.util.Arrays;
|
jpayne@68
|
7 import java.util.Locale;
|
jpayne@68
|
8 import java.util.Random;
|
jpayne@68
|
9
|
jpayne@68
|
10 import dna.AminoAcid;
|
jpayne@68
|
11 import fileIO.ByteFile;
|
jpayne@68
|
12 import fileIO.FileFormat;
|
jpayne@68
|
13 import fileIO.ReadWrite;
|
jpayne@68
|
14 import shared.Parse;
|
jpayne@68
|
15 import shared.Parser;
|
jpayne@68
|
16 import shared.PreParser;
|
jpayne@68
|
17 import shared.ReadStats;
|
jpayne@68
|
18 import shared.Shared;
|
jpayne@68
|
19 import shared.Timer;
|
jpayne@68
|
20 import shared.Tools;
|
jpayne@68
|
21 import stream.ConcurrentReadInputStream;
|
jpayne@68
|
22 import stream.ConcurrentReadOutputStream;
|
jpayne@68
|
23 import stream.FASTQ;
|
jpayne@68
|
24 import stream.FastaReadInputStream;
|
jpayne@68
|
25 import stream.Read;
|
jpayne@68
|
26 import structures.IntMap;
|
jpayne@68
|
27 import structures.ListNum;
|
jpayne@68
|
28
|
jpayne@68
|
29 /**
|
jpayne@68
|
30 *
|
jpayne@68
|
31 * @author Brian Bushnell
|
jpayne@68
|
32 * @date July 30, 2018
|
jpayne@68
|
33 *
|
jpayne@68
|
34 */
|
jpayne@68
|
35 public class KmerLimit2 extends SketchObject {
|
jpayne@68
|
36
|
jpayne@68
|
37 /*--------------------------------------------------------------*/
|
jpayne@68
|
38 /*---------------- Initialization ----------------*/
|
jpayne@68
|
39 /*--------------------------------------------------------------*/
|
jpayne@68
|
40
|
jpayne@68
|
41 /**
|
jpayne@68
|
42 * Code entrance from the command line.
|
jpayne@68
|
43 * @param args Command line arguments
|
jpayne@68
|
44 */
|
jpayne@68
|
45 public static void main(String[] args){
|
jpayne@68
|
46 //Start a timer immediately upon code entrance.
|
jpayne@68
|
47 Timer t=new Timer();
|
jpayne@68
|
48
|
jpayne@68
|
49 //Create an instance of this class
|
jpayne@68
|
50 KmerLimit2 x=new KmerLimit2(args);
|
jpayne@68
|
51
|
jpayne@68
|
52 //Run the object
|
jpayne@68
|
53 x.process(t);
|
jpayne@68
|
54
|
jpayne@68
|
55 //Close the print stream if it was redirected
|
jpayne@68
|
56 Shared.closeStream(x.outstream);
|
jpayne@68
|
57 }
|
jpayne@68
|
58
|
jpayne@68
|
59 /**
|
jpayne@68
|
60 * Constructor.
|
jpayne@68
|
61 * @param args Command line arguments
|
jpayne@68
|
62 */
|
jpayne@68
|
63 public KmerLimit2(String[] args){
|
jpayne@68
|
64
|
jpayne@68
|
65 {//Preparse block for help, config files, and outstream
|
jpayne@68
|
66 PreParser pp=new PreParser(args, getClass(), false);
|
jpayne@68
|
67 args=pp.args;
|
jpayne@68
|
68 outstream=pp.outstream;
|
jpayne@68
|
69 }
|
jpayne@68
|
70
|
jpayne@68
|
71 boolean setInterleaved=false; //Whether interleaved was explicitly set.
|
jpayne@68
|
72
|
jpayne@68
|
73 //Set shared static variables
|
jpayne@68
|
74 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
|
jpayne@68
|
75 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
|
jpayne@68
|
76 SketchObject.setKeyFraction(0.1);
|
jpayne@68
|
77 defaultParams.minEntropy=0;
|
jpayne@68
|
78 defaultParams.minProb=0.2f;
|
jpayne@68
|
79
|
jpayne@68
|
80 boolean setHeapSize=false;
|
jpayne@68
|
81 int heapSize_=8091;
|
jpayne@68
|
82 long targetKmers_=0;
|
jpayne@68
|
83 int k_=32;
|
jpayne@68
|
84 int minCount_=1;
|
jpayne@68
|
85
|
jpayne@68
|
86 //Create a parser object
|
jpayne@68
|
87 Parser parser=new Parser();
|
jpayne@68
|
88 parser.overwrite=true;
|
jpayne@68
|
89
|
jpayne@68
|
90 //Parse each argument
|
jpayne@68
|
91 for(int i=0; i<args.length; i++){
|
jpayne@68
|
92 String arg=args[i];
|
jpayne@68
|
93
|
jpayne@68
|
94 //Break arguments into their constituent parts, in the form of "a=b"
|
jpayne@68
|
95 String[] split=arg.split("=");
|
jpayne@68
|
96 String a=split[0].toLowerCase();
|
jpayne@68
|
97 String b=split.length>1 ? split[1] : null;
|
jpayne@68
|
98 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
|
jpayne@68
|
99
|
jpayne@68
|
100 if(a.equals("verbose")){
|
jpayne@68
|
101 verbose=Parse.parseBoolean(b);
|
jpayne@68
|
102 }else if(a.equals("ordered")){
|
jpayne@68
|
103 ordered=Parse.parseBoolean(b);
|
jpayne@68
|
104 }else if(a.equals("size") || a.equals("heapsize")){
|
jpayne@68
|
105 heapSize_=Parse.parseIntKMG(b);
|
jpayne@68
|
106 setHeapSize=true;
|
jpayne@68
|
107 }else if(a.equals("kmers") || a.equals("target") || a.equals("limit")){
|
jpayne@68
|
108 targetKmers_=Parse.parseKMG(b);
|
jpayne@68
|
109 }else if(a.equals("mincount")){
|
jpayne@68
|
110 minCount_=Parse.parseIntKMG(b);
|
jpayne@68
|
111 }else if(a.equals("maxexpandedlength") || a.equals("maxlength") || a.equals("maxlen")){
|
jpayne@68
|
112 maxExpandedLength=Parse.parseIntKMG(b);
|
jpayne@68
|
113 }else if(a.equals("seed")){
|
jpayne@68
|
114 seed=Parse.parseKMG(b);
|
jpayne@68
|
115 }else if(a.equals("trials")){
|
jpayne@68
|
116 trials=Parse.parseIntKMG(b);
|
jpayne@68
|
117 }else if(parseSketchFlags(arg, a, b)){
|
jpayne@68
|
118 parser.parse(arg, a, b);
|
jpayne@68
|
119 }else if(defaultParams.parse(arg, a, b)){
|
jpayne@68
|
120 parser.parse(arg, a, b);
|
jpayne@68
|
121 }else if(a.equals("parse_flag_goes_here")){
|
jpayne@68
|
122 long fake_variable=Parse.parseKMG(b);
|
jpayne@68
|
123 //Set a variable here
|
jpayne@68
|
124 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
|
jpayne@68
|
125 //do nothing
|
jpayne@68
|
126 }else{
|
jpayne@68
|
127 outstream.println("Unknown parameter "+args[i]);
|
jpayne@68
|
128 assert(false) : "Unknown parameter "+args[i];
|
jpayne@68
|
129 }
|
jpayne@68
|
130 }
|
jpayne@68
|
131
|
jpayne@68
|
132 if(!setHeapSize && minCount_>1){heapSize_=32000;}
|
jpayne@68
|
133 heapSize=heapSize_;
|
jpayne@68
|
134 targetKmers=targetKmers_;
|
jpayne@68
|
135 k=k_;
|
jpayne@68
|
136 minCount=minCount_;
|
jpayne@68
|
137 assert(targetKmers>0) : "Must set a kmer limit.";
|
jpayne@68
|
138 assert(heapSize>0) : "Heap size must be positive.";
|
jpayne@68
|
139 assert(k>0 && k<=32) : "0<k<33; k="+k;
|
jpayne@68
|
140 postParse();
|
jpayne@68
|
141
|
jpayne@68
|
142 // if(minCount>1){
|
jpayne@68
|
143 // Shared.setBufferLen(800);
|
jpayne@68
|
144 // }
|
jpayne@68
|
145
|
jpayne@68
|
146 {//Process parser fields
|
jpayne@68
|
147 Parser.processQuality();
|
jpayne@68
|
148
|
jpayne@68
|
149 maxReads=parser.maxReads;
|
jpayne@68
|
150
|
jpayne@68
|
151 overwrite=ReadStats.overwrite=parser.overwrite;
|
jpayne@68
|
152 append=ReadStats.append=parser.append;
|
jpayne@68
|
153 setInterleaved=parser.setInterleaved;
|
jpayne@68
|
154
|
jpayne@68
|
155 in1=parser.in1;
|
jpayne@68
|
156 in2=parser.in2;
|
jpayne@68
|
157 qfin1=parser.qfin1;
|
jpayne@68
|
158 qfin2=parser.qfin2;
|
jpayne@68
|
159
|
jpayne@68
|
160 out1=parser.out1;
|
jpayne@68
|
161 out2=parser.out2;
|
jpayne@68
|
162 qfout1=parser.qfout1;
|
jpayne@68
|
163 qfout2=parser.qfout2;
|
jpayne@68
|
164
|
jpayne@68
|
165 extin=parser.extin;
|
jpayne@68
|
166 extout=parser.extout;
|
jpayne@68
|
167 }
|
jpayne@68
|
168
|
jpayne@68
|
169 //Do input file # replacement
|
jpayne@68
|
170 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
|
jpayne@68
|
171 in2=in1.replace("#", "2");
|
jpayne@68
|
172 in1=in1.replace("#", "1");
|
jpayne@68
|
173 }
|
jpayne@68
|
174
|
jpayne@68
|
175 //Do output file # replacement
|
jpayne@68
|
176 if(out1!=null && out2==null && out1.indexOf('#')>-1){
|
jpayne@68
|
177 out2=out1.replace("#", "2");
|
jpayne@68
|
178 out1=out1.replace("#", "1");
|
jpayne@68
|
179 }
|
jpayne@68
|
180
|
jpayne@68
|
181 //Adjust interleaved detection based on the number of input files
|
jpayne@68
|
182 if(in2!=null){
|
jpayne@68
|
183 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
|
jpayne@68
|
184 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
|
jpayne@68
|
185 }
|
jpayne@68
|
186
|
jpayne@68
|
187 assert(FastaReadInputStream.settingsOK());
|
jpayne@68
|
188
|
jpayne@68
|
189 //Ensure there is an input file
|
jpayne@68
|
190 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
|
jpayne@68
|
191
|
jpayne@68
|
192 //Adjust the number of threads for input file reading
|
jpayne@68
|
193 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
|
jpayne@68
|
194 ByteFile.FORCE_MODE_BF2=true;
|
jpayne@68
|
195 }
|
jpayne@68
|
196
|
jpayne@68
|
197 //Ensure out2 is not set without out1
|
jpayne@68
|
198 if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");}
|
jpayne@68
|
199
|
jpayne@68
|
200 //Adjust interleaved settings based on number of output files
|
jpayne@68
|
201 if(!setInterleaved){
|
jpayne@68
|
202 assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n";
|
jpayne@68
|
203 if(in2!=null){ //If there are 2 input streams.
|
jpayne@68
|
204 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
|
jpayne@68
|
205 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
|
jpayne@68
|
206 }else{ //There is one input stream.
|
jpayne@68
|
207 if(out2!=null){
|
jpayne@68
|
208 FASTQ.FORCE_INTERLEAVED=true;
|
jpayne@68
|
209 FASTQ.TEST_INTERLEAVED=false;
|
jpayne@68
|
210 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
|
jpayne@68
|
211 }
|
jpayne@68
|
212 }
|
jpayne@68
|
213 }
|
jpayne@68
|
214
|
jpayne@68
|
215 //Ensure output files can be written
|
jpayne@68
|
216 if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){
|
jpayne@68
|
217 outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2);
|
jpayne@68
|
218 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n");
|
jpayne@68
|
219 }
|
jpayne@68
|
220
|
jpayne@68
|
221 //Ensure input files can be read
|
jpayne@68
|
222 if(!Tools.testInputFiles(false, true, in1, in2)){
|
jpayne@68
|
223 throw new RuntimeException("\nCan't read some input files.\n");
|
jpayne@68
|
224 }
|
jpayne@68
|
225
|
jpayne@68
|
226 //Ensure that no file was specified multiple times
|
jpayne@68
|
227 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){
|
jpayne@68
|
228 throw new RuntimeException("\nSome file names were specified multiple times.\n");
|
jpayne@68
|
229 }
|
jpayne@68
|
230
|
jpayne@68
|
231 //Create output FileFormat objects
|
jpayne@68
|
232 ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
|
jpayne@68
|
233 ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
|
jpayne@68
|
234
|
jpayne@68
|
235 //Create input FileFormat objects
|
jpayne@68
|
236 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
|
jpayne@68
|
237 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
|
jpayne@68
|
238
|
jpayne@68
|
239 minProb=defaultParams.minProb;
|
jpayne@68
|
240 minQual=defaultParams.minQual;
|
jpayne@68
|
241
|
jpayne@68
|
242 shift=2*k;
|
jpayne@68
|
243 shift2=shift-2;
|
jpayne@68
|
244 mask=(shift>63 ? -1L : ~((-1L)<<shift)); //Conditional allows K=32
|
jpayne@68
|
245 sharedHeap=new SketchHeap(heapSize, 0, true);
|
jpayne@68
|
246 }
|
jpayne@68
|
247
|
jpayne@68
|
248 /*--------------------------------------------------------------*/
|
jpayne@68
|
249 /*---------------- Outer Methods ----------------*/
|
jpayne@68
|
250 /*--------------------------------------------------------------*/
|
jpayne@68
|
251
|
jpayne@68
|
252 /** Create read streams and process all data */
|
jpayne@68
|
253 void process(Timer t){
|
jpayne@68
|
254
|
jpayne@68
|
255 //Turn off read validation in the input threads to increase speed
|
jpayne@68
|
256 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
|
jpayne@68
|
257 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
|
jpayne@68
|
258
|
jpayne@68
|
259 // //Optionally create a read output stream
|
jpayne@68
|
260 // final ConcurrentReadOutputStream ros;
|
jpayne@68
|
261 // if(ffout1!=null){
|
jpayne@68
|
262 // //Select output buffer size based on whether it needs to be ordered
|
jpayne@68
|
263 // final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
|
jpayne@68
|
264 //
|
jpayne@68
|
265 // //Notify user of output mode
|
jpayne@68
|
266 // if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){
|
jpayne@68
|
267 // outstream.println("Writing interleaved.");
|
jpayne@68
|
268 // }
|
jpayne@68
|
269 //
|
jpayne@68
|
270 // ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false);
|
jpayne@68
|
271 // ros.start(); //Start the stream
|
jpayne@68
|
272 // }else{ros=null;}
|
jpayne@68
|
273
|
jpayne@68
|
274 //Reset counters
|
jpayne@68
|
275 readsProcessed=readsOut=0;
|
jpayne@68
|
276 basesProcessed=basesOut=0;
|
jpayne@68
|
277
|
jpayne@68
|
278 //Process the reads in separate threads
|
jpayne@68
|
279 spawnThreads0();
|
jpayne@68
|
280
|
jpayne@68
|
281 // if(verbose){outstream.println("Finished; closing streams.");}
|
jpayne@68
|
282
|
jpayne@68
|
283 //Reset read validation
|
jpayne@68
|
284 Read.VALIDATE_IN_CONSTRUCTOR=vic;
|
jpayne@68
|
285
|
jpayne@68
|
286
|
jpayne@68
|
287 Sketch sketch=new Sketch(sharedHeap, true, true, null);
|
jpayne@68
|
288 sketch=capLengthAtCountSum(sketch, maxExpandedLength);
|
jpayne@68
|
289 final long reads=Tools.max(1, sketch.genomeSequences);
|
jpayne@68
|
290 final long targetReads=calcTargetReads(sketch, targetKmers, minCount, trials, seed);
|
jpayne@68
|
291 final double targetRate=Tools.min(1, targetReads/(double)reads);
|
jpayne@68
|
292 final String targetRateS=String.format(Locale.ROOT, "%.4f%%",targetRate*100);
|
jpayne@68
|
293
|
jpayne@68
|
294 //Report timing and results
|
jpayne@68
|
295 t.stop();
|
jpayne@68
|
296 outstream.println("Finished counting kmers.");
|
jpayne@68
|
297 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
|
jpayne@68
|
298
|
jpayne@68
|
299 String kstring0=Tools.padKM(sketch.genomeSizeEstimate(minCount), 8);
|
jpayne@68
|
300 String rstring0=Tools.padKM(targetReads, 8);
|
jpayne@68
|
301 outstream.println("Unique Kmers: "+kstring0);
|
jpayne@68
|
302 outstream.println("Target Reads: "+rstring0+"\t"+targetRateS);
|
jpayne@68
|
303
|
jpayne@68
|
304 // outstream.println("Reads: \t"+reads);
|
jpayne@68
|
305 // outstream.println("Unique Kmers: \t"+sketch.genomeSizeEstimate(minCount));
|
jpayne@68
|
306 // outstream.println("Target Reads: \t"+targetReads);
|
jpayne@68
|
307 // outstream.println("Sample Rate: \t"+targetRateS);
|
jpayne@68
|
308 // outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
|
jpayne@68
|
309
|
jpayne@68
|
310 t.start();
|
jpayne@68
|
311 outstream.println("\nSubsampling reads.");
|
jpayne@68
|
312
|
jpayne@68
|
313 // String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8);
|
jpayne@68
|
314 // outstream.println("Unique Kmers Out: "+kstring);
|
jpayne@68
|
315
|
jpayne@68
|
316
|
jpayne@68
|
317 // ArrayList<String> args=new ArrayList<String>();
|
jpayne@68
|
318 // args.add("in="+in1);
|
jpayne@68
|
319 // if(in2!=null){args.add("in2="+in2);}
|
jpayne@68
|
320 // args.add("out="+out1);
|
jpayne@68
|
321 // if(out2!=null){args.add("out2="+out2);}
|
jpayne@68
|
322 // args.add("ordered="+ordered);
|
jpayne@68
|
323 // args.add("ow="+(overwrite ? "t" : "f"));
|
jpayne@68
|
324 // if(targetRate<1){args.add("samplerate="+targetRateS);}
|
jpayne@68
|
325 // args.add("loglogout");
|
jpayne@68
|
326 // args.add("loglogk="+k);
|
jpayne@68
|
327 // args.add("loglogminprob="+minProb);
|
jpayne@68
|
328 // BBDukF.main(args.toArray(new String[0]));
|
jpayne@68
|
329
|
jpayne@68
|
330 // Sketch sk=new Sketch(sharedHeap, true, true, null);
|
jpayne@68
|
331 // outstream.println(sk.genomeSizeEstimate());
|
jpayne@68
|
332 spawnThreads2(targetRate);
|
jpayne@68
|
333 t.stop();
|
jpayne@68
|
334 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
|
jpayne@68
|
335
|
jpayne@68
|
336 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
|
jpayne@68
|
337 String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8);
|
jpayne@68
|
338 outstream.println("Unique Kmers Out: "+kstring);
|
jpayne@68
|
339
|
jpayne@68
|
340 //Throw an exception of there was an error in a thread
|
jpayne@68
|
341 if(errorState){
|
jpayne@68
|
342 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
|
jpayne@68
|
343 }
|
jpayne@68
|
344 }
|
jpayne@68
|
345
|
jpayne@68
|
346 /** Spawn process threads */
|
jpayne@68
|
347 private void spawnThreads0(){
|
jpayne@68
|
348
|
jpayne@68
|
349 //Create a read input stream
|
jpayne@68
|
350 final ConcurrentReadInputStream cris;
|
jpayne@68
|
351 {
|
jpayne@68
|
352 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
|
jpayne@68
|
353 cris.start(); //Start the stream
|
jpayne@68
|
354 if(verbose){outstream.println("Started cris");}
|
jpayne@68
|
355 }
|
jpayne@68
|
356 paired=cris.paired();
|
jpayne@68
|
357 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
|
jpayne@68
|
358
|
jpayne@68
|
359 //Determine how many threads may be used
|
jpayne@68
|
360 final int threads=Tools.min(10, Shared.threads());
|
jpayne@68
|
361
|
jpayne@68
|
362 //Fill a list with ProcessThreads
|
jpayne@68
|
363 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
|
jpayne@68
|
364 for(int i=0; i<threads; i++){
|
jpayne@68
|
365 alpt.add(new ProcessThread(cris, null, i, heapSize));
|
jpayne@68
|
366 }
|
jpayne@68
|
367
|
jpayne@68
|
368 //Start the threads
|
jpayne@68
|
369 for(ProcessThread pt : alpt){
|
jpayne@68
|
370 pt.start();
|
jpayne@68
|
371 }
|
jpayne@68
|
372
|
jpayne@68
|
373 //Wait for completion of all threads
|
jpayne@68
|
374 boolean success=true;
|
jpayne@68
|
375 for(ProcessThread pt : alpt){
|
jpayne@68
|
376
|
jpayne@68
|
377 //Wait until this thread has terminated
|
jpayne@68
|
378 while(pt.getState()!=Thread.State.TERMINATED){
|
jpayne@68
|
379 try {
|
jpayne@68
|
380 //Attempt a join operation
|
jpayne@68
|
381 pt.join();
|
jpayne@68
|
382 } catch (InterruptedException e) {
|
jpayne@68
|
383 //Potentially handle this, if it is expected to occur
|
jpayne@68
|
384 e.printStackTrace();
|
jpayne@68
|
385 }
|
jpayne@68
|
386 }
|
jpayne@68
|
387
|
jpayne@68
|
388 //Accumulate per-thread statistics
|
jpayne@68
|
389 readsProcessed+=pt.readsProcessedT;
|
jpayne@68
|
390 basesProcessed+=pt.basesProcessedT;
|
jpayne@68
|
391 readsOut+=pt.readsOutT;
|
jpayne@68
|
392 basesOut+=pt.basesOutT;
|
jpayne@68
|
393 success&=pt.success;
|
jpayne@68
|
394 }
|
jpayne@68
|
395
|
jpayne@68
|
396 //Track whether any threads failed
|
jpayne@68
|
397 if(!success){errorState=true;}
|
jpayne@68
|
398
|
jpayne@68
|
399 //Do anything necessary after processing
|
jpayne@68
|
400
|
jpayne@68
|
401 //Close the read streams
|
jpayne@68
|
402 errorState|=ReadWrite.closeStreams(cris);
|
jpayne@68
|
403
|
jpayne@68
|
404 }
|
jpayne@68
|
405
|
jpayne@68
|
406 /** Spawn process threads */
|
jpayne@68
|
407 private void spawnThreads2(double rate){
|
jpayne@68
|
408
|
jpayne@68
|
409 //Create a read input stream
|
jpayne@68
|
410 final ConcurrentReadInputStream cris;
|
jpayne@68
|
411 {
|
jpayne@68
|
412 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
|
jpayne@68
|
413 cris.setSampleRate((float)rate, seed);
|
jpayne@68
|
414 cris.start(); //Start the stream
|
jpayne@68
|
415 if(verbose){outstream.println("Started cris");}
|
jpayne@68
|
416 }
|
jpayne@68
|
417 // paired=cris.paired();
|
jpayne@68
|
418 // if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
|
jpayne@68
|
419
|
jpayne@68
|
420 //Optionally create a read output stream
|
jpayne@68
|
421 final ConcurrentReadOutputStream ros;
|
jpayne@68
|
422 if(ffout1!=null){
|
jpayne@68
|
423 //Select output buffer size based on whether it needs to be ordered
|
jpayne@68
|
424 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
|
jpayne@68
|
425
|
jpayne@68
|
426 //Notify user of output mode
|
jpayne@68
|
427 if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){
|
jpayne@68
|
428 outstream.println("Writing interleaved.");
|
jpayne@68
|
429 }
|
jpayne@68
|
430
|
jpayne@68
|
431 ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false);
|
jpayne@68
|
432 ros.start(); //Start the stream
|
jpayne@68
|
433 }else{ros=null;}
|
jpayne@68
|
434
|
jpayne@68
|
435 //Determine how many threads may be used
|
jpayne@68
|
436 final int threads=Tools.min(10, Shared.threads());
|
jpayne@68
|
437
|
jpayne@68
|
438 sharedHeap.clear();
|
jpayne@68
|
439 // readsProcessed=0;
|
jpayne@68
|
440 // basesProcessed=0;
|
jpayne@68
|
441 readsOut=0;
|
jpayne@68
|
442 basesOut=0;
|
jpayne@68
|
443
|
jpayne@68
|
444 //Fill a list with ProcessThreads
|
jpayne@68
|
445 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
|
jpayne@68
|
446 for(int i=0; i<threads; i++){
|
jpayne@68
|
447 alpt.add(new ProcessThread(cris, ros, i, heapSize));
|
jpayne@68
|
448 }
|
jpayne@68
|
449
|
jpayne@68
|
450 //Start the threads
|
jpayne@68
|
451 for(ProcessThread pt : alpt){
|
jpayne@68
|
452 pt.start();
|
jpayne@68
|
453 }
|
jpayne@68
|
454
|
jpayne@68
|
455 //Wait for completion of all threads
|
jpayne@68
|
456 boolean success=true;
|
jpayne@68
|
457 for(ProcessThread pt : alpt){
|
jpayne@68
|
458
|
jpayne@68
|
459 //Wait until this thread has terminated
|
jpayne@68
|
460 while(pt.getState()!=Thread.State.TERMINATED){
|
jpayne@68
|
461 try {
|
jpayne@68
|
462 //Attempt a join operation
|
jpayne@68
|
463 pt.join();
|
jpayne@68
|
464 } catch (InterruptedException e) {
|
jpayne@68
|
465 //Potentially handle this, if it is expected to occur
|
jpayne@68
|
466 e.printStackTrace();
|
jpayne@68
|
467 }
|
jpayne@68
|
468 }
|
jpayne@68
|
469
|
jpayne@68
|
470 //Accumulate per-thread statistics
|
jpayne@68
|
471 // readsProcessed+=pt.readsProcessedT;
|
jpayne@68
|
472 // basesProcessed+=pt.basesProcessedT;
|
jpayne@68
|
473 readsOut+=pt.readsOutT;
|
jpayne@68
|
474 basesOut+=pt.basesOutT;
|
jpayne@68
|
475 success&=pt.success;
|
jpayne@68
|
476 }
|
jpayne@68
|
477
|
jpayne@68
|
478 //Track whether any threads failed
|
jpayne@68
|
479 if(!success){errorState=true;}
|
jpayne@68
|
480
|
jpayne@68
|
481 //Do anything necessary after processing
|
jpayne@68
|
482
|
jpayne@68
|
483 //Write anything that was accumulated by ReadStats
|
jpayne@68
|
484 errorState|=ReadStats.writeAll();
|
jpayne@68
|
485 //Close the read streams
|
jpayne@68
|
486 errorState|=ReadWrite.closeStreams(cris, ros);
|
jpayne@68
|
487
|
jpayne@68
|
488 }
|
jpayne@68
|
489
|
jpayne@68
|
490 /*--------------------------------------------------------------*/
|
jpayne@68
|
491 /*---------------- Inner Methods ----------------*/
|
jpayne@68
|
492 /*--------------------------------------------------------------*/
|
jpayne@68
|
493
|
jpayne@68
|
494 public static Sketch capLengthAtCountSum(Sketch sketch0, int max) {
|
jpayne@68
|
495 int len=0;
|
jpayne@68
|
496 long sum=0;
|
jpayne@68
|
497 for(; len<sketch0.keyCounts.length; len++){
|
jpayne@68
|
498 sum=sum+sketch0.keyCounts[len];
|
jpayne@68
|
499 if(sum>max){break;}
|
jpayne@68
|
500 }
|
jpayne@68
|
501 if(len>=sketch0.length()){return sketch0;}
|
jpayne@68
|
502
|
jpayne@68
|
503 long[] keys=Arrays.copyOf(sketch0.keys, len);
|
jpayne@68
|
504 int[] counts=Arrays.copyOf(sketch0.keyCounts, len);
|
jpayne@68
|
505
|
jpayne@68
|
506 // long[] array_, int[] counts_, int taxID_, long imgID_, long gSizeBases_, long gSizeKmers_, long gSequences_, double probCorrect_,
|
jpayne@68
|
507 // String taxName_, String name0_, String fname_, ArrayList<String> meta_
|
jpayne@68
|
508
|
jpayne@68
|
509 Sketch sk=new Sketch(keys, counts, null, null, null, -1, -1,
|
jpayne@68
|
510 sketch0.genomeSizeBases, sketch0.genomeSizeKmers, sketch0.genomeSequences, sketch0.probCorrect,
|
jpayne@68
|
511 null, null, null, null);
|
jpayne@68
|
512
|
jpayne@68
|
513 return sk;
|
jpayne@68
|
514 }
|
jpayne@68
|
515
|
jpayne@68
|
516 public static long calcTargetReads(Sketch sketch, long targetKmers, int minCount, int trials, long seed){
|
jpayne@68
|
517 final int[] counts0=sketch.keyCounts;
|
jpayne@68
|
518 final int[] counts=Arrays.copyOf(counts0, counts0.length);
|
jpayne@68
|
519 final long size=sketch.genomeSizeEstimate(minCount);
|
jpayne@68
|
520 final long reads=sketch.genomeSequences;
|
jpayne@68
|
521 final double targetKmerFraction=targetKmers/(double)size;
|
jpayne@68
|
522 if(targetKmerFraction>=1){return reads;}
|
jpayne@68
|
523
|
jpayne@68
|
524 final int targetKeys=(int)(targetKmerFraction*counts.length);
|
jpayne@68
|
525 final long countSum=Tools.sum(counts0);
|
jpayne@68
|
526 assert(countSum<Shared.MAX_ARRAY_LEN) : countSum;
|
jpayne@68
|
527 // System.err.println("countsum: "+countSum);
|
jpayne@68
|
528
|
jpayne@68
|
529 final IntMap map=new IntMap(0, counts0.length);
|
jpayne@68
|
530 final int[] expanded=new int[(int)countSum];
|
jpayne@68
|
531
|
jpayne@68
|
532 long roundSum=0;
|
jpayne@68
|
533 final Random randy=Shared.threadLocalRandom(seed);
|
jpayne@68
|
534 for(int i=0; i<trials; i++){
|
jpayne@68
|
535 Tools.fill(counts, counts0);
|
jpayne@68
|
536 // long rounds=reduceRounds(counts0, counts, minCount, targetKeys, randy);
|
jpayne@68
|
537 long rounds=reduceRoundsIM(counts0, expanded, minCount, targetKeys, randy, map);
|
jpayne@68
|
538 roundSum+=rounds;
|
jpayne@68
|
539 }
|
jpayne@68
|
540 double avgRounds=roundSum/(double)trials;
|
jpayne@68
|
541 // System.err.println("avgRounds: "+avgRounds);
|
jpayne@68
|
542 double targetCountFraction=1-(avgRounds/countSum);
|
jpayne@68
|
543 // System.err.println("targetFraction: "+targetCountFraction);
|
jpayne@68
|
544 return (long)(targetCountFraction*reads);
|
jpayne@68
|
545 }
|
jpayne@68
|
546
|
jpayne@68
|
547 // public static int reduceRoundsOld(final int[] counts, final int minCount, final int targetKeys, final Random randy){
|
jpayne@68
|
548 // assert(minCount>=0) : minCount;
|
jpayne@68
|
549 // int rounds=0;
|
jpayne@68
|
550 // int valid=0;
|
jpayne@68
|
551 // for(int x : counts){
|
jpayne@68
|
552 // if(x>=minCount){valid++;}
|
jpayne@68
|
553 // }
|
jpayne@68
|
554 //
|
jpayne@68
|
555 // int len=counts.length;
|
jpayne@68
|
556 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)+", "+Arrays.toString(counts));
|
jpayne@68
|
557 // for(; valid>targetKeys; rounds++){
|
jpayne@68
|
558 // int pos=randy.nextInt(len);
|
jpayne@68
|
559 //// assert(counts[pos]>0) : pos+"/"+len+": "+targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Arrays.toString(counts);
|
jpayne@68
|
560 // if(counts[pos]==minCount){valid--;}
|
jpayne@68
|
561 // counts[pos]--;
|
jpayne@68
|
562 // if(counts[pos]==0){
|
jpayne@68
|
563 // len--;//shrink the array
|
jpayne@68
|
564 // System.err.println("len="+len+", counts[len]="+counts[len]);
|
jpayne@68
|
565 // System.err.println("pos="+pos+", counts[pos]="+counts[pos]);
|
jpayne@68
|
566 // counts[pos]=counts[len];//move the last element to the empty slot
|
jpayne@68
|
567 // counts[len]=0;
|
jpayne@68
|
568 // if(pos!=len && len>0){
|
jpayne@68
|
569 // assert(counts[pos]>0) : pos+"/"+len+": "+targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Arrays.toString(counts);
|
jpayne@68
|
570 // }
|
jpayne@68
|
571 // }
|
jpayne@68
|
572 // System.err.println(len+", "+pos+": "+Arrays.toString(counts));
|
jpayne@68
|
573 // }
|
jpayne@68
|
574 //
|
jpayne@68
|
575 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts));
|
jpayne@68
|
576 //
|
jpayne@68
|
577 // return rounds;
|
jpayne@68
|
578 // }
|
jpayne@68
|
579
|
jpayne@68
|
580 //This can be done faster with bins.
|
jpayne@68
|
581 //Each bin contains all kmers with count x. When a bin is hit, one kmer moves to the next bin lower.
|
jpayne@68
|
582 //Alternately, expand the array into one physical kmer per count. Store the current counts in an IntMap. Remove key each time.
|
jpayne@68
|
583 public static long reduceRounds(final int[] counts0, final int[] counts, final int minCount, final int targetKeys, final Random randy){
|
jpayne@68
|
584 assert(minCount>=0) : minCount;
|
jpayne@68
|
585 long rounds=0;
|
jpayne@68
|
586 int valid=0;
|
jpayne@68
|
587 for(int x : counts){
|
jpayne@68
|
588 if(x>=minCount){valid++;}
|
jpayne@68
|
589 }
|
jpayne@68
|
590
|
jpayne@68
|
591 int len=counts.length;
|
jpayne@68
|
592 final long sum0=Tools.sum(counts);
|
jpayne@68
|
593 long sum=sum0;
|
jpayne@68
|
594 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)+", "+Arrays.toString(counts));
|
jpayne@68
|
595 for(; valid>targetKeys; rounds++){
|
jpayne@68
|
596 long posNum=(Long.MAX_VALUE&randy.nextLong())%sum;
|
jpayne@68
|
597 long sum2=0;
|
jpayne@68
|
598 int pos=0;
|
jpayne@68
|
599
|
jpayne@68
|
600 for(int i=0; i<counts.length; i++){
|
jpayne@68
|
601 int x=counts[i];
|
jpayne@68
|
602 if(x>0){
|
jpayne@68
|
603 sum2+=x;
|
jpayne@68
|
604 if(sum2>=posNum){
|
jpayne@68
|
605 pos=i;
|
jpayne@68
|
606 break;
|
jpayne@68
|
607 }
|
jpayne@68
|
608 }
|
jpayne@68
|
609 }
|
jpayne@68
|
610
|
jpayne@68
|
611 // for(int i=0; i<counts0.length; i++){
|
jpayne@68
|
612 // int x=counts0[i];
|
jpayne@68
|
613 // if(x>0){
|
jpayne@68
|
614 // sum2+=x;
|
jpayne@68
|
615 // if(sum2>=posNum){
|
jpayne@68
|
616 // pos=i;
|
jpayne@68
|
617 // break;
|
jpayne@68
|
618 // }
|
jpayne@68
|
619 // }
|
jpayne@68
|
620 // }
|
jpayne@68
|
621
|
jpayne@68
|
622 sum--;
|
jpayne@68
|
623
|
jpayne@68
|
624 assert(counts[pos]>0) : pos+"/"+len+": "+targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Arrays.toString(counts);
|
jpayne@68
|
625 if(counts[pos]==minCount){valid--;}
|
jpayne@68
|
626 counts[pos]--;
|
jpayne@68
|
627 if(counts[pos]==0){
|
jpayne@68
|
628 len--;//shrink the array
|
jpayne@68
|
629 }
|
jpayne@68
|
630 // System.err.println(len+", "+pos+": "+Arrays.toString(counts));
|
jpayne@68
|
631 }
|
jpayne@68
|
632
|
jpayne@68
|
633 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts));
|
jpayne@68
|
634
|
jpayne@68
|
635 return rounds;
|
jpayne@68
|
636 }
|
jpayne@68
|
637
|
jpayne@68
|
638 //This can be done faster with bins.
|
jpayne@68
|
639 //Each bin contains all kmers with count x. When a bin is hit, one kmer moves to the next bin lower.
|
jpayne@68
|
640 //Alternately, expand the array into one physical kmer per count. Store the current counts in an IntMap. Remove key each time.
|
jpayne@68
|
641 public static long reduceRoundsIM(final int[] counts0, final int[] expanded, final int minCount, final int targetKeys, final Random randy, final IntMap map){
|
jpayne@68
|
642 assert(minCount>=0) : minCount;
|
jpayne@68
|
643 long rounds=0;
|
jpayne@68
|
644 int valid=0;
|
jpayne@68
|
645 map.clear();
|
jpayne@68
|
646 for(int i=0, k=0; i<counts0.length; i++){
|
jpayne@68
|
647 int x=counts0[i];
|
jpayne@68
|
648 // counts[i]=counts0[i];
|
jpayne@68
|
649 if(x>=minCount){valid++;}
|
jpayne@68
|
650 map.put(i, x);
|
jpayne@68
|
651 for(int j=0; j<x; j++, k++){
|
jpayne@68
|
652 expanded[k]=i;
|
jpayne@68
|
653 }
|
jpayne@68
|
654 }
|
jpayne@68
|
655 assert(expanded.length==Tools.sum(counts0));
|
jpayne@68
|
656
|
jpayne@68
|
657 int len=expanded.length;
|
jpayne@68
|
658 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)+", "+Arrays.toString(counts));
|
jpayne@68
|
659 for(; valid>targetKeys; rounds++){
|
jpayne@68
|
660 final int pos=randy.nextInt(len);
|
jpayne@68
|
661 final int key=expanded[pos];
|
jpayne@68
|
662 final int x=map.get(key);
|
jpayne@68
|
663 assert(x>0);
|
jpayne@68
|
664
|
jpayne@68
|
665
|
jpayne@68
|
666 if(x==minCount){valid--;}
|
jpayne@68
|
667 map.put(key, x-1);
|
jpayne@68
|
668
|
jpayne@68
|
669 len--;//shrink the array
|
jpayne@68
|
670 // System.err.println("len="+len+", counts[len]="+counts[len]);
|
jpayne@68
|
671 // System.err.println("pos="+pos+", counts[pos]="+counts[pos]);
|
jpayne@68
|
672 expanded[pos]=expanded[len];//move the last element to the empty slot
|
jpayne@68
|
673 expanded[len]=0;
|
jpayne@68
|
674
|
jpayne@68
|
675 // System.err.println(len+", "+pos+": "+Arrays.toString(counts));
|
jpayne@68
|
676 }
|
jpayne@68
|
677
|
jpayne@68
|
678 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts));
|
jpayne@68
|
679
|
jpayne@68
|
680 return rounds;
|
jpayne@68
|
681 }
|
jpayne@68
|
682
|
jpayne@68
|
683 /*--------------------------------------------------------------*/
|
jpayne@68
|
684 /*---------------- Inner Classes ----------------*/
|
jpayne@68
|
685 /*--------------------------------------------------------------*/
|
jpayne@68
|
686
|
jpayne@68
|
687 /** This class is static to prevent accidental writing to shared variables.
|
jpayne@68
|
688 * It is safe to remove the static modifier. */
|
jpayne@68
|
689 private class ProcessThread extends Thread {
|
jpayne@68
|
690
|
jpayne@68
|
691 //Constructor
|
jpayne@68
|
692 ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_, final int size){
|
jpayne@68
|
693 cris=cris_;
|
jpayne@68
|
694 ros=ros_;
|
jpayne@68
|
695 tid=tid_;
|
jpayne@68
|
696 localHeap=new SketchHeap(size, 0, true);
|
jpayne@68
|
697 }
|
jpayne@68
|
698
|
jpayne@68
|
699 //Called by start()
|
jpayne@68
|
700 @Override
|
jpayne@68
|
701 public void run(){
|
jpayne@68
|
702 //Do anything necessary prior to processing
|
jpayne@68
|
703
|
jpayne@68
|
704 //Process the reads
|
jpayne@68
|
705 processInner();
|
jpayne@68
|
706
|
jpayne@68
|
707 //Do anything necessary after processing
|
jpayne@68
|
708 dumpHeap();
|
jpayne@68
|
709
|
jpayne@68
|
710 //Indicate successful exit status
|
jpayne@68
|
711 success=true;
|
jpayne@68
|
712 }
|
jpayne@68
|
713
|
jpayne@68
|
714 /** Iterate through the reads */
|
jpayne@68
|
715 void processInner(){
|
jpayne@68
|
716
|
jpayne@68
|
717 //Grab the first ListNum of reads
|
jpayne@68
|
718 ListNum<Read> ln=cris.nextList();
|
jpayne@68
|
719 //Grab the actual read list from the ListNum
|
jpayne@68
|
720 ArrayList<Read> reads=(ln!=null ? ln.list : null);
|
jpayne@68
|
721
|
jpayne@68
|
722 //Check to ensure pairing is as expected
|
jpayne@68
|
723 if(reads!=null && !reads.isEmpty()){
|
jpayne@68
|
724 Read r=reads.get(0);
|
jpayne@68
|
725 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
|
jpayne@68
|
726 }
|
jpayne@68
|
727
|
jpayne@68
|
728 //As long as there is a nonempty read list...
|
jpayne@68
|
729 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
|
jpayne@68
|
730 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
|
jpayne@68
|
731
|
jpayne@68
|
732 //Loop through each read in the list
|
jpayne@68
|
733 for(int idx=0; idx<reads.size(); idx++){
|
jpayne@68
|
734 final Read r1=reads.get(idx);
|
jpayne@68
|
735 final Read r2=r1.mate;
|
jpayne@68
|
736
|
jpayne@68
|
737 //Validate reads in worker threads
|
jpayne@68
|
738 if(!r1.validated()){r1.validate(true);}
|
jpayne@68
|
739 if(r2!=null && !r2.validated()){r2.validate(true);}
|
jpayne@68
|
740
|
jpayne@68
|
741 //Track the initial length for statistics
|
jpayne@68
|
742 final int initialLength1=r1.length();
|
jpayne@68
|
743 final int initialLength2=r1.mateLength();
|
jpayne@68
|
744
|
jpayne@68
|
745 //Increment counters
|
jpayne@68
|
746 readsProcessedT+=r1.pairCount();
|
jpayne@68
|
747 basesProcessedT+=initialLength1+initialLength2;
|
jpayne@68
|
748
|
jpayne@68
|
749 //Reads are processed in this block.
|
jpayne@68
|
750 processReadPair(r1, r2);
|
jpayne@68
|
751 }
|
jpayne@68
|
752
|
jpayne@68
|
753 if(ros!=null){
|
jpayne@68
|
754 for(Read r1 : reads){
|
jpayne@68
|
755 readsOutT+=r1.pairCount();
|
jpayne@68
|
756 basesOutT+=r1.pairLength();
|
jpayne@68
|
757 }
|
jpayne@68
|
758
|
jpayne@68
|
759 //Output reads to the output stream
|
jpayne@68
|
760 if(ros!=null){ros.add(reads, ln.id);}
|
jpayne@68
|
761 }
|
jpayne@68
|
762
|
jpayne@68
|
763 //Notify the input stream that the list was used
|
jpayne@68
|
764 cris.returnList(ln);
|
jpayne@68
|
765 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
|
jpayne@68
|
766
|
jpayne@68
|
767 //Fetch a new list
|
jpayne@68
|
768 ln=cris.nextList();
|
jpayne@68
|
769 reads=(ln!=null ? ln.list : null);
|
jpayne@68
|
770 }
|
jpayne@68
|
771
|
jpayne@68
|
772 //Notify the input stream that the final list was used
|
jpayne@68
|
773 if(ln!=null){
|
jpayne@68
|
774 if(ln.list!=null){ln.list.clear();}
|
jpayne@68
|
775 cris.returnList(ln.id, true);
|
jpayne@68
|
776 }
|
jpayne@68
|
777 }
|
jpayne@68
|
778
|
jpayne@68
|
779 /**
|
jpayne@68
|
780 * Process a read or a read pair.
|
jpayne@68
|
781 * @param r1 Read 1
|
jpayne@68
|
782 * @param r2 Read 2 (may be null)
|
jpayne@68
|
783 */
|
jpayne@68
|
784 void processReadPair(final Read r1, final Read r2){
|
jpayne@68
|
785 processReadNucleotide(r1);
|
jpayne@68
|
786 if(r2!=null){processReadNucleotide(r2);}
|
jpayne@68
|
787 }
|
jpayne@68
|
788
|
jpayne@68
|
789 void processReadNucleotide(final Read r){
|
jpayne@68
|
790 final byte[] bases=r.bases;
|
jpayne@68
|
791 final byte[] quals=r.quality;
|
jpayne@68
|
792 long kmer=0;
|
jpayne@68
|
793 long rkmer=0;
|
jpayne@68
|
794 int len=0;
|
jpayne@68
|
795 assert(!r.aminoacid());
|
jpayne@68
|
796
|
jpayne@68
|
797 final long min=minHashValue;
|
jpayne@68
|
798 localHeap.genomeSizeBases+=r.length();
|
jpayne@68
|
799 localHeap.genomeSequences++;
|
jpayne@68
|
800
|
jpayne@68
|
801 if(quals==null || (minProb<=0 && minQual<2)){
|
jpayne@68
|
802 for(int i=0; i<bases.length; i++){
|
jpayne@68
|
803 byte b=bases[i];
|
jpayne@68
|
804 long x=AminoAcid.baseToNumber[b];
|
jpayne@68
|
805 long x2=AminoAcid.baseToComplementNumber[b];
|
jpayne@68
|
806
|
jpayne@68
|
807 kmer=((kmer<<2)|x)&mask;
|
jpayne@68
|
808 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
|
jpayne@68
|
809
|
jpayne@68
|
810 if(x<0){len=0; rkmer=0;}else{len++;}
|
jpayne@68
|
811 if(len>=k){
|
jpayne@68
|
812 localHeap.genomeSizeKmers++;
|
jpayne@68
|
813 final long hashcode=hash(kmer, rkmer);
|
jpayne@68
|
814 if(hashcode>min){localHeap.add(hashcode);}
|
jpayne@68
|
815 }
|
jpayne@68
|
816 }
|
jpayne@68
|
817 }else{
|
jpayne@68
|
818 float prob=1;
|
jpayne@68
|
819 for(int i=0; i<bases.length; i++){
|
jpayne@68
|
820 final byte b=bases[i];
|
jpayne@68
|
821 final long x=AminoAcid.baseToNumber[b];
|
jpayne@68
|
822 final long x2=AminoAcid.baseToComplementNumber[b];
|
jpayne@68
|
823
|
jpayne@68
|
824 {//Quality-related stuff
|
jpayne@68
|
825 final byte q=quals[i];
|
jpayne@68
|
826 assert(q>=0) : Arrays.toString(quals)+"\n"+minProb+", "+minQual;
|
jpayne@68
|
827 prob=prob*align2.QualityTools.PROB_CORRECT[q];
|
jpayne@68
|
828 if(len>k){
|
jpayne@68
|
829 byte oldq=quals[i-k];
|
jpayne@68
|
830 prob=prob*align2.QualityTools.PROB_CORRECT_INVERSE[oldq];
|
jpayne@68
|
831 }
|
jpayne@68
|
832 if(x<0 || q<minQual){
|
jpayne@68
|
833 len=0;
|
jpayne@68
|
834 kmer=rkmer=0;
|
jpayne@68
|
835 prob=1;
|
jpayne@68
|
836 }else{
|
jpayne@68
|
837 len++;
|
jpayne@68
|
838 }
|
jpayne@68
|
839 }
|
jpayne@68
|
840
|
jpayne@68
|
841 kmer=((kmer<<2)|x)&mask;
|
jpayne@68
|
842 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
|
jpayne@68
|
843
|
jpayne@68
|
844 if(len>=k && prob>=minProb){
|
jpayne@68
|
845 localHeap.genomeSizeKmers++;
|
jpayne@68
|
846 localHeap.probSum+=prob;
|
jpayne@68
|
847 final long hashcode=hash(kmer, rkmer);
|
jpayne@68
|
848 if(hashcode>min){localHeap.checkAndAdd(hashcode);}
|
jpayne@68
|
849 }
|
jpayne@68
|
850 }
|
jpayne@68
|
851 }
|
jpayne@68
|
852 }
|
jpayne@68
|
853
|
jpayne@68
|
854 private void dumpHeap(){
|
jpayne@68
|
855 synchronized(sharedHeap){
|
jpayne@68
|
856 sharedHeap.add(localHeap);
|
jpayne@68
|
857 }
|
jpayne@68
|
858 }
|
jpayne@68
|
859
|
jpayne@68
|
860 /** Number of reads processed by this thread */
|
jpayne@68
|
861 protected long readsProcessedT=0;
|
jpayne@68
|
862 /** Number of bases processed by this thread */
|
jpayne@68
|
863 protected long basesProcessedT=0;
|
jpayne@68
|
864
|
jpayne@68
|
865 /** Number of reads retained by this thread */
|
jpayne@68
|
866 protected long readsOutT=0;
|
jpayne@68
|
867 /** Number of bases retained by this thread */
|
jpayne@68
|
868 protected long basesOutT=0;
|
jpayne@68
|
869
|
jpayne@68
|
870 /** True only if this thread has completed successfully */
|
jpayne@68
|
871 boolean success=false;
|
jpayne@68
|
872
|
jpayne@68
|
873 /** Shared input stream */
|
jpayne@68
|
874 private final ConcurrentReadInputStream cris;
|
jpayne@68
|
875 /** Shared output stream */
|
jpayne@68
|
876 private final ConcurrentReadOutputStream ros;
|
jpayne@68
|
877 /** Thread ID */
|
jpayne@68
|
878 final int tid;
|
jpayne@68
|
879
|
jpayne@68
|
880 final SketchHeap localHeap;
|
jpayne@68
|
881 }
|
jpayne@68
|
882
|
jpayne@68
|
883 /*--------------------------------------------------------------*/
|
jpayne@68
|
884 /*---------------- Fields ----------------*/
|
jpayne@68
|
885 /*--------------------------------------------------------------*/
|
jpayne@68
|
886
|
jpayne@68
|
887 /** Primary input file path */
|
jpayne@68
|
888 private String in1=null;
|
jpayne@68
|
889 /** Secondary input file path */
|
jpayne@68
|
890 private String in2=null;
|
jpayne@68
|
891
|
jpayne@68
|
892 private String qfin1=null;
|
jpayne@68
|
893 private String qfin2=null;
|
jpayne@68
|
894
|
jpayne@68
|
895 /** Primary output file path */
|
jpayne@68
|
896 private String out1=null;
|
jpayne@68
|
897 /** Secondary output file path */
|
jpayne@68
|
898 private String out2=null;
|
jpayne@68
|
899
|
jpayne@68
|
900 private String qfout1=null;
|
jpayne@68
|
901 private String qfout2=null;
|
jpayne@68
|
902
|
jpayne@68
|
903 /** Override input file extension */
|
jpayne@68
|
904 private String extin=null;
|
jpayne@68
|
905 /** Override output file extension */
|
jpayne@68
|
906 private String extout=null;
|
jpayne@68
|
907
|
jpayne@68
|
908 /*--------------------------------------------------------------*/
|
jpayne@68
|
909
|
jpayne@68
|
910 /** Number of reads processed */
|
jpayne@68
|
911 protected long readsProcessed=0;
|
jpayne@68
|
912 /** Number of bases processed */
|
jpayne@68
|
913 protected long basesProcessed=0;
|
jpayne@68
|
914
|
jpayne@68
|
915 /** Number of reads retained */
|
jpayne@68
|
916 protected long readsOut=0;
|
jpayne@68
|
917 /** Number of bases retained */
|
jpayne@68
|
918 protected long basesOut=0;
|
jpayne@68
|
919
|
jpayne@68
|
920 /** Quit after processing this many input reads; -1 means no limit */
|
jpayne@68
|
921 private long maxReads=-1;
|
jpayne@68
|
922
|
jpayne@68
|
923 private boolean paired=false;
|
jpayne@68
|
924 private int trials=25;
|
jpayne@68
|
925 private long seed=-1;
|
jpayne@68
|
926 private int maxExpandedLength=50000000;
|
jpayne@68
|
927
|
jpayne@68
|
928 /*--------------------------------------------------------------*/
|
jpayne@68
|
929 /*---------------- Final Fields ----------------*/
|
jpayne@68
|
930 /*--------------------------------------------------------------*/
|
jpayne@68
|
931
|
jpayne@68
|
932 /** Primary input file */
|
jpayne@68
|
933 private final FileFormat ffin1;
|
jpayne@68
|
934 /** Secondary input file */
|
jpayne@68
|
935 private final FileFormat ffin2;
|
jpayne@68
|
936
|
jpayne@68
|
937 /** Primary output file */
|
jpayne@68
|
938 private final FileFormat ffout1;
|
jpayne@68
|
939 /** Secondary output file */
|
jpayne@68
|
940 private final FileFormat ffout2;
|
jpayne@68
|
941
|
jpayne@68
|
942 private final SketchHeap sharedHeap;
|
jpayne@68
|
943 private final int heapSize;
|
jpayne@68
|
944 private final long targetKmers;
|
jpayne@68
|
945 private final int minCount;
|
jpayne@68
|
946
|
jpayne@68
|
947 final int shift;
|
jpayne@68
|
948 final int shift2;
|
jpayne@68
|
949 final long mask;
|
jpayne@68
|
950
|
jpayne@68
|
951 final float minProb;
|
jpayne@68
|
952 final byte minQual;
|
jpayne@68
|
953
|
jpayne@68
|
954 /*--------------------------------------------------------------*/
|
jpayne@68
|
955 /*---------------- Common Fields ----------------*/
|
jpayne@68
|
956 /*--------------------------------------------------------------*/
|
jpayne@68
|
957
|
jpayne@68
|
958 /** Print status messages to this output stream */
|
jpayne@68
|
959 private PrintStream outstream=System.err;
|
jpayne@68
|
960 /** Print verbose messages */
|
jpayne@68
|
961 public static boolean verbose=false;
|
jpayne@68
|
962 /** True if an error was encountered */
|
jpayne@68
|
963 public boolean errorState=false;
|
jpayne@68
|
964 /** Overwrite existing output files */
|
jpayne@68
|
965 private boolean overwrite=true;
|
jpayne@68
|
966 /** Append to existing output files */
|
jpayne@68
|
967 private boolean append=false;
|
jpayne@68
|
968 /** Reads are output in input order (not enabled) */
|
jpayne@68
|
969 private boolean ordered=true;
|
jpayne@68
|
970
|
jpayne@68
|
971 }
|