comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/SketchMaker.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 package sketch;
2
3 import java.io.File;
4 import java.io.PrintStream;
5 import java.util.ArrayDeque;
6 import java.util.ArrayList;
7 import java.util.Arrays;
8 import java.util.HashMap;
9 import java.util.Map.Entry;
10 import java.util.concurrent.atomic.AtomicInteger;
11
12 import fileIO.ByteFile;
13 import fileIO.ByteStreamWriter;
14 import fileIO.FileFormat;
15 import fileIO.ReadWrite;
16 import shared.Parse;
17 import shared.Parser;
18 import shared.PreParser;
19 import shared.ReadStats;
20 import shared.Shared;
21 import shared.Timer;
22 import shared.Tools;
23 import stream.ConcurrentReadInputStream;
24 import stream.FASTQ;
25 import stream.FastaReadInputStream;
26 import stream.Read;
27 import structures.ByteBuilder;
28 import structures.ListNum;
29 import structures.LongList;
30 import tax.AccessionToTaxid;
31 import tax.GiToTaxid;
32 import tax.ImgRecord2;
33 import tax.TaxNode;
34 import tax.TaxTree;
35
36 /**
37 * Creates MinHashSketches rapidly.
38 *
39 * @author Brian Bushnell
40 * @date July 6, 2016
41 *
42 */
43 public class SketchMaker extends SketchObject {
44
45 /*--------------------------------------------------------------*/
46 /*---------------- Initialization ----------------*/
47 /*--------------------------------------------------------------*/
48
49 /**
50 * Code entrance from the command line.
51 * @param args Command line arguments
52 */
53 public static void main(String[] args){
54 //Start a timer immediately upon code entrance.
55 Timer t=new Timer();
56
57 final int mode=parseMode(args);
58 if(mode==PER_FILE/* || mode==ONE_SKETCH || mode==PER_HEADER*/ || mode==PER_SEQUENCE){//ONE_SKETCH does not work for multiple input files.
59 recallCompareSketch(args);
60 return;
61 }
62
63 final int oldBufLen=Shared.bufferLen();
64
65 //Create an instance of this class
66 SketchMaker x=new SketchMaker(args);
67
68 //Run the object
69 x.process(t);
70
71 Shared.setBufferLen(oldBufLen);
72
73 //Close the print stream if it was redirected
74 Shared.closeStream(x.outstream);
75 }
76
77 private static void recallCompareSketch(String[] args){
78 ArrayList<String> list=new ArrayList<String>(args.length+1);
79 for(int i=0; i<args.length; i++){
80 if(args[i].startsWith("out=")){
81 args[i]=args[i].replaceFirst("out=", "outsketch=");
82 }
83 list.add(args[i]);
84 }
85 list.add("sketchonly");
86 CompareSketch.main(list.toArray(new String[0]));
87 }
88
89 /**
90 * Constructor.
91 * @param args Command line arguments
92 */
93 public SketchMaker(String[] args){
94
95 {//Preparse block for help, config files, and outstream
96 PreParser pp=new PreParser(args, getClass(), false);
97 args=pp.args;
98 outstream=pp.outstream;
99 }
100
101 //Set shared static variables
102 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
103 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
104
105 //Create a parser object
106 Parser parser=new Parser();
107
108 int minSizeKmers_=100;
109 int files_=1;
110 int mode_=ONE_SKETCH;
111 hashNames=true;
112 boolean setPrefilter=false;
113 defaultParams.printDepth=defaultParams.printDepth2=defaultParams.printVolume=false;
114
115 //Parse each argument
116 for(int i=0; i<args.length; i++){
117 String arg=args[i];
118
119 //Break arguments into their constituent parts, in the form of "a=b"
120 String[] split=arg.split("=");
121 String a=split[0].toLowerCase();
122 String b=split.length>1 ? split[1] : null;
123
124 if(a.equals("verbose")){
125 verbose=Parse.parseBoolean(b);
126 }else if(a.equals("files")){
127 files_=Integer.parseInt(b);
128 }else if(a.equals("minsize")){
129 minSizeKmers_=Parse.parseIntKMG(b);
130 }else if(a.equals("prefilter")){
131 prefilter=Parse.parseBoolean(b);
132 setPrefilter=true;
133 }
134
135 else if(a.equals("name") || a.equals("taxname")){
136 outTaxName=b;
137 }else if(a.equals("name0")){
138 outName0=b;
139 }else if(a.equals("fname")){
140 outFname=b;
141 }else if(a.equals("taxid") || a.equals("tid")){
142 outTaxID=Integer.parseInt(b);
143 }else if(a.equals("spid")){
144 outSpid=Integer.parseInt(b);
145 }else if(a.equals("imgid")){
146 outImgID=Integer.parseInt(b);
147 }else if((a.startsWith("meta_") || a.startsWith("mt_")) && b!=null){
148 if(outMeta==null){outMeta=new ArrayList<String>();}
149 int underscore=a.indexOf('_', 0);
150 outMeta.add(a.substring(underscore+1)+":"+b);
151 }else if(a.equals("parsesubunit")){
152 parseSubunit=Parse.parseBoolean(b);
153 }
154
155 else if(parseMode(arg, a, b)>-1){
156 mode_=parseMode(arg, a, b);
157 }else if(a.equals("parse_flag_goes_here")){
158 long fake_variable=Parse.parseKMG(b);
159 //Set a variable here
160 }
161
162 else if(a.equals("table") || a.equals("gi") || a.equals("gitable")){
163 giTableFile=b;
164 }else if(a.equals("taxtree") || a.equals("tree")){
165 taxTreeFile=b;
166 }else if(a.equals("accession")){
167 accessionFile=b;
168 }else if(a.equalsIgnoreCase("img") || a.equals("imgfile") || a.equals("imgdump")){
169 imgFile=b;
170 }
171
172 else if(a.equals("tossjunk")){
173 tossJunk=Parse.parseBoolean(b);
174 }
175
176 // else if(a.equals("silva")){//Handled by parser
177 // TaxTree.SILVA_MODE=Parse.parseBoolean(b);
178 // }
179
180 else if(a.equals("taxlevel") || a.equals("tl") || a.equals("level") || a.equals("lv")){
181 taxLevel=TaxTree.parseLevel(b);
182 }
183
184 else if(parseSketchFlags(arg, a, b)){
185 //do nothing
186 // System.err.println("a: "+arg);
187 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
188 //do nothing
189 // System.err.println("b: "+arg);
190 }else if(defaultParams.parse(arg, a, b)){
191 //do nothing
192 // System.err.println("c: "+arg);
193 // System.err.println(defaultParams.printDepth);
194 }
195
196 else{
197 outstream.println("Unknown parameter "+args[i]);
198 assert(false) : "Unknown parameter "+args[i];
199 }
200 }
201 if("auto".equalsIgnoreCase(imgFile)){imgFile=TaxTree.defaultImgFile();}
202 if("auto".equalsIgnoreCase(taxTreeFile)){taxTreeFile=TaxTree.defaultTreeFile();}
203 if("auto".equalsIgnoreCase(giTableFile)){giTableFile=TaxTree.defaultTableFile();}
204 if("auto".equalsIgnoreCase(accessionFile)){accessionFile=TaxTree.defaultAccessionFile();}
205
206 outMeta=SketchObject.fixMeta(outMeta);
207
208 postParse();
209 minSizeKmers=minSizeKmers_;
210 mode=mode_;
211
212 minSizeBases=minSizeKmers_+k-1;
213
214 {//Process parser fields
215 Parser.processQuality();
216
217 maxReads=parser.maxReads;
218
219 overwrite=ReadStats.overwrite=parser.overwrite;
220 append=ReadStats.append=parser.append;
221
222 in1=parser.in1;
223 in2=parser.in2;
224
225 out1=parser.out1;
226
227 extin=parser.extin;
228 }
229 files=(out1==null ? 0 : files_);
230
231 if(!setPrefilter && !prefilter && (mode==PER_TAXA || mode==PER_IMG) && in1!=null && !in1.startsWith("stdin") && (AUTOSIZE || AUTOSIZE_LINEAR || targetSketchSize>200)){
232 prefilter=true;
233 System.err.println("Enabled prefilter due to running in per-"+(mode==PER_TAXA ? "taxa" : "IMG")+" mode; override with 'prefilter=f'.");
234 }
235
236 assert(mode!=ONE_SKETCH || files<2) : "Multiple output files are not allowed in single-sketch mode.";
237
238 //Do input file # replacement
239 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
240 in2=in1.replace("#", "2");
241 in1=in1.replace("#", "1");
242 }
243
244 //Adjust interleaved detection based on the number of input files
245 if(in2!=null){
246 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
247 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
248 }
249
250 assert(FastaReadInputStream.settingsOK());
251
252 //Ensure there is an input file
253 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
254
255 //Adjust the number of threads for input file reading
256 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
257 ByteFile.FORCE_MODE_BF2=true;
258 }
259
260 ffout=makeFFArray(out1, files, overwrite, append);
261 if(ffout==null || ffout.length<1){
262 System.err.println("WARNING: No output files were specified; no sketches will be written.\n");
263 }
264
265 // //Ensure output files can be written
266 // if(!Tools.testOutputFiles(overwrite, append, false, out1)){
267 // outstream.println((out1==null)+", "+out1);
268 // throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out1+"\n");
269 // }
270
271 //Ensure input files can be read
272 if(!Tools.testInputFiles(false, true, in1, in2, taxTreeFile, giTableFile, imgFile, SSUMap.r16SFile, SSUMap.r18SFile)){
273 throw new RuntimeException("\nCan't read some input files.\n");
274 }
275
276 //Ensure that no file was specified multiple times
277 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, taxTreeFile, giTableFile, imgFile, SSUMap.r16SFile, SSUMap.r18SFile)){
278 throw new RuntimeException("\nSome file names were specified multiple times.\n");
279 }
280
281 //Create input FileFormat objects
282 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
283 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
284
285 // assert(false) : defaultParams.trackCounts();
286 tool=new SketchTool(targetSketchSize, defaultParams);
287
288 if(taxTreeFile!=null){setTaxtree(taxTreeFile, outstream);}
289
290 if(giTableFile!=null){
291 loadGiToTaxid();
292 }
293 if(accessionFile!=null){
294 AccessionToTaxid.tree=taxtree;
295 outstream.println("Loading accession table.");
296 AccessionToTaxid.load(accessionFile);
297 System.gc();
298 }
299 if(imgFile!=null){
300 TaxTree.loadIMG(imgFile, true, outstream);
301 }
302 SSUMap.load(outstream);
303
304 if(prefilter){
305 if(mode==PER_TAXA){sizeList=sizeList(); sizeMap=null;}
306 else if(mode==PER_IMG){sizeMap=sizeMap(); sizeList=null;}
307 else{
308 assert(false) : "Wrong mode for prefilter; should be taxa or img.";
309 sizeList=null; sizeMap=null;
310 }
311 }else{
312 sizeList=null; sizeMap=null;
313 }
314 }
315
316 private static FileFormat[] makeFFArray(String fname0, int files, boolean overwrite, boolean append){
317 if(files<1 || fname0==null){return null;}
318 String[] fnames=new String[files];
319 FileFormat[] ff=new FileFormat[files];
320 for(int i=0; i<files; i++){
321 String fname=fname0;
322 if(files>1){
323 assert(fname.indexOf('#')>-1) : "Output name requires # symbol for multiple files.";
324 fname=fname.replaceFirst("#", ""+i);
325 }
326 fnames[i]=fname;
327 ff[i]=FileFormat.testOutput(fname, FileFormat.TEXT, null, true, overwrite, append, false);
328 }
329
330 if(!Tools.testOutputFiles(overwrite, append, false, fnames)){
331 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+Arrays.toString(fnames)+"\n");
332 }
333
334 return ff;
335 }
336
337 // private static ByteStreamWriter[] makeTSWArray(FileFormat[] ff){
338 // if(ff==null || ff.length==0){return null;}
339 // ByteStreamWriter[] tsw=new ByteStreamWriter[ff.length];
340 // for(int i=0; i<ff.length; i++){
341 // tsw[i]=new ByteStreamWriter(ff[i]);
342 // tsw[i].start();
343 // }
344 // return tsw;
345 // }
346
347 private static ByteStreamWriter[] makeTSWArray(FileFormat[] ff){
348 if(ff==null || ff.length==0){return null;}
349 ByteStreamWriter[] tsw=new ByteStreamWriter[ff.length];
350 for(int i=0; i<ff.length; i++){
351 tsw[i]=new ByteStreamWriter(ff[i]);
352 tsw[i].start();
353 }
354 return tsw;
355 }
356
357 /*--------------------------------------------------------------*/
358 /*---------------- Outer Methods ----------------*/
359 /*--------------------------------------------------------------*/
360
361 //Makes a list of genome sizes (bases, not kmers) per taxa.
362 private LongList sizeList(){
363 Timer t=new Timer();
364 Shared.GC_BEFORE_PRINT_MEMORY=true;
365 t.start("Making taxa prefilter.");
366 //For some reason this function is very slow, using only ~95% java and ~95% bgzip CPU.
367 //But that's about the same speed as reformat... BBDuk is only a little faster.
368
369 // assert(false) : ReadWrite.USE_PIGZ+", "+ReadWrite.USE_UNPIGZ+", "+ReadWrite.MAX_ZIP_THREADS+", "+Shared.threads();
370
371 LongList sizes=new LongList();
372
373 //Create a read input stream
374 final ConcurrentReadInputStream cris;
375 {
376 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null);
377 // if(defaultParams.samplerate!=1){cris.setSampleRate(defaultParams.samplerate, sampleseed);} //Why would I ever want this here?
378 cris.start(); //Start the stream
379 if(verbose){outstream.println("Started cris");}
380 }
381
382 //Grab the first ListNum of reads
383 ListNum<Read> ln=cris.nextList();
384 //Grab the actual read list from the ListNum
385 ArrayList<Read> reads=(ln!=null ? ln.list : null);
386
387 //As long as there is a nonempty read list...
388 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
389 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
390
391 //Loop through each read in the list
392 for(int idx=0; idx<reads.size(); idx++){
393 final Read r1=reads.get(idx);
394
395 int taxID=-1;
396 TaxNode tn=null;
397 if(taxtree!=null){
398 tn=taxtree.parseNodeFromHeader(r1.id, bestEffort);
399 // System.err.println("A: "+bestEffort+", "+tn);//123
400 while(tn!=null && tn.pid!=tn.id && tn.level<taxLevel){
401 TaxNode temp=taxtree.getNode(tn.pid);
402 if(temp==null || temp.level>=TaxTree.LIFE){break;}
403 tn=temp;
404 }
405 if(tn!=null){taxID=tn.id;}
406 }
407
408 if(taxID>0){
409 long a=r1.length();
410 long b=r1.mateLength();
411 if(a<k){a=0;}
412 if(b<k){b=0;}
413 sizes.increment(taxID, a+b);
414 }
415 }
416
417 //Notify the input stream that the list was used
418 cris.returnList(ln);
419 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
420
421 //Fetch a new list
422 ln=cris.nextList();
423 reads=(ln!=null ? ln.list : null);
424 }
425
426 //Notify the input stream that the final list was used
427 if(ln!=null){
428 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
429 }
430
431 errorState|=ReadWrite.closeStream(cris);
432 // outstream.println("Created prefilter.");
433 t.stop("Created prefilter:");
434 Shared.printMemory();
435 System.err.println();
436
437 return sizes;
438 }
439
440 //Makes a list of genome sizes (bases, not kmers) per img.
441 private HashMap<Long, Long> sizeMap(){
442 Timer t=new Timer();
443 t.start("Making img prefilter.");
444
445 // assert(false) : ReadWrite.USE_PIGZ+", "+ReadWrite.USE_UNPIGZ+", "+ReadWrite.MAX_ZIP_THREADS+", "+Shared.threads();
446
447 HashMap<Long, Long> sizes=new HashMap<Long, Long>();
448
449 //Create a read input stream
450 final ConcurrentReadInputStream cris;
451 {
452 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null);
453 if(defaultParams.samplerate!=1){cris.setSampleRate(defaultParams.samplerate, sampleseed);}
454 cris.start(); //Start the stream
455 if(verbose){outstream.println("Started cris");}
456 }
457
458 //Grab the first ListNum of reads
459 ListNum<Read> ln=cris.nextList();
460 //Grab the actual read list from the ListNum
461 ArrayList<Read> reads=(ln!=null ? ln.list : null);
462
463 //As long as there is a nonempty read list...
464 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
465 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
466
467 //Loop through each read in the list
468 for(int idx=0; idx<reads.size(); idx++){
469 final Read r1=reads.get(idx);
470
471 long imgID=ImgRecord2.parseImgId(r1.id, true);
472 assert(imgID>-1) : "IMG records must start with IMG number followed by a space: "+r1.id;
473
474 if(imgID>0){
475 long a=r1.length();
476 long b=r1.mateLength();
477 if(a<k){a=0;}
478 if(b<k){b=0;}
479
480 if(a+b>0){
481 Long old=sizes.get(imgID);
482 if(old==null){sizes.put(imgID, a+b);}
483 else{sizes.put(imgID, a+b+old);}
484 }
485 }
486 }
487
488 //Notify the input stream that the list was used
489 cris.returnList(ln);
490 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
491
492 //Fetch a new list
493 ln=cris.nextList();
494 reads=(ln!=null ? ln.list : null);
495 }
496
497 //Notify the input stream that the final list was used
498 if(ln!=null){
499 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
500 }
501
502 errorState|=ReadWrite.closeStream(cris);
503 // outstream.println("Created prefilter.");
504 t.stop("Created prefilter:");
505 Shared.printMemory();
506 System.err.println();
507
508 return sizes;
509 }
510
511 /** Create read streams and process all data */
512 void process(Timer t){
513
514 //Reset counters
515 readsProcessed=0;
516 basesProcessed=0;
517
518 if(mode==ONE_SKETCH && !forceDisableMultithreadedFastq && Shared.threads()>2 && ffin1.fastq()){
519 // Shared.setBufferLen(2);
520 singleSketchMT();
521 }else{
522 final int oldLen=Shared.bufferLen();
523 Shared.capBufferLen(ffin1.fastq() ? 40 : 4);
524
525 //Turn off read validation in the input threads to increase speed
526 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
527 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
528
529 //Create a read input stream
530 final ConcurrentReadInputStream cris;
531 {
532 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null);
533 cris.start(); //Start the stream
534 if(verbose){outstream.println("Started cris");}
535 }
536
537 //Process the reads in separate threads
538 spawnThreads(cris);
539
540 if(verbose){outstream.println("Finished; closing streams.");}
541
542 //Write anything that was accumulated by ReadStats
543 errorState|=ReadStats.writeAll();
544 //Close the read streams
545 errorState|=ReadWrite.closeStream(cris);
546
547 //TODO: Write sketch
548
549 //Reset read validation
550 Read.VALIDATE_IN_CONSTRUCTOR=vic;
551 Shared.setBufferLen(oldLen);
552 }
553
554 //Report timing and results
555 t.stop();
556 outstream.println("Wrote "+sketchesWritten+" of "+sketchesMade+" sketches.\n");
557 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
558
559 //Throw an exception of there was an error in a thread
560 if(errorState){
561 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
562 }
563 }
564
565 private void singleSketchMT(){
566 Timer t=new Timer();
567 Sketch sketch=tool.processReadsMT(ffin1, ffin2, Shared.threads(),
568 maxReads, mode, defaultParams.samplerate, defaultParams.minEntropy, defaultParams.minProb, defaultParams.minQual, false);
569
570 if(outTaxID>=0){sketch.taxID=outTaxID;}
571 if(outTaxName!=null){sketch.setTaxName(outTaxName);}
572 if(outFname!=null){sketch.setFname(outFname);}
573 if(outName0!=null){sketch.setName0(outName0);}
574 if(outSpid>=0){sketch.spid=outSpid;}
575 if(outImgID>=0){sketch.imgID=outImgID;}
576 sketch.setMeta(outMeta);
577
578 //Accumulate per-thread statistics
579 readsProcessed+=sketch.genomeSequences;
580 basesProcessed+=sketch.genomeSizeBases;
581 kmersProcessed+=sketch.genomeSizeKmers;
582
583 sketchesMade+=1;
584
585 t.stop("Finished sketching: ");
586 Shared.printMemory();
587
588 if(ffout!=null && ffout.length>0){
589 sketch.addSSU();
590 SketchTool.write(sketch, ffout[0]);
591 sketchesWritten+=1;
592 }
593 }
594
595 /** Spawn process threads */
596 @SuppressWarnings("unchecked")
597 private void spawnThreads(final ConcurrentReadInputStream cris){
598
599 //Do anything necessary prior to processing
600 Timer t=new Timer();
601
602 //Determine how many threads may be used
603 final int threads=Tools.mid(1, Shared.threads(), 14);//Probably capped for memory reasons. Rarely hits 1200% anyway (was 12, bumped to 14).
604
605 //Fill a list with ProcessThreads
606 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
607
608 if(mode==PER_TAXA || mode==PER_IMG){
609 longMaps=new HashMap[MAP_WAYS];
610 for(int i=0; i<longMaps.length; i++){
611 longMaps[i]=new HashMap<Long, SketchHeap>();
612 }
613 }
614
615 if(mode!=ONE_SKETCH){tsw=makeTSWArray(ffout);}
616
617 for(int i=0; i<threads; i++){
618 alpt.add(new ProcessThread(cris, i));
619 }
620
621 //Start the threads
622 for(ProcessThread pt : alpt){
623 pt.start();
624 }
625
626 //Wait for completion of all threads
627 boolean success=true;
628 SketchHeap singleHeap=null;
629 for(ProcessThread pt : alpt){
630
631 //Wait until this thread has terminated
632 while(pt.getState()!=Thread.State.TERMINATED){
633 try {
634 //Attempt a join operation
635 pt.join();
636 } catch (InterruptedException e) {
637 //Potentially handle this, if it is expected to occur
638 e.printStackTrace();
639 }
640 }
641
642 //Accumulate per-thread statistics
643 readsProcessed+=pt.readsProcessedT;
644 basesProcessed+=pt.basesProcessedT;
645 kmersProcessed+=pt.smm.kmersProcessed;
646
647 sketchesMade+=pt.sketchesMadeT;
648 sketchesWritten+=pt.sketchesWrittenT;
649
650 // System.err.println(pt.sketchesMadeT+" "+pt.sketchesWrittenT);
651
652 // System.err.println("pt.readsProcessedT="+pt.readsProcessedT);
653 if(mode==ONE_SKETCH){
654 SketchHeap temp=pt.smm.heap;
655
656 if(temp==null){
657 //do nothing
658 }else if(singleHeap==null){singleHeap=pt.smm.heap;}
659 else{singleHeap.add(pt.smm.heap);}
660
661 if(singleHeap!=null){
662 if(outTaxID>=0){singleHeap.taxID=outTaxID;}
663 if(outTaxName!=null){singleHeap.setTaxName(outTaxName);}
664 if(outFname!=null){singleHeap.setFname(outFname);}
665 if(outName0!=null){singleHeap.setName0(outName0);}
666 if(outImgID>=0){singleHeap.imgID=outImgID;}
667
668 singleHeap.genomeSizeBases=basesProcessed;
669 singleHeap.genomeSizeKmers=kmersProcessed;
670 singleHeap.genomeSequences=readsProcessed;
671 }
672 }
673 success&=pt.success;
674 }
675
676 if(singleHeap!=null){
677 singleHeap.setFname(ffin1.simpleName());
678 if(singleHeap.name0()==null){
679 singleHeap.setName0(ffin1.simpleName());
680 }
681 }
682
683 t.stop("Finished sketching: ");
684 Shared.printMemory();
685
686 if(ffout!=null){
687 if(mode==PER_TAXA || mode==PER_IMG){
688 if(tsw==null){tsw=makeTSWArray(ffout);}
689 success&=writeMap(longMaps);
690 }else if(mode==ONE_SKETCH){
691 Sketch sketch=new Sketch(singleHeap, true, tool.trackCounts, outMeta);
692 if(outTaxID>=0){sketch.taxID=outTaxID;}
693 if(outTaxName!=null){sketch.setTaxName(outTaxName);}
694 if(outFname!=null){sketch.setFname(outFname);}
695 if(outName0!=null){sketch.setName0(outName0);}
696 if(outSpid>=0){sketch.spid=outSpid;}
697 if(outImgID>=0){sketch.imgID=outImgID;}
698 if(ffout!=null && ffout.length>0){
699 sketch.addSSU();
700 SketchTool.write(sketch, ffout[0]);
701 }
702 sketchesMade++;
703 sketchesWritten++;
704 }
705 }
706
707 if(tsw!=null){
708 for(int i=0; i<tsw.length; i++){tsw[i].poisonAndWait();}
709 }
710
711 //Track whether any threads failed
712 if(!success){errorState=true;}
713
714 //Do anything necessary after processing
715
716 }
717
718 /*--------------------------------------------------------------*/
719 /*---------------- I/O Methods ----------------*/
720 /*--------------------------------------------------------------*/
721
722 private boolean writeMap(HashMap<Long, SketchHeap>[] maps){
723
724 //Determine how many threads may be used
725 final int threads=files;
726
727 //Fill a list with WriteThreads
728 ArrayList<WriteThread> alwt=new ArrayList<WriteThread>(threads);
729
730 @SuppressWarnings("unchecked")
731 ArrayDeque<SketchHeap>[] heaps=new ArrayDeque[threads];
732 for(int i=0; i<threads; i++){
733 heaps[i]=new ArrayDeque<SketchHeap>();
734 WriteThread wt=new WriteThread(i, heaps[i]);
735 alwt.add(wt);
736 }
737
738 for(int i=0; i<maps.length; i++){
739 HashMap<Long, SketchHeap> longMap=maps[i];
740 for(Entry<Long, SketchHeap> entry : longMap.entrySet()){
741 // set.remove(entry); This will probably not work
742 SketchHeap entryHeap=entry.getValue();
743 sketchesMade++;
744 if(entryHeap.size()>0 && entryHeap.genomeSizeKmers>=minSizeKmers){
745 heaps[(entry.hashCode()&Integer.MAX_VALUE)%threads].add(entryHeap);
746 }
747 }
748 // intMap.clear();
749 maps[i]=null;
750 }
751
752 //Start the threads
753 for(WriteThread wt : alwt){wt.start();}
754
755 //Wait for completion of all threads
756 boolean success=true;
757 for(WriteThread wt : alwt){
758
759 //Wait until this thread has terminated
760 while(wt.getState()!=Thread.State.TERMINATED){
761 try {
762 //Attempt a join operation
763 wt.join();
764 } catch (InterruptedException e) {
765 //Potentially handle this, if it is expected to occur
766 e.printStackTrace();
767 }
768 }
769 // sketchesMade+=wt.sketchesMadeT;
770 sketchesWritten+=wt.sketchesWrittenT;
771 success&=wt.success;
772 }
773 return success;
774 }
775
776 private class WriteThread extends Thread{
777
778 WriteThread(int tnum_, ArrayDeque<SketchHeap> queue_){
779 tnum=tnum_;
780 queue=queue_;
781 }
782
783 @Override
784 public void run(){
785 success=false;
786 for(SketchHeap polledHeap=queue.poll(); polledHeap!=null; polledHeap=queue.poll()){
787 if(polledHeap.sketchSizeEstimate()>0){
788 Sketch sketch=new Sketch(polledHeap, true, tool.trackCounts, outMeta);
789 if(outTaxID>=0 && sketch.taxID<0){sketch.taxID=outTaxID;}
790 if(outTaxName!=null && sketch.taxName()==null){sketch.setTaxName(outTaxName);}
791 if(outFname!=null && sketch.fname()==null){sketch.setFname(outFname);}
792 if(outName0!=null && sketch.name0()==null){sketch.setName0(outName0);}
793 if(outSpid>=0 && sketch.spid<0){sketch.spid=outSpid;}
794 if(outImgID>=0 && sketch.imgID<0){sketch.imgID=outImgID;}
795 sketch.addSSU();
796 SketchTool.write(sketch, tsw[tnum], bb);
797 sketchesWrittenT++;
798 }
799 }
800 bb=null;
801 success=true;
802 queue=null;
803 }
804
805 ArrayDeque<SketchHeap> queue;
806 final int tnum;
807 private ByteBuilder bb=new ByteBuilder();
808 // long sketchesMadeT=0;
809 long sketchesWrittenT=0;
810 boolean success=false;
811 }
812
813 // private void writeOutput(ConcurrentHashMap<Integer, SketchHeap> map){
814 // ByteStreamWriter tsw=new ByteStreamWriter(ffout);
815 // tsw.start();
816 // KeySetView<Integer, SketchHeap> y=map.keySet();
817 // for(Integer x : map.keySet()){
818 // SketchHeap smm.heap=map.get(x);
819 // Sketch s=tool.toSketch(smm.heap);
820 // tool.write(s, tsw);
821 // }
822 // tsw.poisonAndWait();
823 // }
824
825 /*--------------------------------------------------------------*/
826 /*---------------- Tax Methods ----------------*/
827 /*--------------------------------------------------------------*/
828
829 private void loadGiToTaxid(){
830 Timer t=new Timer();
831 outstream.println("Loading gi to taxa translation table.");
832 GiToTaxid.initialize(giTableFile);
833 t.stop();
834 if(true){
835 outstream.println("Time: \t"+t);
836 Shared.printMemory();
837 outstream.println();
838 }
839 }
840
841 /*--------------------------------------------------------------*/
842 /*---------------- Inner Methods ----------------*/
843 /*--------------------------------------------------------------*/
844
845 /*--------------------------------------------------------------*/
846 /*---------------- Inner Classes ----------------*/
847 /*--------------------------------------------------------------*/
848
849 private class ProcessThread extends Thread {
850
851 //Constructor
852 ProcessThread(final ConcurrentReadInputStream cris_, final int tid_){
853 cris=cris_;
854 threadID=tid_;
855
856 smm=new SketchMakerMini(tool, mode, defaultParams.minEntropy, defaultParams.minProb, defaultParams.minQual);
857 localMap=(mode==PER_TAXA ? new HashMap<Long, SketchHeap>() : null);
858 }
859
860 //Called by start()
861 @Override
862 public void run(){
863 //Do anything necessary prior to processing
864
865 //Process the reads
866 processInner();
867
868 //Do anything necessary after processing
869 bb=null;
870
871 //Indicate successful exit status
872 success=true;
873 }
874
875 /** Iterate through the reads */
876 void processInner(){
877
878 //Grab the first ListNum of reads
879 ListNum<Read> ln=cris.nextList();
880 //Grab the actual read list from the ListNum
881 ArrayList<Read> reads=(ln!=null ? ln.list : null);
882
883 //Check to ensure pairing is as expected
884 if(reads!=null && !reads.isEmpty()){
885 Read r=reads.get(0);
886 assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
887 }
888
889 // long cntr1=0, cntr2=0, cntr3=0, cntr4=0;
890
891 //As long as there is a nonempty read list...
892 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
893 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
894
895 //Loop through each read in the list
896 for(int idx=0; idx<reads.size(); idx++){
897 final Read r1=reads.get(idx);
898 final Read r2=r1.mate;
899
900 processReadPair(r1, r2);
901 }
902
903 //Notify the input stream that the list was used
904 cris.returnList(ln);
905 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
906
907 //Fetch a new list
908 ln=cris.nextList();
909 reads=(ln!=null ? ln.list : null);
910 }
911
912 //Notify the input stream that the final list was used
913 if(ln!=null){
914 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
915 }
916
917 if(mode==PER_TAXA && localMap.size()>0){dumpLocalMap();}
918
919 // System.out.println(cntr1+", "+cntr2+", "+cntr3+", "+cntr4);
920 }
921
922 void processReadPair(Read r1, Read r2){
923
924 if(mode==PER_TAXA){
925 assert(smm.heap==null || smm.heap.size()==0) : smm.heap.genomeSizeBases+", "+smm.heap;
926 assert(smm.heap==null || smm.heap.genomeSizeBases==0) : smm.heap.genomeSizeBases+", "+smm.heap;
927 }else if(mode==PER_SEQUENCE || mode==PER_IMG){
928 assert(smm.heap==null || smm.heap.size()==0) : smm.heap.genomeSizeBases+", "+smm.heap;
929 assert(smm.heap==null || smm.heap.genomeSizeBases==0) : smm.heap.genomeSizeBases+", "+smm.heap;
930 }else{
931 assert(smm.heap!=null && smm.heap.capacity()>=targetSketchSize) : targetSketchSize+", "+(smm.heap==null ? "null" : ""+smm.heap.capacity());
932 }
933
934 //Track the initial length for statistics
935 final int initialLength1=r1.length();
936 final int initialLength2=r1.mateLength();
937 final String rid=r1.id;
938
939 //Increment counters
940 readsProcessedT+=r1.pairCount();
941 basesProcessedT+=initialLength1+initialLength2;
942
943 if(initialLength1<k && initialLength2<k){return;}
944 final long expectedBases;
945 final long imgID;
946 {
947 int taxID=-1;
948 TaxNode tn=null;
949 if(taxtree!=null && (mode==PER_TAXA || mode==PER_IMG || mode==PER_SEQUENCE || ((mode==ONE_SKETCH/* || mode==PER_HEADER*/) && smm.heap.taxID<0))){
950 if(mode==PER_IMG){
951 imgID=ImgRecord2.parseImgId(rid, true);
952 tn=taxtree.imgToTaxNode(imgID);
953 if(tn==null){tn=taxtree.parseNodeFromHeader(rid, bestEffort);}
954 // assert(tn!=null || !rid.startsWith("tid")) : imgID+", "+taxID+", "+rid; //123
955 }else{
956 imgID=ImgRecord2.parseImgId(rid, false);
957 tn=taxtree.parseNodeFromHeader(rid, bestEffort);
958 // assert(tn!=null || !rid.startsWith("tid")) : imgID+", "+taxID+", "+rid; //123
959 }
960 // assert(false) : imgID +", "+rid;
961 // System.err.println("B: "+bestEffort+", "+tn);//123
962 while(tn!=null && tn.pid!=tn.id && tn.level<taxLevel){
963 TaxNode temp=taxtree.getNode(tn.pid);
964 if(temp==null || temp.level>=TaxTree.LIFE){break;}
965 tn=temp;
966 }
967 // assert(tn!=null) : imgID+", "+taxID+", "+rid; //123
968 if(tn!=null){taxID=tn.id;}
969 // System.err.println("Node: "+rid+"\n->\n"+tn);
970
971 // assert(taxID>0) : imgID+", "+taxID+", "+rid; //123
972 }else{
973 imgID=-1;
974 }
975
976 final long unitSizeBases;
977 if(sizeList!=null){
978 unitSizeBases=taxID<0 ? -1 : sizeList.get(taxID);
979 }else if(sizeMap!=null){
980 unitSizeBases=sizeMap.get(imgID);
981 }else{
982 unitSizeBases=-1;
983 }
984
985
986 if(mode==PER_TAXA){
987 if(tossJunk && tn==null){return;}
988 if(tn!=null){
989 if(taxID==0 || (tn.level>taxLevel && tn.level>=TaxTree.PHYLUM)){return;}
990 TaxNode parent=taxtree.getNode(tn.pid);
991 if(parent.pid==parent.id){return;}
992 if(prefilter && unitSizeBases>=0 && unitSizeBases<minSizeBases){return;}
993 if(tossJunk){
994 if(parent.level==TaxTree.LIFE){return;}
995 if(tn.level==TaxTree.NO_RANK){return;}
996 }
997 }
998 }
999
1000 if(mode==PER_SEQUENCE){
1001 expectedBases=initialLength1+initialLength2;
1002 if(expectedBases<minSizeBases){return;}
1003 int expectedSketchSize=toSketchSize(expectedBases, -1, -1, targetSketchSize);
1004 if(expectedSketchSize<minSketchSize){return;}
1005 if(smm.heap==null || smm.heap.capacity()<expectedSketchSize){
1006 smm.heap=new SketchHeap(expectedSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
1007 }
1008 }else if(mode==PER_TAXA){
1009 if(taxID>=0 && localMap.containsKey((long)taxID)){
1010 smm.heap=localMap.get((long)taxID);
1011 assert(smm.heap.taxID==taxID);
1012 }else if(sizeList==null){
1013 if(smm.heap==null){smm.heap=new SketchHeap(targetSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());}
1014 }else{
1015 expectedBases=unitSizeBases>-1 ? unitSizeBases : initialLength1+initialLength2;
1016 if(expectedBases<minSizeBases){return;}
1017 int expectedSketchSize=toSketchSize(expectedBases, -1, -1, targetSketchSize);
1018 if(expectedSketchSize<minSketchSize){return;}
1019 if(smm.heap==null || smm.heap.capacity()!=expectedSketchSize){
1020 smm.heap=new SketchHeap(expectedSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
1021 }
1022 // assert(taxID!=96897) : taxID+", "+expectedBases+", "+expectedSketchSize+", "+unitSizeBases+", "+initialLength1+", "+sizeList.get(taxID);
1023 }
1024 assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID;
1025 }else if(mode==PER_IMG){
1026 if(sizeMap==null){
1027 if(smm.heap==null){smm.heap=new SketchHeap(targetSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());}
1028 }else{
1029 expectedBases=unitSizeBases>-1 ? unitSizeBases : initialLength1+initialLength2;
1030 if(expectedBases<minSizeBases){return;}
1031 int expectedSketchSize=toSketchSize(expectedBases, -1, -1, targetSketchSize);
1032 if(expectedSketchSize<minSketchSize){return;}
1033 if(smm.heap==null || smm.heap.capacity()!=expectedSketchSize){
1034 smm.heap=new SketchHeap(expectedSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
1035 }
1036 }
1037 }
1038
1039 assert(smm.heap!=null) : mode+", "+(sizeList==null);
1040 assert(taxID<0 || smm.heap.taxID<0 || smm.heap.taxID==taxID); //This is important.
1041 assert(imgID<0 || smm.heap.imgID<0 || smm.heap.imgID==imgID);
1042
1043 if(smm.heap.taxID<0){smm.heap.taxID=taxID;}
1044 if(smm.heap.imgID<0){smm.heap.imgID=imgID;}
1045 if(smm.heap.name0()==null){
1046 smm.heap.setName0(rid);
1047 }
1048 if(smm.heap.taxName()==null && tn!=null){
1049 smm.heap.setTaxName(tn.name);
1050 }
1051
1052 if(!bestEffort && tn==null){//Try to get a higher-level node
1053 TaxNode tn2=taxtree.parseNodeFromHeader(rid, true);
1054 if(tn2!=null){
1055 while(tn2!=null && tn2.pid!=tn2.id && tn2.level<taxLevel){
1056 TaxNode temp=taxtree.getNode(tn2.pid);
1057 if(temp==null || temp.level>=TaxTree.LIFE){break;}
1058 tn2=temp;
1059 }
1060 if(tn2.level<=taxLevel){
1061 taxID=tn2.id;
1062 }
1063 if(smm.heap.taxID<0){smm.heap.taxID=tn2.id;}
1064 if(smm.heap.taxName()==null && tn2!=null){
1065 smm.heap.setTaxName(tn2.name);
1066 }
1067 }
1068 }
1069 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
1070
1071 assert(smm.heap.taxID<0 || smm.heap.taxName()!=null) : smm.heap.taxID+", "+smm.heap.taxName()+", "+smm.heap.name()+", "+tn;
1072
1073 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
1074 if(initialLength1>=k){smm.processRead(r1);}
1075 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
1076 if(initialLength2>=k){smm.processRead(r2);}
1077 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
1078
1079
1080 if(mode==PER_SEQUENCE){
1081 manageHeap_perSequence();
1082 }else if(mode==PER_TAXA || mode==PER_IMG){
1083 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
1084 manageHeap_perTaxa(taxID, imgID, unitSizeBases);
1085 if(localMap.size()>20){dumpLocalMap();}
1086 }else if(mode==ONE_SKETCH/* || mode==PER_HEADER*/){
1087 //do nothing
1088 }else{
1089 assert(false) : mode;
1090 }
1091 }
1092 }
1093
1094 private void manageHeap_perSequence(){
1095 assert(mode==PER_SEQUENCE);
1096 writeHeap(smm.heap);
1097 }
1098
1099 private void manageHeap_perTaxa(final int taxID, final long imgID, final long unitSizeBases){
1100 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
1101 assert(mode==PER_TAXA || mode==PER_IMG);
1102
1103 if(smm.heap.size()<=0 || ((taxID<0 && imgID<0) && smm.heap.genomeSizeKmers<minSizeKmers)){//Discard
1104 assert(!localMap.containsKey((long)taxID));
1105 smm.heap.clear(true);
1106 return;
1107 }
1108
1109 //TODO:
1110 //I could, at this point, write to disk if smm.heap.genomeSize==taxSize.
1111 //Or if the taxID is unknown.
1112
1113 final boolean known=(taxID>=0 || imgID>=0);
1114 final boolean unknown=!known;
1115 final boolean hasSize=(known && (sizeList!=null || sizeMap!=null));
1116 boolean finished=(unknown || (hasSize && smm.heap.genomeSizeBases>=unitSizeBases));
1117
1118 //For some reason, this assertion fired for a single taxID (52271) halfway through sketching RefSeq.
1119 //Likely a transient hardware error.
1120 assert(!finished || smm.heap.genomeSizeBases==unitSizeBases) : finished+", "+unknown+", "+hasSize+", "+(sizeList==null)+"\n"
1121 +taxID+", "+unitSizeBases+", "+smm.heap.genomeSizeBases+", "+smm.heap.genomeSizeKmers;
1122 // if(finished && smm.heap.genomeSizeBases!=unitSizeBases){
1123 // System.err.println("Warning: tid="+taxID+", finished="+finished+", known="+known+
1124 // ", smm.heap.genomeSizeBases="+smm.heap.genomeSizeBases+", unitSizeBases="+unitSizeBases);
1125 // }
1126
1127 smm.heap.taxID=taxID;
1128 smm.heap.imgID=imgID;
1129
1130 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
1131
1132 final Long key;
1133 if(imgID>-1 && (mode==PER_IMG || taxID<1)){key=imgID;}
1134 else if(taxID>-1){key=(long)taxID;}
1135 else{key=Long.valueOf(nextUnknown.getAndIncrement());}
1136
1137 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
1138
1139 if(unknown || finished){
1140 writeHeap(smm.heap);
1141 smm.heap.clear(true);
1142 localMap.remove((long)taxID);
1143 return;
1144 }
1145
1146 //At this point, the taxID is known and this heap does not constitute the whole taxSize, or the taxSize is unknown.
1147 if(!hasSize){
1148 final HashMap<Long, SketchHeap> map=longMaps[(int)(key&MAP_MASK)];
1149 final SketchHeap old;
1150 synchronized(map){
1151 old=map.get(key);
1152 if(old==null){
1153 map.put(key, smm.heap);
1154 }else{
1155 old.add(smm.heap);
1156 }
1157 }
1158 if(old==null){
1159 smm.heap=null;
1160 }else{
1161 smm.heap.clear(true);
1162 assert(!localMap.containsKey((long)taxID));
1163 }
1164 localMap.remove((long)taxID);
1165 return;
1166 }
1167
1168 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
1169
1170 {
1171 //At this point, the taxID is and taxSize are known, and this heap does not constitute the whole taxSize.
1172 final int expectedHeapSize=toSketchSize(unitSizeBases, -1, -1, targetSketchSize);
1173 assert(expectedHeapSize>=3) : expectedHeapSize;
1174 // boolean writeHeap=false;
1175 // boolean makeHeap=false;
1176
1177 SketchHeap local=null;
1178 {
1179 local=localMap.get(key);
1180 assert(local==null || local.taxID==key.intValue());
1181 if(local==smm.heap){
1182 //do nothing
1183 smm.heap=null;
1184 }else if(local==null){
1185 if(expectedHeapSize==smm.heap.capacity()){//Store the current heap
1186 assert(smm.heap.taxID==key.intValue());
1187 assert(smm.heap.name()!=null);
1188 localMap.put(key, smm.heap);//Safe
1189 smm.heap=null;
1190 }else{//Store a precisely-sized heap
1191 SketchHeap temp=new SketchHeap(expectedHeapSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
1192 temp.add(smm.heap);
1193 assert(temp.taxID==key.intValue());
1194 assert(temp.name()!=null);
1195 localMap.put(key, temp);//Looks safe
1196 // smm.heap=null; //Not sure which is better
1197 smm.heap.clear(true);
1198 }
1199 }else{
1200 assert(local.taxID==smm.heap.taxID);
1201 assert(local!=smm.heap);
1202 // assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID+", "+key; //123
1203 // assert(false) : taxID+", "+key;
1204 local.add(smm.heap); //This WAS the slow line. It should no longer be executed.
1205 // if(local.genomeSizeBases>=unitSizeBases){
1206 // writeHeap=true;
1207 // localMap.remove(key);
1208 // }
1209 // smm.heap=null; //Not sure which is better
1210 smm.heap.clear(true);
1211 }
1212 }
1213 }
1214 }
1215
1216 private void dumpLocalMap(){
1217
1218 for(Entry<Long, SketchHeap> e : localMap.entrySet()){
1219 boolean writeHeap=false;
1220 final Long key=e.getKey();
1221 final SketchHeap localHeap=e.getValue();
1222 final HashMap<Long, SketchHeap> map=longMaps[(int)(key&MAP_MASK)];
1223 final SketchHeap old;
1224
1225 final long unitSizeBases;
1226 if(sizeList!=null){
1227 unitSizeBases=localHeap.taxID<0 ? -1 : sizeList.get((int)localHeap.taxID);
1228 }else if(sizeMap!=null){
1229 unitSizeBases=sizeMap.get(localHeap.imgID);
1230 }else{
1231 unitSizeBases=-1;
1232 }
1233 final int expectedHeapSize=toSketchSize(unitSizeBases, -1, -1, targetSketchSize);
1234
1235 synchronized(map){
1236 old=map.get(key);
1237 if(old==null){
1238 if(expectedHeapSize==localHeap.capacity()){//Store the current heap
1239 map.put(key, localHeap);
1240 }else{//Store a precisely-sized heap
1241 // assert(key.intValue()!=96897) : key+", "+unitSizeBases+", "+", "+sizeList.get(key.intValue());
1242 assert(expectedHeapSize>0) : expectedHeapSize+", "+key+", "+localHeap.taxID+", "+localHeap.name()+", "+unitSizeBases;
1243 SketchHeap temp=new SketchHeap(expectedHeapSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
1244 temp.add(localHeap);
1245 map.put(key, temp);
1246 }
1247 }else{
1248 old.add(localHeap); //This was the slow line; should be faster now.
1249 if(old.genomeSizeBases>=unitSizeBases){
1250 writeHeap=true;
1251 map.remove(key);
1252 }
1253 }
1254 }
1255
1256 if(writeHeap){
1257 assert(old!=null); //For compiler
1258 assert(old.genomeSizeBases>0) : unitSizeBases+", "+old.genomeSizeBases+", "+old.genomeSizeKmers;
1259 assert(old.genomeSizeBases==unitSizeBases) : unitSizeBases+", "+old.genomeSizeBases+", "+old.genomeSizeKmers+", "+old.size()+", "+old.taxID;
1260 writeHeap(old);
1261 }
1262 }
1263 localMap.clear();
1264 }
1265
1266 private boolean writeHeap(SketchHeap heap){
1267 sketchesMadeT++;
1268 // assert(heap.size()>0) : heap.size(); //Not really necessary
1269 boolean written=false;
1270 // assert(heap.heap.size()==heap.set.size()) : heap.heap.size()+", "+heap.set.size();
1271 if(heap.size()>0 && heap.genomeSizeKmers>=minSizeKmers && heap.sketchSizeEstimate()>0){
1272 Sketch sketch=new Sketch(heap, true, tool.trackCounts, outMeta);
1273 if(outTaxID>=0 && sketch.taxID<0){sketch.taxID=outTaxID;}
1274 if(outTaxName!=null && sketch.taxName()==null){sketch.setTaxName(outTaxName);}
1275 if(outFname!=null && sketch.fname()==null){sketch.setFname(outFname);}
1276 if(outName0!=null && sketch.name0()==null){sketch.setName0(outName0);}
1277 if(outSpid>=0 && sketch.spid<0){sketch.spid=outSpid;}
1278 if(outImgID>=0 && sketch.imgID<0){sketch.imgID=outImgID;}
1279 if(parseSubunit && sketch.name0()!=null){
1280 if(outMeta!=null){
1281 sketch.meta=(ArrayList<String>)sketch.meta.clone();
1282 }else if(sketch.meta==null){
1283 if(sketch.name0().contains("SSU_")){
1284 sketch.addMeta("subunit:ssu");
1285 }else if(sketch.name0().contains("LSU_")){
1286 sketch.addMeta("subunit:lsu");
1287 }
1288 }
1289 }
1290 if(tsw!=null){
1291 final int choice=(sketch.hashCode()&Integer.MAX_VALUE)%files;
1292 sketch.addSSU();
1293 synchronized(tsw[choice]){
1294 SketchTool.write(sketch, tsw[choice], bb);
1295 sketchesWrittenT++;
1296 written=true;
1297 }
1298 }
1299 }else{
1300 heap.clear(true);
1301 }
1302 assert(heap.genomeSizeBases==0);
1303 assert(heap.genomeSequences==0);
1304 return written;
1305 }
1306
1307 /** Number of reads processed by this thread */
1308 protected long readsProcessedT=0;
1309 /** Number of bases processed by this thread */
1310 protected long basesProcessedT=0;
1311
1312 long sketchesMadeT=0;
1313 long sketchesWrittenT=0;
1314
1315 /** True only if this thread has completed successfully */
1316 boolean success=false;
1317
1318 /** Shared input stream */
1319 private final ConcurrentReadInputStream cris;
1320 /** Thread ID */
1321 final int threadID;
1322 private ByteBuilder bb=new ByteBuilder();
1323
1324 final SketchMakerMini smm;
1325 final HashMap<Long, SketchHeap> localMap;
1326 }
1327
1328 /*--------------------------------------------------------------*/
1329 /*---------------- Fields ----------------*/
1330 /*--------------------------------------------------------------*/
1331
1332 /** Primary input file path */
1333 private String in1=null;
1334 /** Secondary input file path */
1335 private String in2=null;
1336
1337 /** Primary output file path */
1338 private String out1=null;
1339
1340 /** Override input file extension */
1341 private String extin=null;
1342
1343 private String giTableFile=null;
1344 private String taxTreeFile=null;
1345 private String accessionFile=null;
1346 private String imgFile=null;
1347
1348 /*Override metadata */
1349 String outTaxName=null;
1350 String outFname=null;
1351 String outName0=null;
1352 int outTaxID=-1;
1353 long outSpid=-1;
1354 long outImgID=-1;
1355 ArrayList<String> outMeta=null;
1356 static boolean parseSubunit=false;
1357
1358 /*--------------------------------------------------------------*/
1359
1360 /** Number of reads processed */
1361 protected long readsProcessed=0;
1362 /** Number of bases processed */
1363 protected long basesProcessed=0;
1364 /** Number of bases processed */
1365 protected long kmersProcessed=0;
1366 /** Number of sketches started */
1367 protected long sketchesMade=0;
1368 /** Number of sketches written */
1369 protected long sketchesWritten=0;
1370
1371 /** Quit after processing this many input reads; -1 means no limit */
1372 private long maxReads=-1;
1373
1374 final LongList sizeList;
1375 final HashMap<Long, Long> sizeMap;
1376
1377 private HashMap<Long, SketchHeap> longMaps[];
1378 private ByteStreamWriter tsw[];
1379
1380 /*--------------------------------------------------------------*/
1381 /*---------------- Final Fields ----------------*/
1382 /*--------------------------------------------------------------*/
1383
1384 /** Primary input file */
1385 private final FileFormat ffin1;
1386 /** Secondary input file */
1387 private final FileFormat ffin2;
1388
1389 /** Primary output files */
1390 private final FileFormat ffout[];
1391 /** Number of output files */
1392 private final int files;
1393
1394 final int mode;
1395
1396 private final SketchTool tool;
1397
1398 /** Don't make sketches from sequences smaller than this */
1399 final int minSizeBases;
1400 /** Don't make sketches from sequences smaller than this */
1401 final int minSizeKmers;
1402
1403 private int taxLevel=1;
1404 private boolean prefilter=false;
1405 private boolean tossJunk=true;
1406 boolean bestEffort=true;
1407 // private final HashMap<Integer, byte[]> ssuMap=null;
1408
1409 private final AtomicInteger nextUnknown=new AtomicInteger(minFakeID);
1410
1411 private static final int MAP_WAYS=32;
1412 private static final int MAP_MASK=MAP_WAYS-1;
1413
1414
1415 /*--------------------------------------------------------------*/
1416 /*---------------- Common Fields ----------------*/
1417 /*--------------------------------------------------------------*/
1418
1419 /** Print status messages to this output stream */
1420 private PrintStream outstream=System.err;
1421 /** Print verbose messages */
1422 public static boolean verbose=false;
1423 /** True if an error was encountered */
1424 public boolean errorState=false;
1425 /** Overwrite existing output files */
1426 private boolean overwrite=false;
1427 /** Append to existing output files */
1428 private boolean append=false;
1429
1430 }