annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/KmerLimit.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 package sketch;
jpayne@68 2
jpayne@68 3 import java.io.File;
jpayne@68 4 import java.io.PrintStream;
jpayne@68 5 import java.util.ArrayList;
jpayne@68 6 import java.util.Arrays;
jpayne@68 7
jpayne@68 8 import dna.AminoAcid;
jpayne@68 9 import fileIO.ByteFile;
jpayne@68 10 import fileIO.FileFormat;
jpayne@68 11 import fileIO.ReadWrite;
jpayne@68 12 import shared.Parse;
jpayne@68 13 import shared.Parser;
jpayne@68 14 import shared.PreParser;
jpayne@68 15 import shared.ReadStats;
jpayne@68 16 import shared.Shared;
jpayne@68 17 import shared.Timer;
jpayne@68 18 import shared.Tools;
jpayne@68 19 import stream.ConcurrentReadInputStream;
jpayne@68 20 import stream.ConcurrentReadOutputStream;
jpayne@68 21 import stream.FASTQ;
jpayne@68 22 import stream.FastaReadInputStream;
jpayne@68 23 import stream.Read;
jpayne@68 24 import structures.ListNum;
jpayne@68 25
jpayne@68 26 /**
jpayne@68 27 *
jpayne@68 28 * @author Brian Bushnell
jpayne@68 29 * @date July 25, 2018
jpayne@68 30 *
jpayne@68 31 */
jpayne@68 32 public class KmerLimit extends SketchObject {
jpayne@68 33
jpayne@68 34 /*--------------------------------------------------------------*/
jpayne@68 35 /*---------------- Initialization ----------------*/
jpayne@68 36 /*--------------------------------------------------------------*/
jpayne@68 37
jpayne@68 38 /**
jpayne@68 39 * Code entrance from the command line.
jpayne@68 40 * @param args Command line arguments
jpayne@68 41 */
jpayne@68 42 public static void main(String[] args){
jpayne@68 43 //Start a timer immediately upon code entrance.
jpayne@68 44 Timer t=new Timer();
jpayne@68 45
jpayne@68 46 //Create an instance of this class
jpayne@68 47 KmerLimit x=new KmerLimit(args);
jpayne@68 48
jpayne@68 49 //Run the object
jpayne@68 50 x.process(t);
jpayne@68 51
jpayne@68 52 //Close the print stream if it was redirected
jpayne@68 53 Shared.closeStream(x.outstream);
jpayne@68 54 }
jpayne@68 55
jpayne@68 56 /**
jpayne@68 57 * Constructor.
jpayne@68 58 * @param args Command line arguments
jpayne@68 59 */
jpayne@68 60 public KmerLimit(String[] args){
jpayne@68 61
jpayne@68 62 {//Preparse block for help, config files, and outstream
jpayne@68 63 PreParser pp=new PreParser(args, getClass(), false);
jpayne@68 64 args=pp.args;
jpayne@68 65 outstream=pp.outstream;
jpayne@68 66 }
jpayne@68 67
jpayne@68 68 boolean setInterleaved=false; //Whether interleaved was explicitly set.
jpayne@68 69
jpayne@68 70 //Set shared static variables
jpayne@68 71 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
jpayne@68 72 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
jpayne@68 73 SketchObject.setKeyFraction(0.1);
jpayne@68 74 defaultParams.minEntropy=0;
jpayne@68 75 defaultParams.minProb=0.2f;
jpayne@68 76
jpayne@68 77 boolean setHeapSize=false;
jpayne@68 78 int heapSize_=4095;
jpayne@68 79 long targetKmers_=0;
jpayne@68 80 int k_=32;
jpayne@68 81 int minCount_=1;
jpayne@68 82
jpayne@68 83 //Create a parser object
jpayne@68 84 Parser parser=new Parser();
jpayne@68 85 parser.overwrite=true;
jpayne@68 86
jpayne@68 87 //Parse each argument
jpayne@68 88 for(int i=0; i<args.length; i++){
jpayne@68 89 String arg=args[i];
jpayne@68 90
jpayne@68 91 //Break arguments into their constituent parts, in the form of "a=b"
jpayne@68 92 String[] split=arg.split("=");
jpayne@68 93 String a=split[0].toLowerCase();
jpayne@68 94 String b=split.length>1 ? split[1] : null;
jpayne@68 95 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
jpayne@68 96
jpayne@68 97 if(a.equals("verbose")){
jpayne@68 98 verbose=Parse.parseBoolean(b);
jpayne@68 99 }else if(a.equals("ordered")){
jpayne@68 100 ordered=Parse.parseBoolean(b);
jpayne@68 101 }else if(a.equals("shuffle")){
jpayne@68 102 shuffle=Parse.parseBoolean(b);
jpayne@68 103 }else if(a.equals("size") || a.equals("heapsize")){
jpayne@68 104 heapSize_=Parse.parseIntKMG(b);
jpayne@68 105 setHeapSize=true;
jpayne@68 106 }else if(a.equals("kmers") || a.equals("target") || a.equals("limit")){
jpayne@68 107 targetKmers_=Parse.parseKMG(b);
jpayne@68 108 }else if(a.equals("mincount")){
jpayne@68 109 minCount_=Parse.parseIntKMG(b);
jpayne@68 110 }else if(parseSketchFlags(arg, a, b)){
jpayne@68 111 parser.parse(arg, a, b);
jpayne@68 112 }else if(defaultParams.parse(arg, a, b)){
jpayne@68 113 parser.parse(arg, a, b);
jpayne@68 114 }else if(a.equals("parse_flag_goes_here")){
jpayne@68 115 long fake_variable=Parse.parseKMG(b);
jpayne@68 116 //Set a variable here
jpayne@68 117 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
jpayne@68 118 //do nothing
jpayne@68 119 }else{
jpayne@68 120 outstream.println("Unknown parameter "+args[i]);
jpayne@68 121 assert(false) : "Unknown parameter "+args[i];
jpayne@68 122 }
jpayne@68 123 }
jpayne@68 124
jpayne@68 125 if(!setHeapSize && minCount_>1){heapSize_=32000;}
jpayne@68 126 heapSize=heapSize_;
jpayne@68 127 targetKmers=targetKmers_;
jpayne@68 128 k=k_;
jpayne@68 129 minCount=minCount_;
jpayne@68 130 assert(targetKmers>0) : "Must set a kmer limit.";
jpayne@68 131 assert(heapSize>0) : "Heap size must be positive.";
jpayne@68 132 assert(k>0 && k<=32) : "0<k<33; k="+k;
jpayne@68 133 postParse();
jpayne@68 134
jpayne@68 135 if(minCount>1){
jpayne@68 136 Shared.setBufferLen(800);
jpayne@68 137 }
jpayne@68 138
jpayne@68 139 {//Process parser fields
jpayne@68 140 Parser.processQuality();
jpayne@68 141
jpayne@68 142 maxReads=parser.maxReads;
jpayne@68 143
jpayne@68 144 overwrite=ReadStats.overwrite=parser.overwrite;
jpayne@68 145 append=ReadStats.append=parser.append;
jpayne@68 146 setInterleaved=parser.setInterleaved;
jpayne@68 147
jpayne@68 148 in1=parser.in1;
jpayne@68 149 in2=parser.in2;
jpayne@68 150 qfin1=parser.qfin1;
jpayne@68 151 qfin2=parser.qfin2;
jpayne@68 152
jpayne@68 153 out1=parser.out1;
jpayne@68 154 out2=parser.out2;
jpayne@68 155 qfout1=parser.qfout1;
jpayne@68 156 qfout2=parser.qfout2;
jpayne@68 157
jpayne@68 158 extin=parser.extin;
jpayne@68 159 extout=parser.extout;
jpayne@68 160 }
jpayne@68 161
jpayne@68 162 //Do input file # replacement
jpayne@68 163 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
jpayne@68 164 in2=in1.replace("#", "2");
jpayne@68 165 in1=in1.replace("#", "1");
jpayne@68 166 }
jpayne@68 167
jpayne@68 168 //Do output file # replacement
jpayne@68 169 if(out1!=null && out2==null && out1.indexOf('#')>-1){
jpayne@68 170 out2=out1.replace("#", "2");
jpayne@68 171 out1=out1.replace("#", "1");
jpayne@68 172 }
jpayne@68 173
jpayne@68 174 //Adjust interleaved detection based on the number of input files
jpayne@68 175 if(in2!=null){
jpayne@68 176 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
jpayne@68 177 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
jpayne@68 178 }
jpayne@68 179
jpayne@68 180 assert(FastaReadInputStream.settingsOK());
jpayne@68 181
jpayne@68 182 //Ensure there is an input file
jpayne@68 183 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
jpayne@68 184
jpayne@68 185 //Adjust the number of threads for input file reading
jpayne@68 186 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
jpayne@68 187 ByteFile.FORCE_MODE_BF2=true;
jpayne@68 188 }
jpayne@68 189
jpayne@68 190 //Ensure out2 is not set without out1
jpayne@68 191 if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");}
jpayne@68 192
jpayne@68 193 //Adjust interleaved settings based on number of output files
jpayne@68 194 if(!setInterleaved){
jpayne@68 195 assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n";
jpayne@68 196 if(in2!=null){ //If there are 2 input streams.
jpayne@68 197 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
jpayne@68 198 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
jpayne@68 199 }else{ //There is one input stream.
jpayne@68 200 if(out2!=null){
jpayne@68 201 FASTQ.FORCE_INTERLEAVED=true;
jpayne@68 202 FASTQ.TEST_INTERLEAVED=false;
jpayne@68 203 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
jpayne@68 204 }
jpayne@68 205 }
jpayne@68 206 }
jpayne@68 207
jpayne@68 208 //Ensure output files can be written
jpayne@68 209 if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){
jpayne@68 210 outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2);
jpayne@68 211 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n");
jpayne@68 212 }
jpayne@68 213
jpayne@68 214 //Ensure input files can be read
jpayne@68 215 if(!Tools.testInputFiles(false, true, in1, in2)){
jpayne@68 216 throw new RuntimeException("\nCan't read some input files.\n");
jpayne@68 217 }
jpayne@68 218
jpayne@68 219 //Ensure that no file was specified multiple times
jpayne@68 220 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){
jpayne@68 221 throw new RuntimeException("\nSome file names were specified multiple times.\n");
jpayne@68 222 }
jpayne@68 223
jpayne@68 224 //Create output FileFormat objects
jpayne@68 225 ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
jpayne@68 226 ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
jpayne@68 227
jpayne@68 228 //Create input FileFormat objects
jpayne@68 229 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
jpayne@68 230 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
jpayne@68 231
jpayne@68 232 minProb=defaultParams.minProb;
jpayne@68 233 minQual=defaultParams.minQual;
jpayne@68 234
jpayne@68 235 shift=2*k;
jpayne@68 236 shift2=shift-2;
jpayne@68 237 mask=(shift>63 ? -1L : ~((-1L)<<shift)); //Conditional allows K=32
jpayne@68 238 sharedHeap=new SketchHeap(heapSize, 0, minCount>1);
jpayne@68 239 }
jpayne@68 240
jpayne@68 241 /*--------------------------------------------------------------*/
jpayne@68 242 /*---------------- Outer Methods ----------------*/
jpayne@68 243 /*--------------------------------------------------------------*/
jpayne@68 244
jpayne@68 245 /** Create read streams and process all data */
jpayne@68 246 void process(Timer t){
jpayne@68 247
jpayne@68 248 //Turn off read validation in the input threads to increase speed
jpayne@68 249 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
jpayne@68 250 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
jpayne@68 251
jpayne@68 252 //Create a read input stream
jpayne@68 253 final ConcurrentReadInputStream cris;
jpayne@68 254 {
jpayne@68 255 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
jpayne@68 256 cris.start(); //Start the stream
jpayne@68 257 if(verbose){outstream.println("Started cris");}
jpayne@68 258 }
jpayne@68 259 boolean paired=cris.paired();
jpayne@68 260 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
jpayne@68 261
jpayne@68 262 //Optionally create a read output stream
jpayne@68 263 final ConcurrentReadOutputStream ros;
jpayne@68 264 if(ffout1!=null){
jpayne@68 265 //Select output buffer size based on whether it needs to be ordered
jpayne@68 266 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
jpayne@68 267
jpayne@68 268 //Notify user of output mode
jpayne@68 269 if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){
jpayne@68 270 outstream.println("Writing interleaved.");
jpayne@68 271 }
jpayne@68 272
jpayne@68 273 ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false);
jpayne@68 274 ros.start(); //Start the stream
jpayne@68 275 }else{ros=null;}
jpayne@68 276
jpayne@68 277 //Reset counters
jpayne@68 278 readsProcessed=readsOut=0;
jpayne@68 279 basesProcessed=basesOut=0;
jpayne@68 280
jpayne@68 281 //Process the reads in separate threads
jpayne@68 282 spawnThreads(cris, ros);
jpayne@68 283
jpayne@68 284 if(verbose){outstream.println("Finished; closing streams.");}
jpayne@68 285
jpayne@68 286 //Write anything that was accumulated by ReadStats
jpayne@68 287 errorState|=ReadStats.writeAll();
jpayne@68 288 //Close the read streams
jpayne@68 289 errorState|=ReadWrite.closeStreams(cris, ros);
jpayne@68 290
jpayne@68 291 //Reset read validation
jpayne@68 292 Read.VALIDATE_IN_CONSTRUCTOR=vic;
jpayne@68 293
jpayne@68 294 //Report timing and results
jpayne@68 295 t.stop();
jpayne@68 296 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
jpayne@68 297 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
jpayne@68 298 String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8);
jpayne@68 299 outstream.println("Unique Kmers Out: "+kstring);
jpayne@68 300
jpayne@68 301 // Sketch sk=new Sketch(sharedHeap, true, true, null);
jpayne@68 302 // outstream.println(sk.genomeSizeEstimate());
jpayne@68 303
jpayne@68 304 //Throw an exception of there was an error in a thread
jpayne@68 305 if(errorState){
jpayne@68 306 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
jpayne@68 307 }
jpayne@68 308 }
jpayne@68 309
jpayne@68 310 /** Spawn process threads */
jpayne@68 311 private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){
jpayne@68 312
jpayne@68 313 //Do anything necessary prior to processing
jpayne@68 314
jpayne@68 315 //Determine how many threads may be used
jpayne@68 316 final int threads=Tools.min(8, Shared.threads());
jpayne@68 317
jpayne@68 318 //Fill a list with ProcessThreads
jpayne@68 319 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
jpayne@68 320 int tSize=heapSize/2;
jpayne@68 321 for(int i=0; i<threads; i++){
jpayne@68 322 alpt.add(new ProcessThread(cris, ros, i, tSize));
jpayne@68 323 }
jpayne@68 324
jpayne@68 325 //Start the threads
jpayne@68 326 for(ProcessThread pt : alpt){
jpayne@68 327 pt.start();
jpayne@68 328 }
jpayne@68 329
jpayne@68 330 //Wait for completion of all threads
jpayne@68 331 boolean success=true;
jpayne@68 332 for(ProcessThread pt : alpt){
jpayne@68 333
jpayne@68 334 //Wait until this thread has terminated
jpayne@68 335 while(pt.getState()!=Thread.State.TERMINATED){
jpayne@68 336 try {
jpayne@68 337 //Attempt a join operation
jpayne@68 338 pt.join();
jpayne@68 339 } catch (InterruptedException e) {
jpayne@68 340 //Potentially handle this, if it is expected to occur
jpayne@68 341 e.printStackTrace();
jpayne@68 342 }
jpayne@68 343 }
jpayne@68 344
jpayne@68 345 //Accumulate per-thread statistics
jpayne@68 346 readsProcessed+=pt.readsProcessedT;
jpayne@68 347 basesProcessed+=pt.basesProcessedT;
jpayne@68 348 readsOut+=pt.readsOutT;
jpayne@68 349 basesOut+=pt.basesOutT;
jpayne@68 350 success&=pt.success;
jpayne@68 351 }
jpayne@68 352
jpayne@68 353 //Track whether any threads failed
jpayne@68 354 if(!success){errorState=true;}
jpayne@68 355
jpayne@68 356 //Do anything necessary after processing
jpayne@68 357
jpayne@68 358 }
jpayne@68 359
jpayne@68 360 /*--------------------------------------------------------------*/
jpayne@68 361 /*---------------- Inner Methods ----------------*/
jpayne@68 362 /*--------------------------------------------------------------*/
jpayne@68 363
jpayne@68 364 /*--------------------------------------------------------------*/
jpayne@68 365 /*---------------- Inner Classes ----------------*/
jpayne@68 366 /*--------------------------------------------------------------*/
jpayne@68 367
jpayne@68 368 /** This class is static to prevent accidental writing to shared variables.
jpayne@68 369 * It is safe to remove the static modifier. */
jpayne@68 370 private class ProcessThread extends Thread {
jpayne@68 371
jpayne@68 372 //Constructor
jpayne@68 373 ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_, final int size){
jpayne@68 374 cris=cris_;
jpayne@68 375 ros=ros_;
jpayne@68 376 tid=tid_;
jpayne@68 377 localHeap=new SketchHeap(size, 0, minCount>1);
jpayne@68 378 }
jpayne@68 379
jpayne@68 380 //Called by start()
jpayne@68 381 @Override
jpayne@68 382 public void run(){
jpayne@68 383 //Do anything necessary prior to processing
jpayne@68 384
jpayne@68 385 //Process the reads
jpayne@68 386 processInner();
jpayne@68 387
jpayne@68 388 //Do anything necessary after processing
jpayne@68 389
jpayne@68 390 //Indicate successful exit status
jpayne@68 391 success=true;
jpayne@68 392 }
jpayne@68 393
jpayne@68 394 /** Iterate through the reads */
jpayne@68 395 void processInner(){
jpayne@68 396
jpayne@68 397 //Grab the first ListNum of reads
jpayne@68 398 ListNum<Read> ln=cris.nextList();
jpayne@68 399 //Grab the actual read list from the ListNum
jpayne@68 400 ArrayList<Read> reads=(ln!=null ? ln.list : null);
jpayne@68 401
jpayne@68 402 //Check to ensure pairing is as expected
jpayne@68 403 if(reads!=null && !reads.isEmpty()){
jpayne@68 404 Read r=reads.get(0);
jpayne@68 405 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
jpayne@68 406 }
jpayne@68 407
jpayne@68 408 //As long as there is a nonempty read list...
jpayne@68 409 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
jpayne@68 410 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
jpayne@68 411
jpayne@68 412 //Loop through each read in the list
jpayne@68 413 for(int idx=0; idx<reads.size(); idx++){
jpayne@68 414 final Read r1=reads.get(idx);
jpayne@68 415 final Read r2=r1.mate;
jpayne@68 416
jpayne@68 417 //Validate reads in worker threads
jpayne@68 418 if(!r1.validated()){r1.validate(true);}
jpayne@68 419 if(r2!=null && !r2.validated()){r2.validate(true);}
jpayne@68 420
jpayne@68 421 //Track the initial length for statistics
jpayne@68 422 final int initialLength1=r1.length();
jpayne@68 423 final int initialLength2=r1.mateLength();
jpayne@68 424
jpayne@68 425 //Increment counters
jpayne@68 426 readsProcessedT+=r1.pairCount();
jpayne@68 427 basesProcessedT+=initialLength1+initialLength2;
jpayne@68 428
jpayne@68 429 //Reads are processed in this block.
jpayne@68 430 processReadPair(r1, r2);
jpayne@68 431 }
jpayne@68 432
jpayne@68 433 long count=dumpHeap();
jpayne@68 434 if(count>=targetKmers){break;}
jpayne@68 435
jpayne@68 436 for(Read r1 : reads){
jpayne@68 437 readsOutT+=r1.pairCount();
jpayne@68 438 basesOutT+=r1.pairLength();
jpayne@68 439 }
jpayne@68 440
jpayne@68 441 //Output reads to the output stream
jpayne@68 442 if(ros!=null){ros.add(reads, ln.id);}
jpayne@68 443
jpayne@68 444 //Notify the input stream that the list was used
jpayne@68 445 cris.returnList(ln);
jpayne@68 446 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
jpayne@68 447
jpayne@68 448 //Fetch a new list
jpayne@68 449 ln=cris.nextList();
jpayne@68 450 reads=(ln!=null ? ln.list : null);
jpayne@68 451 }
jpayne@68 452
jpayne@68 453 //Notify the input stream that the final list was used
jpayne@68 454 if(ln!=null){
jpayne@68 455 if(ln.list!=null){ln.list.clear();}
jpayne@68 456 cris.returnList(ln.id, true);
jpayne@68 457 }
jpayne@68 458 }
jpayne@68 459
jpayne@68 460 /**
jpayne@68 461 * Process a read or a read pair.
jpayne@68 462 * @param r1 Read 1
jpayne@68 463 * @param r2 Read 2 (may be null)
jpayne@68 464 */
jpayne@68 465 void processReadPair(final Read r1, final Read r2){
jpayne@68 466 processReadNucleotide(r1);
jpayne@68 467 if(r2!=null){processReadNucleotide(r2);}
jpayne@68 468 }
jpayne@68 469
jpayne@68 470 void processReadNucleotide(final Read r){
jpayne@68 471 final byte[] bases=r.bases;
jpayne@68 472 final byte[] quals=r.quality;
jpayne@68 473 long kmer=0;
jpayne@68 474 long rkmer=0;
jpayne@68 475 int len=0;
jpayne@68 476 assert(!r.aminoacid());
jpayne@68 477
jpayne@68 478 final long min=minHashValue;
jpayne@68 479 localHeap.genomeSizeBases+=r.length();
jpayne@68 480 localHeap.genomeSequences++;
jpayne@68 481
jpayne@68 482 if(quals==null || (minProb<=0 && minQual<2)){
jpayne@68 483 for(int i=0; i<bases.length; i++){
jpayne@68 484 byte b=bases[i];
jpayne@68 485 long x=AminoAcid.baseToNumber[b];
jpayne@68 486 long x2=AminoAcid.baseToComplementNumber[b];
jpayne@68 487
jpayne@68 488 kmer=((kmer<<2)|x)&mask;
jpayne@68 489 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
jpayne@68 490
jpayne@68 491 if(x<0){len=0; rkmer=0;}else{len++;}
jpayne@68 492 if(len>=k){
jpayne@68 493 localHeap.genomeSizeKmers++;
jpayne@68 494 final long hashcode=hash(kmer, rkmer);
jpayne@68 495 if(hashcode>min){localHeap.add(hashcode);}
jpayne@68 496 }
jpayne@68 497 }
jpayne@68 498 }else{
jpayne@68 499 float prob=1;
jpayne@68 500 for(int i=0; i<bases.length; i++){
jpayne@68 501 final byte b=bases[i];
jpayne@68 502 final long x=AminoAcid.baseToNumber[b];
jpayne@68 503 final long x2=AminoAcid.baseToComplementNumber[b];
jpayne@68 504
jpayne@68 505 kmer=((kmer<<2)|x)&mask;
jpayne@68 506 rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
jpayne@68 507
jpayne@68 508 {//Quality-related stuff
jpayne@68 509 final byte q=quals[i];
jpayne@68 510 assert(q>=0) : Arrays.toString(quals)+"\n"+minProb+", "+minQual;
jpayne@68 511 prob=prob*align2.QualityTools.PROB_CORRECT[q];
jpayne@68 512 if(len>k){
jpayne@68 513 byte oldq=quals[i-k];
jpayne@68 514 prob=prob*align2.QualityTools.PROB_CORRECT_INVERSE[oldq];
jpayne@68 515 }
jpayne@68 516 if(x<0 || q<minQual){
jpayne@68 517 len=0;
jpayne@68 518 kmer=rkmer=0;
jpayne@68 519 prob=1;
jpayne@68 520 }else{
jpayne@68 521 len++;
jpayne@68 522 }
jpayne@68 523 }
jpayne@68 524
jpayne@68 525 if(len>=k && prob>=minProb){
jpayne@68 526 localHeap.genomeSizeKmers++;
jpayne@68 527 localHeap.probSum+=prob;
jpayne@68 528 final long hashcode=hash(kmer, rkmer);
jpayne@68 529 if(hashcode>min){localHeap.checkAndAdd(hashcode);}
jpayne@68 530 }
jpayne@68 531 }
jpayne@68 532 }
jpayne@68 533 }
jpayne@68 534
jpayne@68 535 private long dumpHeap(){
jpayne@68 536 long count=0;
jpayne@68 537 synchronized(sharedHeap){
jpayne@68 538 count=sharedHeap.genomeSizeEstimate(minCount);
jpayne@68 539 if(count<targetKmers){
jpayne@68 540 sharedHeap.add(localHeap);
jpayne@68 541 localHeap.clear();
jpayne@68 542 }
jpayne@68 543 }
jpayne@68 544 return count;
jpayne@68 545 }
jpayne@68 546
jpayne@68 547 /** Number of reads processed by this thread */
jpayne@68 548 protected long readsProcessedT=0;
jpayne@68 549 /** Number of bases processed by this thread */
jpayne@68 550 protected long basesProcessedT=0;
jpayne@68 551
jpayne@68 552 /** Number of reads retained by this thread */
jpayne@68 553 protected long readsOutT=0;
jpayne@68 554 /** Number of bases retained by this thread */
jpayne@68 555 protected long basesOutT=0;
jpayne@68 556
jpayne@68 557 /** True only if this thread has completed successfully */
jpayne@68 558 boolean success=false;
jpayne@68 559
jpayne@68 560 /** Shared input stream */
jpayne@68 561 private final ConcurrentReadInputStream cris;
jpayne@68 562 /** Shared output stream */
jpayne@68 563 private final ConcurrentReadOutputStream ros;
jpayne@68 564 /** Thread ID */
jpayne@68 565 final int tid;
jpayne@68 566
jpayne@68 567 final SketchHeap localHeap;
jpayne@68 568 }
jpayne@68 569
jpayne@68 570 /*--------------------------------------------------------------*/
jpayne@68 571 /*---------------- Fields ----------------*/
jpayne@68 572 /*--------------------------------------------------------------*/
jpayne@68 573
jpayne@68 574 /** Primary input file path */
jpayne@68 575 private String in1=null;
jpayne@68 576 /** Secondary input file path */
jpayne@68 577 private String in2=null;
jpayne@68 578
jpayne@68 579 private String qfin1=null;
jpayne@68 580 private String qfin2=null;
jpayne@68 581
jpayne@68 582 /** Primary output file path */
jpayne@68 583 private String out1=null;
jpayne@68 584 /** Secondary output file path */
jpayne@68 585 private String out2=null;
jpayne@68 586
jpayne@68 587 private String qfout1=null;
jpayne@68 588 private String qfout2=null;
jpayne@68 589
jpayne@68 590 /** Override input file extension */
jpayne@68 591 private String extin=null;
jpayne@68 592 /** Override output file extension */
jpayne@68 593 private String extout=null;
jpayne@68 594
jpayne@68 595 /*--------------------------------------------------------------*/
jpayne@68 596
jpayne@68 597 /** Number of reads processed */
jpayne@68 598 protected long readsProcessed=0;
jpayne@68 599 /** Number of bases processed */
jpayne@68 600 protected long basesProcessed=0;
jpayne@68 601
jpayne@68 602 /** Number of reads retained */
jpayne@68 603 protected long readsOut=0;
jpayne@68 604 /** Number of bases retained */
jpayne@68 605 protected long basesOut=0;
jpayne@68 606
jpayne@68 607 /** Quit after processing this many input reads; -1 means no limit */
jpayne@68 608 private long maxReads=-1;
jpayne@68 609
jpayne@68 610 /*--------------------------------------------------------------*/
jpayne@68 611 /*---------------- Final Fields ----------------*/
jpayne@68 612 /*--------------------------------------------------------------*/
jpayne@68 613
jpayne@68 614 /** Primary input file */
jpayne@68 615 private final FileFormat ffin1;
jpayne@68 616 /** Secondary input file */
jpayne@68 617 private final FileFormat ffin2;
jpayne@68 618
jpayne@68 619 /** Primary output file */
jpayne@68 620 private final FileFormat ffout1;
jpayne@68 621 /** Secondary output file */
jpayne@68 622 private final FileFormat ffout2;
jpayne@68 623
jpayne@68 624 private final SketchHeap sharedHeap;
jpayne@68 625 private final int heapSize;
jpayne@68 626 private final long targetKmers;
jpayne@68 627 private final int minCount;
jpayne@68 628
jpayne@68 629 final int shift;
jpayne@68 630 final int shift2;
jpayne@68 631 final long mask;
jpayne@68 632
jpayne@68 633 final float minProb;
jpayne@68 634 final byte minQual;
jpayne@68 635
jpayne@68 636 /*--------------------------------------------------------------*/
jpayne@68 637 /*---------------- Common Fields ----------------*/
jpayne@68 638 /*--------------------------------------------------------------*/
jpayne@68 639
jpayne@68 640 /** Print status messages to this output stream */
jpayne@68 641 private PrintStream outstream=System.err;
jpayne@68 642 /** Print verbose messages */
jpayne@68 643 public static boolean verbose=false;
jpayne@68 644 /** True if an error was encountered */
jpayne@68 645 public boolean errorState=false;
jpayne@68 646 /** Overwrite existing output files */
jpayne@68 647 private boolean overwrite=true;
jpayne@68 648 /** Append to existing output files */
jpayne@68 649 private boolean append=false;
jpayne@68 650 /** Reads are output in input order (not enabled) */
jpayne@68 651 private boolean ordered=false;
jpayne@68 652 /** Shuffle input */
jpayne@68 653 private boolean shuffle=false;
jpayne@68 654
jpayne@68 655 }