annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/prok/MergeRibo.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 package prok;
jpayne@68 2
jpayne@68 3 import java.io.File;
jpayne@68 4 import java.io.PrintStream;
jpayne@68 5 import java.util.ArrayList;
jpayne@68 6 import java.util.Collections;
jpayne@68 7 import java.util.Comparator;
jpayne@68 8 import java.util.HashMap;
jpayne@68 9 import java.util.Map.Entry;
jpayne@68 10 import java.util.concurrent.ConcurrentLinkedQueue;
jpayne@68 11
jpayne@68 12 import aligner.SingleStateAlignerFlat2;
jpayne@68 13 import consensus.BaseGraph;
jpayne@68 14 import fileIO.ByteFile;
jpayne@68 15 import fileIO.FileFormat;
jpayne@68 16 import fileIO.ReadWrite;
jpayne@68 17 import shared.Parse;
jpayne@68 18 import shared.Parser;
jpayne@68 19 import shared.PreParser;
jpayne@68 20 import shared.ReadStats;
jpayne@68 21 import shared.Shared;
jpayne@68 22 import shared.Timer;
jpayne@68 23 import shared.Tools;
jpayne@68 24 import stream.ConcurrentReadInputStream;
jpayne@68 25 import stream.ConcurrentReadOutputStream;
jpayne@68 26 import stream.FASTQ;
jpayne@68 27 import stream.FastaReadInputStream;
jpayne@68 28 import stream.Read;
jpayne@68 29 import structures.IntHashSet;
jpayne@68 30 import structures.ListNum;
jpayne@68 31 import tax.GiToTaxid;
jpayne@68 32 import template.Accumulator;
jpayne@68 33 import template.ThreadWaiter;
jpayne@68 34
jpayne@68 35 /**
jpayne@68 36 * Picks one ribosomal (16S) sequence per taxID.
jpayne@68 37 *
jpayne@68 38 * @author Brian Bushnell
jpayne@68 39 * @date November 19, 2015
jpayne@68 40 *
jpayne@68 41 */
jpayne@68 42 public class MergeRibo implements Accumulator<MergeRibo.ProcessThread> {
jpayne@68 43
jpayne@68 44 /*--------------------------------------------------------------*/
jpayne@68 45 /*---------------- Initialization ----------------*/
jpayne@68 46 /*--------------------------------------------------------------*/
jpayne@68 47
jpayne@68 48 /**
jpayne@68 49 * Code entrance from the command line.
jpayne@68 50 * @param args Command line arguments
jpayne@68 51 */
jpayne@68 52 public static void main(String[] args){
jpayne@68 53 //Start a timer immediately upon code entrance.
jpayne@68 54 Timer t=new Timer();
jpayne@68 55
jpayne@68 56 //Create an instance of this class
jpayne@68 57 MergeRibo x=new MergeRibo(args);
jpayne@68 58
jpayne@68 59 //Run the object
jpayne@68 60 x.process(t);
jpayne@68 61
jpayne@68 62 //Close the print stream if it was redirected
jpayne@68 63 Shared.closeStream(x.outstream);
jpayne@68 64 }
jpayne@68 65
jpayne@68 66 /**
jpayne@68 67 * Constructor.
jpayne@68 68 * @param args Command line arguments
jpayne@68 69 */
jpayne@68 70 public MergeRibo(String[] args){
jpayne@68 71
jpayne@68 72 {//Preparse block for help, config files, and outstream
jpayne@68 73 PreParser pp=new PreParser(args, getClass(), false);
jpayne@68 74 args=pp.args;
jpayne@68 75 outstream=pp.outstream;
jpayne@68 76 }
jpayne@68 77
jpayne@68 78 //Set shared static variables prior to parsing
jpayne@68 79 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
jpayne@68 80 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
jpayne@68 81 // Shared.capBufferLen(40);//This does not help; the slowness comes from unevenness in list length during pickBest.
jpayne@68 82 //To fix it, long lists should be sorted to be first.
jpayne@68 83
jpayne@68 84 BaseGraph.MAF_sub=0.251f;
jpayne@68 85 BaseGraph.MAF_del=0.0f;
jpayne@68 86 BaseGraph.MAF_ins=0.0f;
jpayne@68 87 BaseGraph.MAF_noref=0.0f;
jpayne@68 88 BaseGraph.trimDepthFraction=0.3f;
jpayne@68 89 BaseGraph.trimNs=true;
jpayne@68 90
jpayne@68 91 {//Parse the arguments
jpayne@68 92 final Parser parser=parse(args);
jpayne@68 93 Parser.processQuality();
jpayne@68 94
jpayne@68 95 maxReads=parser.maxReads;
jpayne@68 96 overwrite=ReadStats.overwrite=parser.overwrite;
jpayne@68 97 append=ReadStats.append=parser.append;
jpayne@68 98
jpayne@68 99 extin=parser.extin;
jpayne@68 100
jpayne@68 101 out1=parser.out1;
jpayne@68 102 extout=parser.extout;
jpayne@68 103 }
jpayne@68 104
jpayne@68 105 validateParams();
jpayne@68 106 adjustInterleaving(); //Make sure interleaving agrees with number of input and output files
jpayne@68 107 checkFileExistence(); //Ensure files can be read and written
jpayne@68 108 checkStatics(); //Adjust file-related static fields as needed for this program
jpayne@68 109
jpayne@68 110 //Create output FileFormat objects
jpayne@68 111 ffout1=FileFormat.testOutput(out1, FileFormat.FASTA, extout, true, overwrite, append, ordered);
jpayne@68 112
jpayne@68 113 //Create input FileFormat objects
jpayne@68 114 ffin=new ArrayList<FileFormat>(in.size());
jpayne@68 115 ffalt=FileFormat.testInput(alt, FileFormat.FASTA, extin, true, true);
jpayne@68 116 for(String s : in){
jpayne@68 117 FileFormat ff=FileFormat.testInput(s, FileFormat.FASTA, extin, true, true);
jpayne@68 118 ffin.add(ff);
jpayne@68 119 }
jpayne@68 120
jpayne@68 121 //Determine how many threads may be used
jpayne@68 122 threads=Shared.threads();
jpayne@68 123 }
jpayne@68 124
jpayne@68 125 /*--------------------------------------------------------------*/
jpayne@68 126 /*---------------- Initialization Helpers ----------------*/
jpayne@68 127 /*--------------------------------------------------------------*/
jpayne@68 128
jpayne@68 129 /** Parse arguments from the command line */
jpayne@68 130 private Parser parse(String[] args){
jpayne@68 131
jpayne@68 132 //Create a parser object
jpayne@68 133 Parser parser=new Parser();
jpayne@68 134
jpayne@68 135 //Set any necessary Parser defaults here
jpayne@68 136 //parser.foo=bar;
jpayne@68 137
jpayne@68 138 //Parse each argument
jpayne@68 139 for(int i=0; i<args.length; i++){
jpayne@68 140 String arg=args[i];
jpayne@68 141
jpayne@68 142 //Break arguments into their constituent parts, in the form of "a=b"
jpayne@68 143 String[] split=arg.split("=");
jpayne@68 144 String a=split[0].toLowerCase();
jpayne@68 145 String b=split.length>1 ? split[1] : null;
jpayne@68 146 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
jpayne@68 147
jpayne@68 148 if(a.equals("verbose")){
jpayne@68 149 verbose=Parse.parseBoolean(b);
jpayne@68 150 }else if(a.equals("ordered")){
jpayne@68 151 ordered=Parse.parseBoolean(b);
jpayne@68 152 }else if(a.equals("consensus")){
jpayne@68 153 useConsensus=Parse.parseBoolean(b);
jpayne@68 154 }else if(a.equals("best")){
jpayne@68 155 useConsensus=!Parse.parseBoolean(b);
jpayne@68 156 }else if(a.equals("fast")){
jpayne@68 157 fast=Parse.parseBoolean(b);
jpayne@68 158 }else if(a.equals("minid")){
jpayne@68 159 minID=Float.parseFloat(b);
jpayne@68 160 }else if(a.equals("maxns")){
jpayne@68 161 maxns=Integer.parseInt(b);
jpayne@68 162 }else if(a.equals("minlen")){
jpayne@68 163 minlen=Integer.parseInt(b);
jpayne@68 164 }else if(a.equals("maxlen")){
jpayne@68 165 maxlen=Integer.parseInt(b);
jpayne@68 166 }else if(a.equals("in")){
jpayne@68 167 Tools.addFiles(b, in);
jpayne@68 168 }else if(a.equals("alt")){
jpayne@68 169 alt=b;
jpayne@68 170 }else if(a.equalsIgnoreCase("process16S") || a.equalsIgnoreCase("16S")){
jpayne@68 171 process16S=Parse.parseBoolean(b);
jpayne@68 172 process18S=!process16S;
jpayne@68 173 }else if(a.equalsIgnoreCase("process18S") || a.equalsIgnoreCase("18S")){
jpayne@68 174 process18S=Parse.parseBoolean(b);
jpayne@68 175 process16S=!process18S;
jpayne@68 176 }else if(a.equals("parse_flag_goes_here")){
jpayne@68 177 long fake_variable=Parse.parseKMG(b);
jpayne@68 178 //Set a variable here
jpayne@68 179 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
jpayne@68 180 //do nothing
jpayne@68 181 }else if(b==null && new File(arg).exists()){
jpayne@68 182 in.add(arg);
jpayne@68 183 }else{
jpayne@68 184 outstream.println("Unknown parameter "+args[i]);
jpayne@68 185 assert(false) : "Unknown parameter "+args[i];
jpayne@68 186 }
jpayne@68 187 }
jpayne@68 188 assert(!in.isEmpty()) : "No input file.";
jpayne@68 189 return parser;
jpayne@68 190 }
jpayne@68 191
jpayne@68 192 /** Ensure files can be read and written */
jpayne@68 193 private void checkFileExistence(){
jpayne@68 194 //Ensure output files can be written
jpayne@68 195 if(!Tools.testOutputFiles(overwrite, append, false, out1)){
jpayne@68 196 outstream.println((out1==null)+", "+out1);
jpayne@68 197 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+"\n");
jpayne@68 198 }
jpayne@68 199
jpayne@68 200 //Ensure input files can be read
jpayne@68 201 if(!Tools.testInputFiles(false, true, in)){
jpayne@68 202 throw new RuntimeException("\nCan't read some input files.\n");
jpayne@68 203 }
jpayne@68 204
jpayne@68 205 // //Ensure that no file was specified multiple times
jpayne@68 206 // if(!Tools.testForDuplicateFiles(true, out1, in.toArray(new String[0]))){
jpayne@68 207 // throw new RuntimeException("\nSome file names were specified multiple times.\n");
jpayne@68 208 // }
jpayne@68 209 }
jpayne@68 210
jpayne@68 211 /** Make sure interleaving agrees with number of input and output files */
jpayne@68 212 private void adjustInterleaving(){
jpayne@68 213 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
jpayne@68 214 }
jpayne@68 215
jpayne@68 216 /** Adjust file-related static fields as needed for this program */
jpayne@68 217 private static void checkStatics(){
jpayne@68 218 //Adjust the number of threads for input file reading
jpayne@68 219 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
jpayne@68 220 ByteFile.FORCE_MODE_BF2=true;
jpayne@68 221 }
jpayne@68 222
jpayne@68 223 assert(FastaReadInputStream.settingsOK());
jpayne@68 224 }
jpayne@68 225
jpayne@68 226 /** Ensure parameter ranges are within bounds and required parameters are set */
jpayne@68 227 private boolean validateParams(){
jpayne@68 228 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
jpayne@68 229 // assert(false) : "TODO";
jpayne@68 230 assert(process16S || process18S) : "16S or 18S must be selected.";
jpayne@68 231 assert(!process16S || !process18S) : "16S or 18S are both selected; only one may be active.";
jpayne@68 232 return true;
jpayne@68 233 }
jpayne@68 234
jpayne@68 235 /*--------------------------------------------------------------*/
jpayne@68 236 /*---------------- Outer Methods ----------------*/
jpayne@68 237 /*--------------------------------------------------------------*/
jpayne@68 238
jpayne@68 239 /** Create read streams and process all data */
jpayne@68 240 void process(Timer t){
jpayne@68 241
jpayne@68 242 if(process16S){
jpayne@68 243 Read[] data=ProkObject.loadConsensusSequenceType("16S", true, true);
jpayne@68 244 consensus16S=data[0].bases;
jpayne@68 245 if(verbose){System.err.println("process16S: Loaded 16S consensus, length "+consensus16S.length+": "+new String(consensus16S));}
jpayne@68 246 }
jpayne@68 247 if(process18S){
jpayne@68 248 Read[] data=ProkObject.loadConsensusSequenceType("18S", true, true);
jpayne@68 249 consensus18S=data[0].bases;
jpayne@68 250 if(verbose){System.err.println("process18S: Loaded 18S consensus, length "+consensus18S.length+": "+new String(consensus18S));}
jpayne@68 251 }
jpayne@68 252
jpayne@68 253 //Turn off read validation in the input threads to increase speed
jpayne@68 254 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
jpayne@68 255 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
jpayne@68 256
jpayne@68 257 //Reset counters
jpayne@68 258 readsProcessed=readsOut=0;
jpayne@68 259 basesProcessed=basesOut=0;
jpayne@68 260
jpayne@68 261 //Align everything to global consensus
jpayne@68 262 for(FileFormat ff : ffin) {
jpayne@68 263 //Create a read input stream
jpayne@68 264 final ConcurrentReadInputStream cris=makeCris(ff);
jpayne@68 265
jpayne@68 266 //Process the reads in separate threads
jpayne@68 267 spawnThreads(cris, false);
jpayne@68 268 errorState|=ReadWrite.closeStream(cris);
jpayne@68 269 }
jpayne@68 270
jpayne@68 271 if(ffalt!=null){
jpayne@68 272 //Create a read input stream
jpayne@68 273 final ConcurrentReadInputStream cris=makeCris(ffalt);
jpayne@68 274
jpayne@68 275 //Process the reads in separate threads
jpayne@68 276 spawnThreads(cris, true);
jpayne@68 277 errorState|=ReadWrite.closeStream(cris);
jpayne@68 278 }
jpayne@68 279
jpayne@68 280 // queue=new ConcurrentLinkedQueue<ArrayList<Ribo>>();
jpayne@68 281 // for(Entry<Integer, ArrayList<Ribo>> e : listMap.entrySet()){
jpayne@68 282 // queue.add(e.getValue());
jpayne@68 283 // }
jpayne@68 284 // listMap=null;
jpayne@68 285 queue=makeQueue();
jpayne@68 286
jpayne@68 287 //Run a second pass to pick the best SSU per taxID
jpayne@68 288 spawnThreads(null, false);
jpayne@68 289
jpayne@68 290 //Do anything necessary after processing
jpayne@68 291 if(ffout1!=null){
jpayne@68 292 //Optionally create a read output stream
jpayne@68 293 final ConcurrentReadOutputStream ros=makeCros();
jpayne@68 294 long num=0;
jpayne@68 295 for(Ribo ribo : bestList){
jpayne@68 296 Read r=ribo.r;
jpayne@68 297 readsOut++;
jpayne@68 298 basesOut+=r.length();
jpayne@68 299 ArrayList<Read> list=new ArrayList<Read>(1);
jpayne@68 300 list.add(r);
jpayne@68 301 ros.add(list, num);
jpayne@68 302 num++;
jpayne@68 303 }
jpayne@68 304 //Close the read streams
jpayne@68 305 errorState|=ReadWrite.closeStream(ros);
jpayne@68 306 }
jpayne@68 307
jpayne@68 308 if(verbose){outstream.println("Finished; closing streams.");}
jpayne@68 309
jpayne@68 310 //Write anything that was accumulated by ReadStats
jpayne@68 311 errorState|=ReadStats.writeAll();
jpayne@68 312
jpayne@68 313 //Reset read validation
jpayne@68 314 Read.VALIDATE_IN_CONSTRUCTOR=vic;
jpayne@68 315
jpayne@68 316 //Report timing and results
jpayne@68 317 t.stop();
jpayne@68 318 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
jpayne@68 319 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
jpayne@68 320
jpayne@68 321 //Throw an exception of there was an error in a thread
jpayne@68 322 if(errorState){
jpayne@68 323 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
jpayne@68 324 }
jpayne@68 325 }
jpayne@68 326
jpayne@68 327 private ConcurrentLinkedQueue<ArrayList<Ribo>> makeQueue(){
jpayne@68 328 ArrayList<ArrayList<Ribo>> listList=new ArrayList<ArrayList<Ribo>>(listMap.size());
jpayne@68 329 for(Entry<Integer, ArrayList<Ribo>> e : listMap.entrySet()){
jpayne@68 330 listList.add(e.getValue());
jpayne@68 331 }
jpayne@68 332 listMap=null;
jpayne@68 333 Collections.sort(listList, new ListComparator());
jpayne@68 334 assert(listList.isEmpty() || listList.get(0).size()>=listList.get(listList.size()-1).size());
jpayne@68 335 ConcurrentLinkedQueue<ArrayList<Ribo>> q=new ConcurrentLinkedQueue<ArrayList<Ribo>>();
jpayne@68 336 for(ArrayList<Ribo> x : listList){
jpayne@68 337 q.add(x);
jpayne@68 338 }
jpayne@68 339 return q;
jpayne@68 340 }
jpayne@68 341
jpayne@68 342 private ConcurrentReadInputStream makeCris(FileFormat ff){
jpayne@68 343 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ff, null);
jpayne@68 344 cris.start(); //Start the stream
jpayne@68 345 if(verbose){outstream.println("Started cris");}
jpayne@68 346 boolean paired=cris.paired();
jpayne@68 347 assert(!paired) : "This should not be paired input.";
jpayne@68 348 return cris;
jpayne@68 349 }
jpayne@68 350
jpayne@68 351 private ConcurrentReadOutputStream makeCros(){
jpayne@68 352 if(ffout1==null){return null;}
jpayne@68 353
jpayne@68 354 //Select output buffer size based on whether it needs to be ordered
jpayne@68 355 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
jpayne@68 356
jpayne@68 357 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout1, null, buff, null, false);
jpayne@68 358 ros.start(); //Start the stream
jpayne@68 359 return ros;
jpayne@68 360 }
jpayne@68 361
jpayne@68 362 /*--------------------------------------------------------------*/
jpayne@68 363 /*---------------- Thread Management ----------------*/
jpayne@68 364 /*--------------------------------------------------------------*/
jpayne@68 365
jpayne@68 366 /** Spawn process threads */
jpayne@68 367 private void spawnThreads(final ConcurrentReadInputStream cris, boolean altData){
jpayne@68 368
jpayne@68 369 //Do anything necessary prior to processing
jpayne@68 370
jpayne@68 371 //Fill a list with ProcessThreads
jpayne@68 372 if(verbose){System.err.println("Spawning "+threads+" threads.");}
jpayne@68 373 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
jpayne@68 374 for(int i=0; i<threads; i++){
jpayne@68 375 alpt.add(new ProcessThread(cris, i, altData));
jpayne@68 376 }
jpayne@68 377
jpayne@68 378 //Start the threads and wait for them to finish
jpayne@68 379 boolean success=ThreadWaiter.startAndWait(alpt, this);
jpayne@68 380 if(verbose){System.err.println("Threads finished with success="+success+".");}
jpayne@68 381 errorState&=!success;
jpayne@68 382 }
jpayne@68 383
jpayne@68 384 @Override
jpayne@68 385 public final void accumulate(ProcessThread pt){
jpayne@68 386 readsProcessed+=pt.readsProcessedT;
jpayne@68 387 basesProcessed+=pt.basesProcessedT;
jpayne@68 388 errorState|=(!pt.success);
jpayne@68 389 }
jpayne@68 390
jpayne@68 391 @Override
jpayne@68 392 public final boolean success(){return !errorState;}
jpayne@68 393
jpayne@68 394 /*--------------------------------------------------------------*/
jpayne@68 395 /*---------------- Inner Methods ----------------*/
jpayne@68 396 /*--------------------------------------------------------------*/
jpayne@68 397
jpayne@68 398 /*--------------------------------------------------------------*/
jpayne@68 399 /*---------------- Inner Classes ----------------*/
jpayne@68 400 /*--------------------------------------------------------------*/
jpayne@68 401
jpayne@68 402 /** This class is static to prevent accidental writing to shared variables.
jpayne@68 403 * It is safe to remove the static modifier. */
jpayne@68 404 class ProcessThread extends Thread {
jpayne@68 405
jpayne@68 406 //Constructor
jpayne@68 407 ProcessThread(final ConcurrentReadInputStream cris_, final int tid_, boolean alt_){
jpayne@68 408 cris=cris_;
jpayne@68 409 tid=tid_;
jpayne@68 410 processInput=(cris!=null);
jpayne@68 411 altData=alt_;
jpayne@68 412 }
jpayne@68 413
jpayne@68 414 //Called by start()
jpayne@68 415 @Override
jpayne@68 416 public void run(){
jpayne@68 417 //Do anything necessary prior to processing
jpayne@68 418
jpayne@68 419 if(processInput){
jpayne@68 420 //Process the reads
jpayne@68 421 processInner();
jpayne@68 422 }else{
jpayne@68 423 pickBest();
jpayne@68 424 }
jpayne@68 425
jpayne@68 426 //Do anything necessary after processing
jpayne@68 427
jpayne@68 428 //Indicate successful exit status
jpayne@68 429 success=true;
jpayne@68 430 }
jpayne@68 431
jpayne@68 432 /** Iterate through the reads */
jpayne@68 433 void processInner(){
jpayne@68 434 if(verbose && tid==0){System.err.println("processInner() for tid="+tid);}
jpayne@68 435
jpayne@68 436 //Grab the first ListNum of reads
jpayne@68 437 ListNum<Read> ln=cris.nextList();
jpayne@68 438
jpayne@68 439 //Check to ensure pairing is as expected
jpayne@68 440 if(ln!=null && !ln.isEmpty()){
jpayne@68 441 Read r=ln.get(0);
jpayne@68 442 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
jpayne@68 443 }
jpayne@68 444
jpayne@68 445 //As long as there is a nonempty read list...
jpayne@68 446 while(ln!=null && ln.size()>0){
jpayne@68 447 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
jpayne@68 448
jpayne@68 449 processInput(ln);
jpayne@68 450
jpayne@68 451 //Notify the input stream that the list was used
jpayne@68 452 cris.returnList(ln);
jpayne@68 453 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
jpayne@68 454
jpayne@68 455 //Fetch a new list
jpayne@68 456 ln=cris.nextList();
jpayne@68 457 }
jpayne@68 458
jpayne@68 459 //Notify the input stream that the final list was used
jpayne@68 460 if(ln!=null){
jpayne@68 461 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
jpayne@68 462 }
jpayne@68 463 }
jpayne@68 464
jpayne@68 465 void processInput(ListNum<Read> ln){
jpayne@68 466 if(verbose && tid==0){System.err.println("processInput() for tid="+tid);}
jpayne@68 467
jpayne@68 468 //Grab the actual read list from the ListNum
jpayne@68 469 final ArrayList<Read> reads=ln.list;
jpayne@68 470
jpayne@68 471 //Loop through each read in the list
jpayne@68 472 for(int idx=0; idx<reads.size(); idx++){
jpayne@68 473 final Read r1=reads.get(idx);
jpayne@68 474
jpayne@68 475 //Validate reads in worker threads
jpayne@68 476 if(!r1.validated()){r1.validate(true);}
jpayne@68 477
jpayne@68 478 //Track the initial length for statistics
jpayne@68 479 final int initialLength1=r1.length();
jpayne@68 480
jpayne@68 481 //Increment counters
jpayne@68 482 readsProcessedT++;
jpayne@68 483 basesProcessedT+=initialLength1;
jpayne@68 484
jpayne@68 485 processRead(r1);
jpayne@68 486 }
jpayne@68 487 }
jpayne@68 488
jpayne@68 489 void pickBest(){
jpayne@68 490 if(verbose && tid==0){System.err.println("pickBest() for tid="+tid);}
jpayne@68 491 for(ArrayList<Ribo> list=queue.poll(); list!=null; list=queue.poll()){
jpayne@68 492 Ribo best=pickBest(list);
jpayne@68 493 list.clear();
jpayne@68 494 synchronized(bestList){
jpayne@68 495 bestList.add(best);
jpayne@68 496 }
jpayne@68 497 }
jpayne@68 498 }
jpayne@68 499
jpayne@68 500 Ribo pickBest(ArrayList<Ribo> list){
jpayne@68 501 if(verbose && tid==0){System.err.println("pickBest(list[="+list.size()+"]) for tid="+tid);}
jpayne@68 502 assert(list!=null && list.size()>0);
jpayne@68 503 if(list.size()==1){return list.get(0);}
jpayne@68 504 Collections.sort(list);
jpayne@68 505 Collections.reverse(list);
jpayne@68 506 assert(list.get(0).product>=list.get(1).product);
jpayne@68 507 if(list.size()<3 || fast){return list.get(0);}
jpayne@68 508
jpayne@68 509 Ribo base=list.get(0);
jpayne@68 510 int pad=Tools.max(10, (1600-base.r.length()));
jpayne@68 511 BaseGraph bg=new BaseGraph(base.r.name(), base.r.bases, base.r.quality, base.r.numericID, pad);
jpayne@68 512 for(Ribo r : list){
jpayne@68 513 bg.alignAndGenerateMatch(r.r, ssa);
jpayne@68 514 }
jpayne@68 515 Read consensus=bg.traverse();
jpayne@68 516 Ribo best;
jpayne@68 517 if(useConsensus){
jpayne@68 518 best=new Ribo(consensus, base.tid, 1);
jpayne@68 519 }else{
jpayne@68 520 for(Ribo r : list){
jpayne@68 521 float id=align(r.r.bases, consensus.bases);
jpayne@68 522 r.identity=id;
jpayne@68 523 r.product=score(r.length(), r.identity);
jpayne@68 524 }
jpayne@68 525 Collections.sort(list);
jpayne@68 526 Collections.reverse(list);
jpayne@68 527 assert(list.get(0).product>=list.get(1).product);
jpayne@68 528 best=list.get(0);
jpayne@68 529 }
jpayne@68 530 return best;
jpayne@68 531 }
jpayne@68 532
jpayne@68 533 /**
jpayne@68 534 * Process a read or a read pair.
jpayne@68 535 * @return True if the reads should be kept, false if they should be discarded.
jpayne@68 536 */
jpayne@68 537 void processRead(final Read r){
jpayne@68 538 if(verbose && tid==0){System.err.println("processRead()");}
jpayne@68 539 if(r.length()<minlen || r.length()>maxlen){return;}
jpayne@68 540 if(maxns>=0 && r.countNocalls()>maxns){return;}
jpayne@68 541 Integer key=GiToTaxid.parseTaxidNumber(r.id, '|');
jpayne@68 542 if(verbose && tid==0){System.err.println("key="+key);}
jpayne@68 543 if(key==null || key==-1 || (altData && seenTaxID.contains(key))){return;}
jpayne@68 544 float id=align(r);
jpayne@68 545 if(id<minID){return;}
jpayne@68 546 Ribo ribo=new Ribo(r, key, id);
jpayne@68 547
jpayne@68 548 synchronized(listMap){
jpayne@68 549 ArrayList<Ribo> list=listMap.get(key);
jpayne@68 550 if(list==null){
jpayne@68 551 list=new ArrayList<Ribo>(8);
jpayne@68 552 listMap.put(key, list);
jpayne@68 553 }
jpayne@68 554 list.add(ribo);
jpayne@68 555 if(!altData){seenTaxID.add(key);}
jpayne@68 556 }
jpayne@68 557 }
jpayne@68 558
jpayne@68 559 float align(Read r){
jpayne@68 560 float a=(process16S ? align(r.bases, consensus16S) : 0);
jpayne@68 561 float b=(process18S ? align(r.bases, consensus18S) : 0);
jpayne@68 562 if(verbose && tid==0){System.err.println("Aligned; a="+a+", b="+b);}
jpayne@68 563 return Tools.max(a, b);
jpayne@68 564 }
jpayne@68 565
jpayne@68 566 float align(byte[] query, byte[] ref){
jpayne@68 567 int a=0, b=ref.length-1;
jpayne@68 568 int[] max=ssa.fillUnlimited(query, ref, a, b, -9999);
jpayne@68 569 if(max==null){return 0;}
jpayne@68 570
jpayne@68 571 final int rows=max[0];
jpayne@68 572 final int maxCol=max[1];
jpayne@68 573 final int maxState=max[2];
jpayne@68 574 final float id=ssa.tracebackIdentity(query, ref, a, b, rows, maxCol, maxState, null);
jpayne@68 575 return id;
jpayne@68 576 }
jpayne@68 577
jpayne@68 578 SingleStateAlignerFlat2 ssa=new SingleStateAlignerFlat2();
jpayne@68 579
jpayne@68 580 /** Number of reads processed by this thread */
jpayne@68 581 protected long readsProcessedT=0;
jpayne@68 582 /** Number of bases processed by this thread */
jpayne@68 583 protected long basesProcessedT=0;
jpayne@68 584
jpayne@68 585 /** True only if this thread has completed successfully */
jpayne@68 586 boolean success=false;
jpayne@68 587
jpayne@68 588 /** Shared input stream */
jpayne@68 589 private final ConcurrentReadInputStream cris;
jpayne@68 590 /** Thread ID */
jpayne@68 591 final int tid;
jpayne@68 592
jpayne@68 593 //Run mode
jpayne@68 594 final boolean processInput;
jpayne@68 595 final boolean altData;
jpayne@68 596 }
jpayne@68 597
jpayne@68 598 private class Ribo implements Comparable<Ribo>{
jpayne@68 599 Ribo(Read r_, int tid_, float identity_){
jpayne@68 600 r=r_;
jpayne@68 601 tid=tid_;
jpayne@68 602 identity=identity_;
jpayne@68 603 product=score(r.length(), identity);
jpayne@68 604 }
jpayne@68 605
jpayne@68 606 @Override
jpayne@68 607 public int compareTo(Ribo b) {
jpayne@68 608 if(b.product>product){return -1;}
jpayne@68 609 else if(b.product<product){return 1;}
jpayne@68 610 else if(b.r.length()>r.length()){return -1;}
jpayne@68 611 else if(b.r.length()<r.length()){return 1;}
jpayne@68 612 return 0;
jpayne@68 613 }
jpayne@68 614
jpayne@68 615 int length(){return r.length();}
jpayne@68 616
jpayne@68 617 Read r;
jpayne@68 618 int tid;
jpayne@68 619 float identity;
jpayne@68 620 float product;
jpayne@68 621 }
jpayne@68 622
jpayne@68 623 private class ListComparator implements Comparator<ArrayList<Ribo>> {
jpayne@68 624
jpayne@68 625 @Override
jpayne@68 626 public int compare(ArrayList<Ribo> a, ArrayList<Ribo> b) {
jpayne@68 627 return a.size()>b.size() ? -1 : a.size()<b.size() ? 1 : 0;
jpayne@68 628 }
jpayne@68 629
jpayne@68 630 }
jpayne@68 631
jpayne@68 632 private float lengthMult(int len){
jpayne@68 633 int idealLength=idealLength();
jpayne@68 634 int max=Tools.max(len, idealLength, 1);
jpayne@68 635 int min=Tools.min(len, idealLength);
jpayne@68 636 return min/(float)max;
jpayne@68 637 }
jpayne@68 638
jpayne@68 639 private float score(int len, float identity){
jpayne@68 640 return lengthMult(len)*identity;
jpayne@68 641 }
jpayne@68 642
jpayne@68 643 /*--------------------------------------------------------------*/
jpayne@68 644 /*---------------- Fields ----------------*/
jpayne@68 645 /*--------------------------------------------------------------*/
jpayne@68 646
jpayne@68 647 /** Primary input file path */
jpayne@68 648 private ArrayList<String> in=new ArrayList<String>();
jpayne@68 649
jpayne@68 650 /** Alternate input file path */
jpayne@68 651 private String alt=null;
jpayne@68 652
jpayne@68 653 /** Primary output file path */
jpayne@68 654 private String out1=null;
jpayne@68 655
jpayne@68 656 /** Override input file extension */
jpayne@68 657 private String extin=null;
jpayne@68 658 /** Override output file extension */
jpayne@68 659 private String extout=null;
jpayne@68 660
jpayne@68 661 ArrayList<Ribo> bestList=new ArrayList<Ribo>();
jpayne@68 662 HashMap<Integer, ArrayList<Ribo>> listMap=new HashMap<Integer, ArrayList<Ribo>>(100000);
jpayne@68 663 ConcurrentLinkedQueue<ArrayList<Ribo>> queue;
jpayne@68 664
jpayne@68 665
jpayne@68 666 IntHashSet seenTaxID=new IntHashSet(1000000);
jpayne@68 667
jpayne@68 668 byte[] consensus16S;
jpayne@68 669 byte[] consensus18S;
jpayne@68 670
jpayne@68 671 int idealLength(){
jpayne@68 672 if(process16S){return consensus16S.length;}
jpayne@68 673 return consensus18S.length;
jpayne@68 674 }
jpayne@68 675
jpayne@68 676 boolean useConsensus=false;
jpayne@68 677 boolean fast=false;
jpayne@68 678 int maxns=-1;
jpayne@68 679 int minlen=1;
jpayne@68 680 int maxlen=4000;
jpayne@68 681
jpayne@68 682 /*--------------------------------------------------------------*/
jpayne@68 683
jpayne@68 684 /** Number of reads processed */
jpayne@68 685 protected long readsProcessed=0;
jpayne@68 686 /** Number of bases processed */
jpayne@68 687 protected long basesProcessed=0;
jpayne@68 688
jpayne@68 689 /** Number of reads retained */
jpayne@68 690 protected long readsOut=0;
jpayne@68 691 /** Number of bases retained */
jpayne@68 692 protected long basesOut=0;
jpayne@68 693
jpayne@68 694 /** Quit after processing this many input reads; -1 means no limit */
jpayne@68 695 private long maxReads=-1;
jpayne@68 696
jpayne@68 697 private float minID=0.62f;
jpayne@68 698
jpayne@68 699 private boolean process16S=true;
jpayne@68 700 private boolean process18S=false;
jpayne@68 701
jpayne@68 702 /*--------------------------------------------------------------*/
jpayne@68 703 /*---------------- Final Fields ----------------*/
jpayne@68 704 /*--------------------------------------------------------------*/
jpayne@68 705
jpayne@68 706 /** Primary input file */
jpayne@68 707 private final ArrayList<FileFormat> ffin;
jpayne@68 708 private final FileFormat ffalt;
jpayne@68 709
jpayne@68 710 /** Primary output file */
jpayne@68 711 private final FileFormat ffout1;
jpayne@68 712
jpayne@68 713 final int threads;
jpayne@68 714
jpayne@68 715 /*--------------------------------------------------------------*/
jpayne@68 716 /*---------------- Common Fields ----------------*/
jpayne@68 717 /*--------------------------------------------------------------*/
jpayne@68 718
jpayne@68 719 /** Print status messages to this output stream */
jpayne@68 720 private PrintStream outstream=System.err;
jpayne@68 721 /** Print verbose messages */
jpayne@68 722 public static boolean verbose=false;
jpayne@68 723 /** True if an error was encountered */
jpayne@68 724 public boolean errorState=false;
jpayne@68 725 /** Overwrite existing output files */
jpayne@68 726 private boolean overwrite=false;
jpayne@68 727 /** Append to existing output files */
jpayne@68 728 private boolean append=false;
jpayne@68 729 /** Reads are output in input order */
jpayne@68 730 private boolean ordered=false;
jpayne@68 731
jpayne@68 732 }