annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/prok/RiboMaker.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 package prok;
jpayne@68 2
jpayne@68 3 import java.io.File;
jpayne@68 4 import java.io.PrintStream;
jpayne@68 5 import java.util.ArrayList;
jpayne@68 6 import java.util.PriorityQueue;
jpayne@68 7
jpayne@68 8 import aligner.Alignment;
jpayne@68 9 import dna.AminoAcid;
jpayne@68 10 import fileIO.ByteFile;
jpayne@68 11 import fileIO.FileFormat;
jpayne@68 12 import fileIO.ReadWrite;
jpayne@68 13 import shared.Parse;
jpayne@68 14 import shared.Parser;
jpayne@68 15 import shared.PreParser;
jpayne@68 16 import shared.ReadStats;
jpayne@68 17 import shared.Shared;
jpayne@68 18 import shared.Timer;
jpayne@68 19 import shared.Tools;
jpayne@68 20 import stream.ConcurrentReadInputStream;
jpayne@68 21 import stream.ConcurrentReadOutputStream;
jpayne@68 22 import stream.FASTQ;
jpayne@68 23 import stream.FastaReadInputStream;
jpayne@68 24 import stream.Read;
jpayne@68 25 import stream.ReadInputStream;
jpayne@68 26 import structures.ListNum;
jpayne@68 27 import structures.LongHashSet;
jpayne@68 28 import template.Accumulator;
jpayne@68 29 import template.ThreadWaiter;
jpayne@68 30
jpayne@68 31 /**
jpayne@68 32 * Makes a consensus ribosomal sequence using raw reads as input.
jpayne@68 33 *
jpayne@68 34 * @author Brian Bushnell
jpayne@68 35 * @date October 10, 2019
jpayne@68 36 *
jpayne@68 37 */
jpayne@68 38 public class RiboMaker implements Accumulator<RiboMaker.ProcessThread> {
jpayne@68 39
jpayne@68 40 /*--------------------------------------------------------------*/
jpayne@68 41 /*---------------- Initialization ----------------*/
jpayne@68 42 /*--------------------------------------------------------------*/
jpayne@68 43
jpayne@68 44 /**
jpayne@68 45 * Code entrance from the command line.
jpayne@68 46 * @param args Command line arguments
jpayne@68 47 */
jpayne@68 48 public static void main(String[] args){
jpayne@68 49 assert(false) : "TODO";
jpayne@68 50
jpayne@68 51 //Start a timer immediately upon code entrance.
jpayne@68 52 Timer t=new Timer();
jpayne@68 53
jpayne@68 54 //Create an instance of this class
jpayne@68 55 RiboMaker x=new RiboMaker(args);
jpayne@68 56
jpayne@68 57 //Run the object
jpayne@68 58 x.process(t);
jpayne@68 59
jpayne@68 60 //Close the print stream if it was redirected
jpayne@68 61 Shared.closeStream(x.outstream);
jpayne@68 62 }
jpayne@68 63
jpayne@68 64 /**
jpayne@68 65 * Constructor.
jpayne@68 66 * @param args Command line arguments
jpayne@68 67 */
jpayne@68 68 public RiboMaker(String[] args){
jpayne@68 69
jpayne@68 70 {//Preparse block for help, config files, and outstream
jpayne@68 71 PreParser pp=new PreParser(args, getClass(), false);
jpayne@68 72 args=pp.args;
jpayne@68 73 outstream=pp.outstream;
jpayne@68 74 }
jpayne@68 75
jpayne@68 76 //Set shared static variables prior to parsing
jpayne@68 77 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
jpayne@68 78 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
jpayne@68 79
jpayne@68 80 {//Parse the arguments
jpayne@68 81 final Parser parser=parse(args);
jpayne@68 82 Parser.processQuality();
jpayne@68 83
jpayne@68 84 maxReads=parser.maxReads;
jpayne@68 85 overwrite=ReadStats.overwrite=parser.overwrite;
jpayne@68 86 append=ReadStats.append=parser.append;
jpayne@68 87 setInterleaved=parser.setInterleaved;
jpayne@68 88
jpayne@68 89 in1=parser.in1;
jpayne@68 90 in2=parser.in2;
jpayne@68 91 qfin1=parser.qfin1;
jpayne@68 92 qfin2=parser.qfin2;
jpayne@68 93 extin=parser.extin;
jpayne@68 94
jpayne@68 95 out1=parser.out1;
jpayne@68 96 qfout1=parser.qfout1;
jpayne@68 97 extout=parser.extout;
jpayne@68 98 }
jpayne@68 99
jpayne@68 100 validateParams();
jpayne@68 101 doPoundReplacement(); //Replace # with 1 and 2
jpayne@68 102 adjustInterleaving(); //Make sure interleaving agrees with number of input and output files
jpayne@68 103 fixExtensions(); //Add or remove .gz or .bz2 as needed
jpayne@68 104 checkFileExistence(); //Ensure files can be read and written
jpayne@68 105 checkStatics(); //Adjust file-related static fields as needed for this program
jpayne@68 106
jpayne@68 107 //Create output FileFormat objects
jpayne@68 108 ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
jpayne@68 109
jpayne@68 110 fffilter=FileFormat.testInput(filterFile, FileFormat.FASTA, null, true, true);
jpayne@68 111 ffref=FileFormat.testInput(refFile, FileFormat.FASTA, null, true, true);
jpayne@68 112
jpayne@68 113 //Create input FileFormat objects
jpayne@68 114 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
jpayne@68 115 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
jpayne@68 116
jpayne@68 117 if(fffilter==null){
jpayne@68 118 filter=null;
jpayne@68 119 }else{
jpayne@68 120 filter=loadFilter(fffilter, k);
jpayne@68 121 }
jpayne@68 122 loadRef();
jpayne@68 123 }
jpayne@68 124
jpayne@68 125 /*--------------------------------------------------------------*/
jpayne@68 126 /*---------------- Initialization Helpers ----------------*/
jpayne@68 127 /*--------------------------------------------------------------*/
jpayne@68 128
jpayne@68 129 /** Parse arguments from the command line */
jpayne@68 130 private Parser parse(String[] args){
jpayne@68 131
jpayne@68 132 //Create a parser object
jpayne@68 133 Parser parser=new Parser();
jpayne@68 134
jpayne@68 135 //Set any necessary Parser defaults here
jpayne@68 136 //parser.foo=bar;
jpayne@68 137
jpayne@68 138 //Parse each argument
jpayne@68 139 for(int i=0; i<args.length; i++){
jpayne@68 140 String arg=args[i];
jpayne@68 141
jpayne@68 142 //Break arguments into their constituent parts, in the form of "a=b"
jpayne@68 143 String[] split=arg.split("=");
jpayne@68 144 String a=split[0].toLowerCase();
jpayne@68 145 String b=split.length>1 ? split[1] : null;
jpayne@68 146 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
jpayne@68 147
jpayne@68 148 if(a.equals("verbose")){
jpayne@68 149 verbose=Parse.parseBoolean(b);
jpayne@68 150 }else if(a.equals("ordered")){
jpayne@68 151 ordered=Parse.parseBoolean(b);
jpayne@68 152 }else if(a.equals("filter")){
jpayne@68 153 filterFile=b;
jpayne@68 154 }else if(a.equals("ref")){
jpayne@68 155 refFile=b;
jpayne@68 156 }else if(a.equals("parse_flag_goes_here")){
jpayne@68 157 long fake_variable=Parse.parseKMG(b);
jpayne@68 158 //Set a variable here
jpayne@68 159 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
jpayne@68 160 //do nothing
jpayne@68 161 }else{
jpayne@68 162 outstream.println("Unknown parameter "+args[i]);
jpayne@68 163 assert(false) : "Unknown parameter "+args[i];
jpayne@68 164 }
jpayne@68 165 }
jpayne@68 166
jpayne@68 167 return parser;
jpayne@68 168 }
jpayne@68 169
jpayne@68 170 /** Replace # with 1 and 2 in headers */
jpayne@68 171 private void doPoundReplacement(){
jpayne@68 172 //Do input file # replacement
jpayne@68 173 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
jpayne@68 174 in2=in1.replace("#", "2");
jpayne@68 175 in1=in1.replace("#", "1");
jpayne@68 176 }
jpayne@68 177
jpayne@68 178 //Ensure there is an input file
jpayne@68 179 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
jpayne@68 180 }
jpayne@68 181
jpayne@68 182 /** Add or remove .gz or .bz2 as needed */
jpayne@68 183 private void fixExtensions(){
jpayne@68 184 in1=Tools.fixExtension(in1);
jpayne@68 185 in2=Tools.fixExtension(in2);
jpayne@68 186 qfin1=Tools.fixExtension(qfin1);
jpayne@68 187 qfin2=Tools.fixExtension(qfin2);
jpayne@68 188 }
jpayne@68 189
jpayne@68 190 /** Ensure files can be read and written */
jpayne@68 191 private void checkFileExistence(){
jpayne@68 192 //Ensure output files can be written
jpayne@68 193 if(!Tools.testOutputFiles(overwrite, append, false, out1)){
jpayne@68 194 outstream.println((out1==null)+", "+out1);
jpayne@68 195 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+"\n");
jpayne@68 196 }
jpayne@68 197
jpayne@68 198 //Ensure input files can be read
jpayne@68 199 if(!Tools.testInputFiles(false, true, in1, in2, filterFile, refFile)){
jpayne@68 200 throw new RuntimeException("\nCan't read some input files.\n");
jpayne@68 201 }
jpayne@68 202
jpayne@68 203 //Ensure that no file was specified multiple times
jpayne@68 204 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, filterFile, refFile)){
jpayne@68 205 throw new RuntimeException("\nSome file names were specified multiple times.\n");
jpayne@68 206 }
jpayne@68 207
jpayne@68 208 assert(refFile!=null);
jpayne@68 209 }
jpayne@68 210
jpayne@68 211 /** Make sure interleaving agrees with number of input and output files */
jpayne@68 212 private void adjustInterleaving(){
jpayne@68 213 //Adjust interleaved detection based on the number of input files
jpayne@68 214 if(in2!=null){
jpayne@68 215 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
jpayne@68 216 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
jpayne@68 217 }
jpayne@68 218
jpayne@68 219 //Adjust interleaved settings based on number of output files
jpayne@68 220 if(!setInterleaved){
jpayne@68 221 assert(in1!=null) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\n";
jpayne@68 222 if(in2!=null){ //If there are 2 input streams.
jpayne@68 223 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
jpayne@68 224 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
jpayne@68 225 }
jpayne@68 226 }
jpayne@68 227 }
jpayne@68 228
jpayne@68 229 /** Adjust file-related static fields as needed for this program */
jpayne@68 230 private static void checkStatics(){
jpayne@68 231 //Adjust the number of threads for input file reading
jpayne@68 232 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
jpayne@68 233 ByteFile.FORCE_MODE_BF2=true;
jpayne@68 234 }
jpayne@68 235
jpayne@68 236 assert(FastaReadInputStream.settingsOK());
jpayne@68 237 }
jpayne@68 238
jpayne@68 239 /** Ensure parameter ranges are within bounds and required parameters are set */
jpayne@68 240 private boolean validateParams(){
jpayne@68 241 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
jpayne@68 242 assert(false) : "TODO";
jpayne@68 243 return true;
jpayne@68 244 }
jpayne@68 245
jpayne@68 246 /*--------------------------------------------------------------*/
jpayne@68 247 /*---------------- Outer Methods ----------------*/
jpayne@68 248 /*--------------------------------------------------------------*/
jpayne@68 249
jpayne@68 250 /** Create read streams and process all data */
jpayne@68 251 void process(Timer t){
jpayne@68 252
jpayne@68 253 //Turn off read validation in the input threads to increase speed
jpayne@68 254 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
jpayne@68 255 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
jpayne@68 256
jpayne@68 257 //Create a read input stream
jpayne@68 258 final ConcurrentReadInputStream cris=makeCris();
jpayne@68 259
jpayne@68 260 //Optionally create a read output stream
jpayne@68 261 final ConcurrentReadOutputStream ros=makeCros(cris.paired());
jpayne@68 262
jpayne@68 263 //Reset counters
jpayne@68 264 readsProcessed=readsOut=0;
jpayne@68 265 basesProcessed=basesOut=0;
jpayne@68 266
jpayne@68 267 //Process the reads in separate threads
jpayne@68 268 spawnThreads(cris, ros);
jpayne@68 269
jpayne@68 270 if(verbose){outstream.println("Finished; closing streams.");}
jpayne@68 271
jpayne@68 272 //Write anything that was accumulated by ReadStats
jpayne@68 273 errorState|=ReadStats.writeAll();
jpayne@68 274 //Close the read streams
jpayne@68 275 errorState|=ReadWrite.closeStreams(cris, ros);
jpayne@68 276
jpayne@68 277 //Reset read validation
jpayne@68 278 Read.VALIDATE_IN_CONSTRUCTOR=vic;
jpayne@68 279
jpayne@68 280 //Report timing and results
jpayne@68 281 t.stop();
jpayne@68 282 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
jpayne@68 283 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
jpayne@68 284
jpayne@68 285 //Throw an exception of there was an error in a thread
jpayne@68 286 if(errorState){
jpayne@68 287 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
jpayne@68 288 }
jpayne@68 289 }
jpayne@68 290
jpayne@68 291 private void loadRef(){
jpayne@68 292 ArrayList<Read> reads=ReadInputStream.toReads(ffref, -1);
jpayne@68 293 ref0=reads.get(0).bases;
jpayne@68 294 ref=new byte[ref0.length+2*padding];
jpayne@68 295 for(int i=0, j=-padding; i<ref.length; i++, j++){
jpayne@68 296 byte b=(j>=0 && j<ref0.length ? ref0[j] : (byte)'N');
jpayne@68 297 ref[i]=b;
jpayne@68 298 }
jpayne@68 299
jpayne@68 300 queues=new PriorityQueue[1+ref.length/queueWidth];
jpayne@68 301 for(int i=0; i<queues.length; i++){
jpayne@68 302 queues[i]=new PriorityQueue<Alignment>(queueLen);
jpayne@68 303 }
jpayne@68 304 }
jpayne@68 305
jpayne@68 306 public static LongHashSet loadFilter(FileFormat ff, int k){
jpayne@68 307 if(ff==null){return null;}
jpayne@68 308 ArrayList<Read> reads=ReadInputStream.toReads(ff, -1);
jpayne@68 309 if(reads==null || reads.size()==0){return null;}
jpayne@68 310 LongHashSet set=new LongHashSet(4096);
jpayne@68 311
jpayne@68 312 final int shift=2*k;
jpayne@68 313 final int shift2=shift-2;
jpayne@68 314 final long mask=(shift>63 ? -1L : ~((-1L)<<shift));
jpayne@68 315 int len=0;
jpayne@68 316
jpayne@68 317 long kmer=0, rkmer=0;
jpayne@68 318 for(Read r : reads){
jpayne@68 319 final byte[] bases=r.bases;
jpayne@68 320 for(byte b : bases) {
jpayne@68 321 long x=AminoAcid.baseToNumber[b];
jpayne@68 322 long x2=AminoAcid.baseToComplementNumber[b];
jpayne@68 323 kmer=((kmer<<2)|x)&mask;
jpayne@68 324 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
jpayne@68 325
jpayne@68 326 if(x>=0){
jpayne@68 327 len++;
jpayne@68 328 if(len>=k){
jpayne@68 329 set.add(Tools.max(kmer, rkmer));
jpayne@68 330 }
jpayne@68 331 }else{
jpayne@68 332 len=0;
jpayne@68 333 kmer=rkmer=0;
jpayne@68 334 }
jpayne@68 335 }
jpayne@68 336 }
jpayne@68 337 return set;
jpayne@68 338 }
jpayne@68 339
jpayne@68 340 public static boolean passesFilter(Read r, int k, LongHashSet set){
jpayne@68 341 if(r==null) {return false;}
jpayne@68 342 if(set==null){return true;}
jpayne@68 343
jpayne@68 344 final int shift=2*k;
jpayne@68 345 final int shift2=shift-2;
jpayne@68 346 final long mask=(shift>63 ? -1L : ~((-1L)<<shift));
jpayne@68 347 int len=0;
jpayne@68 348 long kmer=0, rkmer=0;
jpayne@68 349
jpayne@68 350 final byte[] bases=r.bases;
jpayne@68 351 for(byte b : bases) {
jpayne@68 352 long x=AminoAcid.baseToNumber[b];
jpayne@68 353 long x2=AminoAcid.baseToComplementNumber[b];
jpayne@68 354 kmer=((kmer<<2)|x)&mask;
jpayne@68 355 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
jpayne@68 356
jpayne@68 357 if(x>=0){
jpayne@68 358 len++;
jpayne@68 359 if(len>=k){
jpayne@68 360 long key=Tools.max(kmer, rkmer);
jpayne@68 361 if(set.contains(key)){return true;}
jpayne@68 362 }
jpayne@68 363 }else{
jpayne@68 364 len=0;
jpayne@68 365 kmer=rkmer=0;
jpayne@68 366 }
jpayne@68 367 }
jpayne@68 368 return false;
jpayne@68 369 }
jpayne@68 370
jpayne@68 371 private ConcurrentReadInputStream makeCris(){
jpayne@68 372 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
jpayne@68 373 cris.start(); //Start the stream
jpayne@68 374 if(verbose){outstream.println("Started cris");}
jpayne@68 375 boolean paired=cris.paired();
jpayne@68 376 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
jpayne@68 377 return cris;
jpayne@68 378 }
jpayne@68 379
jpayne@68 380 private ConcurrentReadOutputStream makeCros(boolean pairedInput){
jpayne@68 381 if(ffout1==null){return null;}
jpayne@68 382
jpayne@68 383 //Select output buffer size based on whether it needs to be ordered
jpayne@68 384 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
jpayne@68 385
jpayne@68 386 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout1, null, qfout1, null, buff, null, false);
jpayne@68 387 ros.start(); //Start the stream
jpayne@68 388 return ros;
jpayne@68 389 }
jpayne@68 390
jpayne@68 391 /*--------------------------------------------------------------*/
jpayne@68 392 /*---------------- Thread Management ----------------*/
jpayne@68 393 /*--------------------------------------------------------------*/
jpayne@68 394
jpayne@68 395 /** Spawn process threads */
jpayne@68 396 private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){
jpayne@68 397
jpayne@68 398 //Do anything necessary prior to processing
jpayne@68 399
jpayne@68 400 //Determine how many threads may be used
jpayne@68 401 final int threads=Shared.threads();
jpayne@68 402
jpayne@68 403 //Fill a list with ProcessThreads
jpayne@68 404 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
jpayne@68 405 for(int i=0; i<threads; i++){
jpayne@68 406 alpt.add(new ProcessThread(cris, i));
jpayne@68 407 }
jpayne@68 408
jpayne@68 409 //Start the threads and wait for them to finish
jpayne@68 410 boolean success=ThreadWaiter.startAndWait(alpt, this);
jpayne@68 411 errorState&=!success;
jpayne@68 412
jpayne@68 413 //Do anything necessary after processing
jpayne@68 414 assert(false) : "TODO: Make consensus and write it?";
jpayne@68 415 }
jpayne@68 416
jpayne@68 417 @Override
jpayne@68 418 public final void accumulate(ProcessThread pt){
jpayne@68 419 readsProcessed+=pt.readsProcessedT;
jpayne@68 420 basesProcessed+=pt.basesProcessedT;
jpayne@68 421 readsOut+=pt.readsOutT;
jpayne@68 422 basesOut+=pt.basesOutT;
jpayne@68 423 errorState|=(!pt.success);
jpayne@68 424
jpayne@68 425 for(int i=0; i<queues.length; i++){
jpayne@68 426 PriorityQueue<Alignment> q=queues[i];
jpayne@68 427 PriorityQueue<Alignment> qt=pt.queuesT[i];
jpayne@68 428 for(Alignment a : qt){
jpayne@68 429 addToQueue(a, q);
jpayne@68 430 }
jpayne@68 431 }
jpayne@68 432 }
jpayne@68 433
jpayne@68 434 @Override
jpayne@68 435 public final boolean success(){return !errorState;}
jpayne@68 436
jpayne@68 437 /*--------------------------------------------------------------*/
jpayne@68 438 /*---------------- Inner Methods ----------------*/
jpayne@68 439 /*--------------------------------------------------------------*/
jpayne@68 440
jpayne@68 441 boolean addToQueue(Alignment best, PriorityQueue<Alignment>[] queues){
jpayne@68 442 int start=best.start;
jpayne@68 443 int qnum=start/queueWidth;
jpayne@68 444 PriorityQueue<Alignment> queue=queues[qnum];
jpayne@68 445 return addToQueue(best, queue);
jpayne@68 446 }
jpayne@68 447
jpayne@68 448 boolean addToQueue(Alignment best, PriorityQueue<Alignment> queue){
jpayne@68 449 if(queue.size()<queueLen){queue.add(best);}
jpayne@68 450 else{
jpayne@68 451 Alignment bottom=queue.peek();
jpayne@68 452 if(bottom.compareTo(best)>=0){return false;}
jpayne@68 453 queue.poll();
jpayne@68 454 queue.add(best);
jpayne@68 455 }
jpayne@68 456 return true;
jpayne@68 457 }
jpayne@68 458
jpayne@68 459 /*--------------------------------------------------------------*/
jpayne@68 460 /*---------------- Inner Classes ----------------*/
jpayne@68 461 /*--------------------------------------------------------------*/
jpayne@68 462
jpayne@68 463 /** This class is static to prevent accidental writing to shared variables.
jpayne@68 464 * It is safe to remove the static modifier. */
jpayne@68 465 class ProcessThread extends Thread {
jpayne@68 466
jpayne@68 467 //Constructor
jpayne@68 468 ProcessThread(final ConcurrentReadInputStream cris_, final int tid_){
jpayne@68 469 cris=cris_;
jpayne@68 470 tid=tid_;
jpayne@68 471
jpayne@68 472 queuesT=new PriorityQueue[1+ref.length/queueWidth];
jpayne@68 473 for(int i=0; i<queuesT.length; i++){
jpayne@68 474 queuesT[i]=new PriorityQueue<Alignment>(queueLen);
jpayne@68 475 }
jpayne@68 476 }
jpayne@68 477
jpayne@68 478 //Called by start()
jpayne@68 479 @Override
jpayne@68 480 public void run(){
jpayne@68 481 //Do anything necessary prior to processing
jpayne@68 482
jpayne@68 483 //Process the reads
jpayne@68 484 processInner();
jpayne@68 485
jpayne@68 486 //Do anything necessary after processing
jpayne@68 487
jpayne@68 488 //Indicate successful exit status
jpayne@68 489 success=true;
jpayne@68 490 }
jpayne@68 491
jpayne@68 492 /** Iterate through the reads */
jpayne@68 493 void processInner(){
jpayne@68 494
jpayne@68 495 //Grab the first ListNum of reads
jpayne@68 496 ListNum<Read> ln=cris.nextList();
jpayne@68 497
jpayne@68 498 //Check to ensure pairing is as expected
jpayne@68 499 if(ln!=null && !ln.isEmpty()){
jpayne@68 500 Read r=ln.get(0);
jpayne@68 501 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
jpayne@68 502 }
jpayne@68 503
jpayne@68 504 //As long as there is a nonempty read list...
jpayne@68 505 while(ln!=null && ln.size()>0){
jpayne@68 506 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
jpayne@68 507
jpayne@68 508 processList(ln);
jpayne@68 509
jpayne@68 510 //Notify the input stream that the list was used
jpayne@68 511 cris.returnList(ln);
jpayne@68 512 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
jpayne@68 513
jpayne@68 514 //Fetch a new list
jpayne@68 515 ln=cris.nextList();
jpayne@68 516 }
jpayne@68 517
jpayne@68 518 //Notify the input stream that the final list was used
jpayne@68 519 if(ln!=null){
jpayne@68 520 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
jpayne@68 521 }
jpayne@68 522 }
jpayne@68 523
jpayne@68 524 void processList(ListNum<Read> ln){
jpayne@68 525
jpayne@68 526 //Grab the actual read list from the ListNum
jpayne@68 527 final ArrayList<Read> reads=ln.list;
jpayne@68 528
jpayne@68 529 //Loop through each read in the list
jpayne@68 530 for(int idx=0; idx<reads.size(); idx++){
jpayne@68 531 final Read r1=reads.get(idx);
jpayne@68 532 final Read r2=r1.mate;
jpayne@68 533
jpayne@68 534 //Validate reads in worker threads
jpayne@68 535 if(!r1.validated()){r1.validate(true);}
jpayne@68 536 if(r2!=null && !r2.validated()){r2.validate(true);}
jpayne@68 537
jpayne@68 538 //Track the initial length for statistics
jpayne@68 539 final int initialLength1=r1.length();
jpayne@68 540 final int initialLength2=r1.mateLength();
jpayne@68 541
jpayne@68 542 //Increment counters
jpayne@68 543 readsProcessedT+=r1.pairCount();
jpayne@68 544 basesProcessedT+=initialLength1+initialLength2;
jpayne@68 545
jpayne@68 546 {
jpayne@68 547 //Reads are processed in this block.
jpayne@68 548 processReadPair(r1, r2);
jpayne@68 549
jpayne@68 550 // if(!keep){reads.set(idx, null);}
jpayne@68 551 // else{
jpayne@68 552 // readsOutT+=r1.pairCount();
jpayne@68 553 // basesOutT+=r1.pairLength();
jpayne@68 554 // }
jpayne@68 555 }
jpayne@68 556 }
jpayne@68 557
jpayne@68 558 //Output reads to the output stream
jpayne@68 559 // if(ros!=null){ros.add(reads, ln.id);}
jpayne@68 560 }
jpayne@68 561
jpayne@68 562 /**
jpayne@68 563 * Process a read or a read pair.
jpayne@68 564 * @param r1 Read 1
jpayne@68 565 * @param r2 Read 2 (may be null)
jpayne@68 566 * @return True if the reads should be kept, false if they should be discarded.
jpayne@68 567 */
jpayne@68 568 void processReadPair(final Read r1, final Read r2){
jpayne@68 569 boolean pass=passesFilter(r1, k, filter) || passesFilter(r2, k, filter);
jpayne@68 570 if(!pass){return;}
jpayne@68 571 processRead(r1);
jpayne@68 572 processRead(r2);
jpayne@68 573 }
jpayne@68 574
jpayne@68 575 void processRead(final Read r){
jpayne@68 576 Alignment plus=new Alignment(r);
jpayne@68 577 plus.align(ref);
jpayne@68 578
jpayne@68 579 r.reverseComplement();
jpayne@68 580 Alignment minus=new Alignment(r);
jpayne@68 581 minus.align(ref);
jpayne@68 582
jpayne@68 583 Alignment best=null;
jpayne@68 584 if(plus.id>=minus.id){
jpayne@68 585 r.reverseComplement();
jpayne@68 586 best=plus;
jpayne@68 587 }else{
jpayne@68 588 best=minus;
jpayne@68 589 }
jpayne@68 590 if(best.id<minID) {return;}
jpayne@68 591
jpayne@68 592 addToQueue(best, queuesT);
jpayne@68 593 }
jpayne@68 594
jpayne@68 595 /** Number of reads processed by this thread */
jpayne@68 596 protected long readsProcessedT=0;
jpayne@68 597 /** Number of bases processed by this thread */
jpayne@68 598 protected long basesProcessedT=0;
jpayne@68 599
jpayne@68 600 /** Number of reads retained by this thread */
jpayne@68 601 protected long readsOutT=0;
jpayne@68 602 /** Number of bases retained by this thread */
jpayne@68 603 protected long basesOutT=0;
jpayne@68 604
jpayne@68 605 /** True only if this thread has completed successfully */
jpayne@68 606 boolean success=false;
jpayne@68 607
jpayne@68 608
jpayne@68 609 private PriorityQueue<Alignment>[] queuesT;
jpayne@68 610
jpayne@68 611 /** Shared input stream */
jpayne@68 612 private final ConcurrentReadInputStream cris;
jpayne@68 613 /** Thread ID */
jpayne@68 614 final int tid;
jpayne@68 615 }
jpayne@68 616
jpayne@68 617 /*--------------------------------------------------------------*/
jpayne@68 618 /*---------------- Fields ----------------*/
jpayne@68 619 /*--------------------------------------------------------------*/
jpayne@68 620
jpayne@68 621 /** Primary input file path */
jpayne@68 622 private String in1=null;
jpayne@68 623 /** Secondary input file path */
jpayne@68 624 private String in2=null;
jpayne@68 625
jpayne@68 626 private String qfin1=null;
jpayne@68 627 private String qfin2=null;
jpayne@68 628
jpayne@68 629 /** Primary output file path */
jpayne@68 630 private String out1=null;
jpayne@68 631
jpayne@68 632 private String qfout1=null;
jpayne@68 633
jpayne@68 634 private String filterFile;
jpayne@68 635 private String refFile;
jpayne@68 636
jpayne@68 637 /** Override input file extension */
jpayne@68 638 private String extin=null;
jpayne@68 639 /** Override output file extension */
jpayne@68 640 private String extout=null;
jpayne@68 641
jpayne@68 642 /** Whether interleaved was explicitly set. */
jpayne@68 643 private boolean setInterleaved=false;
jpayne@68 644
jpayne@68 645 /** Original ref */
jpayne@68 646 private byte[] ref0;
jpayne@68 647 /** Padded ref */
jpayne@68 648 private byte[] ref;
jpayne@68 649
jpayne@68 650 private int padding=100;
jpayne@68 651 private int queueLen=20;
jpayne@68 652 private int queueWidth=20;
jpayne@68 653 private float minID=0.4f;
jpayne@68 654
jpayne@68 655 private PriorityQueue<Alignment>[] queues;
jpayne@68 656
jpayne@68 657 /*--------------------------------------------------------------*/
jpayne@68 658
jpayne@68 659 /** Number of reads processed */
jpayne@68 660 protected long readsProcessed=0;
jpayne@68 661 /** Number of bases processed */
jpayne@68 662 protected long basesProcessed=0;
jpayne@68 663
jpayne@68 664 /** Number of reads retained */
jpayne@68 665 protected long readsOut=0;
jpayne@68 666 /** Number of bases retained */
jpayne@68 667 protected long basesOut=0;
jpayne@68 668
jpayne@68 669 /** Quit after processing this many input reads; -1 means no limit */
jpayne@68 670 private long maxReads=-1;
jpayne@68 671
jpayne@68 672 /*--------------------------------------------------------------*/
jpayne@68 673 /*---------------- Final Fields ----------------*/
jpayne@68 674 /*--------------------------------------------------------------*/
jpayne@68 675
jpayne@68 676 /** Primary input file */
jpayne@68 677 private final FileFormat ffin1;
jpayne@68 678 /** Secondary input file */
jpayne@68 679 private final FileFormat ffin2;
jpayne@68 680 /** Filter input file */
jpayne@68 681 private final FileFormat fffilter;
jpayne@68 682 /** Ref input file */
jpayne@68 683 private final FileFormat ffref;
jpayne@68 684
jpayne@68 685 /** Primary output file */
jpayne@68 686 private final FileFormat ffout1;
jpayne@68 687
jpayne@68 688 private final LongHashSet filter;
jpayne@68 689 private final int k=31;
jpayne@68 690
jpayne@68 691
jpayne@68 692 /*--------------------------------------------------------------*/
jpayne@68 693 /*---------------- Common Fields ----------------*/
jpayne@68 694 /*--------------------------------------------------------------*/
jpayne@68 695
jpayne@68 696 /** Print status messages to this output stream */
jpayne@68 697 private PrintStream outstream=System.err;
jpayne@68 698 /** Print verbose messages */
jpayne@68 699 public static boolean verbose=false;
jpayne@68 700 /** True if an error was encountered */
jpayne@68 701 public boolean errorState=false;
jpayne@68 702 /** Overwrite existing output files */
jpayne@68 703 private boolean overwrite=false;
jpayne@68 704 /** Append to existing output files */
jpayne@68 705 private boolean append=false;
jpayne@68 706 /** Reads are output in input order */
jpayne@68 707 private boolean ordered=false;
jpayne@68 708
jpayne@68 709 }