jpayne@68: package sketch; jpayne@68: jpayne@68: import java.io.File; jpayne@68: import java.io.PrintStream; jpayne@68: import java.util.ArrayDeque; jpayne@68: import java.util.ArrayList; jpayne@68: import java.util.Arrays; jpayne@68: import java.util.HashMap; jpayne@68: import java.util.Map.Entry; jpayne@68: import java.util.concurrent.atomic.AtomicInteger; jpayne@68: jpayne@68: import fileIO.ByteFile; jpayne@68: import fileIO.ByteStreamWriter; jpayne@68: import fileIO.FileFormat; jpayne@68: import fileIO.ReadWrite; jpayne@68: import shared.Parse; jpayne@68: import shared.Parser; jpayne@68: import shared.PreParser; jpayne@68: import shared.ReadStats; jpayne@68: import shared.Shared; jpayne@68: import shared.Timer; jpayne@68: import shared.Tools; jpayne@68: import stream.ConcurrentReadInputStream; jpayne@68: import stream.FASTQ; jpayne@68: import stream.FastaReadInputStream; jpayne@68: import stream.Read; jpayne@68: import structures.ByteBuilder; jpayne@68: import structures.ListNum; jpayne@68: import structures.LongList; jpayne@68: import tax.AccessionToTaxid; jpayne@68: import tax.GiToTaxid; jpayne@68: import tax.ImgRecord2; jpayne@68: import tax.TaxNode; jpayne@68: import tax.TaxTree; jpayne@68: jpayne@68: /** jpayne@68: * Creates MinHashSketches rapidly. jpayne@68: * jpayne@68: * @author Brian Bushnell jpayne@68: * @date July 6, 2016 jpayne@68: * jpayne@68: */ jpayne@68: public class SketchMaker extends SketchObject { jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Initialization ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** jpayne@68: * Code entrance from the command line. jpayne@68: * @param args Command line arguments jpayne@68: */ jpayne@68: public static void main(String[] args){ jpayne@68: //Start a timer immediately upon code entrance. jpayne@68: Timer t=new Timer(); jpayne@68: jpayne@68: final int mode=parseMode(args); jpayne@68: if(mode==PER_FILE/* || mode==ONE_SKETCH || mode==PER_HEADER*/ || mode==PER_SEQUENCE){//ONE_SKETCH does not work for multiple input files. jpayne@68: recallCompareSketch(args); jpayne@68: return; jpayne@68: } jpayne@68: jpayne@68: final int oldBufLen=Shared.bufferLen(); jpayne@68: jpayne@68: //Create an instance of this class jpayne@68: SketchMaker x=new SketchMaker(args); jpayne@68: jpayne@68: //Run the object jpayne@68: x.process(t); jpayne@68: jpayne@68: Shared.setBufferLen(oldBufLen); jpayne@68: jpayne@68: //Close the print stream if it was redirected jpayne@68: Shared.closeStream(x.outstream); jpayne@68: } jpayne@68: jpayne@68: private static void recallCompareSketch(String[] args){ jpayne@68: ArrayList list=new ArrayList(args.length+1); jpayne@68: for(int i=0; i1 ? split[1] : null; jpayne@68: jpayne@68: if(a.equals("verbose")){ jpayne@68: verbose=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("files")){ jpayne@68: files_=Integer.parseInt(b); jpayne@68: }else if(a.equals("minsize")){ jpayne@68: minSizeKmers_=Parse.parseIntKMG(b); jpayne@68: }else if(a.equals("prefilter")){ jpayne@68: prefilter=Parse.parseBoolean(b); jpayne@68: setPrefilter=true; jpayne@68: } jpayne@68: jpayne@68: else if(a.equals("name") || a.equals("taxname")){ jpayne@68: outTaxName=b; jpayne@68: }else if(a.equals("name0")){ jpayne@68: outName0=b; jpayne@68: }else if(a.equals("fname")){ jpayne@68: outFname=b; jpayne@68: }else if(a.equals("taxid") || a.equals("tid")){ jpayne@68: outTaxID=Integer.parseInt(b); jpayne@68: }else if(a.equals("spid")){ jpayne@68: outSpid=Integer.parseInt(b); jpayne@68: }else if(a.equals("imgid")){ jpayne@68: outImgID=Integer.parseInt(b); jpayne@68: }else if((a.startsWith("meta_") || a.startsWith("mt_")) && b!=null){ jpayne@68: if(outMeta==null){outMeta=new ArrayList();} jpayne@68: int underscore=a.indexOf('_', 0); jpayne@68: outMeta.add(a.substring(underscore+1)+":"+b); jpayne@68: }else if(a.equals("parsesubunit")){ jpayne@68: parseSubunit=Parse.parseBoolean(b); jpayne@68: } jpayne@68: jpayne@68: else if(parseMode(arg, a, b)>-1){ jpayne@68: mode_=parseMode(arg, a, b); jpayne@68: }else if(a.equals("parse_flag_goes_here")){ jpayne@68: long fake_variable=Parse.parseKMG(b); jpayne@68: //Set a variable here jpayne@68: } jpayne@68: jpayne@68: else if(a.equals("table") || a.equals("gi") || a.equals("gitable")){ jpayne@68: giTableFile=b; jpayne@68: }else if(a.equals("taxtree") || a.equals("tree")){ jpayne@68: taxTreeFile=b; jpayne@68: }else if(a.equals("accession")){ jpayne@68: accessionFile=b; jpayne@68: }else if(a.equalsIgnoreCase("img") || a.equals("imgfile") || a.equals("imgdump")){ jpayne@68: imgFile=b; jpayne@68: } jpayne@68: jpayne@68: else if(a.equals("tossjunk")){ jpayne@68: tossJunk=Parse.parseBoolean(b); jpayne@68: } jpayne@68: jpayne@68: // else if(a.equals("silva")){//Handled by parser jpayne@68: // TaxTree.SILVA_MODE=Parse.parseBoolean(b); jpayne@68: // } jpayne@68: jpayne@68: else if(a.equals("taxlevel") || a.equals("tl") || a.equals("level") || a.equals("lv")){ jpayne@68: taxLevel=TaxTree.parseLevel(b); jpayne@68: } jpayne@68: jpayne@68: else if(parseSketchFlags(arg, a, b)){ jpayne@68: //do nothing jpayne@68: // System.err.println("a: "+arg); jpayne@68: }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser jpayne@68: //do nothing jpayne@68: // System.err.println("b: "+arg); jpayne@68: }else if(defaultParams.parse(arg, a, b)){ jpayne@68: //do nothing jpayne@68: // System.err.println("c: "+arg); jpayne@68: // System.err.println(defaultParams.printDepth); jpayne@68: } jpayne@68: jpayne@68: else{ jpayne@68: outstream.println("Unknown parameter "+args[i]); jpayne@68: assert(false) : "Unknown parameter "+args[i]; jpayne@68: } jpayne@68: } jpayne@68: if("auto".equalsIgnoreCase(imgFile)){imgFile=TaxTree.defaultImgFile();} jpayne@68: if("auto".equalsIgnoreCase(taxTreeFile)){taxTreeFile=TaxTree.defaultTreeFile();} jpayne@68: if("auto".equalsIgnoreCase(giTableFile)){giTableFile=TaxTree.defaultTableFile();} jpayne@68: if("auto".equalsIgnoreCase(accessionFile)){accessionFile=TaxTree.defaultAccessionFile();} jpayne@68: jpayne@68: outMeta=SketchObject.fixMeta(outMeta); jpayne@68: jpayne@68: postParse(); jpayne@68: minSizeKmers=minSizeKmers_; jpayne@68: mode=mode_; jpayne@68: jpayne@68: minSizeBases=minSizeKmers_+k-1; jpayne@68: jpayne@68: {//Process parser fields jpayne@68: Parser.processQuality(); jpayne@68: jpayne@68: maxReads=parser.maxReads; jpayne@68: jpayne@68: overwrite=ReadStats.overwrite=parser.overwrite; jpayne@68: append=ReadStats.append=parser.append; jpayne@68: jpayne@68: in1=parser.in1; jpayne@68: in2=parser.in2; jpayne@68: jpayne@68: out1=parser.out1; jpayne@68: jpayne@68: extin=parser.extin; jpayne@68: } jpayne@68: files=(out1==null ? 0 : files_); jpayne@68: jpayne@68: if(!setPrefilter && !prefilter && (mode==PER_TAXA || mode==PER_IMG) && in1!=null && !in1.startsWith("stdin") && (AUTOSIZE || AUTOSIZE_LINEAR || targetSketchSize>200)){ jpayne@68: prefilter=true; jpayne@68: System.err.println("Enabled prefilter due to running in per-"+(mode==PER_TAXA ? "taxa" : "IMG")+" mode; override with 'prefilter=f'."); jpayne@68: } jpayne@68: jpayne@68: assert(mode!=ONE_SKETCH || files<2) : "Multiple output files are not allowed in single-sketch mode."; jpayne@68: jpayne@68: //Do input file # replacement jpayne@68: if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){ jpayne@68: in2=in1.replace("#", "2"); jpayne@68: in1=in1.replace("#", "1"); jpayne@68: } jpayne@68: jpayne@68: //Adjust interleaved detection based on the number of input files jpayne@68: if(in2!=null){ jpayne@68: if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");} jpayne@68: FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; jpayne@68: } jpayne@68: jpayne@68: assert(FastaReadInputStream.settingsOK()); jpayne@68: jpayne@68: //Ensure there is an input file jpayne@68: if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} jpayne@68: jpayne@68: //Adjust the number of threads for input file reading jpayne@68: if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ jpayne@68: ByteFile.FORCE_MODE_BF2=true; jpayne@68: } jpayne@68: jpayne@68: ffout=makeFFArray(out1, files, overwrite, append); jpayne@68: if(ffout==null || ffout.length<1){ jpayne@68: System.err.println("WARNING: No output files were specified; no sketches will be written.\n"); jpayne@68: } jpayne@68: jpayne@68: // //Ensure output files can be written jpayne@68: // if(!Tools.testOutputFiles(overwrite, append, false, out1)){ jpayne@68: // outstream.println((out1==null)+", "+out1); jpayne@68: // throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out1+"\n"); jpayne@68: // } jpayne@68: jpayne@68: //Ensure input files can be read jpayne@68: if(!Tools.testInputFiles(false, true, in1, in2, taxTreeFile, giTableFile, imgFile, SSUMap.r16SFile, SSUMap.r18SFile)){ jpayne@68: throw new RuntimeException("\nCan't read some input files.\n"); jpayne@68: } jpayne@68: jpayne@68: //Ensure that no file was specified multiple times jpayne@68: if(!Tools.testForDuplicateFiles(true, in1, in2, out1, taxTreeFile, giTableFile, imgFile, SSUMap.r16SFile, SSUMap.r18SFile)){ jpayne@68: throw new RuntimeException("\nSome file names were specified multiple times.\n"); jpayne@68: } jpayne@68: jpayne@68: //Create input FileFormat objects jpayne@68: ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true); jpayne@68: ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true); jpayne@68: jpayne@68: // assert(false) : defaultParams.trackCounts(); jpayne@68: tool=new SketchTool(targetSketchSize, defaultParams); jpayne@68: jpayne@68: if(taxTreeFile!=null){setTaxtree(taxTreeFile, outstream);} jpayne@68: jpayne@68: if(giTableFile!=null){ jpayne@68: loadGiToTaxid(); jpayne@68: } jpayne@68: if(accessionFile!=null){ jpayne@68: AccessionToTaxid.tree=taxtree; jpayne@68: outstream.println("Loading accession table."); jpayne@68: AccessionToTaxid.load(accessionFile); jpayne@68: System.gc(); jpayne@68: } jpayne@68: if(imgFile!=null){ jpayne@68: TaxTree.loadIMG(imgFile, true, outstream); jpayne@68: } jpayne@68: SSUMap.load(outstream); jpayne@68: jpayne@68: if(prefilter){ jpayne@68: if(mode==PER_TAXA){sizeList=sizeList(); sizeMap=null;} jpayne@68: else if(mode==PER_IMG){sizeMap=sizeMap(); sizeList=null;} jpayne@68: else{ jpayne@68: assert(false) : "Wrong mode for prefilter; should be taxa or img."; jpayne@68: sizeList=null; sizeMap=null; jpayne@68: } jpayne@68: }else{ jpayne@68: sizeList=null; sizeMap=null; jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private static FileFormat[] makeFFArray(String fname0, int files, boolean overwrite, boolean append){ jpayne@68: if(files<1 || fname0==null){return null;} jpayne@68: String[] fnames=new String[files]; jpayne@68: FileFormat[] ff=new FileFormat[files]; jpayne@68: for(int i=0; i1){ jpayne@68: assert(fname.indexOf('#')>-1) : "Output name requires # symbol for multiple files."; jpayne@68: fname=fname.replaceFirst("#", ""+i); jpayne@68: } jpayne@68: fnames[i]=fname; jpayne@68: ff[i]=FileFormat.testOutput(fname, FileFormat.TEXT, null, true, overwrite, append, false); jpayne@68: } jpayne@68: jpayne@68: if(!Tools.testOutputFiles(overwrite, append, false, fnames)){ jpayne@68: throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+Arrays.toString(fnames)+"\n"); jpayne@68: } jpayne@68: jpayne@68: return ff; jpayne@68: } jpayne@68: jpayne@68: // private static ByteStreamWriter[] makeTSWArray(FileFormat[] ff){ jpayne@68: // if(ff==null || ff.length==0){return null;} jpayne@68: // ByteStreamWriter[] tsw=new ByteStreamWriter[ff.length]; jpayne@68: // for(int i=0; i ln=cris.nextList(); jpayne@68: //Grab the actual read list from the ListNum jpayne@68: ArrayList reads=(ln!=null ? ln.list : null); jpayne@68: jpayne@68: //As long as there is a nonempty read list... jpayne@68: while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning jpayne@68: // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access jpayne@68: jpayne@68: //Loop through each read in the list jpayne@68: for(int idx=0; idx=TaxTree.LIFE){break;} jpayne@68: tn=temp; jpayne@68: } jpayne@68: if(tn!=null){taxID=tn.id;} jpayne@68: } jpayne@68: jpayne@68: if(taxID>0){ jpayne@68: long a=r1.length(); jpayne@68: long b=r1.mateLength(); jpayne@68: if(a sizeMap(){ jpayne@68: Timer t=new Timer(); jpayne@68: t.start("Making img prefilter."); jpayne@68: jpayne@68: // assert(false) : ReadWrite.USE_PIGZ+", "+ReadWrite.USE_UNPIGZ+", "+ReadWrite.MAX_ZIP_THREADS+", "+Shared.threads(); jpayne@68: jpayne@68: HashMap sizes=new HashMap(); jpayne@68: jpayne@68: //Create a read input stream jpayne@68: final ConcurrentReadInputStream cris; jpayne@68: { jpayne@68: cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null); jpayne@68: if(defaultParams.samplerate!=1){cris.setSampleRate(defaultParams.samplerate, sampleseed);} jpayne@68: cris.start(); //Start the stream jpayne@68: if(verbose){outstream.println("Started cris");} jpayne@68: } jpayne@68: jpayne@68: //Grab the first ListNum of reads jpayne@68: ListNum ln=cris.nextList(); jpayne@68: //Grab the actual read list from the ListNum jpayne@68: ArrayList reads=(ln!=null ? ln.list : null); jpayne@68: jpayne@68: //As long as there is a nonempty read list... jpayne@68: while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning jpayne@68: // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access jpayne@68: jpayne@68: //Loop through each read in the list jpayne@68: for(int idx=0; idx-1) : "IMG records must start with IMG number followed by a space: "+r1.id; jpayne@68: jpayne@68: if(imgID>0){ jpayne@68: long a=r1.length(); jpayne@68: long b=r1.mateLength(); jpayne@68: if(a0){ jpayne@68: Long old=sizes.get(imgID); jpayne@68: if(old==null){sizes.put(imgID, a+b);} jpayne@68: else{sizes.put(imgID, a+b+old);} jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: //Notify the input stream that the list was used jpayne@68: cris.returnList(ln); jpayne@68: // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access jpayne@68: jpayne@68: //Fetch a new list jpayne@68: ln=cris.nextList(); jpayne@68: reads=(ln!=null ? ln.list : null); jpayne@68: } jpayne@68: jpayne@68: //Notify the input stream that the final list was used jpayne@68: if(ln!=null){ jpayne@68: cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); jpayne@68: } jpayne@68: jpayne@68: errorState|=ReadWrite.closeStream(cris); jpayne@68: // outstream.println("Created prefilter."); jpayne@68: t.stop("Created prefilter:"); jpayne@68: Shared.printMemory(); jpayne@68: System.err.println(); jpayne@68: jpayne@68: return sizes; jpayne@68: } jpayne@68: jpayne@68: /** Create read streams and process all data */ jpayne@68: void process(Timer t){ jpayne@68: jpayne@68: //Reset counters jpayne@68: readsProcessed=0; jpayne@68: basesProcessed=0; jpayne@68: jpayne@68: if(mode==ONE_SKETCH && !forceDisableMultithreadedFastq && Shared.threads()>2 && ffin1.fastq()){ jpayne@68: // Shared.setBufferLen(2); jpayne@68: singleSketchMT(); jpayne@68: }else{ jpayne@68: final int oldLen=Shared.bufferLen(); jpayne@68: Shared.capBufferLen(ffin1.fastq() ? 40 : 4); jpayne@68: jpayne@68: //Turn off read validation in the input threads to increase speed jpayne@68: final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; jpayne@68: Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; jpayne@68: jpayne@68: //Create a read input stream jpayne@68: final ConcurrentReadInputStream cris; jpayne@68: { jpayne@68: cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null); jpayne@68: cris.start(); //Start the stream jpayne@68: if(verbose){outstream.println("Started cris");} jpayne@68: } jpayne@68: jpayne@68: //Process the reads in separate threads jpayne@68: spawnThreads(cris); jpayne@68: jpayne@68: if(verbose){outstream.println("Finished; closing streams.");} jpayne@68: jpayne@68: //Write anything that was accumulated by ReadStats jpayne@68: errorState|=ReadStats.writeAll(); jpayne@68: //Close the read streams jpayne@68: errorState|=ReadWrite.closeStream(cris); jpayne@68: jpayne@68: //TODO: Write sketch jpayne@68: jpayne@68: //Reset read validation jpayne@68: Read.VALIDATE_IN_CONSTRUCTOR=vic; jpayne@68: Shared.setBufferLen(oldLen); jpayne@68: } jpayne@68: jpayne@68: //Report timing and results jpayne@68: t.stop(); jpayne@68: outstream.println("Wrote "+sketchesWritten+" of "+sketchesMade+" sketches.\n"); jpayne@68: outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); jpayne@68: jpayne@68: //Throw an exception of there was an error in a thread jpayne@68: if(errorState){ jpayne@68: throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private void singleSketchMT(){ jpayne@68: Timer t=new Timer(); jpayne@68: Sketch sketch=tool.processReadsMT(ffin1, ffin2, Shared.threads(), jpayne@68: maxReads, mode, defaultParams.samplerate, defaultParams.minEntropy, defaultParams.minProb, defaultParams.minQual, false); jpayne@68: jpayne@68: if(outTaxID>=0){sketch.taxID=outTaxID;} jpayne@68: if(outTaxName!=null){sketch.setTaxName(outTaxName);} jpayne@68: if(outFname!=null){sketch.setFname(outFname);} jpayne@68: if(outName0!=null){sketch.setName0(outName0);} jpayne@68: if(outSpid>=0){sketch.spid=outSpid;} jpayne@68: if(outImgID>=0){sketch.imgID=outImgID;} jpayne@68: sketch.setMeta(outMeta); jpayne@68: jpayne@68: //Accumulate per-thread statistics jpayne@68: readsProcessed+=sketch.genomeSequences; jpayne@68: basesProcessed+=sketch.genomeSizeBases; jpayne@68: kmersProcessed+=sketch.genomeSizeKmers; jpayne@68: jpayne@68: sketchesMade+=1; jpayne@68: jpayne@68: t.stop("Finished sketching: "); jpayne@68: Shared.printMemory(); jpayne@68: jpayne@68: if(ffout!=null && ffout.length>0){ jpayne@68: sketch.addSSU(); jpayne@68: SketchTool.write(sketch, ffout[0]); jpayne@68: sketchesWritten+=1; jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: /** Spawn process threads */ jpayne@68: @SuppressWarnings("unchecked") jpayne@68: private void spawnThreads(final ConcurrentReadInputStream cris){ jpayne@68: jpayne@68: //Do anything necessary prior to processing jpayne@68: Timer t=new Timer(); jpayne@68: jpayne@68: //Determine how many threads may be used jpayne@68: final int threads=Tools.mid(1, Shared.threads(), 14);//Probably capped for memory reasons. Rarely hits 1200% anyway (was 12, bumped to 14). jpayne@68: jpayne@68: //Fill a list with ProcessThreads jpayne@68: ArrayList alpt=new ArrayList(threads); jpayne@68: jpayne@68: if(mode==PER_TAXA || mode==PER_IMG){ jpayne@68: longMaps=new HashMap[MAP_WAYS]; jpayne@68: for(int i=0; i(); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: if(mode!=ONE_SKETCH){tsw=makeTSWArray(ffout);} jpayne@68: jpayne@68: for(int i=0; i=0){sketch.taxID=outTaxID;} jpayne@68: if(outTaxName!=null){sketch.setTaxName(outTaxName);} jpayne@68: if(outFname!=null){sketch.setFname(outFname);} jpayne@68: if(outName0!=null){sketch.setName0(outName0);} jpayne@68: if(outSpid>=0){sketch.spid=outSpid;} jpayne@68: if(outImgID>=0){sketch.imgID=outImgID;} jpayne@68: if(ffout!=null && ffout.length>0){ jpayne@68: sketch.addSSU(); jpayne@68: SketchTool.write(sketch, ffout[0]); jpayne@68: } jpayne@68: sketchesMade++; jpayne@68: sketchesWritten++; jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: if(tsw!=null){ jpayne@68: for(int i=0; i[] maps){ jpayne@68: jpayne@68: //Determine how many threads may be used jpayne@68: final int threads=files; jpayne@68: jpayne@68: //Fill a list with WriteThreads jpayne@68: ArrayList alwt=new ArrayList(threads); jpayne@68: jpayne@68: @SuppressWarnings("unchecked") jpayne@68: ArrayDeque[] heaps=new ArrayDeque[threads]; jpayne@68: for(int i=0; i(); jpayne@68: WriteThread wt=new WriteThread(i, heaps[i]); jpayne@68: alwt.add(wt); jpayne@68: } jpayne@68: jpayne@68: for(int i=0; i longMap=maps[i]; jpayne@68: for(Entry entry : longMap.entrySet()){ jpayne@68: // set.remove(entry); This will probably not work jpayne@68: SketchHeap entryHeap=entry.getValue(); jpayne@68: sketchesMade++; jpayne@68: if(entryHeap.size()>0 && entryHeap.genomeSizeKmers>=minSizeKmers){ jpayne@68: heaps[(entry.hashCode()&Integer.MAX_VALUE)%threads].add(entryHeap); jpayne@68: } jpayne@68: } jpayne@68: // intMap.clear(); jpayne@68: maps[i]=null; jpayne@68: } jpayne@68: jpayne@68: //Start the threads jpayne@68: for(WriteThread wt : alwt){wt.start();} jpayne@68: jpayne@68: //Wait for completion of all threads jpayne@68: boolean success=true; jpayne@68: for(WriteThread wt : alwt){ jpayne@68: jpayne@68: //Wait until this thread has terminated jpayne@68: while(wt.getState()!=Thread.State.TERMINATED){ jpayne@68: try { jpayne@68: //Attempt a join operation jpayne@68: wt.join(); jpayne@68: } catch (InterruptedException e) { jpayne@68: //Potentially handle this, if it is expected to occur jpayne@68: e.printStackTrace(); jpayne@68: } jpayne@68: } jpayne@68: // sketchesMade+=wt.sketchesMadeT; jpayne@68: sketchesWritten+=wt.sketchesWrittenT; jpayne@68: success&=wt.success; jpayne@68: } jpayne@68: return success; jpayne@68: } jpayne@68: jpayne@68: private class WriteThread extends Thread{ jpayne@68: jpayne@68: WriteThread(int tnum_, ArrayDeque queue_){ jpayne@68: tnum=tnum_; jpayne@68: queue=queue_; jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: public void run(){ jpayne@68: success=false; jpayne@68: for(SketchHeap polledHeap=queue.poll(); polledHeap!=null; polledHeap=queue.poll()){ jpayne@68: if(polledHeap.sketchSizeEstimate()>0){ jpayne@68: Sketch sketch=new Sketch(polledHeap, true, tool.trackCounts, outMeta); jpayne@68: if(outTaxID>=0 && sketch.taxID<0){sketch.taxID=outTaxID;} jpayne@68: if(outTaxName!=null && sketch.taxName()==null){sketch.setTaxName(outTaxName);} jpayne@68: if(outFname!=null && sketch.fname()==null){sketch.setFname(outFname);} jpayne@68: if(outName0!=null && sketch.name0()==null){sketch.setName0(outName0);} jpayne@68: if(outSpid>=0 && sketch.spid<0){sketch.spid=outSpid;} jpayne@68: if(outImgID>=0 && sketch.imgID<0){sketch.imgID=outImgID;} jpayne@68: sketch.addSSU(); jpayne@68: SketchTool.write(sketch, tsw[tnum], bb); jpayne@68: sketchesWrittenT++; jpayne@68: } jpayne@68: } jpayne@68: bb=null; jpayne@68: success=true; jpayne@68: queue=null; jpayne@68: } jpayne@68: jpayne@68: ArrayDeque queue; jpayne@68: final int tnum; jpayne@68: private ByteBuilder bb=new ByteBuilder(); jpayne@68: // long sketchesMadeT=0; jpayne@68: long sketchesWrittenT=0; jpayne@68: boolean success=false; jpayne@68: } jpayne@68: jpayne@68: // private void writeOutput(ConcurrentHashMap map){ jpayne@68: // ByteStreamWriter tsw=new ByteStreamWriter(ffout); jpayne@68: // tsw.start(); jpayne@68: // KeySetView y=map.keySet(); jpayne@68: // for(Integer x : map.keySet()){ jpayne@68: // SketchHeap smm.heap=map.get(x); jpayne@68: // Sketch s=tool.toSketch(smm.heap); jpayne@68: // tool.write(s, tsw); jpayne@68: // } jpayne@68: // tsw.poisonAndWait(); jpayne@68: // } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Tax Methods ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: private void loadGiToTaxid(){ jpayne@68: Timer t=new Timer(); jpayne@68: outstream.println("Loading gi to taxa translation table."); jpayne@68: GiToTaxid.initialize(giTableFile); jpayne@68: t.stop(); jpayne@68: if(true){ jpayne@68: outstream.println("Time: \t"+t); jpayne@68: Shared.printMemory(); jpayne@68: outstream.println(); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Inner Methods ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Inner Classes ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: private class ProcessThread extends Thread { jpayne@68: jpayne@68: //Constructor jpayne@68: ProcessThread(final ConcurrentReadInputStream cris_, final int tid_){ jpayne@68: cris=cris_; jpayne@68: threadID=tid_; jpayne@68: jpayne@68: smm=new SketchMakerMini(tool, mode, defaultParams.minEntropy, defaultParams.minProb, defaultParams.minQual); jpayne@68: localMap=(mode==PER_TAXA ? new HashMap() : null); jpayne@68: } jpayne@68: jpayne@68: //Called by start() jpayne@68: @Override jpayne@68: public void run(){ jpayne@68: //Do anything necessary prior to processing jpayne@68: jpayne@68: //Process the reads jpayne@68: processInner(); jpayne@68: jpayne@68: //Do anything necessary after processing jpayne@68: bb=null; jpayne@68: jpayne@68: //Indicate successful exit status jpayne@68: success=true; jpayne@68: } jpayne@68: jpayne@68: /** Iterate through the reads */ jpayne@68: void processInner(){ jpayne@68: jpayne@68: //Grab the first ListNum of reads jpayne@68: ListNum ln=cris.nextList(); jpayne@68: //Grab the actual read list from the ListNum jpayne@68: ArrayList reads=(ln!=null ? ln.list : null); jpayne@68: jpayne@68: //Check to ensure pairing is as expected jpayne@68: if(reads!=null && !reads.isEmpty()){ jpayne@68: Read r=reads.get(0); jpayne@68: assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access jpayne@68: } jpayne@68: jpayne@68: // long cntr1=0, cntr2=0, cntr3=0, cntr4=0; jpayne@68: jpayne@68: //As long as there is a nonempty read list... jpayne@68: while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning jpayne@68: // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access jpayne@68: jpayne@68: //Loop through each read in the list jpayne@68: for(int idx=0; idx0){dumpLocalMap();} jpayne@68: jpayne@68: // System.out.println(cntr1+", "+cntr2+", "+cntr3+", "+cntr4); jpayne@68: } jpayne@68: jpayne@68: void processReadPair(Read r1, Read r2){ jpayne@68: jpayne@68: if(mode==PER_TAXA){ jpayne@68: assert(smm.heap==null || smm.heap.size()==0) : smm.heap.genomeSizeBases+", "+smm.heap; jpayne@68: assert(smm.heap==null || smm.heap.genomeSizeBases==0) : smm.heap.genomeSizeBases+", "+smm.heap; jpayne@68: }else if(mode==PER_SEQUENCE || mode==PER_IMG){ jpayne@68: assert(smm.heap==null || smm.heap.size()==0) : smm.heap.genomeSizeBases+", "+smm.heap; jpayne@68: assert(smm.heap==null || smm.heap.genomeSizeBases==0) : smm.heap.genomeSizeBases+", "+smm.heap; jpayne@68: }else{ jpayne@68: assert(smm.heap!=null && smm.heap.capacity()>=targetSketchSize) : targetSketchSize+", "+(smm.heap==null ? "null" : ""+smm.heap.capacity()); jpayne@68: } jpayne@68: jpayne@68: //Track the initial length for statistics jpayne@68: final int initialLength1=r1.length(); jpayne@68: final int initialLength2=r1.mateLength(); jpayne@68: final String rid=r1.id; jpayne@68: jpayne@68: //Increment counters jpayne@68: readsProcessedT+=r1.pairCount(); jpayne@68: basesProcessedT+=initialLength1+initialLength2; jpayne@68: jpayne@68: if(initialLength1=TaxTree.LIFE){break;} jpayne@68: tn=temp; jpayne@68: } jpayne@68: // assert(tn!=null) : imgID+", "+taxID+", "+rid; //123 jpayne@68: if(tn!=null){taxID=tn.id;} jpayne@68: // System.err.println("Node: "+rid+"\n->\n"+tn); jpayne@68: jpayne@68: // assert(taxID>0) : imgID+", "+taxID+", "+rid; //123 jpayne@68: }else{ jpayne@68: imgID=-1; jpayne@68: } jpayne@68: jpayne@68: final long unitSizeBases; jpayne@68: if(sizeList!=null){ jpayne@68: unitSizeBases=taxID<0 ? -1 : sizeList.get(taxID); jpayne@68: }else if(sizeMap!=null){ jpayne@68: unitSizeBases=sizeMap.get(imgID); jpayne@68: }else{ jpayne@68: unitSizeBases=-1; jpayne@68: } jpayne@68: jpayne@68: jpayne@68: if(mode==PER_TAXA){ jpayne@68: if(tossJunk && tn==null){return;} jpayne@68: if(tn!=null){ jpayne@68: if(taxID==0 || (tn.level>taxLevel && tn.level>=TaxTree.PHYLUM)){return;} jpayne@68: TaxNode parent=taxtree.getNode(tn.pid); jpayne@68: if(parent.pid==parent.id){return;} jpayne@68: if(prefilter && unitSizeBases>=0 && unitSizeBases=0 && localMap.containsKey((long)taxID)){ jpayne@68: smm.heap=localMap.get((long)taxID); jpayne@68: assert(smm.heap.taxID==taxID); jpayne@68: }else if(sizeList==null){ jpayne@68: if(smm.heap==null){smm.heap=new SketchHeap(targetSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());} jpayne@68: }else{ jpayne@68: expectedBases=unitSizeBases>-1 ? unitSizeBases : initialLength1+initialLength2; jpayne@68: if(expectedBases-1 ? unitSizeBases : initialLength1+initialLength2; jpayne@68: if(expectedBases=TaxTree.LIFE){break;} jpayne@68: tn2=temp; jpayne@68: } jpayne@68: if(tn2.level<=taxLevel){ jpayne@68: taxID=tn2.id; jpayne@68: } jpayne@68: if(smm.heap.taxID<0){smm.heap.taxID=tn2.id;} jpayne@68: if(smm.heap.taxName()==null && tn2!=null){ jpayne@68: smm.heap.setTaxName(tn2.name); jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 jpayne@68: jpayne@68: assert(smm.heap.taxID<0 || smm.heap.taxName()!=null) : smm.heap.taxID+", "+smm.heap.taxName()+", "+smm.heap.name()+", "+tn; jpayne@68: jpayne@68: //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 jpayne@68: if(initialLength1>=k){smm.processRead(r1);} jpayne@68: //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 jpayne@68: if(initialLength2>=k){smm.processRead(r2);} jpayne@68: //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 jpayne@68: jpayne@68: jpayne@68: if(mode==PER_SEQUENCE){ jpayne@68: manageHeap_perSequence(); jpayne@68: }else if(mode==PER_TAXA || mode==PER_IMG){ jpayne@68: //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 jpayne@68: manageHeap_perTaxa(taxID, imgID, unitSizeBases); jpayne@68: if(localMap.size()>20){dumpLocalMap();} jpayne@68: }else if(mode==ONE_SKETCH/* || mode==PER_HEADER*/){ jpayne@68: //do nothing jpayne@68: }else{ jpayne@68: assert(false) : mode; jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private void manageHeap_perSequence(){ jpayne@68: assert(mode==PER_SEQUENCE); jpayne@68: writeHeap(smm.heap); jpayne@68: } jpayne@68: jpayne@68: private void manageHeap_perTaxa(final int taxID, final long imgID, final long unitSizeBases){ jpayne@68: //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 jpayne@68: assert(mode==PER_TAXA || mode==PER_IMG); jpayne@68: jpayne@68: if(smm.heap.size()<=0 || ((taxID<0 && imgID<0) && smm.heap.genomeSizeKmers=0 || imgID>=0); jpayne@68: final boolean unknown=!known; jpayne@68: final boolean hasSize=(known && (sizeList!=null || sizeMap!=null)); jpayne@68: boolean finished=(unknown || (hasSize && smm.heap.genomeSizeBases>=unitSizeBases)); jpayne@68: jpayne@68: //For some reason, this assertion fired for a single taxID (52271) halfway through sketching RefSeq. jpayne@68: //Likely a transient hardware error. jpayne@68: assert(!finished || smm.heap.genomeSizeBases==unitSizeBases) : finished+", "+unknown+", "+hasSize+", "+(sizeList==null)+"\n" jpayne@68: +taxID+", "+unitSizeBases+", "+smm.heap.genomeSizeBases+", "+smm.heap.genomeSizeKmers; jpayne@68: // if(finished && smm.heap.genomeSizeBases!=unitSizeBases){ jpayne@68: // System.err.println("Warning: tid="+taxID+", finished="+finished+", known="+known+ jpayne@68: // ", smm.heap.genomeSizeBases="+smm.heap.genomeSizeBases+", unitSizeBases="+unitSizeBases); jpayne@68: // } jpayne@68: jpayne@68: smm.heap.taxID=taxID; jpayne@68: smm.heap.imgID=imgID; jpayne@68: jpayne@68: //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 jpayne@68: jpayne@68: final Long key; jpayne@68: if(imgID>-1 && (mode==PER_IMG || taxID<1)){key=imgID;} jpayne@68: else if(taxID>-1){key=(long)taxID;} jpayne@68: else{key=Long.valueOf(nextUnknown.getAndIncrement());} jpayne@68: jpayne@68: //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 jpayne@68: jpayne@68: if(unknown || finished){ jpayne@68: writeHeap(smm.heap); jpayne@68: smm.heap.clear(true); jpayne@68: localMap.remove((long)taxID); jpayne@68: return; jpayne@68: } jpayne@68: jpayne@68: //At this point, the taxID is known and this heap does not constitute the whole taxSize, or the taxSize is unknown. jpayne@68: if(!hasSize){ jpayne@68: final HashMap map=longMaps[(int)(key&MAP_MASK)]; jpayne@68: final SketchHeap old; jpayne@68: synchronized(map){ jpayne@68: old=map.get(key); jpayne@68: if(old==null){ jpayne@68: map.put(key, smm.heap); jpayne@68: }else{ jpayne@68: old.add(smm.heap); jpayne@68: } jpayne@68: } jpayne@68: if(old==null){ jpayne@68: smm.heap=null; jpayne@68: }else{ jpayne@68: smm.heap.clear(true); jpayne@68: assert(!localMap.containsKey((long)taxID)); jpayne@68: } jpayne@68: localMap.remove((long)taxID); jpayne@68: return; jpayne@68: } jpayne@68: jpayne@68: //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 jpayne@68: jpayne@68: { jpayne@68: //At this point, the taxID is and taxSize are known, and this heap does not constitute the whole taxSize. jpayne@68: final int expectedHeapSize=toSketchSize(unitSizeBases, -1, -1, targetSketchSize); jpayne@68: assert(expectedHeapSize>=3) : expectedHeapSize; jpayne@68: // boolean writeHeap=false; jpayne@68: // boolean makeHeap=false; jpayne@68: jpayne@68: SketchHeap local=null; jpayne@68: { jpayne@68: local=localMap.get(key); jpayne@68: assert(local==null || local.taxID==key.intValue()); jpayne@68: if(local==smm.heap){ jpayne@68: //do nothing jpayne@68: smm.heap=null; jpayne@68: }else if(local==null){ jpayne@68: if(expectedHeapSize==smm.heap.capacity()){//Store the current heap jpayne@68: assert(smm.heap.taxID==key.intValue()); jpayne@68: assert(smm.heap.name()!=null); jpayne@68: localMap.put(key, smm.heap);//Safe jpayne@68: smm.heap=null; jpayne@68: }else{//Store a precisely-sized heap jpayne@68: SketchHeap temp=new SketchHeap(expectedHeapSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts()); jpayne@68: temp.add(smm.heap); jpayne@68: assert(temp.taxID==key.intValue()); jpayne@68: assert(temp.name()!=null); jpayne@68: localMap.put(key, temp);//Looks safe jpayne@68: // smm.heap=null; //Not sure which is better jpayne@68: smm.heap.clear(true); jpayne@68: } jpayne@68: }else{ jpayne@68: assert(local.taxID==smm.heap.taxID); jpayne@68: assert(local!=smm.heap); jpayne@68: // assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID+", "+key; //123 jpayne@68: // assert(false) : taxID+", "+key; jpayne@68: local.add(smm.heap); //This WAS the slow line. It should no longer be executed. jpayne@68: // if(local.genomeSizeBases>=unitSizeBases){ jpayne@68: // writeHeap=true; jpayne@68: // localMap.remove(key); jpayne@68: // } jpayne@68: // smm.heap=null; //Not sure which is better jpayne@68: smm.heap.clear(true); jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private void dumpLocalMap(){ jpayne@68: jpayne@68: for(Entry e : localMap.entrySet()){ jpayne@68: boolean writeHeap=false; jpayne@68: final Long key=e.getKey(); jpayne@68: final SketchHeap localHeap=e.getValue(); jpayne@68: final HashMap map=longMaps[(int)(key&MAP_MASK)]; jpayne@68: final SketchHeap old; jpayne@68: jpayne@68: final long unitSizeBases; jpayne@68: if(sizeList!=null){ jpayne@68: unitSizeBases=localHeap.taxID<0 ? -1 : sizeList.get((int)localHeap.taxID); jpayne@68: }else if(sizeMap!=null){ jpayne@68: unitSizeBases=sizeMap.get(localHeap.imgID); jpayne@68: }else{ jpayne@68: unitSizeBases=-1; jpayne@68: } jpayne@68: final int expectedHeapSize=toSketchSize(unitSizeBases, -1, -1, targetSketchSize); jpayne@68: jpayne@68: synchronized(map){ jpayne@68: old=map.get(key); jpayne@68: if(old==null){ jpayne@68: if(expectedHeapSize==localHeap.capacity()){//Store the current heap jpayne@68: map.put(key, localHeap); jpayne@68: }else{//Store a precisely-sized heap jpayne@68: // assert(key.intValue()!=96897) : key+", "+unitSizeBases+", "+", "+sizeList.get(key.intValue()); jpayne@68: assert(expectedHeapSize>0) : expectedHeapSize+", "+key+", "+localHeap.taxID+", "+localHeap.name()+", "+unitSizeBases; jpayne@68: SketchHeap temp=new SketchHeap(expectedHeapSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts()); jpayne@68: temp.add(localHeap); jpayne@68: map.put(key, temp); jpayne@68: } jpayne@68: }else{ jpayne@68: old.add(localHeap); //This was the slow line; should be faster now. jpayne@68: if(old.genomeSizeBases>=unitSizeBases){ jpayne@68: writeHeap=true; jpayne@68: map.remove(key); jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: if(writeHeap){ jpayne@68: assert(old!=null); //For compiler jpayne@68: assert(old.genomeSizeBases>0) : unitSizeBases+", "+old.genomeSizeBases+", "+old.genomeSizeKmers; jpayne@68: assert(old.genomeSizeBases==unitSizeBases) : unitSizeBases+", "+old.genomeSizeBases+", "+old.genomeSizeKmers+", "+old.size()+", "+old.taxID; jpayne@68: writeHeap(old); jpayne@68: } jpayne@68: } jpayne@68: localMap.clear(); jpayne@68: } jpayne@68: jpayne@68: private boolean writeHeap(SketchHeap heap){ jpayne@68: sketchesMadeT++; jpayne@68: // assert(heap.size()>0) : heap.size(); //Not really necessary jpayne@68: boolean written=false; jpayne@68: // assert(heap.heap.size()==heap.set.size()) : heap.heap.size()+", "+heap.set.size(); jpayne@68: if(heap.size()>0 && heap.genomeSizeKmers>=minSizeKmers && heap.sketchSizeEstimate()>0){ jpayne@68: Sketch sketch=new Sketch(heap, true, tool.trackCounts, outMeta); jpayne@68: if(outTaxID>=0 && sketch.taxID<0){sketch.taxID=outTaxID;} jpayne@68: if(outTaxName!=null && sketch.taxName()==null){sketch.setTaxName(outTaxName);} jpayne@68: if(outFname!=null && sketch.fname()==null){sketch.setFname(outFname);} jpayne@68: if(outName0!=null && sketch.name0()==null){sketch.setName0(outName0);} jpayne@68: if(outSpid>=0 && sketch.spid<0){sketch.spid=outSpid;} jpayne@68: if(outImgID>=0 && sketch.imgID<0){sketch.imgID=outImgID;} jpayne@68: if(parseSubunit && sketch.name0()!=null){ jpayne@68: if(outMeta!=null){ jpayne@68: sketch.meta=(ArrayList)sketch.meta.clone(); jpayne@68: }else if(sketch.meta==null){ jpayne@68: if(sketch.name0().contains("SSU_")){ jpayne@68: sketch.addMeta("subunit:ssu"); jpayne@68: }else if(sketch.name0().contains("LSU_")){ jpayne@68: sketch.addMeta("subunit:lsu"); jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: if(tsw!=null){ jpayne@68: final int choice=(sketch.hashCode()&Integer.MAX_VALUE)%files; jpayne@68: sketch.addSSU(); jpayne@68: synchronized(tsw[choice]){ jpayne@68: SketchTool.write(sketch, tsw[choice], bb); jpayne@68: sketchesWrittenT++; jpayne@68: written=true; jpayne@68: } jpayne@68: } jpayne@68: }else{ jpayne@68: heap.clear(true); jpayne@68: } jpayne@68: assert(heap.genomeSizeBases==0); jpayne@68: assert(heap.genomeSequences==0); jpayne@68: return written; jpayne@68: } jpayne@68: jpayne@68: /** Number of reads processed by this thread */ jpayne@68: protected long readsProcessedT=0; jpayne@68: /** Number of bases processed by this thread */ jpayne@68: protected long basesProcessedT=0; jpayne@68: jpayne@68: long sketchesMadeT=0; jpayne@68: long sketchesWrittenT=0; jpayne@68: jpayne@68: /** True only if this thread has completed successfully */ jpayne@68: boolean success=false; jpayne@68: jpayne@68: /** Shared input stream */ jpayne@68: private final ConcurrentReadInputStream cris; jpayne@68: /** Thread ID */ jpayne@68: final int threadID; jpayne@68: private ByteBuilder bb=new ByteBuilder(); jpayne@68: jpayne@68: final SketchMakerMini smm; jpayne@68: final HashMap localMap; jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Fields ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Primary input file path */ jpayne@68: private String in1=null; jpayne@68: /** Secondary input file path */ jpayne@68: private String in2=null; jpayne@68: jpayne@68: /** Primary output file path */ jpayne@68: private String out1=null; jpayne@68: jpayne@68: /** Override input file extension */ jpayne@68: private String extin=null; jpayne@68: jpayne@68: private String giTableFile=null; jpayne@68: private String taxTreeFile=null; jpayne@68: private String accessionFile=null; jpayne@68: private String imgFile=null; jpayne@68: jpayne@68: /*Override metadata */ jpayne@68: String outTaxName=null; jpayne@68: String outFname=null; jpayne@68: String outName0=null; jpayne@68: int outTaxID=-1; jpayne@68: long outSpid=-1; jpayne@68: long outImgID=-1; jpayne@68: ArrayList outMeta=null; jpayne@68: static boolean parseSubunit=false; jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Number of reads processed */ jpayne@68: protected long readsProcessed=0; jpayne@68: /** Number of bases processed */ jpayne@68: protected long basesProcessed=0; jpayne@68: /** Number of bases processed */ jpayne@68: protected long kmersProcessed=0; jpayne@68: /** Number of sketches started */ jpayne@68: protected long sketchesMade=0; jpayne@68: /** Number of sketches written */ jpayne@68: protected long sketchesWritten=0; jpayne@68: jpayne@68: /** Quit after processing this many input reads; -1 means no limit */ jpayne@68: private long maxReads=-1; jpayne@68: jpayne@68: final LongList sizeList; jpayne@68: final HashMap sizeMap; jpayne@68: jpayne@68: private HashMap longMaps[]; jpayne@68: private ByteStreamWriter tsw[]; jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Final Fields ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Primary input file */ jpayne@68: private final FileFormat ffin1; jpayne@68: /** Secondary input file */ jpayne@68: private final FileFormat ffin2; jpayne@68: jpayne@68: /** Primary output files */ jpayne@68: private final FileFormat ffout[]; jpayne@68: /** Number of output files */ jpayne@68: private final int files; jpayne@68: jpayne@68: final int mode; jpayne@68: jpayne@68: private final SketchTool tool; jpayne@68: jpayne@68: /** Don't make sketches from sequences smaller than this */ jpayne@68: final int minSizeBases; jpayne@68: /** Don't make sketches from sequences smaller than this */ jpayne@68: final int minSizeKmers; jpayne@68: jpayne@68: private int taxLevel=1; jpayne@68: private boolean prefilter=false; jpayne@68: private boolean tossJunk=true; jpayne@68: boolean bestEffort=true; jpayne@68: // private final HashMap ssuMap=null; jpayne@68: jpayne@68: private final AtomicInteger nextUnknown=new AtomicInteger(minFakeID); jpayne@68: jpayne@68: private static final int MAP_WAYS=32; jpayne@68: private static final int MAP_MASK=MAP_WAYS-1; jpayne@68: jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Common Fields ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Print status messages to this output stream */ jpayne@68: private PrintStream outstream=System.err; jpayne@68: /** Print verbose messages */ jpayne@68: public static boolean verbose=false; jpayne@68: /** True if an error was encountered */ jpayne@68: public boolean errorState=false; jpayne@68: /** Overwrite existing output files */ jpayne@68: private boolean overwrite=false; jpayne@68: /** Append to existing output files */ jpayne@68: private boolean append=false; jpayne@68: jpayne@68: }