annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/prok/SplitRibo.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 package prok;
jpayne@68 2
jpayne@68 3 import java.io.PrintStream;
jpayne@68 4 import java.util.ArrayList;
jpayne@68 5 import java.util.Arrays;
jpayne@68 6
jpayne@68 7 import aligner.SingleStateAlignerFlat2;
jpayne@68 8 import fileIO.ByteFile;
jpayne@68 9 import fileIO.FileFormat;
jpayne@68 10 import fileIO.ReadWrite;
jpayne@68 11 import shared.Parse;
jpayne@68 12 import shared.Parser;
jpayne@68 13 import shared.PreParser;
jpayne@68 14 import shared.ReadStats;
jpayne@68 15 import shared.Shared;
jpayne@68 16 import shared.Timer;
jpayne@68 17 import shared.Tools;
jpayne@68 18 import stream.ConcurrentReadInputStream;
jpayne@68 19 import stream.ConcurrentReadOutputStream;
jpayne@68 20 import stream.FastaReadInputStream;
jpayne@68 21 import stream.Read;
jpayne@68 22 import structures.ListNum;
jpayne@68 23 import template.Accumulator;
jpayne@68 24 import template.ThreadWaiter;
jpayne@68 25
jpayne@68 26 /**
jpayne@68 27 * Splits a mix of ribosomal sequences (such as Silva) into different files per type (16S, 18S, etc).
jpayne@68 28 *
jpayne@68 29 * @author Brian Bushnell
jpayne@68 30 * @date November 19, 2015
jpayne@68 31 *
jpayne@68 32 */
jpayne@68 33 public class SplitRibo implements Accumulator<SplitRibo.ProcessThread> {
jpayne@68 34
jpayne@68 35 /*--------------------------------------------------------------*/
jpayne@68 36 /*---------------- Initialization ----------------*/
jpayne@68 37 /*--------------------------------------------------------------*/
jpayne@68 38
jpayne@68 39 /**
jpayne@68 40 * Code entrance from the command line.
jpayne@68 41 * @param args Command line arguments
jpayne@68 42 */
jpayne@68 43 public static void main(String[] args){
jpayne@68 44 //Start a timer immediately upon code entrance.
jpayne@68 45 Timer t=new Timer();
jpayne@68 46
jpayne@68 47 //Create an instance of this class
jpayne@68 48 SplitRibo x=new SplitRibo(args);
jpayne@68 49
jpayne@68 50 //Run the object
jpayne@68 51 x.process(t);
jpayne@68 52
jpayne@68 53 //Close the print stream if it was redirected
jpayne@68 54 Shared.closeStream(x.outstream);
jpayne@68 55 }
jpayne@68 56
jpayne@68 57 /**
jpayne@68 58 * Constructor.
jpayne@68 59 * @param args Command line arguments
jpayne@68 60 */
jpayne@68 61 public SplitRibo(String[] args){
jpayne@68 62
jpayne@68 63 {//Preparse block for help, config files, and outstream
jpayne@68 64 PreParser pp=new PreParser(args, getClass(), false);
jpayne@68 65 args=pp.args;
jpayne@68 66 outstream=pp.outstream;
jpayne@68 67 }
jpayne@68 68
jpayne@68 69 //Set shared static variables prior to parsing
jpayne@68 70 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
jpayne@68 71 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
jpayne@68 72 Shared.capBufferLen(50);
jpayne@68 73 ReadWrite.ZIPLEVEL=9;
jpayne@68 74
jpayne@68 75 {//Parse the arguments
jpayne@68 76 final Parser parser=parse(args);
jpayne@68 77 Parser.processQuality();
jpayne@68 78
jpayne@68 79 maxReads=parser.maxReads;
jpayne@68 80 overwrite=ReadStats.overwrite=parser.overwrite;
jpayne@68 81 append=ReadStats.append=parser.append;
jpayne@68 82
jpayne@68 83 in1=parser.in1;
jpayne@68 84 qfin1=parser.qfin1;
jpayne@68 85 extin=parser.extin;
jpayne@68 86
jpayne@68 87 outPattern=parser.out1;
jpayne@68 88 extout=parser.extout;
jpayne@68 89 }
jpayne@68 90
jpayne@68 91 validateParams();
jpayne@68 92 fixExtensions(); //Add or remove .gz or .bz2 as needed
jpayne@68 93 checkFileExistence(); //Ensure files can be read and written
jpayne@68 94 checkStatics(); //Adjust file-related static fields as needed for this program
jpayne@68 95
jpayne@68 96 //Create input FileFormat objects
jpayne@68 97 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
jpayne@68 98
jpayne@68 99 numTypes=sequenceTypes.length;
jpayne@68 100 readsOut=new long[numTypes];
jpayne@68 101 basesOut=new long[numTypes];
jpayne@68 102 consensusSequences=loadConsensusSequenceFromFile();
jpayne@68 103 }
jpayne@68 104
jpayne@68 105 /*--------------------------------------------------------------*/
jpayne@68 106 /*---------------- Initialization Helpers ----------------*/
jpayne@68 107 /*--------------------------------------------------------------*/
jpayne@68 108
jpayne@68 109 /** Parse arguments from the command line */
jpayne@68 110 private Parser parse(String[] args){
jpayne@68 111
jpayne@68 112 //Create a parser object
jpayne@68 113 Parser parser=new Parser();
jpayne@68 114
jpayne@68 115 //Set any necessary Parser defaults here
jpayne@68 116 //parser.foo=bar;
jpayne@68 117
jpayne@68 118 //Parse each argument
jpayne@68 119 for(int i=0; i<args.length; i++){
jpayne@68 120 String arg=args[i];
jpayne@68 121
jpayne@68 122 //Break arguments into their constituent parts, in the form of "a=b"
jpayne@68 123 String[] split=arg.split("=");
jpayne@68 124 String a=split[0].toLowerCase();
jpayne@68 125 String b=split.length>1 ? split[1] : null;
jpayne@68 126 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
jpayne@68 127
jpayne@68 128 if(a.equals("verbose")){
jpayne@68 129 verbose=Parse.parseBoolean(b);
jpayne@68 130 }else if(a.equals("ordered")){
jpayne@68 131 ordered=Parse.parseBoolean(b);
jpayne@68 132 }else if(a.equalsIgnoreCase("minid")){
jpayne@68 133 minID=Float.parseFloat(b);
jpayne@68 134 }else if(a.equalsIgnoreCase("minid2") || a.equalsIgnoreCase("refineid")){
jpayne@68 135 refineID=Float.parseFloat(b);
jpayne@68 136 }else if(a.equals("out") || a.equals("pattern") || a.equals("outpattern")){
jpayne@68 137 parser.out1=b;
jpayne@68 138 }else if(a.equals("type") || a.equals("types")){
jpayne@68 139 parseTypes(b);
jpayne@68 140 }else if(a.equals("parse_flag_goes_here")){
jpayne@68 141 long fake_variable=Parse.parseKMG(b);
jpayne@68 142 //Set a variable here
jpayne@68 143 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
jpayne@68 144 //do nothing
jpayne@68 145 }else{
jpayne@68 146 outstream.println("Unknown parameter "+args[i]);
jpayne@68 147 assert(false) : "Unknown parameter "+args[i];
jpayne@68 148 }
jpayne@68 149 }
jpayne@68 150
jpayne@68 151 return parser;
jpayne@68 152 }
jpayne@68 153
jpayne@68 154 private void parseTypes(String b){
jpayne@68 155 sequenceTypes=null;
jpayne@68 156 if(b==null){
jpayne@68 157 assert(false) : "'types' flag requires a list of types, such as 'types=16S,18S'";
jpayne@68 158 sequenceTypes=new String[] {"Other"};
jpayne@68 159 }else{
jpayne@68 160 String[] split=b.split(",");
jpayne@68 161 sequenceTypes=new String[split.length+1];
jpayne@68 162 sequenceTypes[0]="Other";
jpayne@68 163 for(int i=0; i<split.length; i++){
jpayne@68 164 String s=split[i].replace('s', 'S');
jpayne@68 165 if(s.startsWith("its")){s=s.replaceFirst("its", "ITS");}
jpayne@68 166 sequenceTypes[i+1]=s;
jpayne@68 167 }
jpayne@68 168 }
jpayne@68 169 }
jpayne@68 170
jpayne@68 171 /** Add or remove .gz or .bz2 as needed */
jpayne@68 172 private void fixExtensions(){
jpayne@68 173 in1=Tools.fixExtension(in1);
jpayne@68 174 qfin1=Tools.fixExtension(qfin1);
jpayne@68 175 }
jpayne@68 176
jpayne@68 177 /** Ensure files can be read and written */
jpayne@68 178 private void checkFileExistence(){
jpayne@68 179
jpayne@68 180 //Ensure input files can be read
jpayne@68 181 if(!Tools.testInputFiles(false, true, in1)){
jpayne@68 182 throw new RuntimeException("\nCan't read some input files.\n");
jpayne@68 183 }
jpayne@68 184
jpayne@68 185 if(outPattern==null){return;}
jpayne@68 186
jpayne@68 187 if(!outPattern.contains("#")){
jpayne@68 188 throw new RuntimeException("OutPattern must contain '#' symbol: "+outPattern);
jpayne@68 189 }
jpayne@68 190
jpayne@68 191 for(String type : sequenceTypes) {
jpayne@68 192 String out=outPattern.replaceFirst("#", type);
jpayne@68 193
jpayne@68 194 //Ensure output files can be written
jpayne@68 195 if(!Tools.testOutputFiles(overwrite, append, false, out)){
jpayne@68 196 outstream.println((outPattern==null)+", "+(out==null)+", "+outPattern+", "+out);
jpayne@68 197 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out+"\n");
jpayne@68 198 }
jpayne@68 199
jpayne@68 200 //Ensure that no file was specified multiple times
jpayne@68 201 if(!Tools.testForDuplicateFiles(true, in1, out)){
jpayne@68 202 throw new RuntimeException("\nSome file names were specified multiple times.\n");
jpayne@68 203 }
jpayne@68 204 }
jpayne@68 205 }
jpayne@68 206
jpayne@68 207 /** Adjust file-related static fields as needed for this program */
jpayne@68 208 private static void checkStatics(){
jpayne@68 209 //Adjust the number of threads for input file reading
jpayne@68 210 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
jpayne@68 211 ByteFile.FORCE_MODE_BF2=true;
jpayne@68 212 }
jpayne@68 213
jpayne@68 214 assert(FastaReadInputStream.settingsOK());
jpayne@68 215 }
jpayne@68 216
jpayne@68 217 /** Ensure parameter ranges are within bounds and required parameters are set */
jpayne@68 218 private boolean validateParams(){
jpayne@68 219 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
jpayne@68 220 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
jpayne@68 221 return true;
jpayne@68 222 }
jpayne@68 223
jpayne@68 224 private final Read[][] loadConsensusSequenceFromFile(){
jpayne@68 225 Read[][] seqs=new Read[numTypes][];
jpayne@68 226 m16S_index=Tools.find("m16S", sequenceTypes);
jpayne@68 227 m18S_index=Tools.find("m18S", sequenceTypes);
jpayne@68 228 p16S_index=Tools.find("p16S", sequenceTypes);
jpayne@68 229 boolean stripM16S=(m16S_index>=0);
jpayne@68 230 boolean stripM18S=(m18S_index>=0);
jpayne@68 231 boolean stripP16S=(p16S_index>=0);
jpayne@68 232 for(int st=1; st<numTypes; st++){
jpayne@68 233 String name=sequenceTypes[st];
jpayne@68 234 boolean is16S=name.equalsIgnoreCase("16S");
jpayne@68 235 boolean is18S=name.equalsIgnoreCase("18S");
jpayne@68 236 seqs[st]=ProkObject.loadConsensusSequenceType(name, ((is16S && stripM16S) || (is18S && stripM18S)), (is16S && stripP16S));
jpayne@68 237 }
jpayne@68 238 return seqs;
jpayne@68 239 }
jpayne@68 240
jpayne@68 241 /*--------------------------------------------------------------*/
jpayne@68 242 /*---------------- Outer Methods ----------------*/
jpayne@68 243 /*--------------------------------------------------------------*/
jpayne@68 244
jpayne@68 245 /** Create read streams and process all data */
jpayne@68 246 void process(Timer t){
jpayne@68 247
jpayne@68 248 //Turn off read validation in the input threads to increase speed
jpayne@68 249 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
jpayne@68 250 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
jpayne@68 251
jpayne@68 252 //Create a read input stream
jpayne@68 253 final ConcurrentReadInputStream cris=makeCris();
jpayne@68 254
jpayne@68 255 //Optionally create a read output stream
jpayne@68 256 final ConcurrentReadOutputStream[] rosa=makeCrosArray();
jpayne@68 257
jpayne@68 258 //Reset counters
jpayne@68 259 readsProcessed=0;
jpayne@68 260 basesProcessed=0;
jpayne@68 261 Arrays.fill(readsOut, 0);
jpayne@68 262 Arrays.fill(basesOut, 0);
jpayne@68 263
jpayne@68 264 //Process the reads in separate threads
jpayne@68 265 spawnThreads(cris, rosa);
jpayne@68 266
jpayne@68 267 if(verbose){outstream.println("Finished; closing streams.");}
jpayne@68 268
jpayne@68 269 //Write anything that was accumulated by ReadStats
jpayne@68 270 errorState|=ReadStats.writeAll();
jpayne@68 271 //assert(!errorState);
jpayne@68 272 //Close the read streams
jpayne@68 273 errorState|=ReadWrite.closeStreams(cris, rosa);
jpayne@68 274 //assert(!errorState);
jpayne@68 275
jpayne@68 276 //Reset read validation
jpayne@68 277 Read.VALIDATE_IN_CONSTRUCTOR=vic;
jpayne@68 278
jpayne@68 279 long readsOut2=Tools.sum(readsOut)-readsOut[0];
jpayne@68 280 long basesOut2=Tools.sum(basesOut)-basesOut[0];
jpayne@68 281
jpayne@68 282 //Report timing and results
jpayne@68 283 t.stop();
jpayne@68 284 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
jpayne@68 285 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut2, basesOut2, 8, true));
jpayne@68 286
jpayne@68 287 outstream.println();
jpayne@68 288 outstream.println(Tools.string("Type", "Count", 8));
jpayne@68 289 for(int type=0; type<numTypes; type++){
jpayne@68 290 outstream.println(Tools.number(sequenceTypes[type], readsOut[type], 8));
jpayne@68 291 }
jpayne@68 292
jpayne@68 293 //Throw an exception of there was an error in a thread
jpayne@68 294 if(errorState){
jpayne@68 295 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
jpayne@68 296 }
jpayne@68 297 }
jpayne@68 298
jpayne@68 299 private ConcurrentReadInputStream makeCris(){
jpayne@68 300 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, null, qfin1, null);
jpayne@68 301 cris.start(); //Start the stream
jpayne@68 302 if(verbose){outstream.println("Started cris");}
jpayne@68 303 return cris;
jpayne@68 304 }
jpayne@68 305
jpayne@68 306 private ConcurrentReadOutputStream[] makeCrosArray(){
jpayne@68 307 ConcurrentReadOutputStream[] rosa=new ConcurrentReadOutputStream[numTypes];
jpayne@68 308 for(int i=0; i<numTypes; i++){
jpayne@68 309 String type=sequenceTypes[i];
jpayne@68 310 final ConcurrentReadOutputStream ros=makeCros(type);
jpayne@68 311 rosa[i]=ros;
jpayne@68 312 }
jpayne@68 313 return rosa;
jpayne@68 314 }
jpayne@68 315
jpayne@68 316 private ConcurrentReadOutputStream makeCros(String type){
jpayne@68 317 if(outPattern==null){return null;}
jpayne@68 318
jpayne@68 319 //Select output buffer size based on whether it needs to be ordered
jpayne@68 320 final int buff=(ordered ? Tools.mid(2, 16, (Shared.threads()*2)/3) : 4);
jpayne@68 321 final String fname=outPattern.replaceFirst("#", type);
jpayne@68 322 FileFormat ff=FileFormat.testOutput(fname, FileFormat.FASTA, extout, true, overwrite, append, ordered);
jpayne@68 323
jpayne@68 324 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ff, null, buff, null, false);
jpayne@68 325 ros.start(); //Start the stream
jpayne@68 326 return ros;
jpayne@68 327 }
jpayne@68 328
jpayne@68 329 /*--------------------------------------------------------------*/
jpayne@68 330 /*---------------- Thread Management ----------------*/
jpayne@68 331 /*--------------------------------------------------------------*/
jpayne@68 332
jpayne@68 333 /** Spawn process threads */
jpayne@68 334 private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream[] rosa){
jpayne@68 335
jpayne@68 336 //Do anything necessary prior to processing
jpayne@68 337
jpayne@68 338 //Determine how many threads may be used
jpayne@68 339 final int threads=Shared.threads();
jpayne@68 340
jpayne@68 341 //Fill a list with ProcessThreads
jpayne@68 342 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
jpayne@68 343 for(int i=0; i<threads; i++){
jpayne@68 344 alpt.add(new ProcessThread(cris, rosa, i));
jpayne@68 345 }
jpayne@68 346
jpayne@68 347 //Start the threads and wait for them to finish
jpayne@68 348 boolean success=ThreadWaiter.startAndWait(alpt, this);
jpayne@68 349 errorState&=!success;
jpayne@68 350 //assert(!errorState);
jpayne@68 351
jpayne@68 352 //Do anything necessary after processing
jpayne@68 353
jpayne@68 354 }
jpayne@68 355
jpayne@68 356 @Override
jpayne@68 357 public final void accumulate(ProcessThread pt){
jpayne@68 358 readsProcessed+=pt.readsProcessedT;
jpayne@68 359 basesProcessed+=pt.basesProcessedT;
jpayne@68 360 Tools.add(readsOut, pt.readsOutT);
jpayne@68 361 Tools.add(basesOut, pt.basesOutT);
jpayne@68 362 errorState|=(!pt.success);
jpayne@68 363 //assert(!errorState);
jpayne@68 364 }
jpayne@68 365
jpayne@68 366 @Override
jpayne@68 367 public final boolean success(){return !errorState;}
jpayne@68 368
jpayne@68 369 /*--------------------------------------------------------------*/
jpayne@68 370 /*---------------- Inner Methods ----------------*/
jpayne@68 371 /*--------------------------------------------------------------*/
jpayne@68 372
jpayne@68 373 /*--------------------------------------------------------------*/
jpayne@68 374 /*---------------- Inner Classes ----------------*/
jpayne@68 375 /*--------------------------------------------------------------*/
jpayne@68 376
jpayne@68 377 /** This class is static to prevent accidental writing to shared variables.
jpayne@68 378 * It is safe to remove the static modifier. */
jpayne@68 379 class ProcessThread extends Thread {
jpayne@68 380
jpayne@68 381 //Constructor
jpayne@68 382 ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream[] rosa_, final int tid_){
jpayne@68 383 cris=cris_;
jpayne@68 384 rosa=rosa_;
jpayne@68 385 tid=tid_;
jpayne@68 386 }
jpayne@68 387
jpayne@68 388 //Called by start()
jpayne@68 389 @Override
jpayne@68 390 public void run(){
jpayne@68 391 //Do anything necessary prior to processing
jpayne@68 392
jpayne@68 393 //Process the reads
jpayne@68 394 processInner();
jpayne@68 395
jpayne@68 396 //Do anything necessary after processing
jpayne@68 397
jpayne@68 398 //Indicate successful exit status
jpayne@68 399 success=true;
jpayne@68 400 }
jpayne@68 401
jpayne@68 402 /** Iterate through the reads */
jpayne@68 403 void processInner(){
jpayne@68 404
jpayne@68 405 //Grab the first ListNum of reads
jpayne@68 406 ListNum<Read> ln=cris.nextList();
jpayne@68 407
jpayne@68 408 //Check to ensure pairing is as expected
jpayne@68 409 if(ln!=null && !ln.isEmpty()){
jpayne@68 410 Read r=ln.get(0);
jpayne@68 411 assert(r.mate==null);
jpayne@68 412 }
jpayne@68 413
jpayne@68 414 //As long as there is a nonempty read list...
jpayne@68 415 while(ln!=null && ln.size()>0){
jpayne@68 416 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
jpayne@68 417
jpayne@68 418 processList(ln);
jpayne@68 419
jpayne@68 420 //Notify the input stream that the list was used
jpayne@68 421 cris.returnList(ln);
jpayne@68 422 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
jpayne@68 423
jpayne@68 424 //Fetch a new list
jpayne@68 425 ln=cris.nextList();
jpayne@68 426 }
jpayne@68 427
jpayne@68 428 //Notify the input stream that the final list was used
jpayne@68 429 if(ln!=null){
jpayne@68 430 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
jpayne@68 431 }
jpayne@68 432 }
jpayne@68 433
jpayne@68 434 void processList(ListNum<Read> ln){
jpayne@68 435
jpayne@68 436 //Grab the actual read list from the ListNum
jpayne@68 437 final ArrayList<Read> reads=ln.list;
jpayne@68 438
jpayne@68 439 @SuppressWarnings("unchecked")
jpayne@68 440 final ArrayList<Read>[] out=new ArrayList[numTypes];
jpayne@68 441 for(int i=0; i<numTypes; i++){
jpayne@68 442 ArrayList<Read> list=new ArrayList<Read>(50);
jpayne@68 443 out[i]=list;
jpayne@68 444 }
jpayne@68 445
jpayne@68 446 //Loop through each read in the list
jpayne@68 447 for(int idx=0; idx<reads.size(); idx++){
jpayne@68 448 final Read r1=reads.get(idx);
jpayne@68 449
jpayne@68 450 //Validate reads in worker threads
jpayne@68 451 if(!r1.validated()){r1.validate(true);}
jpayne@68 452
jpayne@68 453 //Track the initial length for statistics
jpayne@68 454 final int initialLength1=r1.length();
jpayne@68 455 final int initialLength2=r1.mateLength();
jpayne@68 456
jpayne@68 457 //Increment counters
jpayne@68 458 readsProcessedT+=r1.pairCount();
jpayne@68 459 basesProcessedT+=initialLength1+initialLength2;
jpayne@68 460
jpayne@68 461 {
jpayne@68 462 //Reads are processed in this block.
jpayne@68 463 final int type=processRead(r1);
jpayne@68 464 readsOutT[type]+=r1.pairCount();
jpayne@68 465 basesOutT[type]+=r1.pairLength();
jpayne@68 466 out[type].add(r1);
jpayne@68 467 }
jpayne@68 468 }
jpayne@68 469
jpayne@68 470 //Output reads to the output stream
jpayne@68 471 if(rosa!=null){
jpayne@68 472 for(int type=0; type<numTypes; type++){
jpayne@68 473 rosa[type].add(out[type], ln.id);
jpayne@68 474 }
jpayne@68 475 }
jpayne@68 476 }
jpayne@68 477
jpayne@68 478 /**
jpayne@68 479 * Process a read.
jpayne@68 480 * @param r1 Read 1
jpayne@68 481 * @return The best-matching type, or 0 for no matches.
jpayne@68 482 */
jpayne@68 483 private int processRead(final Read r){
jpayne@68 484 int bestType=0;
jpayne@68 485 float bestID=-1;
jpayne@68 486 for(int type=1; type<numTypes; type++){//Align to only the overall consensus
jpayne@68 487 Read[] refs=consensusSequences[type];
jpayne@68 488 float id=align(r, refs, 0, 1);
jpayne@68 489 if(id>bestID && id>=minID){
jpayne@68 490 bestType=type;
jpayne@68 491 bestID=id;
jpayne@68 492 }
jpayne@68 493 }
jpayne@68 494 if(bestType<1 || bestID<refineID || bestType==p16S_index){//If nothing met minID, or if it matched chloro, align to clade-specific consensuses
jpayne@68 495 for(int type=1; type<numTypes; type++){
jpayne@68 496 Read[] refs=consensusSequences[type];
jpayne@68 497 float id=align(r, refs, 1, refs.length);
jpayne@68 498 if(id>bestID && id>=minID){
jpayne@68 499 bestType=type;
jpayne@68 500 bestID=id;
jpayne@68 501 }
jpayne@68 502 }
jpayne@68 503 }
jpayne@68 504 r.obj=bestID;//If desired... in actuality, more info might be useful, like alignment length
jpayne@68 505 return bestID<minID ? 0 : bestType;
jpayne@68 506 }
jpayne@68 507
jpayne@68 508 private float align(Read r, Read[] refs, int minRef, int maxRef){
jpayne@68 509 float bestID=-1;
jpayne@68 510 if(refs!=null){
jpayne@68 511 for(int i=minRef; i<maxRef; i++){
jpayne@68 512 Read ref=refs[i];
jpayne@68 513 float id=align(r.bases, ref.bases);
jpayne@68 514 bestID=Tools.max(id, bestID);
jpayne@68 515 }
jpayne@68 516 }
jpayne@68 517 return bestID;
jpayne@68 518 }
jpayne@68 519
jpayne@68 520 private float align(byte[] query, byte[] ref){
jpayne@68 521 int a=0, b=ref.length-1;
jpayne@68 522 int[] max=ssa.fillUnlimited(query, ref, a, b, -9999);
jpayne@68 523 if(max==null){return 0;}
jpayne@68 524
jpayne@68 525 final int rows=max[0];
jpayne@68 526 final int maxCol=max[1];
jpayne@68 527 final int maxState=max[2];
jpayne@68 528 final float id=ssa.tracebackIdentity(query, ref, a, b, rows, maxCol, maxState, null);
jpayne@68 529 return id;
jpayne@68 530 }
jpayne@68 531
jpayne@68 532 SingleStateAlignerFlat2 ssa=new SingleStateAlignerFlat2();
jpayne@68 533
jpayne@68 534 /** Number of reads processed by this thread */
jpayne@68 535 protected long readsProcessedT=0;
jpayne@68 536 /** Number of bases processed by this thread */
jpayne@68 537 protected long basesProcessedT=0;
jpayne@68 538
jpayne@68 539 /** Number of reads retained by this thread */
jpayne@68 540 protected long[] readsOutT=new long[numTypes];
jpayne@68 541 /** Number of bases retained by this thread */
jpayne@68 542 protected long[] basesOutT=new long[numTypes];
jpayne@68 543
jpayne@68 544 /** True only if this thread has completed successfully */
jpayne@68 545 boolean success=false;
jpayne@68 546
jpayne@68 547 /** Shared input stream */
jpayne@68 548 private final ConcurrentReadInputStream cris;
jpayne@68 549 /** Shared output stream */
jpayne@68 550 private final ConcurrentReadOutputStream[] rosa;
jpayne@68 551 /** Thread ID */
jpayne@68 552 final int tid;
jpayne@68 553 }
jpayne@68 554
jpayne@68 555 /*--------------------------------------------------------------*/
jpayne@68 556 /*---------------- Fields ----------------*/
jpayne@68 557 /*--------------------------------------------------------------*/
jpayne@68 558
jpayne@68 559 /** Primary input file path */
jpayne@68 560 private String in1=null;
jpayne@68 561
jpayne@68 562 private String qfin1=null;
jpayne@68 563
jpayne@68 564 /** Primary output file path */
jpayne@68 565 private String outPattern=null;
jpayne@68 566
jpayne@68 567 /** Override input file extension */
jpayne@68 568 private String extin=null;
jpayne@68 569 /** Override output file extension */
jpayne@68 570 private String extout=null;
jpayne@68 571
jpayne@68 572 float minID=0.59f; //This could be a per-type value
jpayne@68 573 float refineID=0.70f; //Refine alignment if best is less than this
jpayne@68 574
jpayne@68 575 private int m16S_index=-2;
jpayne@68 576 private int m18S_index=-2;
jpayne@68 577 private int p16S_index=-2;
jpayne@68 578
jpayne@68 579 /*--------------------------------------------------------------*/
jpayne@68 580
jpayne@68 581 /** Number of reads processed */
jpayne@68 582 protected long readsProcessed=0;
jpayne@68 583 /** Number of bases processed */
jpayne@68 584 protected long basesProcessed=0;
jpayne@68 585
jpayne@68 586 /** Quit after processing this many input reads; -1 means no limit */
jpayne@68 587 private long maxReads=-1;
jpayne@68 588
jpayne@68 589 private String[] sequenceTypes=new String[] {"Other", "16S", "18S", "23S", "5S", "m16S", "m18S", "p16S"};
jpayne@68 590 private final int numTypes;//=sequenceTypes.length;
jpayne@68 591 final Read[][] consensusSequences;
jpayne@68 592
jpayne@68 593 /** Number of reads retained */
jpayne@68 594 final long[] readsOut;
jpayne@68 595 /** Number of bases retained */
jpayne@68 596 final long[] basesOut;
jpayne@68 597
jpayne@68 598 /*--------------------------------------------------------------*/
jpayne@68 599 /*---------------- Final Fields ----------------*/
jpayne@68 600 /*--------------------------------------------------------------*/
jpayne@68 601
jpayne@68 602 /** Primary input file */
jpayne@68 603 private final FileFormat ffin1;
jpayne@68 604
jpayne@68 605 /*--------------------------------------------------------------*/
jpayne@68 606 /*---------------- Static Fields ----------------*/
jpayne@68 607 /*--------------------------------------------------------------*/
jpayne@68 608
jpayne@68 609 /*--------------------------------------------------------------*/
jpayne@68 610 /*---------------- Common Fields ----------------*/
jpayne@68 611 /*--------------------------------------------------------------*/
jpayne@68 612
jpayne@68 613 /** Print status messages to this output stream */
jpayne@68 614 private PrintStream outstream=System.err;
jpayne@68 615 /** Print verbose messages */
jpayne@68 616 public static boolean verbose=false;
jpayne@68 617 /** True if an error was encountered */
jpayne@68 618 public boolean errorState=false;
jpayne@68 619 /** Overwrite existing output files */
jpayne@68 620 private boolean overwrite=false;
jpayne@68 621 /** Append to existing output files */
jpayne@68 622 private boolean append=false;
jpayne@68 623 /** Reads are output in input order */
jpayne@68 624 private boolean ordered=true;
jpayne@68 625
jpayne@68 626 }