comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/prok/SplitRibo.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 package prok;
2
3 import java.io.PrintStream;
4 import java.util.ArrayList;
5 import java.util.Arrays;
6
7 import aligner.SingleStateAlignerFlat2;
8 import fileIO.ByteFile;
9 import fileIO.FileFormat;
10 import fileIO.ReadWrite;
11 import shared.Parse;
12 import shared.Parser;
13 import shared.PreParser;
14 import shared.ReadStats;
15 import shared.Shared;
16 import shared.Timer;
17 import shared.Tools;
18 import stream.ConcurrentReadInputStream;
19 import stream.ConcurrentReadOutputStream;
20 import stream.FastaReadInputStream;
21 import stream.Read;
22 import structures.ListNum;
23 import template.Accumulator;
24 import template.ThreadWaiter;
25
26 /**
27 * Splits a mix of ribosomal sequences (such as Silva) into different files per type (16S, 18S, etc).
28 *
29 * @author Brian Bushnell
30 * @date November 19, 2015
31 *
32 */
33 public class SplitRibo implements Accumulator<SplitRibo.ProcessThread> {
34
35 /*--------------------------------------------------------------*/
36 /*---------------- Initialization ----------------*/
37 /*--------------------------------------------------------------*/
38
39 /**
40 * Code entrance from the command line.
41 * @param args Command line arguments
42 */
43 public static void main(String[] args){
44 //Start a timer immediately upon code entrance.
45 Timer t=new Timer();
46
47 //Create an instance of this class
48 SplitRibo x=new SplitRibo(args);
49
50 //Run the object
51 x.process(t);
52
53 //Close the print stream if it was redirected
54 Shared.closeStream(x.outstream);
55 }
56
57 /**
58 * Constructor.
59 * @param args Command line arguments
60 */
61 public SplitRibo(String[] args){
62
63 {//Preparse block for help, config files, and outstream
64 PreParser pp=new PreParser(args, getClass(), false);
65 args=pp.args;
66 outstream=pp.outstream;
67 }
68
69 //Set shared static variables prior to parsing
70 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
71 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
72 Shared.capBufferLen(50);
73 ReadWrite.ZIPLEVEL=9;
74
75 {//Parse the arguments
76 final Parser parser=parse(args);
77 Parser.processQuality();
78
79 maxReads=parser.maxReads;
80 overwrite=ReadStats.overwrite=parser.overwrite;
81 append=ReadStats.append=parser.append;
82
83 in1=parser.in1;
84 qfin1=parser.qfin1;
85 extin=parser.extin;
86
87 outPattern=parser.out1;
88 extout=parser.extout;
89 }
90
91 validateParams();
92 fixExtensions(); //Add or remove .gz or .bz2 as needed
93 checkFileExistence(); //Ensure files can be read and written
94 checkStatics(); //Adjust file-related static fields as needed for this program
95
96 //Create input FileFormat objects
97 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
98
99 numTypes=sequenceTypes.length;
100 readsOut=new long[numTypes];
101 basesOut=new long[numTypes];
102 consensusSequences=loadConsensusSequenceFromFile();
103 }
104
105 /*--------------------------------------------------------------*/
106 /*---------------- Initialization Helpers ----------------*/
107 /*--------------------------------------------------------------*/
108
109 /** Parse arguments from the command line */
110 private Parser parse(String[] args){
111
112 //Create a parser object
113 Parser parser=new Parser();
114
115 //Set any necessary Parser defaults here
116 //parser.foo=bar;
117
118 //Parse each argument
119 for(int i=0; i<args.length; i++){
120 String arg=args[i];
121
122 //Break arguments into their constituent parts, in the form of "a=b"
123 String[] split=arg.split("=");
124 String a=split[0].toLowerCase();
125 String b=split.length>1 ? split[1] : null;
126 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
127
128 if(a.equals("verbose")){
129 verbose=Parse.parseBoolean(b);
130 }else if(a.equals("ordered")){
131 ordered=Parse.parseBoolean(b);
132 }else if(a.equalsIgnoreCase("minid")){
133 minID=Float.parseFloat(b);
134 }else if(a.equalsIgnoreCase("minid2") || a.equalsIgnoreCase("refineid")){
135 refineID=Float.parseFloat(b);
136 }else if(a.equals("out") || a.equals("pattern") || a.equals("outpattern")){
137 parser.out1=b;
138 }else if(a.equals("type") || a.equals("types")){
139 parseTypes(b);
140 }else if(a.equals("parse_flag_goes_here")){
141 long fake_variable=Parse.parseKMG(b);
142 //Set a variable here
143 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
144 //do nothing
145 }else{
146 outstream.println("Unknown parameter "+args[i]);
147 assert(false) : "Unknown parameter "+args[i];
148 }
149 }
150
151 return parser;
152 }
153
154 private void parseTypes(String b){
155 sequenceTypes=null;
156 if(b==null){
157 assert(false) : "'types' flag requires a list of types, such as 'types=16S,18S'";
158 sequenceTypes=new String[] {"Other"};
159 }else{
160 String[] split=b.split(",");
161 sequenceTypes=new String[split.length+1];
162 sequenceTypes[0]="Other";
163 for(int i=0; i<split.length; i++){
164 String s=split[i].replace('s', 'S');
165 if(s.startsWith("its")){s=s.replaceFirst("its", "ITS");}
166 sequenceTypes[i+1]=s;
167 }
168 }
169 }
170
171 /** Add or remove .gz or .bz2 as needed */
172 private void fixExtensions(){
173 in1=Tools.fixExtension(in1);
174 qfin1=Tools.fixExtension(qfin1);
175 }
176
177 /** Ensure files can be read and written */
178 private void checkFileExistence(){
179
180 //Ensure input files can be read
181 if(!Tools.testInputFiles(false, true, in1)){
182 throw new RuntimeException("\nCan't read some input files.\n");
183 }
184
185 if(outPattern==null){return;}
186
187 if(!outPattern.contains("#")){
188 throw new RuntimeException("OutPattern must contain '#' symbol: "+outPattern);
189 }
190
191 for(String type : sequenceTypes) {
192 String out=outPattern.replaceFirst("#", type);
193
194 //Ensure output files can be written
195 if(!Tools.testOutputFiles(overwrite, append, false, out)){
196 outstream.println((outPattern==null)+", "+(out==null)+", "+outPattern+", "+out);
197 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out+"\n");
198 }
199
200 //Ensure that no file was specified multiple times
201 if(!Tools.testForDuplicateFiles(true, in1, out)){
202 throw new RuntimeException("\nSome file names were specified multiple times.\n");
203 }
204 }
205 }
206
207 /** Adjust file-related static fields as needed for this program */
208 private static void checkStatics(){
209 //Adjust the number of threads for input file reading
210 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
211 ByteFile.FORCE_MODE_BF2=true;
212 }
213
214 assert(FastaReadInputStream.settingsOK());
215 }
216
217 /** Ensure parameter ranges are within bounds and required parameters are set */
218 private boolean validateParams(){
219 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
220 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
221 return true;
222 }
223
224 private final Read[][] loadConsensusSequenceFromFile(){
225 Read[][] seqs=new Read[numTypes][];
226 m16S_index=Tools.find("m16S", sequenceTypes);
227 m18S_index=Tools.find("m18S", sequenceTypes);
228 p16S_index=Tools.find("p16S", sequenceTypes);
229 boolean stripM16S=(m16S_index>=0);
230 boolean stripM18S=(m18S_index>=0);
231 boolean stripP16S=(p16S_index>=0);
232 for(int st=1; st<numTypes; st++){
233 String name=sequenceTypes[st];
234 boolean is16S=name.equalsIgnoreCase("16S");
235 boolean is18S=name.equalsIgnoreCase("18S");
236 seqs[st]=ProkObject.loadConsensusSequenceType(name, ((is16S && stripM16S) || (is18S && stripM18S)), (is16S && stripP16S));
237 }
238 return seqs;
239 }
240
241 /*--------------------------------------------------------------*/
242 /*---------------- Outer Methods ----------------*/
243 /*--------------------------------------------------------------*/
244
245 /** Create read streams and process all data */
246 void process(Timer t){
247
248 //Turn off read validation in the input threads to increase speed
249 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
250 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
251
252 //Create a read input stream
253 final ConcurrentReadInputStream cris=makeCris();
254
255 //Optionally create a read output stream
256 final ConcurrentReadOutputStream[] rosa=makeCrosArray();
257
258 //Reset counters
259 readsProcessed=0;
260 basesProcessed=0;
261 Arrays.fill(readsOut, 0);
262 Arrays.fill(basesOut, 0);
263
264 //Process the reads in separate threads
265 spawnThreads(cris, rosa);
266
267 if(verbose){outstream.println("Finished; closing streams.");}
268
269 //Write anything that was accumulated by ReadStats
270 errorState|=ReadStats.writeAll();
271 //assert(!errorState);
272 //Close the read streams
273 errorState|=ReadWrite.closeStreams(cris, rosa);
274 //assert(!errorState);
275
276 //Reset read validation
277 Read.VALIDATE_IN_CONSTRUCTOR=vic;
278
279 long readsOut2=Tools.sum(readsOut)-readsOut[0];
280 long basesOut2=Tools.sum(basesOut)-basesOut[0];
281
282 //Report timing and results
283 t.stop();
284 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
285 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut2, basesOut2, 8, true));
286
287 outstream.println();
288 outstream.println(Tools.string("Type", "Count", 8));
289 for(int type=0; type<numTypes; type++){
290 outstream.println(Tools.number(sequenceTypes[type], readsOut[type], 8));
291 }
292
293 //Throw an exception of there was an error in a thread
294 if(errorState){
295 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
296 }
297 }
298
299 private ConcurrentReadInputStream makeCris(){
300 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, null, qfin1, null);
301 cris.start(); //Start the stream
302 if(verbose){outstream.println("Started cris");}
303 return cris;
304 }
305
306 private ConcurrentReadOutputStream[] makeCrosArray(){
307 ConcurrentReadOutputStream[] rosa=new ConcurrentReadOutputStream[numTypes];
308 for(int i=0; i<numTypes; i++){
309 String type=sequenceTypes[i];
310 final ConcurrentReadOutputStream ros=makeCros(type);
311 rosa[i]=ros;
312 }
313 return rosa;
314 }
315
316 private ConcurrentReadOutputStream makeCros(String type){
317 if(outPattern==null){return null;}
318
319 //Select output buffer size based on whether it needs to be ordered
320 final int buff=(ordered ? Tools.mid(2, 16, (Shared.threads()*2)/3) : 4);
321 final String fname=outPattern.replaceFirst("#", type);
322 FileFormat ff=FileFormat.testOutput(fname, FileFormat.FASTA, extout, true, overwrite, append, ordered);
323
324 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ff, null, buff, null, false);
325 ros.start(); //Start the stream
326 return ros;
327 }
328
329 /*--------------------------------------------------------------*/
330 /*---------------- Thread Management ----------------*/
331 /*--------------------------------------------------------------*/
332
333 /** Spawn process threads */
334 private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream[] rosa){
335
336 //Do anything necessary prior to processing
337
338 //Determine how many threads may be used
339 final int threads=Shared.threads();
340
341 //Fill a list with ProcessThreads
342 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
343 for(int i=0; i<threads; i++){
344 alpt.add(new ProcessThread(cris, rosa, i));
345 }
346
347 //Start the threads and wait for them to finish
348 boolean success=ThreadWaiter.startAndWait(alpt, this);
349 errorState&=!success;
350 //assert(!errorState);
351
352 //Do anything necessary after processing
353
354 }
355
356 @Override
357 public final void accumulate(ProcessThread pt){
358 readsProcessed+=pt.readsProcessedT;
359 basesProcessed+=pt.basesProcessedT;
360 Tools.add(readsOut, pt.readsOutT);
361 Tools.add(basesOut, pt.basesOutT);
362 errorState|=(!pt.success);
363 //assert(!errorState);
364 }
365
366 @Override
367 public final boolean success(){return !errorState;}
368
369 /*--------------------------------------------------------------*/
370 /*---------------- Inner Methods ----------------*/
371 /*--------------------------------------------------------------*/
372
373 /*--------------------------------------------------------------*/
374 /*---------------- Inner Classes ----------------*/
375 /*--------------------------------------------------------------*/
376
377 /** This class is static to prevent accidental writing to shared variables.
378 * It is safe to remove the static modifier. */
379 class ProcessThread extends Thread {
380
381 //Constructor
382 ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream[] rosa_, final int tid_){
383 cris=cris_;
384 rosa=rosa_;
385 tid=tid_;
386 }
387
388 //Called by start()
389 @Override
390 public void run(){
391 //Do anything necessary prior to processing
392
393 //Process the reads
394 processInner();
395
396 //Do anything necessary after processing
397
398 //Indicate successful exit status
399 success=true;
400 }
401
402 /** Iterate through the reads */
403 void processInner(){
404
405 //Grab the first ListNum of reads
406 ListNum<Read> ln=cris.nextList();
407
408 //Check to ensure pairing is as expected
409 if(ln!=null && !ln.isEmpty()){
410 Read r=ln.get(0);
411 assert(r.mate==null);
412 }
413
414 //As long as there is a nonempty read list...
415 while(ln!=null && ln.size()>0){
416 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
417
418 processList(ln);
419
420 //Notify the input stream that the list was used
421 cris.returnList(ln);
422 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
423
424 //Fetch a new list
425 ln=cris.nextList();
426 }
427
428 //Notify the input stream that the final list was used
429 if(ln!=null){
430 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
431 }
432 }
433
434 void processList(ListNum<Read> ln){
435
436 //Grab the actual read list from the ListNum
437 final ArrayList<Read> reads=ln.list;
438
439 @SuppressWarnings("unchecked")
440 final ArrayList<Read>[] out=new ArrayList[numTypes];
441 for(int i=0; i<numTypes; i++){
442 ArrayList<Read> list=new ArrayList<Read>(50);
443 out[i]=list;
444 }
445
446 //Loop through each read in the list
447 for(int idx=0; idx<reads.size(); idx++){
448 final Read r1=reads.get(idx);
449
450 //Validate reads in worker threads
451 if(!r1.validated()){r1.validate(true);}
452
453 //Track the initial length for statistics
454 final int initialLength1=r1.length();
455 final int initialLength2=r1.mateLength();
456
457 //Increment counters
458 readsProcessedT+=r1.pairCount();
459 basesProcessedT+=initialLength1+initialLength2;
460
461 {
462 //Reads are processed in this block.
463 final int type=processRead(r1);
464 readsOutT[type]+=r1.pairCount();
465 basesOutT[type]+=r1.pairLength();
466 out[type].add(r1);
467 }
468 }
469
470 //Output reads to the output stream
471 if(rosa!=null){
472 for(int type=0; type<numTypes; type++){
473 rosa[type].add(out[type], ln.id);
474 }
475 }
476 }
477
478 /**
479 * Process a read.
480 * @param r1 Read 1
481 * @return The best-matching type, or 0 for no matches.
482 */
483 private int processRead(final Read r){
484 int bestType=0;
485 float bestID=-1;
486 for(int type=1; type<numTypes; type++){//Align to only the overall consensus
487 Read[] refs=consensusSequences[type];
488 float id=align(r, refs, 0, 1);
489 if(id>bestID && id>=minID){
490 bestType=type;
491 bestID=id;
492 }
493 }
494 if(bestType<1 || bestID<refineID || bestType==p16S_index){//If nothing met minID, or if it matched chloro, align to clade-specific consensuses
495 for(int type=1; type<numTypes; type++){
496 Read[] refs=consensusSequences[type];
497 float id=align(r, refs, 1, refs.length);
498 if(id>bestID && id>=minID){
499 bestType=type;
500 bestID=id;
501 }
502 }
503 }
504 r.obj=bestID;//If desired... in actuality, more info might be useful, like alignment length
505 return bestID<minID ? 0 : bestType;
506 }
507
508 private float align(Read r, Read[] refs, int minRef, int maxRef){
509 float bestID=-1;
510 if(refs!=null){
511 for(int i=minRef; i<maxRef; i++){
512 Read ref=refs[i];
513 float id=align(r.bases, ref.bases);
514 bestID=Tools.max(id, bestID);
515 }
516 }
517 return bestID;
518 }
519
520 private float align(byte[] query, byte[] ref){
521 int a=0, b=ref.length-1;
522 int[] max=ssa.fillUnlimited(query, ref, a, b, -9999);
523 if(max==null){return 0;}
524
525 final int rows=max[0];
526 final int maxCol=max[1];
527 final int maxState=max[2];
528 final float id=ssa.tracebackIdentity(query, ref, a, b, rows, maxCol, maxState, null);
529 return id;
530 }
531
532 SingleStateAlignerFlat2 ssa=new SingleStateAlignerFlat2();
533
534 /** Number of reads processed by this thread */
535 protected long readsProcessedT=0;
536 /** Number of bases processed by this thread */
537 protected long basesProcessedT=0;
538
539 /** Number of reads retained by this thread */
540 protected long[] readsOutT=new long[numTypes];
541 /** Number of bases retained by this thread */
542 protected long[] basesOutT=new long[numTypes];
543
544 /** True only if this thread has completed successfully */
545 boolean success=false;
546
547 /** Shared input stream */
548 private final ConcurrentReadInputStream cris;
549 /** Shared output stream */
550 private final ConcurrentReadOutputStream[] rosa;
551 /** Thread ID */
552 final int tid;
553 }
554
555 /*--------------------------------------------------------------*/
556 /*---------------- Fields ----------------*/
557 /*--------------------------------------------------------------*/
558
559 /** Primary input file path */
560 private String in1=null;
561
562 private String qfin1=null;
563
564 /** Primary output file path */
565 private String outPattern=null;
566
567 /** Override input file extension */
568 private String extin=null;
569 /** Override output file extension */
570 private String extout=null;
571
572 float minID=0.59f; //This could be a per-type value
573 float refineID=0.70f; //Refine alignment if best is less than this
574
575 private int m16S_index=-2;
576 private int m18S_index=-2;
577 private int p16S_index=-2;
578
579 /*--------------------------------------------------------------*/
580
581 /** Number of reads processed */
582 protected long readsProcessed=0;
583 /** Number of bases processed */
584 protected long basesProcessed=0;
585
586 /** Quit after processing this many input reads; -1 means no limit */
587 private long maxReads=-1;
588
589 private String[] sequenceTypes=new String[] {"Other", "16S", "18S", "23S", "5S", "m16S", "m18S", "p16S"};
590 private final int numTypes;//=sequenceTypes.length;
591 final Read[][] consensusSequences;
592
593 /** Number of reads retained */
594 final long[] readsOut;
595 /** Number of bases retained */
596 final long[] basesOut;
597
598 /*--------------------------------------------------------------*/
599 /*---------------- Final Fields ----------------*/
600 /*--------------------------------------------------------------*/
601
602 /** Primary input file */
603 private final FileFormat ffin1;
604
605 /*--------------------------------------------------------------*/
606 /*---------------- Static Fields ----------------*/
607 /*--------------------------------------------------------------*/
608
609 /*--------------------------------------------------------------*/
610 /*---------------- Common Fields ----------------*/
611 /*--------------------------------------------------------------*/
612
613 /** Print status messages to this output stream */
614 private PrintStream outstream=System.err;
615 /** Print verbose messages */
616 public static boolean verbose=false;
617 /** True if an error was encountered */
618 public boolean errorState=false;
619 /** Overwrite existing output files */
620 private boolean overwrite=false;
621 /** Append to existing output files */
622 private boolean append=false;
623 /** Reads are output in input order */
624 private boolean ordered=true;
625
626 }