comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/KmerLimit.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 package sketch;
2
3 import java.io.File;
4 import java.io.PrintStream;
5 import java.util.ArrayList;
6 import java.util.Arrays;
7
8 import dna.AminoAcid;
9 import fileIO.ByteFile;
10 import fileIO.FileFormat;
11 import fileIO.ReadWrite;
12 import shared.Parse;
13 import shared.Parser;
14 import shared.PreParser;
15 import shared.ReadStats;
16 import shared.Shared;
17 import shared.Timer;
18 import shared.Tools;
19 import stream.ConcurrentReadInputStream;
20 import stream.ConcurrentReadOutputStream;
21 import stream.FASTQ;
22 import stream.FastaReadInputStream;
23 import stream.Read;
24 import structures.ListNum;
25
26 /**
27 *
28 * @author Brian Bushnell
29 * @date July 25, 2018
30 *
31 */
32 public class KmerLimit extends SketchObject {
33
34 /*--------------------------------------------------------------*/
35 /*---------------- Initialization ----------------*/
36 /*--------------------------------------------------------------*/
37
38 /**
39 * Code entrance from the command line.
40 * @param args Command line arguments
41 */
42 public static void main(String[] args){
43 //Start a timer immediately upon code entrance.
44 Timer t=new Timer();
45
46 //Create an instance of this class
47 KmerLimit x=new KmerLimit(args);
48
49 //Run the object
50 x.process(t);
51
52 //Close the print stream if it was redirected
53 Shared.closeStream(x.outstream);
54 }
55
56 /**
57 * Constructor.
58 * @param args Command line arguments
59 */
60 public KmerLimit(String[] args){
61
62 {//Preparse block for help, config files, and outstream
63 PreParser pp=new PreParser(args, getClass(), false);
64 args=pp.args;
65 outstream=pp.outstream;
66 }
67
68 boolean setInterleaved=false; //Whether interleaved was explicitly set.
69
70 //Set shared static variables
71 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
72 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
73 SketchObject.setKeyFraction(0.1);
74 defaultParams.minEntropy=0;
75 defaultParams.minProb=0.2f;
76
77 boolean setHeapSize=false;
78 int heapSize_=4095;
79 long targetKmers_=0;
80 int k_=32;
81 int minCount_=1;
82
83 //Create a parser object
84 Parser parser=new Parser();
85 parser.overwrite=true;
86
87 //Parse each argument
88 for(int i=0; i<args.length; i++){
89 String arg=args[i];
90
91 //Break arguments into their constituent parts, in the form of "a=b"
92 String[] split=arg.split("=");
93 String a=split[0].toLowerCase();
94 String b=split.length>1 ? split[1] : null;
95 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
96
97 if(a.equals("verbose")){
98 verbose=Parse.parseBoolean(b);
99 }else if(a.equals("ordered")){
100 ordered=Parse.parseBoolean(b);
101 }else if(a.equals("shuffle")){
102 shuffle=Parse.parseBoolean(b);
103 }else if(a.equals("size") || a.equals("heapsize")){
104 heapSize_=Parse.parseIntKMG(b);
105 setHeapSize=true;
106 }else if(a.equals("kmers") || a.equals("target") || a.equals("limit")){
107 targetKmers_=Parse.parseKMG(b);
108 }else if(a.equals("mincount")){
109 minCount_=Parse.parseIntKMG(b);
110 }else if(parseSketchFlags(arg, a, b)){
111 parser.parse(arg, a, b);
112 }else if(defaultParams.parse(arg, a, b)){
113 parser.parse(arg, a, b);
114 }else if(a.equals("parse_flag_goes_here")){
115 long fake_variable=Parse.parseKMG(b);
116 //Set a variable here
117 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
118 //do nothing
119 }else{
120 outstream.println("Unknown parameter "+args[i]);
121 assert(false) : "Unknown parameter "+args[i];
122 }
123 }
124
125 if(!setHeapSize && minCount_>1){heapSize_=32000;}
126 heapSize=heapSize_;
127 targetKmers=targetKmers_;
128 k=k_;
129 minCount=minCount_;
130 assert(targetKmers>0) : "Must set a kmer limit.";
131 assert(heapSize>0) : "Heap size must be positive.";
132 assert(k>0 && k<=32) : "0<k<33; k="+k;
133 postParse();
134
135 if(minCount>1){
136 Shared.setBufferLen(800);
137 }
138
139 {//Process parser fields
140 Parser.processQuality();
141
142 maxReads=parser.maxReads;
143
144 overwrite=ReadStats.overwrite=parser.overwrite;
145 append=ReadStats.append=parser.append;
146 setInterleaved=parser.setInterleaved;
147
148 in1=parser.in1;
149 in2=parser.in2;
150 qfin1=parser.qfin1;
151 qfin2=parser.qfin2;
152
153 out1=parser.out1;
154 out2=parser.out2;
155 qfout1=parser.qfout1;
156 qfout2=parser.qfout2;
157
158 extin=parser.extin;
159 extout=parser.extout;
160 }
161
162 //Do input file # replacement
163 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
164 in2=in1.replace("#", "2");
165 in1=in1.replace("#", "1");
166 }
167
168 //Do output file # replacement
169 if(out1!=null && out2==null && out1.indexOf('#')>-1){
170 out2=out1.replace("#", "2");
171 out1=out1.replace("#", "1");
172 }
173
174 //Adjust interleaved detection based on the number of input files
175 if(in2!=null){
176 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
177 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
178 }
179
180 assert(FastaReadInputStream.settingsOK());
181
182 //Ensure there is an input file
183 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
184
185 //Adjust the number of threads for input file reading
186 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
187 ByteFile.FORCE_MODE_BF2=true;
188 }
189
190 //Ensure out2 is not set without out1
191 if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");}
192
193 //Adjust interleaved settings based on number of output files
194 if(!setInterleaved){
195 assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n";
196 if(in2!=null){ //If there are 2 input streams.
197 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
198 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
199 }else{ //There is one input stream.
200 if(out2!=null){
201 FASTQ.FORCE_INTERLEAVED=true;
202 FASTQ.TEST_INTERLEAVED=false;
203 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
204 }
205 }
206 }
207
208 //Ensure output files can be written
209 if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){
210 outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2);
211 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n");
212 }
213
214 //Ensure input files can be read
215 if(!Tools.testInputFiles(false, true, in1, in2)){
216 throw new RuntimeException("\nCan't read some input files.\n");
217 }
218
219 //Ensure that no file was specified multiple times
220 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){
221 throw new RuntimeException("\nSome file names were specified multiple times.\n");
222 }
223
224 //Create output FileFormat objects
225 ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
226 ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
227
228 //Create input FileFormat objects
229 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
230 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
231
232 minProb=defaultParams.minProb;
233 minQual=defaultParams.minQual;
234
235 shift=2*k;
236 shift2=shift-2;
237 mask=(shift>63 ? -1L : ~((-1L)<<shift)); //Conditional allows K=32
238 sharedHeap=new SketchHeap(heapSize, 0, minCount>1);
239 }
240
241 /*--------------------------------------------------------------*/
242 /*---------------- Outer Methods ----------------*/
243 /*--------------------------------------------------------------*/
244
245 /** Create read streams and process all data */
246 void process(Timer t){
247
248 //Turn off read validation in the input threads to increase speed
249 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
250 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
251
252 //Create a read input stream
253 final ConcurrentReadInputStream cris;
254 {
255 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
256 cris.start(); //Start the stream
257 if(verbose){outstream.println("Started cris");}
258 }
259 boolean paired=cris.paired();
260 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
261
262 //Optionally create a read output stream
263 final ConcurrentReadOutputStream ros;
264 if(ffout1!=null){
265 //Select output buffer size based on whether it needs to be ordered
266 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
267
268 //Notify user of output mode
269 if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){
270 outstream.println("Writing interleaved.");
271 }
272
273 ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false);
274 ros.start(); //Start the stream
275 }else{ros=null;}
276
277 //Reset counters
278 readsProcessed=readsOut=0;
279 basesProcessed=basesOut=0;
280
281 //Process the reads in separate threads
282 spawnThreads(cris, ros);
283
284 if(verbose){outstream.println("Finished; closing streams.");}
285
286 //Write anything that was accumulated by ReadStats
287 errorState|=ReadStats.writeAll();
288 //Close the read streams
289 errorState|=ReadWrite.closeStreams(cris, ros);
290
291 //Reset read validation
292 Read.VALIDATE_IN_CONSTRUCTOR=vic;
293
294 //Report timing and results
295 t.stop();
296 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
297 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
298 String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8);
299 outstream.println("Unique Kmers Out: "+kstring);
300
301 // Sketch sk=new Sketch(sharedHeap, true, true, null);
302 // outstream.println(sk.genomeSizeEstimate());
303
304 //Throw an exception of there was an error in a thread
305 if(errorState){
306 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
307 }
308 }
309
310 /** Spawn process threads */
311 private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){
312
313 //Do anything necessary prior to processing
314
315 //Determine how many threads may be used
316 final int threads=Tools.min(8, Shared.threads());
317
318 //Fill a list with ProcessThreads
319 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
320 int tSize=heapSize/2;
321 for(int i=0; i<threads; i++){
322 alpt.add(new ProcessThread(cris, ros, i, tSize));
323 }
324
325 //Start the threads
326 for(ProcessThread pt : alpt){
327 pt.start();
328 }
329
330 //Wait for completion of all threads
331 boolean success=true;
332 for(ProcessThread pt : alpt){
333
334 //Wait until this thread has terminated
335 while(pt.getState()!=Thread.State.TERMINATED){
336 try {
337 //Attempt a join operation
338 pt.join();
339 } catch (InterruptedException e) {
340 //Potentially handle this, if it is expected to occur
341 e.printStackTrace();
342 }
343 }
344
345 //Accumulate per-thread statistics
346 readsProcessed+=pt.readsProcessedT;
347 basesProcessed+=pt.basesProcessedT;
348 readsOut+=pt.readsOutT;
349 basesOut+=pt.basesOutT;
350 success&=pt.success;
351 }
352
353 //Track whether any threads failed
354 if(!success){errorState=true;}
355
356 //Do anything necessary after processing
357
358 }
359
360 /*--------------------------------------------------------------*/
361 /*---------------- Inner Methods ----------------*/
362 /*--------------------------------------------------------------*/
363
364 /*--------------------------------------------------------------*/
365 /*---------------- Inner Classes ----------------*/
366 /*--------------------------------------------------------------*/
367
368 /** This class is static to prevent accidental writing to shared variables.
369 * It is safe to remove the static modifier. */
370 private class ProcessThread extends Thread {
371
372 //Constructor
373 ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_, final int size){
374 cris=cris_;
375 ros=ros_;
376 tid=tid_;
377 localHeap=new SketchHeap(size, 0, minCount>1);
378 }
379
380 //Called by start()
381 @Override
382 public void run(){
383 //Do anything necessary prior to processing
384
385 //Process the reads
386 processInner();
387
388 //Do anything necessary after processing
389
390 //Indicate successful exit status
391 success=true;
392 }
393
394 /** Iterate through the reads */
395 void processInner(){
396
397 //Grab the first ListNum of reads
398 ListNum<Read> ln=cris.nextList();
399 //Grab the actual read list from the ListNum
400 ArrayList<Read> reads=(ln!=null ? ln.list : null);
401
402 //Check to ensure pairing is as expected
403 if(reads!=null && !reads.isEmpty()){
404 Read r=reads.get(0);
405 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
406 }
407
408 //As long as there is a nonempty read list...
409 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
410 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
411
412 //Loop through each read in the list
413 for(int idx=0; idx<reads.size(); idx++){
414 final Read r1=reads.get(idx);
415 final Read r2=r1.mate;
416
417 //Validate reads in worker threads
418 if(!r1.validated()){r1.validate(true);}
419 if(r2!=null && !r2.validated()){r2.validate(true);}
420
421 //Track the initial length for statistics
422 final int initialLength1=r1.length();
423 final int initialLength2=r1.mateLength();
424
425 //Increment counters
426 readsProcessedT+=r1.pairCount();
427 basesProcessedT+=initialLength1+initialLength2;
428
429 //Reads are processed in this block.
430 processReadPair(r1, r2);
431 }
432
433 long count=dumpHeap();
434 if(count>=targetKmers){break;}
435
436 for(Read r1 : reads){
437 readsOutT+=r1.pairCount();
438 basesOutT+=r1.pairLength();
439 }
440
441 //Output reads to the output stream
442 if(ros!=null){ros.add(reads, ln.id);}
443
444 //Notify the input stream that the list was used
445 cris.returnList(ln);
446 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
447
448 //Fetch a new list
449 ln=cris.nextList();
450 reads=(ln!=null ? ln.list : null);
451 }
452
453 //Notify the input stream that the final list was used
454 if(ln!=null){
455 if(ln.list!=null){ln.list.clear();}
456 cris.returnList(ln.id, true);
457 }
458 }
459
460 /**
461 * Process a read or a read pair.
462 * @param r1 Read 1
463 * @param r2 Read 2 (may be null)
464 */
465 void processReadPair(final Read r1, final Read r2){
466 processReadNucleotide(r1);
467 if(r2!=null){processReadNucleotide(r2);}
468 }
469
470 void processReadNucleotide(final Read r){
471 final byte[] bases=r.bases;
472 final byte[] quals=r.quality;
473 long kmer=0;
474 long rkmer=0;
475 int len=0;
476 assert(!r.aminoacid());
477
478 final long min=minHashValue;
479 localHeap.genomeSizeBases+=r.length();
480 localHeap.genomeSequences++;
481
482 if(quals==null || (minProb<=0 && minQual<2)){
483 for(int i=0; i<bases.length; i++){
484 byte b=bases[i];
485 long x=AminoAcid.baseToNumber[b];
486 long x2=AminoAcid.baseToComplementNumber[b];
487
488 kmer=((kmer<<2)|x)&mask;
489 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
490
491 if(x<0){len=0; rkmer=0;}else{len++;}
492 if(len>=k){
493 localHeap.genomeSizeKmers++;
494 final long hashcode=hash(kmer, rkmer);
495 if(hashcode>min){localHeap.add(hashcode);}
496 }
497 }
498 }else{
499 float prob=1;
500 for(int i=0; i<bases.length; i++){
501 final byte b=bases[i];
502 final long x=AminoAcid.baseToNumber[b];
503 final long x2=AminoAcid.baseToComplementNumber[b];
504
505 kmer=((kmer<<2)|x)&mask;
506 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
507
508 {//Quality-related stuff
509 final byte q=quals[i];
510 assert(q>=0) : Arrays.toString(quals)+"\n"+minProb+", "+minQual;
511 prob=prob*align2.QualityTools.PROB_CORRECT[q];
512 if(len>k){
513 byte oldq=quals[i-k];
514 prob=prob*align2.QualityTools.PROB_CORRECT_INVERSE[oldq];
515 }
516 if(x<0 || q<minQual){
517 len=0;
518 kmer=rkmer=0;
519 prob=1;
520 }else{
521 len++;
522 }
523 }
524
525 if(len>=k && prob>=minProb){
526 localHeap.genomeSizeKmers++;
527 localHeap.probSum+=prob;
528 final long hashcode=hash(kmer, rkmer);
529 if(hashcode>min){localHeap.checkAndAdd(hashcode);}
530 }
531 }
532 }
533 }
534
535 private long dumpHeap(){
536 long count=0;
537 synchronized(sharedHeap){
538 count=sharedHeap.genomeSizeEstimate(minCount);
539 if(count<targetKmers){
540 sharedHeap.add(localHeap);
541 localHeap.clear();
542 }
543 }
544 return count;
545 }
546
547 /** Number of reads processed by this thread */
548 protected long readsProcessedT=0;
549 /** Number of bases processed by this thread */
550 protected long basesProcessedT=0;
551
552 /** Number of reads retained by this thread */
553 protected long readsOutT=0;
554 /** Number of bases retained by this thread */
555 protected long basesOutT=0;
556
557 /** True only if this thread has completed successfully */
558 boolean success=false;
559
560 /** Shared input stream */
561 private final ConcurrentReadInputStream cris;
562 /** Shared output stream */
563 private final ConcurrentReadOutputStream ros;
564 /** Thread ID */
565 final int tid;
566
567 final SketchHeap localHeap;
568 }
569
570 /*--------------------------------------------------------------*/
571 /*---------------- Fields ----------------*/
572 /*--------------------------------------------------------------*/
573
574 /** Primary input file path */
575 private String in1=null;
576 /** Secondary input file path */
577 private String in2=null;
578
579 private String qfin1=null;
580 private String qfin2=null;
581
582 /** Primary output file path */
583 private String out1=null;
584 /** Secondary output file path */
585 private String out2=null;
586
587 private String qfout1=null;
588 private String qfout2=null;
589
590 /** Override input file extension */
591 private String extin=null;
592 /** Override output file extension */
593 private String extout=null;
594
595 /*--------------------------------------------------------------*/
596
597 /** Number of reads processed */
598 protected long readsProcessed=0;
599 /** Number of bases processed */
600 protected long basesProcessed=0;
601
602 /** Number of reads retained */
603 protected long readsOut=0;
604 /** Number of bases retained */
605 protected long basesOut=0;
606
607 /** Quit after processing this many input reads; -1 means no limit */
608 private long maxReads=-1;
609
610 /*--------------------------------------------------------------*/
611 /*---------------- Final Fields ----------------*/
612 /*--------------------------------------------------------------*/
613
614 /** Primary input file */
615 private final FileFormat ffin1;
616 /** Secondary input file */
617 private final FileFormat ffin2;
618
619 /** Primary output file */
620 private final FileFormat ffout1;
621 /** Secondary output file */
622 private final FileFormat ffout2;
623
624 private final SketchHeap sharedHeap;
625 private final int heapSize;
626 private final long targetKmers;
627 private final int minCount;
628
629 final int shift;
630 final int shift2;
631 final long mask;
632
633 final float minProb;
634 final byte minQual;
635
636 /*--------------------------------------------------------------*/
637 /*---------------- Common Fields ----------------*/
638 /*--------------------------------------------------------------*/
639
640 /** Print status messages to this output stream */
641 private PrintStream outstream=System.err;
642 /** Print verbose messages */
643 public static boolean verbose=false;
644 /** True if an error was encountered */
645 public boolean errorState=false;
646 /** Overwrite existing output files */
647 private boolean overwrite=true;
648 /** Append to existing output files */
649 private boolean append=false;
650 /** Reads are output in input order (not enabled) */
651 private boolean ordered=false;
652 /** Shuffle input */
653 private boolean shuffle=false;
654
655 }