Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/SketchMaker.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 package sketch; | |
2 | |
3 import java.io.File; | |
4 import java.io.PrintStream; | |
5 import java.util.ArrayDeque; | |
6 import java.util.ArrayList; | |
7 import java.util.Arrays; | |
8 import java.util.HashMap; | |
9 import java.util.Map.Entry; | |
10 import java.util.concurrent.atomic.AtomicInteger; | |
11 | |
12 import fileIO.ByteFile; | |
13 import fileIO.ByteStreamWriter; | |
14 import fileIO.FileFormat; | |
15 import fileIO.ReadWrite; | |
16 import shared.Parse; | |
17 import shared.Parser; | |
18 import shared.PreParser; | |
19 import shared.ReadStats; | |
20 import shared.Shared; | |
21 import shared.Timer; | |
22 import shared.Tools; | |
23 import stream.ConcurrentReadInputStream; | |
24 import stream.FASTQ; | |
25 import stream.FastaReadInputStream; | |
26 import stream.Read; | |
27 import structures.ByteBuilder; | |
28 import structures.ListNum; | |
29 import structures.LongList; | |
30 import tax.AccessionToTaxid; | |
31 import tax.GiToTaxid; | |
32 import tax.ImgRecord2; | |
33 import tax.TaxNode; | |
34 import tax.TaxTree; | |
35 | |
36 /** | |
37 * Creates MinHashSketches rapidly. | |
38 * | |
39 * @author Brian Bushnell | |
40 * @date July 6, 2016 | |
41 * | |
42 */ | |
43 public class SketchMaker extends SketchObject { | |
44 | |
45 /*--------------------------------------------------------------*/ | |
46 /*---------------- Initialization ----------------*/ | |
47 /*--------------------------------------------------------------*/ | |
48 | |
49 /** | |
50 * Code entrance from the command line. | |
51 * @param args Command line arguments | |
52 */ | |
53 public static void main(String[] args){ | |
54 //Start a timer immediately upon code entrance. | |
55 Timer t=new Timer(); | |
56 | |
57 final int mode=parseMode(args); | |
58 if(mode==PER_FILE/* || mode==ONE_SKETCH || mode==PER_HEADER*/ || mode==PER_SEQUENCE){//ONE_SKETCH does not work for multiple input files. | |
59 recallCompareSketch(args); | |
60 return; | |
61 } | |
62 | |
63 final int oldBufLen=Shared.bufferLen(); | |
64 | |
65 //Create an instance of this class | |
66 SketchMaker x=new SketchMaker(args); | |
67 | |
68 //Run the object | |
69 x.process(t); | |
70 | |
71 Shared.setBufferLen(oldBufLen); | |
72 | |
73 //Close the print stream if it was redirected | |
74 Shared.closeStream(x.outstream); | |
75 } | |
76 | |
77 private static void recallCompareSketch(String[] args){ | |
78 ArrayList<String> list=new ArrayList<String>(args.length+1); | |
79 for(int i=0; i<args.length; i++){ | |
80 if(args[i].startsWith("out=")){ | |
81 args[i]=args[i].replaceFirst("out=", "outsketch="); | |
82 } | |
83 list.add(args[i]); | |
84 } | |
85 list.add("sketchonly"); | |
86 CompareSketch.main(list.toArray(new String[0])); | |
87 } | |
88 | |
89 /** | |
90 * Constructor. | |
91 * @param args Command line arguments | |
92 */ | |
93 public SketchMaker(String[] args){ | |
94 | |
95 {//Preparse block for help, config files, and outstream | |
96 PreParser pp=new PreParser(args, getClass(), false); | |
97 args=pp.args; | |
98 outstream=pp.outstream; | |
99 } | |
100 | |
101 //Set shared static variables | |
102 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; | |
103 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); | |
104 | |
105 //Create a parser object | |
106 Parser parser=new Parser(); | |
107 | |
108 int minSizeKmers_=100; | |
109 int files_=1; | |
110 int mode_=ONE_SKETCH; | |
111 hashNames=true; | |
112 boolean setPrefilter=false; | |
113 defaultParams.printDepth=defaultParams.printDepth2=defaultParams.printVolume=false; | |
114 | |
115 //Parse each argument | |
116 for(int i=0; i<args.length; i++){ | |
117 String arg=args[i]; | |
118 | |
119 //Break arguments into their constituent parts, in the form of "a=b" | |
120 String[] split=arg.split("="); | |
121 String a=split[0].toLowerCase(); | |
122 String b=split.length>1 ? split[1] : null; | |
123 | |
124 if(a.equals("verbose")){ | |
125 verbose=Parse.parseBoolean(b); | |
126 }else if(a.equals("files")){ | |
127 files_=Integer.parseInt(b); | |
128 }else if(a.equals("minsize")){ | |
129 minSizeKmers_=Parse.parseIntKMG(b); | |
130 }else if(a.equals("prefilter")){ | |
131 prefilter=Parse.parseBoolean(b); | |
132 setPrefilter=true; | |
133 } | |
134 | |
135 else if(a.equals("name") || a.equals("taxname")){ | |
136 outTaxName=b; | |
137 }else if(a.equals("name0")){ | |
138 outName0=b; | |
139 }else if(a.equals("fname")){ | |
140 outFname=b; | |
141 }else if(a.equals("taxid") || a.equals("tid")){ | |
142 outTaxID=Integer.parseInt(b); | |
143 }else if(a.equals("spid")){ | |
144 outSpid=Integer.parseInt(b); | |
145 }else if(a.equals("imgid")){ | |
146 outImgID=Integer.parseInt(b); | |
147 }else if((a.startsWith("meta_") || a.startsWith("mt_")) && b!=null){ | |
148 if(outMeta==null){outMeta=new ArrayList<String>();} | |
149 int underscore=a.indexOf('_', 0); | |
150 outMeta.add(a.substring(underscore+1)+":"+b); | |
151 }else if(a.equals("parsesubunit")){ | |
152 parseSubunit=Parse.parseBoolean(b); | |
153 } | |
154 | |
155 else if(parseMode(arg, a, b)>-1){ | |
156 mode_=parseMode(arg, a, b); | |
157 }else if(a.equals("parse_flag_goes_here")){ | |
158 long fake_variable=Parse.parseKMG(b); | |
159 //Set a variable here | |
160 } | |
161 | |
162 else if(a.equals("table") || a.equals("gi") || a.equals("gitable")){ | |
163 giTableFile=b; | |
164 }else if(a.equals("taxtree") || a.equals("tree")){ | |
165 taxTreeFile=b; | |
166 }else if(a.equals("accession")){ | |
167 accessionFile=b; | |
168 }else if(a.equalsIgnoreCase("img") || a.equals("imgfile") || a.equals("imgdump")){ | |
169 imgFile=b; | |
170 } | |
171 | |
172 else if(a.equals("tossjunk")){ | |
173 tossJunk=Parse.parseBoolean(b); | |
174 } | |
175 | |
176 // else if(a.equals("silva")){//Handled by parser | |
177 // TaxTree.SILVA_MODE=Parse.parseBoolean(b); | |
178 // } | |
179 | |
180 else if(a.equals("taxlevel") || a.equals("tl") || a.equals("level") || a.equals("lv")){ | |
181 taxLevel=TaxTree.parseLevel(b); | |
182 } | |
183 | |
184 else if(parseSketchFlags(arg, a, b)){ | |
185 //do nothing | |
186 // System.err.println("a: "+arg); | |
187 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser | |
188 //do nothing | |
189 // System.err.println("b: "+arg); | |
190 }else if(defaultParams.parse(arg, a, b)){ | |
191 //do nothing | |
192 // System.err.println("c: "+arg); | |
193 // System.err.println(defaultParams.printDepth); | |
194 } | |
195 | |
196 else{ | |
197 outstream.println("Unknown parameter "+args[i]); | |
198 assert(false) : "Unknown parameter "+args[i]; | |
199 } | |
200 } | |
201 if("auto".equalsIgnoreCase(imgFile)){imgFile=TaxTree.defaultImgFile();} | |
202 if("auto".equalsIgnoreCase(taxTreeFile)){taxTreeFile=TaxTree.defaultTreeFile();} | |
203 if("auto".equalsIgnoreCase(giTableFile)){giTableFile=TaxTree.defaultTableFile();} | |
204 if("auto".equalsIgnoreCase(accessionFile)){accessionFile=TaxTree.defaultAccessionFile();} | |
205 | |
206 outMeta=SketchObject.fixMeta(outMeta); | |
207 | |
208 postParse(); | |
209 minSizeKmers=minSizeKmers_; | |
210 mode=mode_; | |
211 | |
212 minSizeBases=minSizeKmers_+k-1; | |
213 | |
214 {//Process parser fields | |
215 Parser.processQuality(); | |
216 | |
217 maxReads=parser.maxReads; | |
218 | |
219 overwrite=ReadStats.overwrite=parser.overwrite; | |
220 append=ReadStats.append=parser.append; | |
221 | |
222 in1=parser.in1; | |
223 in2=parser.in2; | |
224 | |
225 out1=parser.out1; | |
226 | |
227 extin=parser.extin; | |
228 } | |
229 files=(out1==null ? 0 : files_); | |
230 | |
231 if(!setPrefilter && !prefilter && (mode==PER_TAXA || mode==PER_IMG) && in1!=null && !in1.startsWith("stdin") && (AUTOSIZE || AUTOSIZE_LINEAR || targetSketchSize>200)){ | |
232 prefilter=true; | |
233 System.err.println("Enabled prefilter due to running in per-"+(mode==PER_TAXA ? "taxa" : "IMG")+" mode; override with 'prefilter=f'."); | |
234 } | |
235 | |
236 assert(mode!=ONE_SKETCH || files<2) : "Multiple output files are not allowed in single-sketch mode."; | |
237 | |
238 //Do input file # replacement | |
239 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){ | |
240 in2=in1.replace("#", "2"); | |
241 in1=in1.replace("#", "1"); | |
242 } | |
243 | |
244 //Adjust interleaved detection based on the number of input files | |
245 if(in2!=null){ | |
246 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");} | |
247 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; | |
248 } | |
249 | |
250 assert(FastaReadInputStream.settingsOK()); | |
251 | |
252 //Ensure there is an input file | |
253 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} | |
254 | |
255 //Adjust the number of threads for input file reading | |
256 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ | |
257 ByteFile.FORCE_MODE_BF2=true; | |
258 } | |
259 | |
260 ffout=makeFFArray(out1, files, overwrite, append); | |
261 if(ffout==null || ffout.length<1){ | |
262 System.err.println("WARNING: No output files were specified; no sketches will be written.\n"); | |
263 } | |
264 | |
265 // //Ensure output files can be written | |
266 // if(!Tools.testOutputFiles(overwrite, append, false, out1)){ | |
267 // outstream.println((out1==null)+", "+out1); | |
268 // throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out1+"\n"); | |
269 // } | |
270 | |
271 //Ensure input files can be read | |
272 if(!Tools.testInputFiles(false, true, in1, in2, taxTreeFile, giTableFile, imgFile, SSUMap.r16SFile, SSUMap.r18SFile)){ | |
273 throw new RuntimeException("\nCan't read some input files.\n"); | |
274 } | |
275 | |
276 //Ensure that no file was specified multiple times | |
277 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, taxTreeFile, giTableFile, imgFile, SSUMap.r16SFile, SSUMap.r18SFile)){ | |
278 throw new RuntimeException("\nSome file names were specified multiple times.\n"); | |
279 } | |
280 | |
281 //Create input FileFormat objects | |
282 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true); | |
283 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true); | |
284 | |
285 // assert(false) : defaultParams.trackCounts(); | |
286 tool=new SketchTool(targetSketchSize, defaultParams); | |
287 | |
288 if(taxTreeFile!=null){setTaxtree(taxTreeFile, outstream);} | |
289 | |
290 if(giTableFile!=null){ | |
291 loadGiToTaxid(); | |
292 } | |
293 if(accessionFile!=null){ | |
294 AccessionToTaxid.tree=taxtree; | |
295 outstream.println("Loading accession table."); | |
296 AccessionToTaxid.load(accessionFile); | |
297 System.gc(); | |
298 } | |
299 if(imgFile!=null){ | |
300 TaxTree.loadIMG(imgFile, true, outstream); | |
301 } | |
302 SSUMap.load(outstream); | |
303 | |
304 if(prefilter){ | |
305 if(mode==PER_TAXA){sizeList=sizeList(); sizeMap=null;} | |
306 else if(mode==PER_IMG){sizeMap=sizeMap(); sizeList=null;} | |
307 else{ | |
308 assert(false) : "Wrong mode for prefilter; should be taxa or img."; | |
309 sizeList=null; sizeMap=null; | |
310 } | |
311 }else{ | |
312 sizeList=null; sizeMap=null; | |
313 } | |
314 } | |
315 | |
316 private static FileFormat[] makeFFArray(String fname0, int files, boolean overwrite, boolean append){ | |
317 if(files<1 || fname0==null){return null;} | |
318 String[] fnames=new String[files]; | |
319 FileFormat[] ff=new FileFormat[files]; | |
320 for(int i=0; i<files; i++){ | |
321 String fname=fname0; | |
322 if(files>1){ | |
323 assert(fname.indexOf('#')>-1) : "Output name requires # symbol for multiple files."; | |
324 fname=fname.replaceFirst("#", ""+i); | |
325 } | |
326 fnames[i]=fname; | |
327 ff[i]=FileFormat.testOutput(fname, FileFormat.TEXT, null, true, overwrite, append, false); | |
328 } | |
329 | |
330 if(!Tools.testOutputFiles(overwrite, append, false, fnames)){ | |
331 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+Arrays.toString(fnames)+"\n"); | |
332 } | |
333 | |
334 return ff; | |
335 } | |
336 | |
337 // private static ByteStreamWriter[] makeTSWArray(FileFormat[] ff){ | |
338 // if(ff==null || ff.length==0){return null;} | |
339 // ByteStreamWriter[] tsw=new ByteStreamWriter[ff.length]; | |
340 // for(int i=0; i<ff.length; i++){ | |
341 // tsw[i]=new ByteStreamWriter(ff[i]); | |
342 // tsw[i].start(); | |
343 // } | |
344 // return tsw; | |
345 // } | |
346 | |
347 private static ByteStreamWriter[] makeTSWArray(FileFormat[] ff){ | |
348 if(ff==null || ff.length==0){return null;} | |
349 ByteStreamWriter[] tsw=new ByteStreamWriter[ff.length]; | |
350 for(int i=0; i<ff.length; i++){ | |
351 tsw[i]=new ByteStreamWriter(ff[i]); | |
352 tsw[i].start(); | |
353 } | |
354 return tsw; | |
355 } | |
356 | |
357 /*--------------------------------------------------------------*/ | |
358 /*---------------- Outer Methods ----------------*/ | |
359 /*--------------------------------------------------------------*/ | |
360 | |
361 //Makes a list of genome sizes (bases, not kmers) per taxa. | |
362 private LongList sizeList(){ | |
363 Timer t=new Timer(); | |
364 Shared.GC_BEFORE_PRINT_MEMORY=true; | |
365 t.start("Making taxa prefilter."); | |
366 //For some reason this function is very slow, using only ~95% java and ~95% bgzip CPU. | |
367 //But that's about the same speed as reformat... BBDuk is only a little faster. | |
368 | |
369 // assert(false) : ReadWrite.USE_PIGZ+", "+ReadWrite.USE_UNPIGZ+", "+ReadWrite.MAX_ZIP_THREADS+", "+Shared.threads(); | |
370 | |
371 LongList sizes=new LongList(); | |
372 | |
373 //Create a read input stream | |
374 final ConcurrentReadInputStream cris; | |
375 { | |
376 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null); | |
377 // if(defaultParams.samplerate!=1){cris.setSampleRate(defaultParams.samplerate, sampleseed);} //Why would I ever want this here? | |
378 cris.start(); //Start the stream | |
379 if(verbose){outstream.println("Started cris");} | |
380 } | |
381 | |
382 //Grab the first ListNum of reads | |
383 ListNum<Read> ln=cris.nextList(); | |
384 //Grab the actual read list from the ListNum | |
385 ArrayList<Read> reads=(ln!=null ? ln.list : null); | |
386 | |
387 //As long as there is a nonempty read list... | |
388 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning | |
389 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access | |
390 | |
391 //Loop through each read in the list | |
392 for(int idx=0; idx<reads.size(); idx++){ | |
393 final Read r1=reads.get(idx); | |
394 | |
395 int taxID=-1; | |
396 TaxNode tn=null; | |
397 if(taxtree!=null){ | |
398 tn=taxtree.parseNodeFromHeader(r1.id, bestEffort); | |
399 // System.err.println("A: "+bestEffort+", "+tn);//123 | |
400 while(tn!=null && tn.pid!=tn.id && tn.level<taxLevel){ | |
401 TaxNode temp=taxtree.getNode(tn.pid); | |
402 if(temp==null || temp.level>=TaxTree.LIFE){break;} | |
403 tn=temp; | |
404 } | |
405 if(tn!=null){taxID=tn.id;} | |
406 } | |
407 | |
408 if(taxID>0){ | |
409 long a=r1.length(); | |
410 long b=r1.mateLength(); | |
411 if(a<k){a=0;} | |
412 if(b<k){b=0;} | |
413 sizes.increment(taxID, a+b); | |
414 } | |
415 } | |
416 | |
417 //Notify the input stream that the list was used | |
418 cris.returnList(ln); | |
419 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access | |
420 | |
421 //Fetch a new list | |
422 ln=cris.nextList(); | |
423 reads=(ln!=null ? ln.list : null); | |
424 } | |
425 | |
426 //Notify the input stream that the final list was used | |
427 if(ln!=null){ | |
428 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); | |
429 } | |
430 | |
431 errorState|=ReadWrite.closeStream(cris); | |
432 // outstream.println("Created prefilter."); | |
433 t.stop("Created prefilter:"); | |
434 Shared.printMemory(); | |
435 System.err.println(); | |
436 | |
437 return sizes; | |
438 } | |
439 | |
440 //Makes a list of genome sizes (bases, not kmers) per img. | |
441 private HashMap<Long, Long> sizeMap(){ | |
442 Timer t=new Timer(); | |
443 t.start("Making img prefilter."); | |
444 | |
445 // assert(false) : ReadWrite.USE_PIGZ+", "+ReadWrite.USE_UNPIGZ+", "+ReadWrite.MAX_ZIP_THREADS+", "+Shared.threads(); | |
446 | |
447 HashMap<Long, Long> sizes=new HashMap<Long, Long>(); | |
448 | |
449 //Create a read input stream | |
450 final ConcurrentReadInputStream cris; | |
451 { | |
452 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null); | |
453 if(defaultParams.samplerate!=1){cris.setSampleRate(defaultParams.samplerate, sampleseed);} | |
454 cris.start(); //Start the stream | |
455 if(verbose){outstream.println("Started cris");} | |
456 } | |
457 | |
458 //Grab the first ListNum of reads | |
459 ListNum<Read> ln=cris.nextList(); | |
460 //Grab the actual read list from the ListNum | |
461 ArrayList<Read> reads=(ln!=null ? ln.list : null); | |
462 | |
463 //As long as there is a nonempty read list... | |
464 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning | |
465 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access | |
466 | |
467 //Loop through each read in the list | |
468 for(int idx=0; idx<reads.size(); idx++){ | |
469 final Read r1=reads.get(idx); | |
470 | |
471 long imgID=ImgRecord2.parseImgId(r1.id, true); | |
472 assert(imgID>-1) : "IMG records must start with IMG number followed by a space: "+r1.id; | |
473 | |
474 if(imgID>0){ | |
475 long a=r1.length(); | |
476 long b=r1.mateLength(); | |
477 if(a<k){a=0;} | |
478 if(b<k){b=0;} | |
479 | |
480 if(a+b>0){ | |
481 Long old=sizes.get(imgID); | |
482 if(old==null){sizes.put(imgID, a+b);} | |
483 else{sizes.put(imgID, a+b+old);} | |
484 } | |
485 } | |
486 } | |
487 | |
488 //Notify the input stream that the list was used | |
489 cris.returnList(ln); | |
490 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access | |
491 | |
492 //Fetch a new list | |
493 ln=cris.nextList(); | |
494 reads=(ln!=null ? ln.list : null); | |
495 } | |
496 | |
497 //Notify the input stream that the final list was used | |
498 if(ln!=null){ | |
499 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); | |
500 } | |
501 | |
502 errorState|=ReadWrite.closeStream(cris); | |
503 // outstream.println("Created prefilter."); | |
504 t.stop("Created prefilter:"); | |
505 Shared.printMemory(); | |
506 System.err.println(); | |
507 | |
508 return sizes; | |
509 } | |
510 | |
511 /** Create read streams and process all data */ | |
512 void process(Timer t){ | |
513 | |
514 //Reset counters | |
515 readsProcessed=0; | |
516 basesProcessed=0; | |
517 | |
518 if(mode==ONE_SKETCH && !forceDisableMultithreadedFastq && Shared.threads()>2 && ffin1.fastq()){ | |
519 // Shared.setBufferLen(2); | |
520 singleSketchMT(); | |
521 }else{ | |
522 final int oldLen=Shared.bufferLen(); | |
523 Shared.capBufferLen(ffin1.fastq() ? 40 : 4); | |
524 | |
525 //Turn off read validation in the input threads to increase speed | |
526 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; | |
527 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; | |
528 | |
529 //Create a read input stream | |
530 final ConcurrentReadInputStream cris; | |
531 { | |
532 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null); | |
533 cris.start(); //Start the stream | |
534 if(verbose){outstream.println("Started cris");} | |
535 } | |
536 | |
537 //Process the reads in separate threads | |
538 spawnThreads(cris); | |
539 | |
540 if(verbose){outstream.println("Finished; closing streams.");} | |
541 | |
542 //Write anything that was accumulated by ReadStats | |
543 errorState|=ReadStats.writeAll(); | |
544 //Close the read streams | |
545 errorState|=ReadWrite.closeStream(cris); | |
546 | |
547 //TODO: Write sketch | |
548 | |
549 //Reset read validation | |
550 Read.VALIDATE_IN_CONSTRUCTOR=vic; | |
551 Shared.setBufferLen(oldLen); | |
552 } | |
553 | |
554 //Report timing and results | |
555 t.stop(); | |
556 outstream.println("Wrote "+sketchesWritten+" of "+sketchesMade+" sketches.\n"); | |
557 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); | |
558 | |
559 //Throw an exception of there was an error in a thread | |
560 if(errorState){ | |
561 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); | |
562 } | |
563 } | |
564 | |
565 private void singleSketchMT(){ | |
566 Timer t=new Timer(); | |
567 Sketch sketch=tool.processReadsMT(ffin1, ffin2, Shared.threads(), | |
568 maxReads, mode, defaultParams.samplerate, defaultParams.minEntropy, defaultParams.minProb, defaultParams.minQual, false); | |
569 | |
570 if(outTaxID>=0){sketch.taxID=outTaxID;} | |
571 if(outTaxName!=null){sketch.setTaxName(outTaxName);} | |
572 if(outFname!=null){sketch.setFname(outFname);} | |
573 if(outName0!=null){sketch.setName0(outName0);} | |
574 if(outSpid>=0){sketch.spid=outSpid;} | |
575 if(outImgID>=0){sketch.imgID=outImgID;} | |
576 sketch.setMeta(outMeta); | |
577 | |
578 //Accumulate per-thread statistics | |
579 readsProcessed+=sketch.genomeSequences; | |
580 basesProcessed+=sketch.genomeSizeBases; | |
581 kmersProcessed+=sketch.genomeSizeKmers; | |
582 | |
583 sketchesMade+=1; | |
584 | |
585 t.stop("Finished sketching: "); | |
586 Shared.printMemory(); | |
587 | |
588 if(ffout!=null && ffout.length>0){ | |
589 sketch.addSSU(); | |
590 SketchTool.write(sketch, ffout[0]); | |
591 sketchesWritten+=1; | |
592 } | |
593 } | |
594 | |
595 /** Spawn process threads */ | |
596 @SuppressWarnings("unchecked") | |
597 private void spawnThreads(final ConcurrentReadInputStream cris){ | |
598 | |
599 //Do anything necessary prior to processing | |
600 Timer t=new Timer(); | |
601 | |
602 //Determine how many threads may be used | |
603 final int threads=Tools.mid(1, Shared.threads(), 14);//Probably capped for memory reasons. Rarely hits 1200% anyway (was 12, bumped to 14). | |
604 | |
605 //Fill a list with ProcessThreads | |
606 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads); | |
607 | |
608 if(mode==PER_TAXA || mode==PER_IMG){ | |
609 longMaps=new HashMap[MAP_WAYS]; | |
610 for(int i=0; i<longMaps.length; i++){ | |
611 longMaps[i]=new HashMap<Long, SketchHeap>(); | |
612 } | |
613 } | |
614 | |
615 if(mode!=ONE_SKETCH){tsw=makeTSWArray(ffout);} | |
616 | |
617 for(int i=0; i<threads; i++){ | |
618 alpt.add(new ProcessThread(cris, i)); | |
619 } | |
620 | |
621 //Start the threads | |
622 for(ProcessThread pt : alpt){ | |
623 pt.start(); | |
624 } | |
625 | |
626 //Wait for completion of all threads | |
627 boolean success=true; | |
628 SketchHeap singleHeap=null; | |
629 for(ProcessThread pt : alpt){ | |
630 | |
631 //Wait until this thread has terminated | |
632 while(pt.getState()!=Thread.State.TERMINATED){ | |
633 try { | |
634 //Attempt a join operation | |
635 pt.join(); | |
636 } catch (InterruptedException e) { | |
637 //Potentially handle this, if it is expected to occur | |
638 e.printStackTrace(); | |
639 } | |
640 } | |
641 | |
642 //Accumulate per-thread statistics | |
643 readsProcessed+=pt.readsProcessedT; | |
644 basesProcessed+=pt.basesProcessedT; | |
645 kmersProcessed+=pt.smm.kmersProcessed; | |
646 | |
647 sketchesMade+=pt.sketchesMadeT; | |
648 sketchesWritten+=pt.sketchesWrittenT; | |
649 | |
650 // System.err.println(pt.sketchesMadeT+" "+pt.sketchesWrittenT); | |
651 | |
652 // System.err.println("pt.readsProcessedT="+pt.readsProcessedT); | |
653 if(mode==ONE_SKETCH){ | |
654 SketchHeap temp=pt.smm.heap; | |
655 | |
656 if(temp==null){ | |
657 //do nothing | |
658 }else if(singleHeap==null){singleHeap=pt.smm.heap;} | |
659 else{singleHeap.add(pt.smm.heap);} | |
660 | |
661 if(singleHeap!=null){ | |
662 if(outTaxID>=0){singleHeap.taxID=outTaxID;} | |
663 if(outTaxName!=null){singleHeap.setTaxName(outTaxName);} | |
664 if(outFname!=null){singleHeap.setFname(outFname);} | |
665 if(outName0!=null){singleHeap.setName0(outName0);} | |
666 if(outImgID>=0){singleHeap.imgID=outImgID;} | |
667 | |
668 singleHeap.genomeSizeBases=basesProcessed; | |
669 singleHeap.genomeSizeKmers=kmersProcessed; | |
670 singleHeap.genomeSequences=readsProcessed; | |
671 } | |
672 } | |
673 success&=pt.success; | |
674 } | |
675 | |
676 if(singleHeap!=null){ | |
677 singleHeap.setFname(ffin1.simpleName()); | |
678 if(singleHeap.name0()==null){ | |
679 singleHeap.setName0(ffin1.simpleName()); | |
680 } | |
681 } | |
682 | |
683 t.stop("Finished sketching: "); | |
684 Shared.printMemory(); | |
685 | |
686 if(ffout!=null){ | |
687 if(mode==PER_TAXA || mode==PER_IMG){ | |
688 if(tsw==null){tsw=makeTSWArray(ffout);} | |
689 success&=writeMap(longMaps); | |
690 }else if(mode==ONE_SKETCH){ | |
691 Sketch sketch=new Sketch(singleHeap, true, tool.trackCounts, outMeta); | |
692 if(outTaxID>=0){sketch.taxID=outTaxID;} | |
693 if(outTaxName!=null){sketch.setTaxName(outTaxName);} | |
694 if(outFname!=null){sketch.setFname(outFname);} | |
695 if(outName0!=null){sketch.setName0(outName0);} | |
696 if(outSpid>=0){sketch.spid=outSpid;} | |
697 if(outImgID>=0){sketch.imgID=outImgID;} | |
698 if(ffout!=null && ffout.length>0){ | |
699 sketch.addSSU(); | |
700 SketchTool.write(sketch, ffout[0]); | |
701 } | |
702 sketchesMade++; | |
703 sketchesWritten++; | |
704 } | |
705 } | |
706 | |
707 if(tsw!=null){ | |
708 for(int i=0; i<tsw.length; i++){tsw[i].poisonAndWait();} | |
709 } | |
710 | |
711 //Track whether any threads failed | |
712 if(!success){errorState=true;} | |
713 | |
714 //Do anything necessary after processing | |
715 | |
716 } | |
717 | |
718 /*--------------------------------------------------------------*/ | |
719 /*---------------- I/O Methods ----------------*/ | |
720 /*--------------------------------------------------------------*/ | |
721 | |
722 private boolean writeMap(HashMap<Long, SketchHeap>[] maps){ | |
723 | |
724 //Determine how many threads may be used | |
725 final int threads=files; | |
726 | |
727 //Fill a list with WriteThreads | |
728 ArrayList<WriteThread> alwt=new ArrayList<WriteThread>(threads); | |
729 | |
730 @SuppressWarnings("unchecked") | |
731 ArrayDeque<SketchHeap>[] heaps=new ArrayDeque[threads]; | |
732 for(int i=0; i<threads; i++){ | |
733 heaps[i]=new ArrayDeque<SketchHeap>(); | |
734 WriteThread wt=new WriteThread(i, heaps[i]); | |
735 alwt.add(wt); | |
736 } | |
737 | |
738 for(int i=0; i<maps.length; i++){ | |
739 HashMap<Long, SketchHeap> longMap=maps[i]; | |
740 for(Entry<Long, SketchHeap> entry : longMap.entrySet()){ | |
741 // set.remove(entry); This will probably not work | |
742 SketchHeap entryHeap=entry.getValue(); | |
743 sketchesMade++; | |
744 if(entryHeap.size()>0 && entryHeap.genomeSizeKmers>=minSizeKmers){ | |
745 heaps[(entry.hashCode()&Integer.MAX_VALUE)%threads].add(entryHeap); | |
746 } | |
747 } | |
748 // intMap.clear(); | |
749 maps[i]=null; | |
750 } | |
751 | |
752 //Start the threads | |
753 for(WriteThread wt : alwt){wt.start();} | |
754 | |
755 //Wait for completion of all threads | |
756 boolean success=true; | |
757 for(WriteThread wt : alwt){ | |
758 | |
759 //Wait until this thread has terminated | |
760 while(wt.getState()!=Thread.State.TERMINATED){ | |
761 try { | |
762 //Attempt a join operation | |
763 wt.join(); | |
764 } catch (InterruptedException e) { | |
765 //Potentially handle this, if it is expected to occur | |
766 e.printStackTrace(); | |
767 } | |
768 } | |
769 // sketchesMade+=wt.sketchesMadeT; | |
770 sketchesWritten+=wt.sketchesWrittenT; | |
771 success&=wt.success; | |
772 } | |
773 return success; | |
774 } | |
775 | |
776 private class WriteThread extends Thread{ | |
777 | |
778 WriteThread(int tnum_, ArrayDeque<SketchHeap> queue_){ | |
779 tnum=tnum_; | |
780 queue=queue_; | |
781 } | |
782 | |
783 @Override | |
784 public void run(){ | |
785 success=false; | |
786 for(SketchHeap polledHeap=queue.poll(); polledHeap!=null; polledHeap=queue.poll()){ | |
787 if(polledHeap.sketchSizeEstimate()>0){ | |
788 Sketch sketch=new Sketch(polledHeap, true, tool.trackCounts, outMeta); | |
789 if(outTaxID>=0 && sketch.taxID<0){sketch.taxID=outTaxID;} | |
790 if(outTaxName!=null && sketch.taxName()==null){sketch.setTaxName(outTaxName);} | |
791 if(outFname!=null && sketch.fname()==null){sketch.setFname(outFname);} | |
792 if(outName0!=null && sketch.name0()==null){sketch.setName0(outName0);} | |
793 if(outSpid>=0 && sketch.spid<0){sketch.spid=outSpid;} | |
794 if(outImgID>=0 && sketch.imgID<0){sketch.imgID=outImgID;} | |
795 sketch.addSSU(); | |
796 SketchTool.write(sketch, tsw[tnum], bb); | |
797 sketchesWrittenT++; | |
798 } | |
799 } | |
800 bb=null; | |
801 success=true; | |
802 queue=null; | |
803 } | |
804 | |
805 ArrayDeque<SketchHeap> queue; | |
806 final int tnum; | |
807 private ByteBuilder bb=new ByteBuilder(); | |
808 // long sketchesMadeT=0; | |
809 long sketchesWrittenT=0; | |
810 boolean success=false; | |
811 } | |
812 | |
813 // private void writeOutput(ConcurrentHashMap<Integer, SketchHeap> map){ | |
814 // ByteStreamWriter tsw=new ByteStreamWriter(ffout); | |
815 // tsw.start(); | |
816 // KeySetView<Integer, SketchHeap> y=map.keySet(); | |
817 // for(Integer x : map.keySet()){ | |
818 // SketchHeap smm.heap=map.get(x); | |
819 // Sketch s=tool.toSketch(smm.heap); | |
820 // tool.write(s, tsw); | |
821 // } | |
822 // tsw.poisonAndWait(); | |
823 // } | |
824 | |
825 /*--------------------------------------------------------------*/ | |
826 /*---------------- Tax Methods ----------------*/ | |
827 /*--------------------------------------------------------------*/ | |
828 | |
829 private void loadGiToTaxid(){ | |
830 Timer t=new Timer(); | |
831 outstream.println("Loading gi to taxa translation table."); | |
832 GiToTaxid.initialize(giTableFile); | |
833 t.stop(); | |
834 if(true){ | |
835 outstream.println("Time: \t"+t); | |
836 Shared.printMemory(); | |
837 outstream.println(); | |
838 } | |
839 } | |
840 | |
841 /*--------------------------------------------------------------*/ | |
842 /*---------------- Inner Methods ----------------*/ | |
843 /*--------------------------------------------------------------*/ | |
844 | |
845 /*--------------------------------------------------------------*/ | |
846 /*---------------- Inner Classes ----------------*/ | |
847 /*--------------------------------------------------------------*/ | |
848 | |
849 private class ProcessThread extends Thread { | |
850 | |
851 //Constructor | |
852 ProcessThread(final ConcurrentReadInputStream cris_, final int tid_){ | |
853 cris=cris_; | |
854 threadID=tid_; | |
855 | |
856 smm=new SketchMakerMini(tool, mode, defaultParams.minEntropy, defaultParams.minProb, defaultParams.minQual); | |
857 localMap=(mode==PER_TAXA ? new HashMap<Long, SketchHeap>() : null); | |
858 } | |
859 | |
860 //Called by start() | |
861 @Override | |
862 public void run(){ | |
863 //Do anything necessary prior to processing | |
864 | |
865 //Process the reads | |
866 processInner(); | |
867 | |
868 //Do anything necessary after processing | |
869 bb=null; | |
870 | |
871 //Indicate successful exit status | |
872 success=true; | |
873 } | |
874 | |
875 /** Iterate through the reads */ | |
876 void processInner(){ | |
877 | |
878 //Grab the first ListNum of reads | |
879 ListNum<Read> ln=cris.nextList(); | |
880 //Grab the actual read list from the ListNum | |
881 ArrayList<Read> reads=(ln!=null ? ln.list : null); | |
882 | |
883 //Check to ensure pairing is as expected | |
884 if(reads!=null && !reads.isEmpty()){ | |
885 Read r=reads.get(0); | |
886 assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access | |
887 } | |
888 | |
889 // long cntr1=0, cntr2=0, cntr3=0, cntr4=0; | |
890 | |
891 //As long as there is a nonempty read list... | |
892 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning | |
893 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access | |
894 | |
895 //Loop through each read in the list | |
896 for(int idx=0; idx<reads.size(); idx++){ | |
897 final Read r1=reads.get(idx); | |
898 final Read r2=r1.mate; | |
899 | |
900 processReadPair(r1, r2); | |
901 } | |
902 | |
903 //Notify the input stream that the list was used | |
904 cris.returnList(ln); | |
905 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access | |
906 | |
907 //Fetch a new list | |
908 ln=cris.nextList(); | |
909 reads=(ln!=null ? ln.list : null); | |
910 } | |
911 | |
912 //Notify the input stream that the final list was used | |
913 if(ln!=null){ | |
914 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); | |
915 } | |
916 | |
917 if(mode==PER_TAXA && localMap.size()>0){dumpLocalMap();} | |
918 | |
919 // System.out.println(cntr1+", "+cntr2+", "+cntr3+", "+cntr4); | |
920 } | |
921 | |
922 void processReadPair(Read r1, Read r2){ | |
923 | |
924 if(mode==PER_TAXA){ | |
925 assert(smm.heap==null || smm.heap.size()==0) : smm.heap.genomeSizeBases+", "+smm.heap; | |
926 assert(smm.heap==null || smm.heap.genomeSizeBases==0) : smm.heap.genomeSizeBases+", "+smm.heap; | |
927 }else if(mode==PER_SEQUENCE || mode==PER_IMG){ | |
928 assert(smm.heap==null || smm.heap.size()==0) : smm.heap.genomeSizeBases+", "+smm.heap; | |
929 assert(smm.heap==null || smm.heap.genomeSizeBases==0) : smm.heap.genomeSizeBases+", "+smm.heap; | |
930 }else{ | |
931 assert(smm.heap!=null && smm.heap.capacity()>=targetSketchSize) : targetSketchSize+", "+(smm.heap==null ? "null" : ""+smm.heap.capacity()); | |
932 } | |
933 | |
934 //Track the initial length for statistics | |
935 final int initialLength1=r1.length(); | |
936 final int initialLength2=r1.mateLength(); | |
937 final String rid=r1.id; | |
938 | |
939 //Increment counters | |
940 readsProcessedT+=r1.pairCount(); | |
941 basesProcessedT+=initialLength1+initialLength2; | |
942 | |
943 if(initialLength1<k && initialLength2<k){return;} | |
944 final long expectedBases; | |
945 final long imgID; | |
946 { | |
947 int taxID=-1; | |
948 TaxNode tn=null; | |
949 if(taxtree!=null && (mode==PER_TAXA || mode==PER_IMG || mode==PER_SEQUENCE || ((mode==ONE_SKETCH/* || mode==PER_HEADER*/) && smm.heap.taxID<0))){ | |
950 if(mode==PER_IMG){ | |
951 imgID=ImgRecord2.parseImgId(rid, true); | |
952 tn=taxtree.imgToTaxNode(imgID); | |
953 if(tn==null){tn=taxtree.parseNodeFromHeader(rid, bestEffort);} | |
954 // assert(tn!=null || !rid.startsWith("tid")) : imgID+", "+taxID+", "+rid; //123 | |
955 }else{ | |
956 imgID=ImgRecord2.parseImgId(rid, false); | |
957 tn=taxtree.parseNodeFromHeader(rid, bestEffort); | |
958 // assert(tn!=null || !rid.startsWith("tid")) : imgID+", "+taxID+", "+rid; //123 | |
959 } | |
960 // assert(false) : imgID +", "+rid; | |
961 // System.err.println("B: "+bestEffort+", "+tn);//123 | |
962 while(tn!=null && tn.pid!=tn.id && tn.level<taxLevel){ | |
963 TaxNode temp=taxtree.getNode(tn.pid); | |
964 if(temp==null || temp.level>=TaxTree.LIFE){break;} | |
965 tn=temp; | |
966 } | |
967 // assert(tn!=null) : imgID+", "+taxID+", "+rid; //123 | |
968 if(tn!=null){taxID=tn.id;} | |
969 // System.err.println("Node: "+rid+"\n->\n"+tn); | |
970 | |
971 // assert(taxID>0) : imgID+", "+taxID+", "+rid; //123 | |
972 }else{ | |
973 imgID=-1; | |
974 } | |
975 | |
976 final long unitSizeBases; | |
977 if(sizeList!=null){ | |
978 unitSizeBases=taxID<0 ? -1 : sizeList.get(taxID); | |
979 }else if(sizeMap!=null){ | |
980 unitSizeBases=sizeMap.get(imgID); | |
981 }else{ | |
982 unitSizeBases=-1; | |
983 } | |
984 | |
985 | |
986 if(mode==PER_TAXA){ | |
987 if(tossJunk && tn==null){return;} | |
988 if(tn!=null){ | |
989 if(taxID==0 || (tn.level>taxLevel && tn.level>=TaxTree.PHYLUM)){return;} | |
990 TaxNode parent=taxtree.getNode(tn.pid); | |
991 if(parent.pid==parent.id){return;} | |
992 if(prefilter && unitSizeBases>=0 && unitSizeBases<minSizeBases){return;} | |
993 if(tossJunk){ | |
994 if(parent.level==TaxTree.LIFE){return;} | |
995 if(tn.level==TaxTree.NO_RANK){return;} | |
996 } | |
997 } | |
998 } | |
999 | |
1000 if(mode==PER_SEQUENCE){ | |
1001 expectedBases=initialLength1+initialLength2; | |
1002 if(expectedBases<minSizeBases){return;} | |
1003 int expectedSketchSize=toSketchSize(expectedBases, -1, -1, targetSketchSize); | |
1004 if(expectedSketchSize<minSketchSize){return;} | |
1005 if(smm.heap==null || smm.heap.capacity()<expectedSketchSize){ | |
1006 smm.heap=new SketchHeap(expectedSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts()); | |
1007 } | |
1008 }else if(mode==PER_TAXA){ | |
1009 if(taxID>=0 && localMap.containsKey((long)taxID)){ | |
1010 smm.heap=localMap.get((long)taxID); | |
1011 assert(smm.heap.taxID==taxID); | |
1012 }else if(sizeList==null){ | |
1013 if(smm.heap==null){smm.heap=new SketchHeap(targetSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());} | |
1014 }else{ | |
1015 expectedBases=unitSizeBases>-1 ? unitSizeBases : initialLength1+initialLength2; | |
1016 if(expectedBases<minSizeBases){return;} | |
1017 int expectedSketchSize=toSketchSize(expectedBases, -1, -1, targetSketchSize); | |
1018 if(expectedSketchSize<minSketchSize){return;} | |
1019 if(smm.heap==null || smm.heap.capacity()!=expectedSketchSize){ | |
1020 smm.heap=new SketchHeap(expectedSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts()); | |
1021 } | |
1022 // assert(taxID!=96897) : taxID+", "+expectedBases+", "+expectedSketchSize+", "+unitSizeBases+", "+initialLength1+", "+sizeList.get(taxID); | |
1023 } | |
1024 assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; | |
1025 }else if(mode==PER_IMG){ | |
1026 if(sizeMap==null){ | |
1027 if(smm.heap==null){smm.heap=new SketchHeap(targetSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());} | |
1028 }else{ | |
1029 expectedBases=unitSizeBases>-1 ? unitSizeBases : initialLength1+initialLength2; | |
1030 if(expectedBases<minSizeBases){return;} | |
1031 int expectedSketchSize=toSketchSize(expectedBases, -1, -1, targetSketchSize); | |
1032 if(expectedSketchSize<minSketchSize){return;} | |
1033 if(smm.heap==null || smm.heap.capacity()!=expectedSketchSize){ | |
1034 smm.heap=new SketchHeap(expectedSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts()); | |
1035 } | |
1036 } | |
1037 } | |
1038 | |
1039 assert(smm.heap!=null) : mode+", "+(sizeList==null); | |
1040 assert(taxID<0 || smm.heap.taxID<0 || smm.heap.taxID==taxID); //This is important. | |
1041 assert(imgID<0 || smm.heap.imgID<0 || smm.heap.imgID==imgID); | |
1042 | |
1043 if(smm.heap.taxID<0){smm.heap.taxID=taxID;} | |
1044 if(smm.heap.imgID<0){smm.heap.imgID=imgID;} | |
1045 if(smm.heap.name0()==null){ | |
1046 smm.heap.setName0(rid); | |
1047 } | |
1048 if(smm.heap.taxName()==null && tn!=null){ | |
1049 smm.heap.setTaxName(tn.name); | |
1050 } | |
1051 | |
1052 if(!bestEffort && tn==null){//Try to get a higher-level node | |
1053 TaxNode tn2=taxtree.parseNodeFromHeader(rid, true); | |
1054 if(tn2!=null){ | |
1055 while(tn2!=null && tn2.pid!=tn2.id && tn2.level<taxLevel){ | |
1056 TaxNode temp=taxtree.getNode(tn2.pid); | |
1057 if(temp==null || temp.level>=TaxTree.LIFE){break;} | |
1058 tn2=temp; | |
1059 } | |
1060 if(tn2.level<=taxLevel){ | |
1061 taxID=tn2.id; | |
1062 } | |
1063 if(smm.heap.taxID<0){smm.heap.taxID=tn2.id;} | |
1064 if(smm.heap.taxName()==null && tn2!=null){ | |
1065 smm.heap.setTaxName(tn2.name); | |
1066 } | |
1067 } | |
1068 } | |
1069 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 | |
1070 | |
1071 assert(smm.heap.taxID<0 || smm.heap.taxName()!=null) : smm.heap.taxID+", "+smm.heap.taxName()+", "+smm.heap.name()+", "+tn; | |
1072 | |
1073 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 | |
1074 if(initialLength1>=k){smm.processRead(r1);} | |
1075 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 | |
1076 if(initialLength2>=k){smm.processRead(r2);} | |
1077 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 | |
1078 | |
1079 | |
1080 if(mode==PER_SEQUENCE){ | |
1081 manageHeap_perSequence(); | |
1082 }else if(mode==PER_TAXA || mode==PER_IMG){ | |
1083 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 | |
1084 manageHeap_perTaxa(taxID, imgID, unitSizeBases); | |
1085 if(localMap.size()>20){dumpLocalMap();} | |
1086 }else if(mode==ONE_SKETCH/* || mode==PER_HEADER*/){ | |
1087 //do nothing | |
1088 }else{ | |
1089 assert(false) : mode; | |
1090 } | |
1091 } | |
1092 } | |
1093 | |
1094 private void manageHeap_perSequence(){ | |
1095 assert(mode==PER_SEQUENCE); | |
1096 writeHeap(smm.heap); | |
1097 } | |
1098 | |
1099 private void manageHeap_perTaxa(final int taxID, final long imgID, final long unitSizeBases){ | |
1100 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 | |
1101 assert(mode==PER_TAXA || mode==PER_IMG); | |
1102 | |
1103 if(smm.heap.size()<=0 || ((taxID<0 && imgID<0) && smm.heap.genomeSizeKmers<minSizeKmers)){//Discard | |
1104 assert(!localMap.containsKey((long)taxID)); | |
1105 smm.heap.clear(true); | |
1106 return; | |
1107 } | |
1108 | |
1109 //TODO: | |
1110 //I could, at this point, write to disk if smm.heap.genomeSize==taxSize. | |
1111 //Or if the taxID is unknown. | |
1112 | |
1113 final boolean known=(taxID>=0 || imgID>=0); | |
1114 final boolean unknown=!known; | |
1115 final boolean hasSize=(known && (sizeList!=null || sizeMap!=null)); | |
1116 boolean finished=(unknown || (hasSize && smm.heap.genomeSizeBases>=unitSizeBases)); | |
1117 | |
1118 //For some reason, this assertion fired for a single taxID (52271) halfway through sketching RefSeq. | |
1119 //Likely a transient hardware error. | |
1120 assert(!finished || smm.heap.genomeSizeBases==unitSizeBases) : finished+", "+unknown+", "+hasSize+", "+(sizeList==null)+"\n" | |
1121 +taxID+", "+unitSizeBases+", "+smm.heap.genomeSizeBases+", "+smm.heap.genomeSizeKmers; | |
1122 // if(finished && smm.heap.genomeSizeBases!=unitSizeBases){ | |
1123 // System.err.println("Warning: tid="+taxID+", finished="+finished+", known="+known+ | |
1124 // ", smm.heap.genomeSizeBases="+smm.heap.genomeSizeBases+", unitSizeBases="+unitSizeBases); | |
1125 // } | |
1126 | |
1127 smm.heap.taxID=taxID; | |
1128 smm.heap.imgID=imgID; | |
1129 | |
1130 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 | |
1131 | |
1132 final Long key; | |
1133 if(imgID>-1 && (mode==PER_IMG || taxID<1)){key=imgID;} | |
1134 else if(taxID>-1){key=(long)taxID;} | |
1135 else{key=Long.valueOf(nextUnknown.getAndIncrement());} | |
1136 | |
1137 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 | |
1138 | |
1139 if(unknown || finished){ | |
1140 writeHeap(smm.heap); | |
1141 smm.heap.clear(true); | |
1142 localMap.remove((long)taxID); | |
1143 return; | |
1144 } | |
1145 | |
1146 //At this point, the taxID is known and this heap does not constitute the whole taxSize, or the taxSize is unknown. | |
1147 if(!hasSize){ | |
1148 final HashMap<Long, SketchHeap> map=longMaps[(int)(key&MAP_MASK)]; | |
1149 final SketchHeap old; | |
1150 synchronized(map){ | |
1151 old=map.get(key); | |
1152 if(old==null){ | |
1153 map.put(key, smm.heap); | |
1154 }else{ | |
1155 old.add(smm.heap); | |
1156 } | |
1157 } | |
1158 if(old==null){ | |
1159 smm.heap=null; | |
1160 }else{ | |
1161 smm.heap.clear(true); | |
1162 assert(!localMap.containsKey((long)taxID)); | |
1163 } | |
1164 localMap.remove((long)taxID); | |
1165 return; | |
1166 } | |
1167 | |
1168 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123 | |
1169 | |
1170 { | |
1171 //At this point, the taxID is and taxSize are known, and this heap does not constitute the whole taxSize. | |
1172 final int expectedHeapSize=toSketchSize(unitSizeBases, -1, -1, targetSketchSize); | |
1173 assert(expectedHeapSize>=3) : expectedHeapSize; | |
1174 // boolean writeHeap=false; | |
1175 // boolean makeHeap=false; | |
1176 | |
1177 SketchHeap local=null; | |
1178 { | |
1179 local=localMap.get(key); | |
1180 assert(local==null || local.taxID==key.intValue()); | |
1181 if(local==smm.heap){ | |
1182 //do nothing | |
1183 smm.heap=null; | |
1184 }else if(local==null){ | |
1185 if(expectedHeapSize==smm.heap.capacity()){//Store the current heap | |
1186 assert(smm.heap.taxID==key.intValue()); | |
1187 assert(smm.heap.name()!=null); | |
1188 localMap.put(key, smm.heap);//Safe | |
1189 smm.heap=null; | |
1190 }else{//Store a precisely-sized heap | |
1191 SketchHeap temp=new SketchHeap(expectedHeapSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts()); | |
1192 temp.add(smm.heap); | |
1193 assert(temp.taxID==key.intValue()); | |
1194 assert(temp.name()!=null); | |
1195 localMap.put(key, temp);//Looks safe | |
1196 // smm.heap=null; //Not sure which is better | |
1197 smm.heap.clear(true); | |
1198 } | |
1199 }else{ | |
1200 assert(local.taxID==smm.heap.taxID); | |
1201 assert(local!=smm.heap); | |
1202 // assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID+", "+key; //123 | |
1203 // assert(false) : taxID+", "+key; | |
1204 local.add(smm.heap); //This WAS the slow line. It should no longer be executed. | |
1205 // if(local.genomeSizeBases>=unitSizeBases){ | |
1206 // writeHeap=true; | |
1207 // localMap.remove(key); | |
1208 // } | |
1209 // smm.heap=null; //Not sure which is better | |
1210 smm.heap.clear(true); | |
1211 } | |
1212 } | |
1213 } | |
1214 } | |
1215 | |
1216 private void dumpLocalMap(){ | |
1217 | |
1218 for(Entry<Long, SketchHeap> e : localMap.entrySet()){ | |
1219 boolean writeHeap=false; | |
1220 final Long key=e.getKey(); | |
1221 final SketchHeap localHeap=e.getValue(); | |
1222 final HashMap<Long, SketchHeap> map=longMaps[(int)(key&MAP_MASK)]; | |
1223 final SketchHeap old; | |
1224 | |
1225 final long unitSizeBases; | |
1226 if(sizeList!=null){ | |
1227 unitSizeBases=localHeap.taxID<0 ? -1 : sizeList.get((int)localHeap.taxID); | |
1228 }else if(sizeMap!=null){ | |
1229 unitSizeBases=sizeMap.get(localHeap.imgID); | |
1230 }else{ | |
1231 unitSizeBases=-1; | |
1232 } | |
1233 final int expectedHeapSize=toSketchSize(unitSizeBases, -1, -1, targetSketchSize); | |
1234 | |
1235 synchronized(map){ | |
1236 old=map.get(key); | |
1237 if(old==null){ | |
1238 if(expectedHeapSize==localHeap.capacity()){//Store the current heap | |
1239 map.put(key, localHeap); | |
1240 }else{//Store a precisely-sized heap | |
1241 // assert(key.intValue()!=96897) : key+", "+unitSizeBases+", "+", "+sizeList.get(key.intValue()); | |
1242 assert(expectedHeapSize>0) : expectedHeapSize+", "+key+", "+localHeap.taxID+", "+localHeap.name()+", "+unitSizeBases; | |
1243 SketchHeap temp=new SketchHeap(expectedHeapSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts()); | |
1244 temp.add(localHeap); | |
1245 map.put(key, temp); | |
1246 } | |
1247 }else{ | |
1248 old.add(localHeap); //This was the slow line; should be faster now. | |
1249 if(old.genomeSizeBases>=unitSizeBases){ | |
1250 writeHeap=true; | |
1251 map.remove(key); | |
1252 } | |
1253 } | |
1254 } | |
1255 | |
1256 if(writeHeap){ | |
1257 assert(old!=null); //For compiler | |
1258 assert(old.genomeSizeBases>0) : unitSizeBases+", "+old.genomeSizeBases+", "+old.genomeSizeKmers; | |
1259 assert(old.genomeSizeBases==unitSizeBases) : unitSizeBases+", "+old.genomeSizeBases+", "+old.genomeSizeKmers+", "+old.size()+", "+old.taxID; | |
1260 writeHeap(old); | |
1261 } | |
1262 } | |
1263 localMap.clear(); | |
1264 } | |
1265 | |
1266 private boolean writeHeap(SketchHeap heap){ | |
1267 sketchesMadeT++; | |
1268 // assert(heap.size()>0) : heap.size(); //Not really necessary | |
1269 boolean written=false; | |
1270 // assert(heap.heap.size()==heap.set.size()) : heap.heap.size()+", "+heap.set.size(); | |
1271 if(heap.size()>0 && heap.genomeSizeKmers>=minSizeKmers && heap.sketchSizeEstimate()>0){ | |
1272 Sketch sketch=new Sketch(heap, true, tool.trackCounts, outMeta); | |
1273 if(outTaxID>=0 && sketch.taxID<0){sketch.taxID=outTaxID;} | |
1274 if(outTaxName!=null && sketch.taxName()==null){sketch.setTaxName(outTaxName);} | |
1275 if(outFname!=null && sketch.fname()==null){sketch.setFname(outFname);} | |
1276 if(outName0!=null && sketch.name0()==null){sketch.setName0(outName0);} | |
1277 if(outSpid>=0 && sketch.spid<0){sketch.spid=outSpid;} | |
1278 if(outImgID>=0 && sketch.imgID<0){sketch.imgID=outImgID;} | |
1279 if(parseSubunit && sketch.name0()!=null){ | |
1280 if(outMeta!=null){ | |
1281 sketch.meta=(ArrayList<String>)sketch.meta.clone(); | |
1282 }else if(sketch.meta==null){ | |
1283 if(sketch.name0().contains("SSU_")){ | |
1284 sketch.addMeta("subunit:ssu"); | |
1285 }else if(sketch.name0().contains("LSU_")){ | |
1286 sketch.addMeta("subunit:lsu"); | |
1287 } | |
1288 } | |
1289 } | |
1290 if(tsw!=null){ | |
1291 final int choice=(sketch.hashCode()&Integer.MAX_VALUE)%files; | |
1292 sketch.addSSU(); | |
1293 synchronized(tsw[choice]){ | |
1294 SketchTool.write(sketch, tsw[choice], bb); | |
1295 sketchesWrittenT++; | |
1296 written=true; | |
1297 } | |
1298 } | |
1299 }else{ | |
1300 heap.clear(true); | |
1301 } | |
1302 assert(heap.genomeSizeBases==0); | |
1303 assert(heap.genomeSequences==0); | |
1304 return written; | |
1305 } | |
1306 | |
1307 /** Number of reads processed by this thread */ | |
1308 protected long readsProcessedT=0; | |
1309 /** Number of bases processed by this thread */ | |
1310 protected long basesProcessedT=0; | |
1311 | |
1312 long sketchesMadeT=0; | |
1313 long sketchesWrittenT=0; | |
1314 | |
1315 /** True only if this thread has completed successfully */ | |
1316 boolean success=false; | |
1317 | |
1318 /** Shared input stream */ | |
1319 private final ConcurrentReadInputStream cris; | |
1320 /** Thread ID */ | |
1321 final int threadID; | |
1322 private ByteBuilder bb=new ByteBuilder(); | |
1323 | |
1324 final SketchMakerMini smm; | |
1325 final HashMap<Long, SketchHeap> localMap; | |
1326 } | |
1327 | |
1328 /*--------------------------------------------------------------*/ | |
1329 /*---------------- Fields ----------------*/ | |
1330 /*--------------------------------------------------------------*/ | |
1331 | |
1332 /** Primary input file path */ | |
1333 private String in1=null; | |
1334 /** Secondary input file path */ | |
1335 private String in2=null; | |
1336 | |
1337 /** Primary output file path */ | |
1338 private String out1=null; | |
1339 | |
1340 /** Override input file extension */ | |
1341 private String extin=null; | |
1342 | |
1343 private String giTableFile=null; | |
1344 private String taxTreeFile=null; | |
1345 private String accessionFile=null; | |
1346 private String imgFile=null; | |
1347 | |
1348 /*Override metadata */ | |
1349 String outTaxName=null; | |
1350 String outFname=null; | |
1351 String outName0=null; | |
1352 int outTaxID=-1; | |
1353 long outSpid=-1; | |
1354 long outImgID=-1; | |
1355 ArrayList<String> outMeta=null; | |
1356 static boolean parseSubunit=false; | |
1357 | |
1358 /*--------------------------------------------------------------*/ | |
1359 | |
1360 /** Number of reads processed */ | |
1361 protected long readsProcessed=0; | |
1362 /** Number of bases processed */ | |
1363 protected long basesProcessed=0; | |
1364 /** Number of bases processed */ | |
1365 protected long kmersProcessed=0; | |
1366 /** Number of sketches started */ | |
1367 protected long sketchesMade=0; | |
1368 /** Number of sketches written */ | |
1369 protected long sketchesWritten=0; | |
1370 | |
1371 /** Quit after processing this many input reads; -1 means no limit */ | |
1372 private long maxReads=-1; | |
1373 | |
1374 final LongList sizeList; | |
1375 final HashMap<Long, Long> sizeMap; | |
1376 | |
1377 private HashMap<Long, SketchHeap> longMaps[]; | |
1378 private ByteStreamWriter tsw[]; | |
1379 | |
1380 /*--------------------------------------------------------------*/ | |
1381 /*---------------- Final Fields ----------------*/ | |
1382 /*--------------------------------------------------------------*/ | |
1383 | |
1384 /** Primary input file */ | |
1385 private final FileFormat ffin1; | |
1386 /** Secondary input file */ | |
1387 private final FileFormat ffin2; | |
1388 | |
1389 /** Primary output files */ | |
1390 private final FileFormat ffout[]; | |
1391 /** Number of output files */ | |
1392 private final int files; | |
1393 | |
1394 final int mode; | |
1395 | |
1396 private final SketchTool tool; | |
1397 | |
1398 /** Don't make sketches from sequences smaller than this */ | |
1399 final int minSizeBases; | |
1400 /** Don't make sketches from sequences smaller than this */ | |
1401 final int minSizeKmers; | |
1402 | |
1403 private int taxLevel=1; | |
1404 private boolean prefilter=false; | |
1405 private boolean tossJunk=true; | |
1406 boolean bestEffort=true; | |
1407 // private final HashMap<Integer, byte[]> ssuMap=null; | |
1408 | |
1409 private final AtomicInteger nextUnknown=new AtomicInteger(minFakeID); | |
1410 | |
1411 private static final int MAP_WAYS=32; | |
1412 private static final int MAP_MASK=MAP_WAYS-1; | |
1413 | |
1414 | |
1415 /*--------------------------------------------------------------*/ | |
1416 /*---------------- Common Fields ----------------*/ | |
1417 /*--------------------------------------------------------------*/ | |
1418 | |
1419 /** Print status messages to this output stream */ | |
1420 private PrintStream outstream=System.err; | |
1421 /** Print verbose messages */ | |
1422 public static boolean verbose=false; | |
1423 /** True if an error was encountered */ | |
1424 public boolean errorState=false; | |
1425 /** Overwrite existing output files */ | |
1426 private boolean overwrite=false; | |
1427 /** Append to existing output files */ | |
1428 private boolean append=false; | |
1429 | |
1430 } |