comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/prok/RiboMaker.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 package prok;
2
3 import java.io.File;
4 import java.io.PrintStream;
5 import java.util.ArrayList;
6 import java.util.PriorityQueue;
7
8 import aligner.Alignment;
9 import dna.AminoAcid;
10 import fileIO.ByteFile;
11 import fileIO.FileFormat;
12 import fileIO.ReadWrite;
13 import shared.Parse;
14 import shared.Parser;
15 import shared.PreParser;
16 import shared.ReadStats;
17 import shared.Shared;
18 import shared.Timer;
19 import shared.Tools;
20 import stream.ConcurrentReadInputStream;
21 import stream.ConcurrentReadOutputStream;
22 import stream.FASTQ;
23 import stream.FastaReadInputStream;
24 import stream.Read;
25 import stream.ReadInputStream;
26 import structures.ListNum;
27 import structures.LongHashSet;
28 import template.Accumulator;
29 import template.ThreadWaiter;
30
31 /**
32 * Makes a consensus ribosomal sequence using raw reads as input.
33 *
34 * @author Brian Bushnell
35 * @date October 10, 2019
36 *
37 */
38 public class RiboMaker implements Accumulator<RiboMaker.ProcessThread> {
39
40 /*--------------------------------------------------------------*/
41 /*---------------- Initialization ----------------*/
42 /*--------------------------------------------------------------*/
43
44 /**
45 * Code entrance from the command line.
46 * @param args Command line arguments
47 */
48 public static void main(String[] args){
49 assert(false) : "TODO";
50
51 //Start a timer immediately upon code entrance.
52 Timer t=new Timer();
53
54 //Create an instance of this class
55 RiboMaker x=new RiboMaker(args);
56
57 //Run the object
58 x.process(t);
59
60 //Close the print stream if it was redirected
61 Shared.closeStream(x.outstream);
62 }
63
64 /**
65 * Constructor.
66 * @param args Command line arguments
67 */
68 public RiboMaker(String[] args){
69
70 {//Preparse block for help, config files, and outstream
71 PreParser pp=new PreParser(args, getClass(), false);
72 args=pp.args;
73 outstream=pp.outstream;
74 }
75
76 //Set shared static variables prior to parsing
77 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
78 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
79
80 {//Parse the arguments
81 final Parser parser=parse(args);
82 Parser.processQuality();
83
84 maxReads=parser.maxReads;
85 overwrite=ReadStats.overwrite=parser.overwrite;
86 append=ReadStats.append=parser.append;
87 setInterleaved=parser.setInterleaved;
88
89 in1=parser.in1;
90 in2=parser.in2;
91 qfin1=parser.qfin1;
92 qfin2=parser.qfin2;
93 extin=parser.extin;
94
95 out1=parser.out1;
96 qfout1=parser.qfout1;
97 extout=parser.extout;
98 }
99
100 validateParams();
101 doPoundReplacement(); //Replace # with 1 and 2
102 adjustInterleaving(); //Make sure interleaving agrees with number of input and output files
103 fixExtensions(); //Add or remove .gz or .bz2 as needed
104 checkFileExistence(); //Ensure files can be read and written
105 checkStatics(); //Adjust file-related static fields as needed for this program
106
107 //Create output FileFormat objects
108 ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
109
110 fffilter=FileFormat.testInput(filterFile, FileFormat.FASTA, null, true, true);
111 ffref=FileFormat.testInput(refFile, FileFormat.FASTA, null, true, true);
112
113 //Create input FileFormat objects
114 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
115 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
116
117 if(fffilter==null){
118 filter=null;
119 }else{
120 filter=loadFilter(fffilter, k);
121 }
122 loadRef();
123 }
124
125 /*--------------------------------------------------------------*/
126 /*---------------- Initialization Helpers ----------------*/
127 /*--------------------------------------------------------------*/
128
129 /** Parse arguments from the command line */
130 private Parser parse(String[] args){
131
132 //Create a parser object
133 Parser parser=new Parser();
134
135 //Set any necessary Parser defaults here
136 //parser.foo=bar;
137
138 //Parse each argument
139 for(int i=0; i<args.length; i++){
140 String arg=args[i];
141
142 //Break arguments into their constituent parts, in the form of "a=b"
143 String[] split=arg.split("=");
144 String a=split[0].toLowerCase();
145 String b=split.length>1 ? split[1] : null;
146 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
147
148 if(a.equals("verbose")){
149 verbose=Parse.parseBoolean(b);
150 }else if(a.equals("ordered")){
151 ordered=Parse.parseBoolean(b);
152 }else if(a.equals("filter")){
153 filterFile=b;
154 }else if(a.equals("ref")){
155 refFile=b;
156 }else if(a.equals("parse_flag_goes_here")){
157 long fake_variable=Parse.parseKMG(b);
158 //Set a variable here
159 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
160 //do nothing
161 }else{
162 outstream.println("Unknown parameter "+args[i]);
163 assert(false) : "Unknown parameter "+args[i];
164 }
165 }
166
167 return parser;
168 }
169
170 /** Replace # with 1 and 2 in headers */
171 private void doPoundReplacement(){
172 //Do input file # replacement
173 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
174 in2=in1.replace("#", "2");
175 in1=in1.replace("#", "1");
176 }
177
178 //Ensure there is an input file
179 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
180 }
181
182 /** Add or remove .gz or .bz2 as needed */
183 private void fixExtensions(){
184 in1=Tools.fixExtension(in1);
185 in2=Tools.fixExtension(in2);
186 qfin1=Tools.fixExtension(qfin1);
187 qfin2=Tools.fixExtension(qfin2);
188 }
189
190 /** Ensure files can be read and written */
191 private void checkFileExistence(){
192 //Ensure output files can be written
193 if(!Tools.testOutputFiles(overwrite, append, false, out1)){
194 outstream.println((out1==null)+", "+out1);
195 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+"\n");
196 }
197
198 //Ensure input files can be read
199 if(!Tools.testInputFiles(false, true, in1, in2, filterFile, refFile)){
200 throw new RuntimeException("\nCan't read some input files.\n");
201 }
202
203 //Ensure that no file was specified multiple times
204 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, filterFile, refFile)){
205 throw new RuntimeException("\nSome file names were specified multiple times.\n");
206 }
207
208 assert(refFile!=null);
209 }
210
211 /** Make sure interleaving agrees with number of input and output files */
212 private void adjustInterleaving(){
213 //Adjust interleaved detection based on the number of input files
214 if(in2!=null){
215 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
216 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
217 }
218
219 //Adjust interleaved settings based on number of output files
220 if(!setInterleaved){
221 assert(in1!=null) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\n";
222 if(in2!=null){ //If there are 2 input streams.
223 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
224 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
225 }
226 }
227 }
228
229 /** Adjust file-related static fields as needed for this program */
230 private static void checkStatics(){
231 //Adjust the number of threads for input file reading
232 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
233 ByteFile.FORCE_MODE_BF2=true;
234 }
235
236 assert(FastaReadInputStream.settingsOK());
237 }
238
239 /** Ensure parameter ranges are within bounds and required parameters are set */
240 private boolean validateParams(){
241 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
242 assert(false) : "TODO";
243 return true;
244 }
245
246 /*--------------------------------------------------------------*/
247 /*---------------- Outer Methods ----------------*/
248 /*--------------------------------------------------------------*/
249
250 /** Create read streams and process all data */
251 void process(Timer t){
252
253 //Turn off read validation in the input threads to increase speed
254 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
255 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
256
257 //Create a read input stream
258 final ConcurrentReadInputStream cris=makeCris();
259
260 //Optionally create a read output stream
261 final ConcurrentReadOutputStream ros=makeCros(cris.paired());
262
263 //Reset counters
264 readsProcessed=readsOut=0;
265 basesProcessed=basesOut=0;
266
267 //Process the reads in separate threads
268 spawnThreads(cris, ros);
269
270 if(verbose){outstream.println("Finished; closing streams.");}
271
272 //Write anything that was accumulated by ReadStats
273 errorState|=ReadStats.writeAll();
274 //Close the read streams
275 errorState|=ReadWrite.closeStreams(cris, ros);
276
277 //Reset read validation
278 Read.VALIDATE_IN_CONSTRUCTOR=vic;
279
280 //Report timing and results
281 t.stop();
282 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
283 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
284
285 //Throw an exception of there was an error in a thread
286 if(errorState){
287 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
288 }
289 }
290
291 private void loadRef(){
292 ArrayList<Read> reads=ReadInputStream.toReads(ffref, -1);
293 ref0=reads.get(0).bases;
294 ref=new byte[ref0.length+2*padding];
295 for(int i=0, j=-padding; i<ref.length; i++, j++){
296 byte b=(j>=0 && j<ref0.length ? ref0[j] : (byte)'N');
297 ref[i]=b;
298 }
299
300 queues=new PriorityQueue[1+ref.length/queueWidth];
301 for(int i=0; i<queues.length; i++){
302 queues[i]=new PriorityQueue<Alignment>(queueLen);
303 }
304 }
305
306 public static LongHashSet loadFilter(FileFormat ff, int k){
307 if(ff==null){return null;}
308 ArrayList<Read> reads=ReadInputStream.toReads(ff, -1);
309 if(reads==null || reads.size()==0){return null;}
310 LongHashSet set=new LongHashSet(4096);
311
312 final int shift=2*k;
313 final int shift2=shift-2;
314 final long mask=(shift>63 ? -1L : ~((-1L)<<shift));
315 int len=0;
316
317 long kmer=0, rkmer=0;
318 for(Read r : reads){
319 final byte[] bases=r.bases;
320 for(byte b : bases) {
321 long x=AminoAcid.baseToNumber[b];
322 long x2=AminoAcid.baseToComplementNumber[b];
323 kmer=((kmer<<2)|x)&mask;
324 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
325
326 if(x>=0){
327 len++;
328 if(len>=k){
329 set.add(Tools.max(kmer, rkmer));
330 }
331 }else{
332 len=0;
333 kmer=rkmer=0;
334 }
335 }
336 }
337 return set;
338 }
339
340 public static boolean passesFilter(Read r, int k, LongHashSet set){
341 if(r==null) {return false;}
342 if(set==null){return true;}
343
344 final int shift=2*k;
345 final int shift2=shift-2;
346 final long mask=(shift>63 ? -1L : ~((-1L)<<shift));
347 int len=0;
348 long kmer=0, rkmer=0;
349
350 final byte[] bases=r.bases;
351 for(byte b : bases) {
352 long x=AminoAcid.baseToNumber[b];
353 long x2=AminoAcid.baseToComplementNumber[b];
354 kmer=((kmer<<2)|x)&mask;
355 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
356
357 if(x>=0){
358 len++;
359 if(len>=k){
360 long key=Tools.max(kmer, rkmer);
361 if(set.contains(key)){return true;}
362 }
363 }else{
364 len=0;
365 kmer=rkmer=0;
366 }
367 }
368 return false;
369 }
370
371 private ConcurrentReadInputStream makeCris(){
372 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
373 cris.start(); //Start the stream
374 if(verbose){outstream.println("Started cris");}
375 boolean paired=cris.paired();
376 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
377 return cris;
378 }
379
380 private ConcurrentReadOutputStream makeCros(boolean pairedInput){
381 if(ffout1==null){return null;}
382
383 //Select output buffer size based on whether it needs to be ordered
384 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
385
386 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout1, null, qfout1, null, buff, null, false);
387 ros.start(); //Start the stream
388 return ros;
389 }
390
391 /*--------------------------------------------------------------*/
392 /*---------------- Thread Management ----------------*/
393 /*--------------------------------------------------------------*/
394
395 /** Spawn process threads */
396 private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){
397
398 //Do anything necessary prior to processing
399
400 //Determine how many threads may be used
401 final int threads=Shared.threads();
402
403 //Fill a list with ProcessThreads
404 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
405 for(int i=0; i<threads; i++){
406 alpt.add(new ProcessThread(cris, i));
407 }
408
409 //Start the threads and wait for them to finish
410 boolean success=ThreadWaiter.startAndWait(alpt, this);
411 errorState&=!success;
412
413 //Do anything necessary after processing
414 assert(false) : "TODO: Make consensus and write it?";
415 }
416
417 @Override
418 public final void accumulate(ProcessThread pt){
419 readsProcessed+=pt.readsProcessedT;
420 basesProcessed+=pt.basesProcessedT;
421 readsOut+=pt.readsOutT;
422 basesOut+=pt.basesOutT;
423 errorState|=(!pt.success);
424
425 for(int i=0; i<queues.length; i++){
426 PriorityQueue<Alignment> q=queues[i];
427 PriorityQueue<Alignment> qt=pt.queuesT[i];
428 for(Alignment a : qt){
429 addToQueue(a, q);
430 }
431 }
432 }
433
434 @Override
435 public final boolean success(){return !errorState;}
436
437 /*--------------------------------------------------------------*/
438 /*---------------- Inner Methods ----------------*/
439 /*--------------------------------------------------------------*/
440
441 boolean addToQueue(Alignment best, PriorityQueue<Alignment>[] queues){
442 int start=best.start;
443 int qnum=start/queueWidth;
444 PriorityQueue<Alignment> queue=queues[qnum];
445 return addToQueue(best, queue);
446 }
447
448 boolean addToQueue(Alignment best, PriorityQueue<Alignment> queue){
449 if(queue.size()<queueLen){queue.add(best);}
450 else{
451 Alignment bottom=queue.peek();
452 if(bottom.compareTo(best)>=0){return false;}
453 queue.poll();
454 queue.add(best);
455 }
456 return true;
457 }
458
459 /*--------------------------------------------------------------*/
460 /*---------------- Inner Classes ----------------*/
461 /*--------------------------------------------------------------*/
462
463 /** This class is static to prevent accidental writing to shared variables.
464 * It is safe to remove the static modifier. */
465 class ProcessThread extends Thread {
466
467 //Constructor
468 ProcessThread(final ConcurrentReadInputStream cris_, final int tid_){
469 cris=cris_;
470 tid=tid_;
471
472 queuesT=new PriorityQueue[1+ref.length/queueWidth];
473 for(int i=0; i<queuesT.length; i++){
474 queuesT[i]=new PriorityQueue<Alignment>(queueLen);
475 }
476 }
477
478 //Called by start()
479 @Override
480 public void run(){
481 //Do anything necessary prior to processing
482
483 //Process the reads
484 processInner();
485
486 //Do anything necessary after processing
487
488 //Indicate successful exit status
489 success=true;
490 }
491
492 /** Iterate through the reads */
493 void processInner(){
494
495 //Grab the first ListNum of reads
496 ListNum<Read> ln=cris.nextList();
497
498 //Check to ensure pairing is as expected
499 if(ln!=null && !ln.isEmpty()){
500 Read r=ln.get(0);
501 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
502 }
503
504 //As long as there is a nonempty read list...
505 while(ln!=null && ln.size()>0){
506 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
507
508 processList(ln);
509
510 //Notify the input stream that the list was used
511 cris.returnList(ln);
512 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
513
514 //Fetch a new list
515 ln=cris.nextList();
516 }
517
518 //Notify the input stream that the final list was used
519 if(ln!=null){
520 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
521 }
522 }
523
524 void processList(ListNum<Read> ln){
525
526 //Grab the actual read list from the ListNum
527 final ArrayList<Read> reads=ln.list;
528
529 //Loop through each read in the list
530 for(int idx=0; idx<reads.size(); idx++){
531 final Read r1=reads.get(idx);
532 final Read r2=r1.mate;
533
534 //Validate reads in worker threads
535 if(!r1.validated()){r1.validate(true);}
536 if(r2!=null && !r2.validated()){r2.validate(true);}
537
538 //Track the initial length for statistics
539 final int initialLength1=r1.length();
540 final int initialLength2=r1.mateLength();
541
542 //Increment counters
543 readsProcessedT+=r1.pairCount();
544 basesProcessedT+=initialLength1+initialLength2;
545
546 {
547 //Reads are processed in this block.
548 processReadPair(r1, r2);
549
550 // if(!keep){reads.set(idx, null);}
551 // else{
552 // readsOutT+=r1.pairCount();
553 // basesOutT+=r1.pairLength();
554 // }
555 }
556 }
557
558 //Output reads to the output stream
559 // if(ros!=null){ros.add(reads, ln.id);}
560 }
561
562 /**
563 * Process a read or a read pair.
564 * @param r1 Read 1
565 * @param r2 Read 2 (may be null)
566 * @return True if the reads should be kept, false if they should be discarded.
567 */
568 void processReadPair(final Read r1, final Read r2){
569 boolean pass=passesFilter(r1, k, filter) || passesFilter(r2, k, filter);
570 if(!pass){return;}
571 processRead(r1);
572 processRead(r2);
573 }
574
575 void processRead(final Read r){
576 Alignment plus=new Alignment(r);
577 plus.align(ref);
578
579 r.reverseComplement();
580 Alignment minus=new Alignment(r);
581 minus.align(ref);
582
583 Alignment best=null;
584 if(plus.id>=minus.id){
585 r.reverseComplement();
586 best=plus;
587 }else{
588 best=minus;
589 }
590 if(best.id<minID) {return;}
591
592 addToQueue(best, queuesT);
593 }
594
595 /** Number of reads processed by this thread */
596 protected long readsProcessedT=0;
597 /** Number of bases processed by this thread */
598 protected long basesProcessedT=0;
599
600 /** Number of reads retained by this thread */
601 protected long readsOutT=0;
602 /** Number of bases retained by this thread */
603 protected long basesOutT=0;
604
605 /** True only if this thread has completed successfully */
606 boolean success=false;
607
608
609 private PriorityQueue<Alignment>[] queuesT;
610
611 /** Shared input stream */
612 private final ConcurrentReadInputStream cris;
613 /** Thread ID */
614 final int tid;
615 }
616
617 /*--------------------------------------------------------------*/
618 /*---------------- Fields ----------------*/
619 /*--------------------------------------------------------------*/
620
621 /** Primary input file path */
622 private String in1=null;
623 /** Secondary input file path */
624 private String in2=null;
625
626 private String qfin1=null;
627 private String qfin2=null;
628
629 /** Primary output file path */
630 private String out1=null;
631
632 private String qfout1=null;
633
634 private String filterFile;
635 private String refFile;
636
637 /** Override input file extension */
638 private String extin=null;
639 /** Override output file extension */
640 private String extout=null;
641
642 /** Whether interleaved was explicitly set. */
643 private boolean setInterleaved=false;
644
645 /** Original ref */
646 private byte[] ref0;
647 /** Padded ref */
648 private byte[] ref;
649
650 private int padding=100;
651 private int queueLen=20;
652 private int queueWidth=20;
653 private float minID=0.4f;
654
655 private PriorityQueue<Alignment>[] queues;
656
657 /*--------------------------------------------------------------*/
658
659 /** Number of reads processed */
660 protected long readsProcessed=0;
661 /** Number of bases processed */
662 protected long basesProcessed=0;
663
664 /** Number of reads retained */
665 protected long readsOut=0;
666 /** Number of bases retained */
667 protected long basesOut=0;
668
669 /** Quit after processing this many input reads; -1 means no limit */
670 private long maxReads=-1;
671
672 /*--------------------------------------------------------------*/
673 /*---------------- Final Fields ----------------*/
674 /*--------------------------------------------------------------*/
675
676 /** Primary input file */
677 private final FileFormat ffin1;
678 /** Secondary input file */
679 private final FileFormat ffin2;
680 /** Filter input file */
681 private final FileFormat fffilter;
682 /** Ref input file */
683 private final FileFormat ffref;
684
685 /** Primary output file */
686 private final FileFormat ffout1;
687
688 private final LongHashSet filter;
689 private final int k=31;
690
691
692 /*--------------------------------------------------------------*/
693 /*---------------- Common Fields ----------------*/
694 /*--------------------------------------------------------------*/
695
696 /** Print status messages to this output stream */
697 private PrintStream outstream=System.err;
698 /** Print verbose messages */
699 public static boolean verbose=false;
700 /** True if an error was encountered */
701 public boolean errorState=false;
702 /** Overwrite existing output files */
703 private boolean overwrite=false;
704 /** Append to existing output files */
705 private boolean append=false;
706 /** Reads are output in input order */
707 private boolean ordered=false;
708
709 }