comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/KmerLimit2.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 package sketch;
2
3 import java.io.File;
4 import java.io.PrintStream;
5 import java.util.ArrayList;
6 import java.util.Arrays;
7 import java.util.Locale;
8 import java.util.Random;
9
10 import dna.AminoAcid;
11 import fileIO.ByteFile;
12 import fileIO.FileFormat;
13 import fileIO.ReadWrite;
14 import shared.Parse;
15 import shared.Parser;
16 import shared.PreParser;
17 import shared.ReadStats;
18 import shared.Shared;
19 import shared.Timer;
20 import shared.Tools;
21 import stream.ConcurrentReadInputStream;
22 import stream.ConcurrentReadOutputStream;
23 import stream.FASTQ;
24 import stream.FastaReadInputStream;
25 import stream.Read;
26 import structures.IntMap;
27 import structures.ListNum;
28
29 /**
30 *
31 * @author Brian Bushnell
32 * @date July 30, 2018
33 *
34 */
35 public class KmerLimit2 extends SketchObject {
36
37 /*--------------------------------------------------------------*/
38 /*---------------- Initialization ----------------*/
39 /*--------------------------------------------------------------*/
40
41 /**
42 * Code entrance from the command line.
43 * @param args Command line arguments
44 */
45 public static void main(String[] args){
46 //Start a timer immediately upon code entrance.
47 Timer t=new Timer();
48
49 //Create an instance of this class
50 KmerLimit2 x=new KmerLimit2(args);
51
52 //Run the object
53 x.process(t);
54
55 //Close the print stream if it was redirected
56 Shared.closeStream(x.outstream);
57 }
58
59 /**
60 * Constructor.
61 * @param args Command line arguments
62 */
63 public KmerLimit2(String[] args){
64
65 {//Preparse block for help, config files, and outstream
66 PreParser pp=new PreParser(args, getClass(), false);
67 args=pp.args;
68 outstream=pp.outstream;
69 }
70
71 boolean setInterleaved=false; //Whether interleaved was explicitly set.
72
73 //Set shared static variables
74 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
75 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
76 SketchObject.setKeyFraction(0.1);
77 defaultParams.minEntropy=0;
78 defaultParams.minProb=0.2f;
79
80 boolean setHeapSize=false;
81 int heapSize_=8091;
82 long targetKmers_=0;
83 int k_=32;
84 int minCount_=1;
85
86 //Create a parser object
87 Parser parser=new Parser();
88 parser.overwrite=true;
89
90 //Parse each argument
91 for(int i=0; i<args.length; i++){
92 String arg=args[i];
93
94 //Break arguments into their constituent parts, in the form of "a=b"
95 String[] split=arg.split("=");
96 String a=split[0].toLowerCase();
97 String b=split.length>1 ? split[1] : null;
98 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
99
100 if(a.equals("verbose")){
101 verbose=Parse.parseBoolean(b);
102 }else if(a.equals("ordered")){
103 ordered=Parse.parseBoolean(b);
104 }else if(a.equals("size") || a.equals("heapsize")){
105 heapSize_=Parse.parseIntKMG(b);
106 setHeapSize=true;
107 }else if(a.equals("kmers") || a.equals("target") || a.equals("limit")){
108 targetKmers_=Parse.parseKMG(b);
109 }else if(a.equals("mincount")){
110 minCount_=Parse.parseIntKMG(b);
111 }else if(a.equals("maxexpandedlength") || a.equals("maxlength") || a.equals("maxlen")){
112 maxExpandedLength=Parse.parseIntKMG(b);
113 }else if(a.equals("seed")){
114 seed=Parse.parseKMG(b);
115 }else if(a.equals("trials")){
116 trials=Parse.parseIntKMG(b);
117 }else if(parseSketchFlags(arg, a, b)){
118 parser.parse(arg, a, b);
119 }else if(defaultParams.parse(arg, a, b)){
120 parser.parse(arg, a, b);
121 }else if(a.equals("parse_flag_goes_here")){
122 long fake_variable=Parse.parseKMG(b);
123 //Set a variable here
124 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
125 //do nothing
126 }else{
127 outstream.println("Unknown parameter "+args[i]);
128 assert(false) : "Unknown parameter "+args[i];
129 }
130 }
131
132 if(!setHeapSize && minCount_>1){heapSize_=32000;}
133 heapSize=heapSize_;
134 targetKmers=targetKmers_;
135 k=k_;
136 minCount=minCount_;
137 assert(targetKmers>0) : "Must set a kmer limit.";
138 assert(heapSize>0) : "Heap size must be positive.";
139 assert(k>0 && k<=32) : "0<k<33; k="+k;
140 postParse();
141
142 // if(minCount>1){
143 // Shared.setBufferLen(800);
144 // }
145
146 {//Process parser fields
147 Parser.processQuality();
148
149 maxReads=parser.maxReads;
150
151 overwrite=ReadStats.overwrite=parser.overwrite;
152 append=ReadStats.append=parser.append;
153 setInterleaved=parser.setInterleaved;
154
155 in1=parser.in1;
156 in2=parser.in2;
157 qfin1=parser.qfin1;
158 qfin2=parser.qfin2;
159
160 out1=parser.out1;
161 out2=parser.out2;
162 qfout1=parser.qfout1;
163 qfout2=parser.qfout2;
164
165 extin=parser.extin;
166 extout=parser.extout;
167 }
168
169 //Do input file # replacement
170 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
171 in2=in1.replace("#", "2");
172 in1=in1.replace("#", "1");
173 }
174
175 //Do output file # replacement
176 if(out1!=null && out2==null && out1.indexOf('#')>-1){
177 out2=out1.replace("#", "2");
178 out1=out1.replace("#", "1");
179 }
180
181 //Adjust interleaved detection based on the number of input files
182 if(in2!=null){
183 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
184 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
185 }
186
187 assert(FastaReadInputStream.settingsOK());
188
189 //Ensure there is an input file
190 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
191
192 //Adjust the number of threads for input file reading
193 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
194 ByteFile.FORCE_MODE_BF2=true;
195 }
196
197 //Ensure out2 is not set without out1
198 if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");}
199
200 //Adjust interleaved settings based on number of output files
201 if(!setInterleaved){
202 assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n";
203 if(in2!=null){ //If there are 2 input streams.
204 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
205 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
206 }else{ //There is one input stream.
207 if(out2!=null){
208 FASTQ.FORCE_INTERLEAVED=true;
209 FASTQ.TEST_INTERLEAVED=false;
210 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
211 }
212 }
213 }
214
215 //Ensure output files can be written
216 if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){
217 outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2);
218 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n");
219 }
220
221 //Ensure input files can be read
222 if(!Tools.testInputFiles(false, true, in1, in2)){
223 throw new RuntimeException("\nCan't read some input files.\n");
224 }
225
226 //Ensure that no file was specified multiple times
227 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){
228 throw new RuntimeException("\nSome file names were specified multiple times.\n");
229 }
230
231 //Create output FileFormat objects
232 ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
233 ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
234
235 //Create input FileFormat objects
236 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
237 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
238
239 minProb=defaultParams.minProb;
240 minQual=defaultParams.minQual;
241
242 shift=2*k;
243 shift2=shift-2;
244 mask=(shift>63 ? -1L : ~((-1L)<<shift)); //Conditional allows K=32
245 sharedHeap=new SketchHeap(heapSize, 0, true);
246 }
247
248 /*--------------------------------------------------------------*/
249 /*---------------- Outer Methods ----------------*/
250 /*--------------------------------------------------------------*/
251
252 /** Create read streams and process all data */
253 void process(Timer t){
254
255 //Turn off read validation in the input threads to increase speed
256 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
257 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
258
259 // //Optionally create a read output stream
260 // final ConcurrentReadOutputStream ros;
261 // if(ffout1!=null){
262 // //Select output buffer size based on whether it needs to be ordered
263 // final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
264 //
265 // //Notify user of output mode
266 // if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){
267 // outstream.println("Writing interleaved.");
268 // }
269 //
270 // ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false);
271 // ros.start(); //Start the stream
272 // }else{ros=null;}
273
274 //Reset counters
275 readsProcessed=readsOut=0;
276 basesProcessed=basesOut=0;
277
278 //Process the reads in separate threads
279 spawnThreads0();
280
281 // if(verbose){outstream.println("Finished; closing streams.");}
282
283 //Reset read validation
284 Read.VALIDATE_IN_CONSTRUCTOR=vic;
285
286
287 Sketch sketch=new Sketch(sharedHeap, true, true, null);
288 sketch=capLengthAtCountSum(sketch, maxExpandedLength);
289 final long reads=Tools.max(1, sketch.genomeSequences);
290 final long targetReads=calcTargetReads(sketch, targetKmers, minCount, trials, seed);
291 final double targetRate=Tools.min(1, targetReads/(double)reads);
292 final String targetRateS=String.format(Locale.ROOT, "%.4f%%",targetRate*100);
293
294 //Report timing and results
295 t.stop();
296 outstream.println("Finished counting kmers.");
297 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
298
299 String kstring0=Tools.padKM(sketch.genomeSizeEstimate(minCount), 8);
300 String rstring0=Tools.padKM(targetReads, 8);
301 outstream.println("Unique Kmers: "+kstring0);
302 outstream.println("Target Reads: "+rstring0+"\t"+targetRateS);
303
304 // outstream.println("Reads: \t"+reads);
305 // outstream.println("Unique Kmers: \t"+sketch.genomeSizeEstimate(minCount));
306 // outstream.println("Target Reads: \t"+targetReads);
307 // outstream.println("Sample Rate: \t"+targetRateS);
308 // outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
309
310 t.start();
311 outstream.println("\nSubsampling reads.");
312
313 // String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8);
314 // outstream.println("Unique Kmers Out: "+kstring);
315
316
317 // ArrayList<String> args=new ArrayList<String>();
318 // args.add("in="+in1);
319 // if(in2!=null){args.add("in2="+in2);}
320 // args.add("out="+out1);
321 // if(out2!=null){args.add("out2="+out2);}
322 // args.add("ordered="+ordered);
323 // args.add("ow="+(overwrite ? "t" : "f"));
324 // if(targetRate<1){args.add("samplerate="+targetRateS);}
325 // args.add("loglogout");
326 // args.add("loglogk="+k);
327 // args.add("loglogminprob="+minProb);
328 // BBDukF.main(args.toArray(new String[0]));
329
330 // Sketch sk=new Sketch(sharedHeap, true, true, null);
331 // outstream.println(sk.genomeSizeEstimate());
332 spawnThreads2(targetRate);
333 t.stop();
334 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
335
336 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
337 String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8);
338 outstream.println("Unique Kmers Out: "+kstring);
339
340 //Throw an exception of there was an error in a thread
341 if(errorState){
342 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
343 }
344 }
345
346 /** Spawn process threads */
347 private void spawnThreads0(){
348
349 //Create a read input stream
350 final ConcurrentReadInputStream cris;
351 {
352 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
353 cris.start(); //Start the stream
354 if(verbose){outstream.println("Started cris");}
355 }
356 paired=cris.paired();
357 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
358
359 //Determine how many threads may be used
360 final int threads=Tools.min(10, Shared.threads());
361
362 //Fill a list with ProcessThreads
363 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
364 for(int i=0; i<threads; i++){
365 alpt.add(new ProcessThread(cris, null, i, heapSize));
366 }
367
368 //Start the threads
369 for(ProcessThread pt : alpt){
370 pt.start();
371 }
372
373 //Wait for completion of all threads
374 boolean success=true;
375 for(ProcessThread pt : alpt){
376
377 //Wait until this thread has terminated
378 while(pt.getState()!=Thread.State.TERMINATED){
379 try {
380 //Attempt a join operation
381 pt.join();
382 } catch (InterruptedException e) {
383 //Potentially handle this, if it is expected to occur
384 e.printStackTrace();
385 }
386 }
387
388 //Accumulate per-thread statistics
389 readsProcessed+=pt.readsProcessedT;
390 basesProcessed+=pt.basesProcessedT;
391 readsOut+=pt.readsOutT;
392 basesOut+=pt.basesOutT;
393 success&=pt.success;
394 }
395
396 //Track whether any threads failed
397 if(!success){errorState=true;}
398
399 //Do anything necessary after processing
400
401 //Close the read streams
402 errorState|=ReadWrite.closeStreams(cris);
403
404 }
405
406 /** Spawn process threads */
407 private void spawnThreads2(double rate){
408
409 //Create a read input stream
410 final ConcurrentReadInputStream cris;
411 {
412 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
413 cris.setSampleRate((float)rate, seed);
414 cris.start(); //Start the stream
415 if(verbose){outstream.println("Started cris");}
416 }
417 // paired=cris.paired();
418 // if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
419
420 //Optionally create a read output stream
421 final ConcurrentReadOutputStream ros;
422 if(ffout1!=null){
423 //Select output buffer size based on whether it needs to be ordered
424 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
425
426 //Notify user of output mode
427 if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){
428 outstream.println("Writing interleaved.");
429 }
430
431 ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false);
432 ros.start(); //Start the stream
433 }else{ros=null;}
434
435 //Determine how many threads may be used
436 final int threads=Tools.min(10, Shared.threads());
437
438 sharedHeap.clear();
439 // readsProcessed=0;
440 // basesProcessed=0;
441 readsOut=0;
442 basesOut=0;
443
444 //Fill a list with ProcessThreads
445 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
446 for(int i=0; i<threads; i++){
447 alpt.add(new ProcessThread(cris, ros, i, heapSize));
448 }
449
450 //Start the threads
451 for(ProcessThread pt : alpt){
452 pt.start();
453 }
454
455 //Wait for completion of all threads
456 boolean success=true;
457 for(ProcessThread pt : alpt){
458
459 //Wait until this thread has terminated
460 while(pt.getState()!=Thread.State.TERMINATED){
461 try {
462 //Attempt a join operation
463 pt.join();
464 } catch (InterruptedException e) {
465 //Potentially handle this, if it is expected to occur
466 e.printStackTrace();
467 }
468 }
469
470 //Accumulate per-thread statistics
471 // readsProcessed+=pt.readsProcessedT;
472 // basesProcessed+=pt.basesProcessedT;
473 readsOut+=pt.readsOutT;
474 basesOut+=pt.basesOutT;
475 success&=pt.success;
476 }
477
478 //Track whether any threads failed
479 if(!success){errorState=true;}
480
481 //Do anything necessary after processing
482
483 //Write anything that was accumulated by ReadStats
484 errorState|=ReadStats.writeAll();
485 //Close the read streams
486 errorState|=ReadWrite.closeStreams(cris, ros);
487
488 }
489
490 /*--------------------------------------------------------------*/
491 /*---------------- Inner Methods ----------------*/
492 /*--------------------------------------------------------------*/
493
494 public static Sketch capLengthAtCountSum(Sketch sketch0, int max) {
495 int len=0;
496 long sum=0;
497 for(; len<sketch0.keyCounts.length; len++){
498 sum=sum+sketch0.keyCounts[len];
499 if(sum>max){break;}
500 }
501 if(len>=sketch0.length()){return sketch0;}
502
503 long[] keys=Arrays.copyOf(sketch0.keys, len);
504 int[] counts=Arrays.copyOf(sketch0.keyCounts, len);
505
506 // long[] array_, int[] counts_, int taxID_, long imgID_, long gSizeBases_, long gSizeKmers_, long gSequences_, double probCorrect_,
507 // String taxName_, String name0_, String fname_, ArrayList<String> meta_
508
509 Sketch sk=new Sketch(keys, counts, null, null, null, -1, -1,
510 sketch0.genomeSizeBases, sketch0.genomeSizeKmers, sketch0.genomeSequences, sketch0.probCorrect,
511 null, null, null, null);
512
513 return sk;
514 }
515
516 public static long calcTargetReads(Sketch sketch, long targetKmers, int minCount, int trials, long seed){
517 final int[] counts0=sketch.keyCounts;
518 final int[] counts=Arrays.copyOf(counts0, counts0.length);
519 final long size=sketch.genomeSizeEstimate(minCount);
520 final long reads=sketch.genomeSequences;
521 final double targetKmerFraction=targetKmers/(double)size;
522 if(targetKmerFraction>=1){return reads;}
523
524 final int targetKeys=(int)(targetKmerFraction*counts.length);
525 final long countSum=Tools.sum(counts0);
526 assert(countSum<Shared.MAX_ARRAY_LEN) : countSum;
527 // System.err.println("countsum: "+countSum);
528
529 final IntMap map=new IntMap(0, counts0.length);
530 final int[] expanded=new int[(int)countSum];
531
532 long roundSum=0;
533 final Random randy=Shared.threadLocalRandom(seed);
534 for(int i=0; i<trials; i++){
535 Tools.fill(counts, counts0);
536 // long rounds=reduceRounds(counts0, counts, minCount, targetKeys, randy);
537 long rounds=reduceRoundsIM(counts0, expanded, minCount, targetKeys, randy, map);
538 roundSum+=rounds;
539 }
540 double avgRounds=roundSum/(double)trials;
541 // System.err.println("avgRounds: "+avgRounds);
542 double targetCountFraction=1-(avgRounds/countSum);
543 // System.err.println("targetFraction: "+targetCountFraction);
544 return (long)(targetCountFraction*reads);
545 }
546
547 // public static int reduceRoundsOld(final int[] counts, final int minCount, final int targetKeys, final Random randy){
548 // assert(minCount>=0) : minCount;
549 // int rounds=0;
550 // int valid=0;
551 // for(int x : counts){
552 // if(x>=minCount){valid++;}
553 // }
554 //
555 // int len=counts.length;
556 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)+", "+Arrays.toString(counts));
557 // for(; valid>targetKeys; rounds++){
558 // int pos=randy.nextInt(len);
559 //// assert(counts[pos]>0) : pos+"/"+len+": "+targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Arrays.toString(counts);
560 // if(counts[pos]==minCount){valid--;}
561 // counts[pos]--;
562 // if(counts[pos]==0){
563 // len--;//shrink the array
564 // System.err.println("len="+len+", counts[len]="+counts[len]);
565 // System.err.println("pos="+pos+", counts[pos]="+counts[pos]);
566 // counts[pos]=counts[len];//move the last element to the empty slot
567 // counts[len]=0;
568 // if(pos!=len && len>0){
569 // assert(counts[pos]>0) : pos+"/"+len+": "+targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Arrays.toString(counts);
570 // }
571 // }
572 // System.err.println(len+", "+pos+": "+Arrays.toString(counts));
573 // }
574 //
575 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts));
576 //
577 // return rounds;
578 // }
579
580 //This can be done faster with bins.
581 //Each bin contains all kmers with count x. When a bin is hit, one kmer moves to the next bin lower.
582 //Alternately, expand the array into one physical kmer per count. Store the current counts in an IntMap. Remove key each time.
583 public static long reduceRounds(final int[] counts0, final int[] counts, final int minCount, final int targetKeys, final Random randy){
584 assert(minCount>=0) : minCount;
585 long rounds=0;
586 int valid=0;
587 for(int x : counts){
588 if(x>=minCount){valid++;}
589 }
590
591 int len=counts.length;
592 final long sum0=Tools.sum(counts);
593 long sum=sum0;
594 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)+", "+Arrays.toString(counts));
595 for(; valid>targetKeys; rounds++){
596 long posNum=(Long.MAX_VALUE&randy.nextLong())%sum;
597 long sum2=0;
598 int pos=0;
599
600 for(int i=0; i<counts.length; i++){
601 int x=counts[i];
602 if(x>0){
603 sum2+=x;
604 if(sum2>=posNum){
605 pos=i;
606 break;
607 }
608 }
609 }
610
611 // for(int i=0; i<counts0.length; i++){
612 // int x=counts0[i];
613 // if(x>0){
614 // sum2+=x;
615 // if(sum2>=posNum){
616 // pos=i;
617 // break;
618 // }
619 // }
620 // }
621
622 sum--;
623
624 assert(counts[pos]>0) : pos+"/"+len+": "+targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Arrays.toString(counts);
625 if(counts[pos]==minCount){valid--;}
626 counts[pos]--;
627 if(counts[pos]==0){
628 len--;//shrink the array
629 }
630 // System.err.println(len+", "+pos+": "+Arrays.toString(counts));
631 }
632
633 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts));
634
635 return rounds;
636 }
637
638 //This can be done faster with bins.
639 //Each bin contains all kmers with count x. When a bin is hit, one kmer moves to the next bin lower.
640 //Alternately, expand the array into one physical kmer per count. Store the current counts in an IntMap. Remove key each time.
641 public static long reduceRoundsIM(final int[] counts0, final int[] expanded, final int minCount, final int targetKeys, final Random randy, final IntMap map){
642 assert(minCount>=0) : minCount;
643 long rounds=0;
644 int valid=0;
645 map.clear();
646 for(int i=0, k=0; i<counts0.length; i++){
647 int x=counts0[i];
648 // counts[i]=counts0[i];
649 if(x>=minCount){valid++;}
650 map.put(i, x);
651 for(int j=0; j<x; j++, k++){
652 expanded[k]=i;
653 }
654 }
655 assert(expanded.length==Tools.sum(counts0));
656
657 int len=expanded.length;
658 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)+", "+Arrays.toString(counts));
659 for(; valid>targetKeys; rounds++){
660 final int pos=randy.nextInt(len);
661 final int key=expanded[pos];
662 final int x=map.get(key);
663 assert(x>0);
664
665
666 if(x==minCount){valid--;}
667 map.put(key, x-1);
668
669 len--;//shrink the array
670 // System.err.println("len="+len+", counts[len]="+counts[len]);
671 // System.err.println("pos="+pos+", counts[pos]="+counts[pos]);
672 expanded[pos]=expanded[len];//move the last element to the empty slot
673 expanded[len]=0;
674
675 // System.err.println(len+", "+pos+": "+Arrays.toString(counts));
676 }
677
678 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts));
679
680 return rounds;
681 }
682
683 /*--------------------------------------------------------------*/
684 /*---------------- Inner Classes ----------------*/
685 /*--------------------------------------------------------------*/
686
687 /** This class is static to prevent accidental writing to shared variables.
688 * It is safe to remove the static modifier. */
689 private class ProcessThread extends Thread {
690
691 //Constructor
692 ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_, final int size){
693 cris=cris_;
694 ros=ros_;
695 tid=tid_;
696 localHeap=new SketchHeap(size, 0, true);
697 }
698
699 //Called by start()
700 @Override
701 public void run(){
702 //Do anything necessary prior to processing
703
704 //Process the reads
705 processInner();
706
707 //Do anything necessary after processing
708 dumpHeap();
709
710 //Indicate successful exit status
711 success=true;
712 }
713
714 /** Iterate through the reads */
715 void processInner(){
716
717 //Grab the first ListNum of reads
718 ListNum<Read> ln=cris.nextList();
719 //Grab the actual read list from the ListNum
720 ArrayList<Read> reads=(ln!=null ? ln.list : null);
721
722 //Check to ensure pairing is as expected
723 if(reads!=null && !reads.isEmpty()){
724 Read r=reads.get(0);
725 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
726 }
727
728 //As long as there is a nonempty read list...
729 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
730 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
731
732 //Loop through each read in the list
733 for(int idx=0; idx<reads.size(); idx++){
734 final Read r1=reads.get(idx);
735 final Read r2=r1.mate;
736
737 //Validate reads in worker threads
738 if(!r1.validated()){r1.validate(true);}
739 if(r2!=null && !r2.validated()){r2.validate(true);}
740
741 //Track the initial length for statistics
742 final int initialLength1=r1.length();
743 final int initialLength2=r1.mateLength();
744
745 //Increment counters
746 readsProcessedT+=r1.pairCount();
747 basesProcessedT+=initialLength1+initialLength2;
748
749 //Reads are processed in this block.
750 processReadPair(r1, r2);
751 }
752
753 if(ros!=null){
754 for(Read r1 : reads){
755 readsOutT+=r1.pairCount();
756 basesOutT+=r1.pairLength();
757 }
758
759 //Output reads to the output stream
760 if(ros!=null){ros.add(reads, ln.id);}
761 }
762
763 //Notify the input stream that the list was used
764 cris.returnList(ln);
765 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
766
767 //Fetch a new list
768 ln=cris.nextList();
769 reads=(ln!=null ? ln.list : null);
770 }
771
772 //Notify the input stream that the final list was used
773 if(ln!=null){
774 if(ln.list!=null){ln.list.clear();}
775 cris.returnList(ln.id, true);
776 }
777 }
778
779 /**
780 * Process a read or a read pair.
781 * @param r1 Read 1
782 * @param r2 Read 2 (may be null)
783 */
784 void processReadPair(final Read r1, final Read r2){
785 processReadNucleotide(r1);
786 if(r2!=null){processReadNucleotide(r2);}
787 }
788
789 void processReadNucleotide(final Read r){
790 final byte[] bases=r.bases;
791 final byte[] quals=r.quality;
792 long kmer=0;
793 long rkmer=0;
794 int len=0;
795 assert(!r.aminoacid());
796
797 final long min=minHashValue;
798 localHeap.genomeSizeBases+=r.length();
799 localHeap.genomeSequences++;
800
801 if(quals==null || (minProb<=0 && minQual<2)){
802 for(int i=0; i<bases.length; i++){
803 byte b=bases[i];
804 long x=AminoAcid.baseToNumber[b];
805 long x2=AminoAcid.baseToComplementNumber[b];
806
807 kmer=((kmer<<2)|x)&mask;
808 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
809
810 if(x<0){len=0; rkmer=0;}else{len++;}
811 if(len>=k){
812 localHeap.genomeSizeKmers++;
813 final long hashcode=hash(kmer, rkmer);
814 if(hashcode>min){localHeap.add(hashcode);}
815 }
816 }
817 }else{
818 float prob=1;
819 for(int i=0; i<bases.length; i++){
820 final byte b=bases[i];
821 final long x=AminoAcid.baseToNumber[b];
822 final long x2=AminoAcid.baseToComplementNumber[b];
823
824 {//Quality-related stuff
825 final byte q=quals[i];
826 assert(q>=0) : Arrays.toString(quals)+"\n"+minProb+", "+minQual;
827 prob=prob*align2.QualityTools.PROB_CORRECT[q];
828 if(len>k){
829 byte oldq=quals[i-k];
830 prob=prob*align2.QualityTools.PROB_CORRECT_INVERSE[oldq];
831 }
832 if(x<0 || q<minQual){
833 len=0;
834 kmer=rkmer=0;
835 prob=1;
836 }else{
837 len++;
838 }
839 }
840
841 kmer=((kmer<<2)|x)&mask;
842 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
843
844 if(len>=k && prob>=minProb){
845 localHeap.genomeSizeKmers++;
846 localHeap.probSum+=prob;
847 final long hashcode=hash(kmer, rkmer);
848 if(hashcode>min){localHeap.checkAndAdd(hashcode);}
849 }
850 }
851 }
852 }
853
854 private void dumpHeap(){
855 synchronized(sharedHeap){
856 sharedHeap.add(localHeap);
857 }
858 }
859
860 /** Number of reads processed by this thread */
861 protected long readsProcessedT=0;
862 /** Number of bases processed by this thread */
863 protected long basesProcessedT=0;
864
865 /** Number of reads retained by this thread */
866 protected long readsOutT=0;
867 /** Number of bases retained by this thread */
868 protected long basesOutT=0;
869
870 /** True only if this thread has completed successfully */
871 boolean success=false;
872
873 /** Shared input stream */
874 private final ConcurrentReadInputStream cris;
875 /** Shared output stream */
876 private final ConcurrentReadOutputStream ros;
877 /** Thread ID */
878 final int tid;
879
880 final SketchHeap localHeap;
881 }
882
883 /*--------------------------------------------------------------*/
884 /*---------------- Fields ----------------*/
885 /*--------------------------------------------------------------*/
886
887 /** Primary input file path */
888 private String in1=null;
889 /** Secondary input file path */
890 private String in2=null;
891
892 private String qfin1=null;
893 private String qfin2=null;
894
895 /** Primary output file path */
896 private String out1=null;
897 /** Secondary output file path */
898 private String out2=null;
899
900 private String qfout1=null;
901 private String qfout2=null;
902
903 /** Override input file extension */
904 private String extin=null;
905 /** Override output file extension */
906 private String extout=null;
907
908 /*--------------------------------------------------------------*/
909
910 /** Number of reads processed */
911 protected long readsProcessed=0;
912 /** Number of bases processed */
913 protected long basesProcessed=0;
914
915 /** Number of reads retained */
916 protected long readsOut=0;
917 /** Number of bases retained */
918 protected long basesOut=0;
919
920 /** Quit after processing this many input reads; -1 means no limit */
921 private long maxReads=-1;
922
923 private boolean paired=false;
924 private int trials=25;
925 private long seed=-1;
926 private int maxExpandedLength=50000000;
927
928 /*--------------------------------------------------------------*/
929 /*---------------- Final Fields ----------------*/
930 /*--------------------------------------------------------------*/
931
932 /** Primary input file */
933 private final FileFormat ffin1;
934 /** Secondary input file */
935 private final FileFormat ffin2;
936
937 /** Primary output file */
938 private final FileFormat ffout1;
939 /** Secondary output file */
940 private final FileFormat ffout2;
941
942 private final SketchHeap sharedHeap;
943 private final int heapSize;
944 private final long targetKmers;
945 private final int minCount;
946
947 final int shift;
948 final int shift2;
949 final long mask;
950
951 final float minProb;
952 final byte minQual;
953
954 /*--------------------------------------------------------------*/
955 /*---------------- Common Fields ----------------*/
956 /*--------------------------------------------------------------*/
957
958 /** Print status messages to this output stream */
959 private PrintStream outstream=System.err;
960 /** Print verbose messages */
961 public static boolean verbose=false;
962 /** True if an error was encountered */
963 public boolean errorState=false;
964 /** Overwrite existing output files */
965 private boolean overwrite=true;
966 /** Append to existing output files */
967 private boolean append=false;
968 /** Reads are output in input order (not enabled) */
969 private boolean ordered=true;
970
971 }