jpayne@68: package sketch; jpayne@68: jpayne@68: import java.io.File; jpayne@68: import java.io.PrintStream; jpayne@68: import java.util.ArrayList; jpayne@68: import java.util.Arrays; jpayne@68: jpayne@68: import dna.AminoAcid; jpayne@68: import fileIO.ByteFile; jpayne@68: import fileIO.FileFormat; jpayne@68: import fileIO.ReadWrite; jpayne@68: import shared.Parse; jpayne@68: import shared.Parser; jpayne@68: import shared.PreParser; jpayne@68: import shared.ReadStats; jpayne@68: import shared.Shared; jpayne@68: import shared.Timer; jpayne@68: import shared.Tools; jpayne@68: import stream.ConcurrentReadInputStream; jpayne@68: import stream.ConcurrentReadOutputStream; jpayne@68: import stream.FASTQ; jpayne@68: import stream.FastaReadInputStream; jpayne@68: import stream.Read; jpayne@68: import structures.ListNum; jpayne@68: jpayne@68: /** jpayne@68: * jpayne@68: * @author Brian Bushnell jpayne@68: * @date July 25, 2018 jpayne@68: * jpayne@68: */ jpayne@68: public class KmerLimit extends SketchObject { jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Initialization ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** jpayne@68: * Code entrance from the command line. jpayne@68: * @param args Command line arguments jpayne@68: */ jpayne@68: public static void main(String[] args){ jpayne@68: //Start a timer immediately upon code entrance. jpayne@68: Timer t=new Timer(); jpayne@68: jpayne@68: //Create an instance of this class jpayne@68: KmerLimit x=new KmerLimit(args); jpayne@68: jpayne@68: //Run the object jpayne@68: x.process(t); jpayne@68: jpayne@68: //Close the print stream if it was redirected jpayne@68: Shared.closeStream(x.outstream); jpayne@68: } jpayne@68: jpayne@68: /** jpayne@68: * Constructor. jpayne@68: * @param args Command line arguments jpayne@68: */ jpayne@68: public KmerLimit(String[] args){ jpayne@68: jpayne@68: {//Preparse block for help, config files, and outstream jpayne@68: PreParser pp=new PreParser(args, getClass(), false); jpayne@68: args=pp.args; jpayne@68: outstream=pp.outstream; jpayne@68: } jpayne@68: jpayne@68: boolean setInterleaved=false; //Whether interleaved was explicitly set. jpayne@68: jpayne@68: //Set shared static variables jpayne@68: ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; jpayne@68: ReadWrite.MAX_ZIP_THREADS=Shared.threads(); jpayne@68: SketchObject.setKeyFraction(0.1); jpayne@68: defaultParams.minEntropy=0; jpayne@68: defaultParams.minProb=0.2f; jpayne@68: jpayne@68: boolean setHeapSize=false; jpayne@68: int heapSize_=4095; jpayne@68: long targetKmers_=0; jpayne@68: int k_=32; jpayne@68: int minCount_=1; jpayne@68: jpayne@68: //Create a parser object jpayne@68: Parser parser=new Parser(); jpayne@68: parser.overwrite=true; jpayne@68: jpayne@68: //Parse each argument jpayne@68: for(int i=0; i1 ? split[1] : null; jpayne@68: if(b!=null && b.equalsIgnoreCase("null")){b=null;} jpayne@68: jpayne@68: if(a.equals("verbose")){ jpayne@68: verbose=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("ordered")){ jpayne@68: ordered=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("shuffle")){ jpayne@68: shuffle=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("size") || a.equals("heapsize")){ jpayne@68: heapSize_=Parse.parseIntKMG(b); jpayne@68: setHeapSize=true; jpayne@68: }else if(a.equals("kmers") || a.equals("target") || a.equals("limit")){ jpayne@68: targetKmers_=Parse.parseKMG(b); jpayne@68: }else if(a.equals("mincount")){ jpayne@68: minCount_=Parse.parseIntKMG(b); jpayne@68: }else if(parseSketchFlags(arg, a, b)){ jpayne@68: parser.parse(arg, a, b); jpayne@68: }else if(defaultParams.parse(arg, a, b)){ jpayne@68: parser.parse(arg, a, b); jpayne@68: }else if(a.equals("parse_flag_goes_here")){ jpayne@68: long fake_variable=Parse.parseKMG(b); jpayne@68: //Set a variable here jpayne@68: }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser jpayne@68: //do nothing jpayne@68: }else{ jpayne@68: outstream.println("Unknown parameter "+args[i]); jpayne@68: assert(false) : "Unknown parameter "+args[i]; jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: if(!setHeapSize && minCount_>1){heapSize_=32000;} jpayne@68: heapSize=heapSize_; jpayne@68: targetKmers=targetKmers_; jpayne@68: k=k_; jpayne@68: minCount=minCount_; jpayne@68: assert(targetKmers>0) : "Must set a kmer limit."; jpayne@68: assert(heapSize>0) : "Heap size must be positive."; jpayne@68: assert(k>0 && k<=32) : "0-1){ jpayne@68: out2=out1.replace("#", "2"); jpayne@68: out1=out1.replace("#", "1"); jpayne@68: } jpayne@68: jpayne@68: //Adjust interleaved detection based on the number of input files jpayne@68: if(in2!=null){ jpayne@68: if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");} jpayne@68: FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; jpayne@68: } jpayne@68: jpayne@68: assert(FastaReadInputStream.settingsOK()); jpayne@68: jpayne@68: //Ensure there is an input file jpayne@68: if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} jpayne@68: jpayne@68: //Adjust the number of threads for input file reading jpayne@68: if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ jpayne@68: ByteFile.FORCE_MODE_BF2=true; jpayne@68: } jpayne@68: jpayne@68: //Ensure out2 is not set without out1 jpayne@68: if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");} jpayne@68: jpayne@68: //Adjust interleaved settings based on number of output files jpayne@68: if(!setInterleaved){ jpayne@68: assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n"; jpayne@68: if(in2!=null){ //If there are 2 input streams. jpayne@68: FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; jpayne@68: outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); jpayne@68: }else{ //There is one input stream. jpayne@68: if(out2!=null){ jpayne@68: FASTQ.FORCE_INTERLEAVED=true; jpayne@68: FASTQ.TEST_INTERLEAVED=false; jpayne@68: outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: //Ensure output files can be written jpayne@68: if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){ jpayne@68: outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2); jpayne@68: throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n"); jpayne@68: } jpayne@68: jpayne@68: //Ensure input files can be read jpayne@68: if(!Tools.testInputFiles(false, true, in1, in2)){ jpayne@68: throw new RuntimeException("\nCan't read some input files.\n"); jpayne@68: } jpayne@68: jpayne@68: //Ensure that no file was specified multiple times jpayne@68: if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){ jpayne@68: throw new RuntimeException("\nSome file names were specified multiple times.\n"); jpayne@68: } jpayne@68: jpayne@68: //Create output FileFormat objects jpayne@68: ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered); jpayne@68: ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, ordered); jpayne@68: jpayne@68: //Create input FileFormat objects jpayne@68: ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true); jpayne@68: ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true); jpayne@68: jpayne@68: minProb=defaultParams.minProb; jpayne@68: minQual=defaultParams.minQual; jpayne@68: jpayne@68: shift=2*k; jpayne@68: shift2=shift-2; jpayne@68: mask=(shift>63 ? -1L : ~((-1L)<1); jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Outer Methods ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Create read streams and process all data */ jpayne@68: void process(Timer t){ jpayne@68: jpayne@68: //Turn off read validation in the input threads to increase speed jpayne@68: final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; jpayne@68: Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; jpayne@68: jpayne@68: //Create a read input stream jpayne@68: final ConcurrentReadInputStream cris; jpayne@68: { jpayne@68: cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2); jpayne@68: cris.start(); //Start the stream jpayne@68: if(verbose){outstream.println("Started cris");} jpayne@68: } jpayne@68: boolean paired=cris.paired(); jpayne@68: if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));} jpayne@68: jpayne@68: //Optionally create a read output stream jpayne@68: final ConcurrentReadOutputStream ros; jpayne@68: if(ffout1!=null){ jpayne@68: //Select output buffer size based on whether it needs to be ordered jpayne@68: final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8); jpayne@68: jpayne@68: //Notify user of output mode jpayne@68: if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){ jpayne@68: outstream.println("Writing interleaved."); jpayne@68: } jpayne@68: jpayne@68: ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false); jpayne@68: ros.start(); //Start the stream jpayne@68: }else{ros=null;} jpayne@68: jpayne@68: //Reset counters jpayne@68: readsProcessed=readsOut=0; jpayne@68: basesProcessed=basesOut=0; jpayne@68: jpayne@68: //Process the reads in separate threads jpayne@68: spawnThreads(cris, ros); jpayne@68: jpayne@68: if(verbose){outstream.println("Finished; closing streams.");} jpayne@68: jpayne@68: //Write anything that was accumulated by ReadStats jpayne@68: errorState|=ReadStats.writeAll(); jpayne@68: //Close the read streams jpayne@68: errorState|=ReadWrite.closeStreams(cris, ros); jpayne@68: jpayne@68: //Reset read validation jpayne@68: Read.VALIDATE_IN_CONSTRUCTOR=vic; jpayne@68: jpayne@68: //Report timing and results jpayne@68: t.stop(); jpayne@68: outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); jpayne@68: outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false)); jpayne@68: String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8); jpayne@68: outstream.println("Unique Kmers Out: "+kstring); jpayne@68: jpayne@68: // Sketch sk=new Sketch(sharedHeap, true, true, null); jpayne@68: // outstream.println(sk.genomeSizeEstimate()); jpayne@68: jpayne@68: //Throw an exception of there was an error in a thread jpayne@68: if(errorState){ jpayne@68: throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: /** Spawn process threads */ jpayne@68: private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){ jpayne@68: jpayne@68: //Do anything necessary prior to processing jpayne@68: jpayne@68: //Determine how many threads may be used jpayne@68: final int threads=Tools.min(8, Shared.threads()); jpayne@68: jpayne@68: //Fill a list with ProcessThreads jpayne@68: ArrayList alpt=new ArrayList(threads); jpayne@68: int tSize=heapSize/2; jpayne@68: for(int i=0; i1); jpayne@68: } jpayne@68: jpayne@68: //Called by start() jpayne@68: @Override jpayne@68: public void run(){ jpayne@68: //Do anything necessary prior to processing jpayne@68: jpayne@68: //Process the reads jpayne@68: processInner(); jpayne@68: jpayne@68: //Do anything necessary after processing jpayne@68: jpayne@68: //Indicate successful exit status jpayne@68: success=true; jpayne@68: } jpayne@68: jpayne@68: /** Iterate through the reads */ jpayne@68: void processInner(){ jpayne@68: jpayne@68: //Grab the first ListNum of reads jpayne@68: ListNum ln=cris.nextList(); jpayne@68: //Grab the actual read list from the ListNum jpayne@68: ArrayList reads=(ln!=null ? ln.list : null); jpayne@68: jpayne@68: //Check to ensure pairing is as expected jpayne@68: if(reads!=null && !reads.isEmpty()){ jpayne@68: Read r=reads.get(0); jpayne@68: // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access jpayne@68: } jpayne@68: jpayne@68: //As long as there is a nonempty read list... jpayne@68: while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning jpayne@68: // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access jpayne@68: jpayne@68: //Loop through each read in the list jpayne@68: for(int idx=0; idx=targetKmers){break;} jpayne@68: jpayne@68: for(Read r1 : reads){ jpayne@68: readsOutT+=r1.pairCount(); jpayne@68: basesOutT+=r1.pairLength(); jpayne@68: } jpayne@68: jpayne@68: //Output reads to the output stream jpayne@68: if(ros!=null){ros.add(reads, ln.id);} jpayne@68: jpayne@68: //Notify the input stream that the list was used jpayne@68: cris.returnList(ln); jpayne@68: // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access jpayne@68: jpayne@68: //Fetch a new list jpayne@68: ln=cris.nextList(); jpayne@68: reads=(ln!=null ? ln.list : null); jpayne@68: } jpayne@68: jpayne@68: //Notify the input stream that the final list was used jpayne@68: if(ln!=null){ jpayne@68: if(ln.list!=null){ln.list.clear();} jpayne@68: cris.returnList(ln.id, true); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: /** jpayne@68: * Process a read or a read pair. jpayne@68: * @param r1 Read 1 jpayne@68: * @param r2 Read 2 (may be null) jpayne@68: */ jpayne@68: void processReadPair(final Read r1, final Read r2){ jpayne@68: processReadNucleotide(r1); jpayne@68: if(r2!=null){processReadNucleotide(r2);} jpayne@68: } jpayne@68: jpayne@68: void processReadNucleotide(final Read r){ jpayne@68: final byte[] bases=r.bases; jpayne@68: final byte[] quals=r.quality; jpayne@68: long kmer=0; jpayne@68: long rkmer=0; jpayne@68: int len=0; jpayne@68: assert(!r.aminoacid()); jpayne@68: jpayne@68: final long min=minHashValue; jpayne@68: localHeap.genomeSizeBases+=r.length(); jpayne@68: localHeap.genomeSequences++; jpayne@68: jpayne@68: if(quals==null || (minProb<=0 && minQual<2)){ jpayne@68: for(int i=0; i>>2)|(x2<=k){ jpayne@68: localHeap.genomeSizeKmers++; jpayne@68: final long hashcode=hash(kmer, rkmer); jpayne@68: if(hashcode>min){localHeap.add(hashcode);} jpayne@68: } jpayne@68: } jpayne@68: }else{ jpayne@68: float prob=1; jpayne@68: for(int i=0; i>>2)|(x2<=0) : Arrays.toString(quals)+"\n"+minProb+", "+minQual; jpayne@68: prob=prob*align2.QualityTools.PROB_CORRECT[q]; jpayne@68: if(len>k){ jpayne@68: byte oldq=quals[i-k]; jpayne@68: prob=prob*align2.QualityTools.PROB_CORRECT_INVERSE[oldq]; jpayne@68: } jpayne@68: if(x<0 || q=k && prob>=minProb){ jpayne@68: localHeap.genomeSizeKmers++; jpayne@68: localHeap.probSum+=prob; jpayne@68: final long hashcode=hash(kmer, rkmer); jpayne@68: if(hashcode>min){localHeap.checkAndAdd(hashcode);} jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private long dumpHeap(){ jpayne@68: long count=0; jpayne@68: synchronized(sharedHeap){ jpayne@68: count=sharedHeap.genomeSizeEstimate(minCount); jpayne@68: if(count