jpayne@68
|
1 package sketch;
|
jpayne@68
|
2
|
jpayne@68
|
3 import java.io.File;
|
jpayne@68
|
4 import java.io.PrintStream;
|
jpayne@68
|
5 import java.util.ArrayDeque;
|
jpayne@68
|
6 import java.util.ArrayList;
|
jpayne@68
|
7 import java.util.Arrays;
|
jpayne@68
|
8 import java.util.HashMap;
|
jpayne@68
|
9 import java.util.Map.Entry;
|
jpayne@68
|
10 import java.util.concurrent.atomic.AtomicInteger;
|
jpayne@68
|
11
|
jpayne@68
|
12 import fileIO.ByteFile;
|
jpayne@68
|
13 import fileIO.ByteStreamWriter;
|
jpayne@68
|
14 import fileIO.FileFormat;
|
jpayne@68
|
15 import fileIO.ReadWrite;
|
jpayne@68
|
16 import shared.Parse;
|
jpayne@68
|
17 import shared.Parser;
|
jpayne@68
|
18 import shared.PreParser;
|
jpayne@68
|
19 import shared.ReadStats;
|
jpayne@68
|
20 import shared.Shared;
|
jpayne@68
|
21 import shared.Timer;
|
jpayne@68
|
22 import shared.Tools;
|
jpayne@68
|
23 import stream.ConcurrentReadInputStream;
|
jpayne@68
|
24 import stream.FASTQ;
|
jpayne@68
|
25 import stream.FastaReadInputStream;
|
jpayne@68
|
26 import stream.Read;
|
jpayne@68
|
27 import structures.ByteBuilder;
|
jpayne@68
|
28 import structures.ListNum;
|
jpayne@68
|
29 import structures.LongList;
|
jpayne@68
|
30 import tax.AccessionToTaxid;
|
jpayne@68
|
31 import tax.GiToTaxid;
|
jpayne@68
|
32 import tax.ImgRecord2;
|
jpayne@68
|
33 import tax.TaxNode;
|
jpayne@68
|
34 import tax.TaxTree;
|
jpayne@68
|
35
|
jpayne@68
|
36 /**
|
jpayne@68
|
37 * Creates MinHashSketches rapidly.
|
jpayne@68
|
38 *
|
jpayne@68
|
39 * @author Brian Bushnell
|
jpayne@68
|
40 * @date July 6, 2016
|
jpayne@68
|
41 *
|
jpayne@68
|
42 */
|
jpayne@68
|
43 public class SketchMaker extends SketchObject {
|
jpayne@68
|
44
|
jpayne@68
|
45 /*--------------------------------------------------------------*/
|
jpayne@68
|
46 /*---------------- Initialization ----------------*/
|
jpayne@68
|
47 /*--------------------------------------------------------------*/
|
jpayne@68
|
48
|
jpayne@68
|
49 /**
|
jpayne@68
|
50 * Code entrance from the command line.
|
jpayne@68
|
51 * @param args Command line arguments
|
jpayne@68
|
52 */
|
jpayne@68
|
53 public static void main(String[] args){
|
jpayne@68
|
54 //Start a timer immediately upon code entrance.
|
jpayne@68
|
55 Timer t=new Timer();
|
jpayne@68
|
56
|
jpayne@68
|
57 final int mode=parseMode(args);
|
jpayne@68
|
58 if(mode==PER_FILE/* || mode==ONE_SKETCH || mode==PER_HEADER*/ || mode==PER_SEQUENCE){//ONE_SKETCH does not work for multiple input files.
|
jpayne@68
|
59 recallCompareSketch(args);
|
jpayne@68
|
60 return;
|
jpayne@68
|
61 }
|
jpayne@68
|
62
|
jpayne@68
|
63 final int oldBufLen=Shared.bufferLen();
|
jpayne@68
|
64
|
jpayne@68
|
65 //Create an instance of this class
|
jpayne@68
|
66 SketchMaker x=new SketchMaker(args);
|
jpayne@68
|
67
|
jpayne@68
|
68 //Run the object
|
jpayne@68
|
69 x.process(t);
|
jpayne@68
|
70
|
jpayne@68
|
71 Shared.setBufferLen(oldBufLen);
|
jpayne@68
|
72
|
jpayne@68
|
73 //Close the print stream if it was redirected
|
jpayne@68
|
74 Shared.closeStream(x.outstream);
|
jpayne@68
|
75 }
|
jpayne@68
|
76
|
jpayne@68
|
77 private static void recallCompareSketch(String[] args){
|
jpayne@68
|
78 ArrayList<String> list=new ArrayList<String>(args.length+1);
|
jpayne@68
|
79 for(int i=0; i<args.length; i++){
|
jpayne@68
|
80 if(args[i].startsWith("out=")){
|
jpayne@68
|
81 args[i]=args[i].replaceFirst("out=", "outsketch=");
|
jpayne@68
|
82 }
|
jpayne@68
|
83 list.add(args[i]);
|
jpayne@68
|
84 }
|
jpayne@68
|
85 list.add("sketchonly");
|
jpayne@68
|
86 CompareSketch.main(list.toArray(new String[0]));
|
jpayne@68
|
87 }
|
jpayne@68
|
88
|
jpayne@68
|
89 /**
|
jpayne@68
|
90 * Constructor.
|
jpayne@68
|
91 * @param args Command line arguments
|
jpayne@68
|
92 */
|
jpayne@68
|
93 public SketchMaker(String[] args){
|
jpayne@68
|
94
|
jpayne@68
|
95 {//Preparse block for help, config files, and outstream
|
jpayne@68
|
96 PreParser pp=new PreParser(args, getClass(), false);
|
jpayne@68
|
97 args=pp.args;
|
jpayne@68
|
98 outstream=pp.outstream;
|
jpayne@68
|
99 }
|
jpayne@68
|
100
|
jpayne@68
|
101 //Set shared static variables
|
jpayne@68
|
102 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
|
jpayne@68
|
103 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
|
jpayne@68
|
104
|
jpayne@68
|
105 //Create a parser object
|
jpayne@68
|
106 Parser parser=new Parser();
|
jpayne@68
|
107
|
jpayne@68
|
108 int minSizeKmers_=100;
|
jpayne@68
|
109 int files_=1;
|
jpayne@68
|
110 int mode_=ONE_SKETCH;
|
jpayne@68
|
111 hashNames=true;
|
jpayne@68
|
112 boolean setPrefilter=false;
|
jpayne@68
|
113 defaultParams.printDepth=defaultParams.printDepth2=defaultParams.printVolume=false;
|
jpayne@68
|
114
|
jpayne@68
|
115 //Parse each argument
|
jpayne@68
|
116 for(int i=0; i<args.length; i++){
|
jpayne@68
|
117 String arg=args[i];
|
jpayne@68
|
118
|
jpayne@68
|
119 //Break arguments into their constituent parts, in the form of "a=b"
|
jpayne@68
|
120 String[] split=arg.split("=");
|
jpayne@68
|
121 String a=split[0].toLowerCase();
|
jpayne@68
|
122 String b=split.length>1 ? split[1] : null;
|
jpayne@68
|
123
|
jpayne@68
|
124 if(a.equals("verbose")){
|
jpayne@68
|
125 verbose=Parse.parseBoolean(b);
|
jpayne@68
|
126 }else if(a.equals("files")){
|
jpayne@68
|
127 files_=Integer.parseInt(b);
|
jpayne@68
|
128 }else if(a.equals("minsize")){
|
jpayne@68
|
129 minSizeKmers_=Parse.parseIntKMG(b);
|
jpayne@68
|
130 }else if(a.equals("prefilter")){
|
jpayne@68
|
131 prefilter=Parse.parseBoolean(b);
|
jpayne@68
|
132 setPrefilter=true;
|
jpayne@68
|
133 }
|
jpayne@68
|
134
|
jpayne@68
|
135 else if(a.equals("name") || a.equals("taxname")){
|
jpayne@68
|
136 outTaxName=b;
|
jpayne@68
|
137 }else if(a.equals("name0")){
|
jpayne@68
|
138 outName0=b;
|
jpayne@68
|
139 }else if(a.equals("fname")){
|
jpayne@68
|
140 outFname=b;
|
jpayne@68
|
141 }else if(a.equals("taxid") || a.equals("tid")){
|
jpayne@68
|
142 outTaxID=Integer.parseInt(b);
|
jpayne@68
|
143 }else if(a.equals("spid")){
|
jpayne@68
|
144 outSpid=Integer.parseInt(b);
|
jpayne@68
|
145 }else if(a.equals("imgid")){
|
jpayne@68
|
146 outImgID=Integer.parseInt(b);
|
jpayne@68
|
147 }else if((a.startsWith("meta_") || a.startsWith("mt_")) && b!=null){
|
jpayne@68
|
148 if(outMeta==null){outMeta=new ArrayList<String>();}
|
jpayne@68
|
149 int underscore=a.indexOf('_', 0);
|
jpayne@68
|
150 outMeta.add(a.substring(underscore+1)+":"+b);
|
jpayne@68
|
151 }else if(a.equals("parsesubunit")){
|
jpayne@68
|
152 parseSubunit=Parse.parseBoolean(b);
|
jpayne@68
|
153 }
|
jpayne@68
|
154
|
jpayne@68
|
155 else if(parseMode(arg, a, b)>-1){
|
jpayne@68
|
156 mode_=parseMode(arg, a, b);
|
jpayne@68
|
157 }else if(a.equals("parse_flag_goes_here")){
|
jpayne@68
|
158 long fake_variable=Parse.parseKMG(b);
|
jpayne@68
|
159 //Set a variable here
|
jpayne@68
|
160 }
|
jpayne@68
|
161
|
jpayne@68
|
162 else if(a.equals("table") || a.equals("gi") || a.equals("gitable")){
|
jpayne@68
|
163 giTableFile=b;
|
jpayne@68
|
164 }else if(a.equals("taxtree") || a.equals("tree")){
|
jpayne@68
|
165 taxTreeFile=b;
|
jpayne@68
|
166 }else if(a.equals("accession")){
|
jpayne@68
|
167 accessionFile=b;
|
jpayne@68
|
168 }else if(a.equalsIgnoreCase("img") || a.equals("imgfile") || a.equals("imgdump")){
|
jpayne@68
|
169 imgFile=b;
|
jpayne@68
|
170 }
|
jpayne@68
|
171
|
jpayne@68
|
172 else if(a.equals("tossjunk")){
|
jpayne@68
|
173 tossJunk=Parse.parseBoolean(b);
|
jpayne@68
|
174 }
|
jpayne@68
|
175
|
jpayne@68
|
176 // else if(a.equals("silva")){//Handled by parser
|
jpayne@68
|
177 // TaxTree.SILVA_MODE=Parse.parseBoolean(b);
|
jpayne@68
|
178 // }
|
jpayne@68
|
179
|
jpayne@68
|
180 else if(a.equals("taxlevel") || a.equals("tl") || a.equals("level") || a.equals("lv")){
|
jpayne@68
|
181 taxLevel=TaxTree.parseLevel(b);
|
jpayne@68
|
182 }
|
jpayne@68
|
183
|
jpayne@68
|
184 else if(parseSketchFlags(arg, a, b)){
|
jpayne@68
|
185 //do nothing
|
jpayne@68
|
186 // System.err.println("a: "+arg);
|
jpayne@68
|
187 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
|
jpayne@68
|
188 //do nothing
|
jpayne@68
|
189 // System.err.println("b: "+arg);
|
jpayne@68
|
190 }else if(defaultParams.parse(arg, a, b)){
|
jpayne@68
|
191 //do nothing
|
jpayne@68
|
192 // System.err.println("c: "+arg);
|
jpayne@68
|
193 // System.err.println(defaultParams.printDepth);
|
jpayne@68
|
194 }
|
jpayne@68
|
195
|
jpayne@68
|
196 else{
|
jpayne@68
|
197 outstream.println("Unknown parameter "+args[i]);
|
jpayne@68
|
198 assert(false) : "Unknown parameter "+args[i];
|
jpayne@68
|
199 }
|
jpayne@68
|
200 }
|
jpayne@68
|
201 if("auto".equalsIgnoreCase(imgFile)){imgFile=TaxTree.defaultImgFile();}
|
jpayne@68
|
202 if("auto".equalsIgnoreCase(taxTreeFile)){taxTreeFile=TaxTree.defaultTreeFile();}
|
jpayne@68
|
203 if("auto".equalsIgnoreCase(giTableFile)){giTableFile=TaxTree.defaultTableFile();}
|
jpayne@68
|
204 if("auto".equalsIgnoreCase(accessionFile)){accessionFile=TaxTree.defaultAccessionFile();}
|
jpayne@68
|
205
|
jpayne@68
|
206 outMeta=SketchObject.fixMeta(outMeta);
|
jpayne@68
|
207
|
jpayne@68
|
208 postParse();
|
jpayne@68
|
209 minSizeKmers=minSizeKmers_;
|
jpayne@68
|
210 mode=mode_;
|
jpayne@68
|
211
|
jpayne@68
|
212 minSizeBases=minSizeKmers_+k-1;
|
jpayne@68
|
213
|
jpayne@68
|
214 {//Process parser fields
|
jpayne@68
|
215 Parser.processQuality();
|
jpayne@68
|
216
|
jpayne@68
|
217 maxReads=parser.maxReads;
|
jpayne@68
|
218
|
jpayne@68
|
219 overwrite=ReadStats.overwrite=parser.overwrite;
|
jpayne@68
|
220 append=ReadStats.append=parser.append;
|
jpayne@68
|
221
|
jpayne@68
|
222 in1=parser.in1;
|
jpayne@68
|
223 in2=parser.in2;
|
jpayne@68
|
224
|
jpayne@68
|
225 out1=parser.out1;
|
jpayne@68
|
226
|
jpayne@68
|
227 extin=parser.extin;
|
jpayne@68
|
228 }
|
jpayne@68
|
229 files=(out1==null ? 0 : files_);
|
jpayne@68
|
230
|
jpayne@68
|
231 if(!setPrefilter && !prefilter && (mode==PER_TAXA || mode==PER_IMG) && in1!=null && !in1.startsWith("stdin") && (AUTOSIZE || AUTOSIZE_LINEAR || targetSketchSize>200)){
|
jpayne@68
|
232 prefilter=true;
|
jpayne@68
|
233 System.err.println("Enabled prefilter due to running in per-"+(mode==PER_TAXA ? "taxa" : "IMG")+" mode; override with 'prefilter=f'.");
|
jpayne@68
|
234 }
|
jpayne@68
|
235
|
jpayne@68
|
236 assert(mode!=ONE_SKETCH || files<2) : "Multiple output files are not allowed in single-sketch mode.";
|
jpayne@68
|
237
|
jpayne@68
|
238 //Do input file # replacement
|
jpayne@68
|
239 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
|
jpayne@68
|
240 in2=in1.replace("#", "2");
|
jpayne@68
|
241 in1=in1.replace("#", "1");
|
jpayne@68
|
242 }
|
jpayne@68
|
243
|
jpayne@68
|
244 //Adjust interleaved detection based on the number of input files
|
jpayne@68
|
245 if(in2!=null){
|
jpayne@68
|
246 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
|
jpayne@68
|
247 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
|
jpayne@68
|
248 }
|
jpayne@68
|
249
|
jpayne@68
|
250 assert(FastaReadInputStream.settingsOK());
|
jpayne@68
|
251
|
jpayne@68
|
252 //Ensure there is an input file
|
jpayne@68
|
253 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
|
jpayne@68
|
254
|
jpayne@68
|
255 //Adjust the number of threads for input file reading
|
jpayne@68
|
256 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
|
jpayne@68
|
257 ByteFile.FORCE_MODE_BF2=true;
|
jpayne@68
|
258 }
|
jpayne@68
|
259
|
jpayne@68
|
260 ffout=makeFFArray(out1, files, overwrite, append);
|
jpayne@68
|
261 if(ffout==null || ffout.length<1){
|
jpayne@68
|
262 System.err.println("WARNING: No output files were specified; no sketches will be written.\n");
|
jpayne@68
|
263 }
|
jpayne@68
|
264
|
jpayne@68
|
265 // //Ensure output files can be written
|
jpayne@68
|
266 // if(!Tools.testOutputFiles(overwrite, append, false, out1)){
|
jpayne@68
|
267 // outstream.println((out1==null)+", "+out1);
|
jpayne@68
|
268 // throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out1+"\n");
|
jpayne@68
|
269 // }
|
jpayne@68
|
270
|
jpayne@68
|
271 //Ensure input files can be read
|
jpayne@68
|
272 if(!Tools.testInputFiles(false, true, in1, in2, taxTreeFile, giTableFile, imgFile, SSUMap.r16SFile, SSUMap.r18SFile)){
|
jpayne@68
|
273 throw new RuntimeException("\nCan't read some input files.\n");
|
jpayne@68
|
274 }
|
jpayne@68
|
275
|
jpayne@68
|
276 //Ensure that no file was specified multiple times
|
jpayne@68
|
277 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, taxTreeFile, giTableFile, imgFile, SSUMap.r16SFile, SSUMap.r18SFile)){
|
jpayne@68
|
278 throw new RuntimeException("\nSome file names were specified multiple times.\n");
|
jpayne@68
|
279 }
|
jpayne@68
|
280
|
jpayne@68
|
281 //Create input FileFormat objects
|
jpayne@68
|
282 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
|
jpayne@68
|
283 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
|
jpayne@68
|
284
|
jpayne@68
|
285 // assert(false) : defaultParams.trackCounts();
|
jpayne@68
|
286 tool=new SketchTool(targetSketchSize, defaultParams);
|
jpayne@68
|
287
|
jpayne@68
|
288 if(taxTreeFile!=null){setTaxtree(taxTreeFile, outstream);}
|
jpayne@68
|
289
|
jpayne@68
|
290 if(giTableFile!=null){
|
jpayne@68
|
291 loadGiToTaxid();
|
jpayne@68
|
292 }
|
jpayne@68
|
293 if(accessionFile!=null){
|
jpayne@68
|
294 AccessionToTaxid.tree=taxtree;
|
jpayne@68
|
295 outstream.println("Loading accession table.");
|
jpayne@68
|
296 AccessionToTaxid.load(accessionFile);
|
jpayne@68
|
297 System.gc();
|
jpayne@68
|
298 }
|
jpayne@68
|
299 if(imgFile!=null){
|
jpayne@68
|
300 TaxTree.loadIMG(imgFile, true, outstream);
|
jpayne@68
|
301 }
|
jpayne@68
|
302 SSUMap.load(outstream);
|
jpayne@68
|
303
|
jpayne@68
|
304 if(prefilter){
|
jpayne@68
|
305 if(mode==PER_TAXA){sizeList=sizeList(); sizeMap=null;}
|
jpayne@68
|
306 else if(mode==PER_IMG){sizeMap=sizeMap(); sizeList=null;}
|
jpayne@68
|
307 else{
|
jpayne@68
|
308 assert(false) : "Wrong mode for prefilter; should be taxa or img.";
|
jpayne@68
|
309 sizeList=null; sizeMap=null;
|
jpayne@68
|
310 }
|
jpayne@68
|
311 }else{
|
jpayne@68
|
312 sizeList=null; sizeMap=null;
|
jpayne@68
|
313 }
|
jpayne@68
|
314 }
|
jpayne@68
|
315
|
jpayne@68
|
316 private static FileFormat[] makeFFArray(String fname0, int files, boolean overwrite, boolean append){
|
jpayne@68
|
317 if(files<1 || fname0==null){return null;}
|
jpayne@68
|
318 String[] fnames=new String[files];
|
jpayne@68
|
319 FileFormat[] ff=new FileFormat[files];
|
jpayne@68
|
320 for(int i=0; i<files; i++){
|
jpayne@68
|
321 String fname=fname0;
|
jpayne@68
|
322 if(files>1){
|
jpayne@68
|
323 assert(fname.indexOf('#')>-1) : "Output name requires # symbol for multiple files.";
|
jpayne@68
|
324 fname=fname.replaceFirst("#", ""+i);
|
jpayne@68
|
325 }
|
jpayne@68
|
326 fnames[i]=fname;
|
jpayne@68
|
327 ff[i]=FileFormat.testOutput(fname, FileFormat.TEXT, null, true, overwrite, append, false);
|
jpayne@68
|
328 }
|
jpayne@68
|
329
|
jpayne@68
|
330 if(!Tools.testOutputFiles(overwrite, append, false, fnames)){
|
jpayne@68
|
331 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+Arrays.toString(fnames)+"\n");
|
jpayne@68
|
332 }
|
jpayne@68
|
333
|
jpayne@68
|
334 return ff;
|
jpayne@68
|
335 }
|
jpayne@68
|
336
|
jpayne@68
|
337 // private static ByteStreamWriter[] makeTSWArray(FileFormat[] ff){
|
jpayne@68
|
338 // if(ff==null || ff.length==0){return null;}
|
jpayne@68
|
339 // ByteStreamWriter[] tsw=new ByteStreamWriter[ff.length];
|
jpayne@68
|
340 // for(int i=0; i<ff.length; i++){
|
jpayne@68
|
341 // tsw[i]=new ByteStreamWriter(ff[i]);
|
jpayne@68
|
342 // tsw[i].start();
|
jpayne@68
|
343 // }
|
jpayne@68
|
344 // return tsw;
|
jpayne@68
|
345 // }
|
jpayne@68
|
346
|
jpayne@68
|
347 private static ByteStreamWriter[] makeTSWArray(FileFormat[] ff){
|
jpayne@68
|
348 if(ff==null || ff.length==0){return null;}
|
jpayne@68
|
349 ByteStreamWriter[] tsw=new ByteStreamWriter[ff.length];
|
jpayne@68
|
350 for(int i=0; i<ff.length; i++){
|
jpayne@68
|
351 tsw[i]=new ByteStreamWriter(ff[i]);
|
jpayne@68
|
352 tsw[i].start();
|
jpayne@68
|
353 }
|
jpayne@68
|
354 return tsw;
|
jpayne@68
|
355 }
|
jpayne@68
|
356
|
jpayne@68
|
357 /*--------------------------------------------------------------*/
|
jpayne@68
|
358 /*---------------- Outer Methods ----------------*/
|
jpayne@68
|
359 /*--------------------------------------------------------------*/
|
jpayne@68
|
360
|
jpayne@68
|
361 //Makes a list of genome sizes (bases, not kmers) per taxa.
|
jpayne@68
|
362 private LongList sizeList(){
|
jpayne@68
|
363 Timer t=new Timer();
|
jpayne@68
|
364 Shared.GC_BEFORE_PRINT_MEMORY=true;
|
jpayne@68
|
365 t.start("Making taxa prefilter.");
|
jpayne@68
|
366 //For some reason this function is very slow, using only ~95% java and ~95% bgzip CPU.
|
jpayne@68
|
367 //But that's about the same speed as reformat... BBDuk is only a little faster.
|
jpayne@68
|
368
|
jpayne@68
|
369 // assert(false) : ReadWrite.USE_PIGZ+", "+ReadWrite.USE_UNPIGZ+", "+ReadWrite.MAX_ZIP_THREADS+", "+Shared.threads();
|
jpayne@68
|
370
|
jpayne@68
|
371 LongList sizes=new LongList();
|
jpayne@68
|
372
|
jpayne@68
|
373 //Create a read input stream
|
jpayne@68
|
374 final ConcurrentReadInputStream cris;
|
jpayne@68
|
375 {
|
jpayne@68
|
376 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null);
|
jpayne@68
|
377 // if(defaultParams.samplerate!=1){cris.setSampleRate(defaultParams.samplerate, sampleseed);} //Why would I ever want this here?
|
jpayne@68
|
378 cris.start(); //Start the stream
|
jpayne@68
|
379 if(verbose){outstream.println("Started cris");}
|
jpayne@68
|
380 }
|
jpayne@68
|
381
|
jpayne@68
|
382 //Grab the first ListNum of reads
|
jpayne@68
|
383 ListNum<Read> ln=cris.nextList();
|
jpayne@68
|
384 //Grab the actual read list from the ListNum
|
jpayne@68
|
385 ArrayList<Read> reads=(ln!=null ? ln.list : null);
|
jpayne@68
|
386
|
jpayne@68
|
387 //As long as there is a nonempty read list...
|
jpayne@68
|
388 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
|
jpayne@68
|
389 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
|
jpayne@68
|
390
|
jpayne@68
|
391 //Loop through each read in the list
|
jpayne@68
|
392 for(int idx=0; idx<reads.size(); idx++){
|
jpayne@68
|
393 final Read r1=reads.get(idx);
|
jpayne@68
|
394
|
jpayne@68
|
395 int taxID=-1;
|
jpayne@68
|
396 TaxNode tn=null;
|
jpayne@68
|
397 if(taxtree!=null){
|
jpayne@68
|
398 tn=taxtree.parseNodeFromHeader(r1.id, bestEffort);
|
jpayne@68
|
399 // System.err.println("A: "+bestEffort+", "+tn);//123
|
jpayne@68
|
400 while(tn!=null && tn.pid!=tn.id && tn.level<taxLevel){
|
jpayne@68
|
401 TaxNode temp=taxtree.getNode(tn.pid);
|
jpayne@68
|
402 if(temp==null || temp.level>=TaxTree.LIFE){break;}
|
jpayne@68
|
403 tn=temp;
|
jpayne@68
|
404 }
|
jpayne@68
|
405 if(tn!=null){taxID=tn.id;}
|
jpayne@68
|
406 }
|
jpayne@68
|
407
|
jpayne@68
|
408 if(taxID>0){
|
jpayne@68
|
409 long a=r1.length();
|
jpayne@68
|
410 long b=r1.mateLength();
|
jpayne@68
|
411 if(a<k){a=0;}
|
jpayne@68
|
412 if(b<k){b=0;}
|
jpayne@68
|
413 sizes.increment(taxID, a+b);
|
jpayne@68
|
414 }
|
jpayne@68
|
415 }
|
jpayne@68
|
416
|
jpayne@68
|
417 //Notify the input stream that the list was used
|
jpayne@68
|
418 cris.returnList(ln);
|
jpayne@68
|
419 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
|
jpayne@68
|
420
|
jpayne@68
|
421 //Fetch a new list
|
jpayne@68
|
422 ln=cris.nextList();
|
jpayne@68
|
423 reads=(ln!=null ? ln.list : null);
|
jpayne@68
|
424 }
|
jpayne@68
|
425
|
jpayne@68
|
426 //Notify the input stream that the final list was used
|
jpayne@68
|
427 if(ln!=null){
|
jpayne@68
|
428 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
|
jpayne@68
|
429 }
|
jpayne@68
|
430
|
jpayne@68
|
431 errorState|=ReadWrite.closeStream(cris);
|
jpayne@68
|
432 // outstream.println("Created prefilter.");
|
jpayne@68
|
433 t.stop("Created prefilter:");
|
jpayne@68
|
434 Shared.printMemory();
|
jpayne@68
|
435 System.err.println();
|
jpayne@68
|
436
|
jpayne@68
|
437 return sizes;
|
jpayne@68
|
438 }
|
jpayne@68
|
439
|
jpayne@68
|
440 //Makes a list of genome sizes (bases, not kmers) per img.
|
jpayne@68
|
441 private HashMap<Long, Long> sizeMap(){
|
jpayne@68
|
442 Timer t=new Timer();
|
jpayne@68
|
443 t.start("Making img prefilter.");
|
jpayne@68
|
444
|
jpayne@68
|
445 // assert(false) : ReadWrite.USE_PIGZ+", "+ReadWrite.USE_UNPIGZ+", "+ReadWrite.MAX_ZIP_THREADS+", "+Shared.threads();
|
jpayne@68
|
446
|
jpayne@68
|
447 HashMap<Long, Long> sizes=new HashMap<Long, Long>();
|
jpayne@68
|
448
|
jpayne@68
|
449 //Create a read input stream
|
jpayne@68
|
450 final ConcurrentReadInputStream cris;
|
jpayne@68
|
451 {
|
jpayne@68
|
452 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null);
|
jpayne@68
|
453 if(defaultParams.samplerate!=1){cris.setSampleRate(defaultParams.samplerate, sampleseed);}
|
jpayne@68
|
454 cris.start(); //Start the stream
|
jpayne@68
|
455 if(verbose){outstream.println("Started cris");}
|
jpayne@68
|
456 }
|
jpayne@68
|
457
|
jpayne@68
|
458 //Grab the first ListNum of reads
|
jpayne@68
|
459 ListNum<Read> ln=cris.nextList();
|
jpayne@68
|
460 //Grab the actual read list from the ListNum
|
jpayne@68
|
461 ArrayList<Read> reads=(ln!=null ? ln.list : null);
|
jpayne@68
|
462
|
jpayne@68
|
463 //As long as there is a nonempty read list...
|
jpayne@68
|
464 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
|
jpayne@68
|
465 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
|
jpayne@68
|
466
|
jpayne@68
|
467 //Loop through each read in the list
|
jpayne@68
|
468 for(int idx=0; idx<reads.size(); idx++){
|
jpayne@68
|
469 final Read r1=reads.get(idx);
|
jpayne@68
|
470
|
jpayne@68
|
471 long imgID=ImgRecord2.parseImgId(r1.id, true);
|
jpayne@68
|
472 assert(imgID>-1) : "IMG records must start with IMG number followed by a space: "+r1.id;
|
jpayne@68
|
473
|
jpayne@68
|
474 if(imgID>0){
|
jpayne@68
|
475 long a=r1.length();
|
jpayne@68
|
476 long b=r1.mateLength();
|
jpayne@68
|
477 if(a<k){a=0;}
|
jpayne@68
|
478 if(b<k){b=0;}
|
jpayne@68
|
479
|
jpayne@68
|
480 if(a+b>0){
|
jpayne@68
|
481 Long old=sizes.get(imgID);
|
jpayne@68
|
482 if(old==null){sizes.put(imgID, a+b);}
|
jpayne@68
|
483 else{sizes.put(imgID, a+b+old);}
|
jpayne@68
|
484 }
|
jpayne@68
|
485 }
|
jpayne@68
|
486 }
|
jpayne@68
|
487
|
jpayne@68
|
488 //Notify the input stream that the list was used
|
jpayne@68
|
489 cris.returnList(ln);
|
jpayne@68
|
490 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
|
jpayne@68
|
491
|
jpayne@68
|
492 //Fetch a new list
|
jpayne@68
|
493 ln=cris.nextList();
|
jpayne@68
|
494 reads=(ln!=null ? ln.list : null);
|
jpayne@68
|
495 }
|
jpayne@68
|
496
|
jpayne@68
|
497 //Notify the input stream that the final list was used
|
jpayne@68
|
498 if(ln!=null){
|
jpayne@68
|
499 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
|
jpayne@68
|
500 }
|
jpayne@68
|
501
|
jpayne@68
|
502 errorState|=ReadWrite.closeStream(cris);
|
jpayne@68
|
503 // outstream.println("Created prefilter.");
|
jpayne@68
|
504 t.stop("Created prefilter:");
|
jpayne@68
|
505 Shared.printMemory();
|
jpayne@68
|
506 System.err.println();
|
jpayne@68
|
507
|
jpayne@68
|
508 return sizes;
|
jpayne@68
|
509 }
|
jpayne@68
|
510
|
jpayne@68
|
511 /** Create read streams and process all data */
|
jpayne@68
|
512 void process(Timer t){
|
jpayne@68
|
513
|
jpayne@68
|
514 //Reset counters
|
jpayne@68
|
515 readsProcessed=0;
|
jpayne@68
|
516 basesProcessed=0;
|
jpayne@68
|
517
|
jpayne@68
|
518 if(mode==ONE_SKETCH && !forceDisableMultithreadedFastq && Shared.threads()>2 && ffin1.fastq()){
|
jpayne@68
|
519 // Shared.setBufferLen(2);
|
jpayne@68
|
520 singleSketchMT();
|
jpayne@68
|
521 }else{
|
jpayne@68
|
522 final int oldLen=Shared.bufferLen();
|
jpayne@68
|
523 Shared.capBufferLen(ffin1.fastq() ? 40 : 4);
|
jpayne@68
|
524
|
jpayne@68
|
525 //Turn off read validation in the input threads to increase speed
|
jpayne@68
|
526 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
|
jpayne@68
|
527 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
|
jpayne@68
|
528
|
jpayne@68
|
529 //Create a read input stream
|
jpayne@68
|
530 final ConcurrentReadInputStream cris;
|
jpayne@68
|
531 {
|
jpayne@68
|
532 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null);
|
jpayne@68
|
533 cris.start(); //Start the stream
|
jpayne@68
|
534 if(verbose){outstream.println("Started cris");}
|
jpayne@68
|
535 }
|
jpayne@68
|
536
|
jpayne@68
|
537 //Process the reads in separate threads
|
jpayne@68
|
538 spawnThreads(cris);
|
jpayne@68
|
539
|
jpayne@68
|
540 if(verbose){outstream.println("Finished; closing streams.");}
|
jpayne@68
|
541
|
jpayne@68
|
542 //Write anything that was accumulated by ReadStats
|
jpayne@68
|
543 errorState|=ReadStats.writeAll();
|
jpayne@68
|
544 //Close the read streams
|
jpayne@68
|
545 errorState|=ReadWrite.closeStream(cris);
|
jpayne@68
|
546
|
jpayne@68
|
547 //TODO: Write sketch
|
jpayne@68
|
548
|
jpayne@68
|
549 //Reset read validation
|
jpayne@68
|
550 Read.VALIDATE_IN_CONSTRUCTOR=vic;
|
jpayne@68
|
551 Shared.setBufferLen(oldLen);
|
jpayne@68
|
552 }
|
jpayne@68
|
553
|
jpayne@68
|
554 //Report timing and results
|
jpayne@68
|
555 t.stop();
|
jpayne@68
|
556 outstream.println("Wrote "+sketchesWritten+" of "+sketchesMade+" sketches.\n");
|
jpayne@68
|
557 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
|
jpayne@68
|
558
|
jpayne@68
|
559 //Throw an exception of there was an error in a thread
|
jpayne@68
|
560 if(errorState){
|
jpayne@68
|
561 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
|
jpayne@68
|
562 }
|
jpayne@68
|
563 }
|
jpayne@68
|
564
|
jpayne@68
|
565 private void singleSketchMT(){
|
jpayne@68
|
566 Timer t=new Timer();
|
jpayne@68
|
567 Sketch sketch=tool.processReadsMT(ffin1, ffin2, Shared.threads(),
|
jpayne@68
|
568 maxReads, mode, defaultParams.samplerate, defaultParams.minEntropy, defaultParams.minProb, defaultParams.minQual, false);
|
jpayne@68
|
569
|
jpayne@68
|
570 if(outTaxID>=0){sketch.taxID=outTaxID;}
|
jpayne@68
|
571 if(outTaxName!=null){sketch.setTaxName(outTaxName);}
|
jpayne@68
|
572 if(outFname!=null){sketch.setFname(outFname);}
|
jpayne@68
|
573 if(outName0!=null){sketch.setName0(outName0);}
|
jpayne@68
|
574 if(outSpid>=0){sketch.spid=outSpid;}
|
jpayne@68
|
575 if(outImgID>=0){sketch.imgID=outImgID;}
|
jpayne@68
|
576 sketch.setMeta(outMeta);
|
jpayne@68
|
577
|
jpayne@68
|
578 //Accumulate per-thread statistics
|
jpayne@68
|
579 readsProcessed+=sketch.genomeSequences;
|
jpayne@68
|
580 basesProcessed+=sketch.genomeSizeBases;
|
jpayne@68
|
581 kmersProcessed+=sketch.genomeSizeKmers;
|
jpayne@68
|
582
|
jpayne@68
|
583 sketchesMade+=1;
|
jpayne@68
|
584
|
jpayne@68
|
585 t.stop("Finished sketching: ");
|
jpayne@68
|
586 Shared.printMemory();
|
jpayne@68
|
587
|
jpayne@68
|
588 if(ffout!=null && ffout.length>0){
|
jpayne@68
|
589 sketch.addSSU();
|
jpayne@68
|
590 SketchTool.write(sketch, ffout[0]);
|
jpayne@68
|
591 sketchesWritten+=1;
|
jpayne@68
|
592 }
|
jpayne@68
|
593 }
|
jpayne@68
|
594
|
jpayne@68
|
595 /** Spawn process threads */
|
jpayne@68
|
596 @SuppressWarnings("unchecked")
|
jpayne@68
|
597 private void spawnThreads(final ConcurrentReadInputStream cris){
|
jpayne@68
|
598
|
jpayne@68
|
599 //Do anything necessary prior to processing
|
jpayne@68
|
600 Timer t=new Timer();
|
jpayne@68
|
601
|
jpayne@68
|
602 //Determine how many threads may be used
|
jpayne@68
|
603 final int threads=Tools.mid(1, Shared.threads(), 14);//Probably capped for memory reasons. Rarely hits 1200% anyway (was 12, bumped to 14).
|
jpayne@68
|
604
|
jpayne@68
|
605 //Fill a list with ProcessThreads
|
jpayne@68
|
606 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
|
jpayne@68
|
607
|
jpayne@68
|
608 if(mode==PER_TAXA || mode==PER_IMG){
|
jpayne@68
|
609 longMaps=new HashMap[MAP_WAYS];
|
jpayne@68
|
610 for(int i=0; i<longMaps.length; i++){
|
jpayne@68
|
611 longMaps[i]=new HashMap<Long, SketchHeap>();
|
jpayne@68
|
612 }
|
jpayne@68
|
613 }
|
jpayne@68
|
614
|
jpayne@68
|
615 if(mode!=ONE_SKETCH){tsw=makeTSWArray(ffout);}
|
jpayne@68
|
616
|
jpayne@68
|
617 for(int i=0; i<threads; i++){
|
jpayne@68
|
618 alpt.add(new ProcessThread(cris, i));
|
jpayne@68
|
619 }
|
jpayne@68
|
620
|
jpayne@68
|
621 //Start the threads
|
jpayne@68
|
622 for(ProcessThread pt : alpt){
|
jpayne@68
|
623 pt.start();
|
jpayne@68
|
624 }
|
jpayne@68
|
625
|
jpayne@68
|
626 //Wait for completion of all threads
|
jpayne@68
|
627 boolean success=true;
|
jpayne@68
|
628 SketchHeap singleHeap=null;
|
jpayne@68
|
629 for(ProcessThread pt : alpt){
|
jpayne@68
|
630
|
jpayne@68
|
631 //Wait until this thread has terminated
|
jpayne@68
|
632 while(pt.getState()!=Thread.State.TERMINATED){
|
jpayne@68
|
633 try {
|
jpayne@68
|
634 //Attempt a join operation
|
jpayne@68
|
635 pt.join();
|
jpayne@68
|
636 } catch (InterruptedException e) {
|
jpayne@68
|
637 //Potentially handle this, if it is expected to occur
|
jpayne@68
|
638 e.printStackTrace();
|
jpayne@68
|
639 }
|
jpayne@68
|
640 }
|
jpayne@68
|
641
|
jpayne@68
|
642 //Accumulate per-thread statistics
|
jpayne@68
|
643 readsProcessed+=pt.readsProcessedT;
|
jpayne@68
|
644 basesProcessed+=pt.basesProcessedT;
|
jpayne@68
|
645 kmersProcessed+=pt.smm.kmersProcessed;
|
jpayne@68
|
646
|
jpayne@68
|
647 sketchesMade+=pt.sketchesMadeT;
|
jpayne@68
|
648 sketchesWritten+=pt.sketchesWrittenT;
|
jpayne@68
|
649
|
jpayne@68
|
650 // System.err.println(pt.sketchesMadeT+" "+pt.sketchesWrittenT);
|
jpayne@68
|
651
|
jpayne@68
|
652 // System.err.println("pt.readsProcessedT="+pt.readsProcessedT);
|
jpayne@68
|
653 if(mode==ONE_SKETCH){
|
jpayne@68
|
654 SketchHeap temp=pt.smm.heap;
|
jpayne@68
|
655
|
jpayne@68
|
656 if(temp==null){
|
jpayne@68
|
657 //do nothing
|
jpayne@68
|
658 }else if(singleHeap==null){singleHeap=pt.smm.heap;}
|
jpayne@68
|
659 else{singleHeap.add(pt.smm.heap);}
|
jpayne@68
|
660
|
jpayne@68
|
661 if(singleHeap!=null){
|
jpayne@68
|
662 if(outTaxID>=0){singleHeap.taxID=outTaxID;}
|
jpayne@68
|
663 if(outTaxName!=null){singleHeap.setTaxName(outTaxName);}
|
jpayne@68
|
664 if(outFname!=null){singleHeap.setFname(outFname);}
|
jpayne@68
|
665 if(outName0!=null){singleHeap.setName0(outName0);}
|
jpayne@68
|
666 if(outImgID>=0){singleHeap.imgID=outImgID;}
|
jpayne@68
|
667
|
jpayne@68
|
668 singleHeap.genomeSizeBases=basesProcessed;
|
jpayne@68
|
669 singleHeap.genomeSizeKmers=kmersProcessed;
|
jpayne@68
|
670 singleHeap.genomeSequences=readsProcessed;
|
jpayne@68
|
671 }
|
jpayne@68
|
672 }
|
jpayne@68
|
673 success&=pt.success;
|
jpayne@68
|
674 }
|
jpayne@68
|
675
|
jpayne@68
|
676 if(singleHeap!=null){
|
jpayne@68
|
677 singleHeap.setFname(ffin1.simpleName());
|
jpayne@68
|
678 if(singleHeap.name0()==null){
|
jpayne@68
|
679 singleHeap.setName0(ffin1.simpleName());
|
jpayne@68
|
680 }
|
jpayne@68
|
681 }
|
jpayne@68
|
682
|
jpayne@68
|
683 t.stop("Finished sketching: ");
|
jpayne@68
|
684 Shared.printMemory();
|
jpayne@68
|
685
|
jpayne@68
|
686 if(ffout!=null){
|
jpayne@68
|
687 if(mode==PER_TAXA || mode==PER_IMG){
|
jpayne@68
|
688 if(tsw==null){tsw=makeTSWArray(ffout);}
|
jpayne@68
|
689 success&=writeMap(longMaps);
|
jpayne@68
|
690 }else if(mode==ONE_SKETCH){
|
jpayne@68
|
691 Sketch sketch=new Sketch(singleHeap, true, tool.trackCounts, outMeta);
|
jpayne@68
|
692 if(outTaxID>=0){sketch.taxID=outTaxID;}
|
jpayne@68
|
693 if(outTaxName!=null){sketch.setTaxName(outTaxName);}
|
jpayne@68
|
694 if(outFname!=null){sketch.setFname(outFname);}
|
jpayne@68
|
695 if(outName0!=null){sketch.setName0(outName0);}
|
jpayne@68
|
696 if(outSpid>=0){sketch.spid=outSpid;}
|
jpayne@68
|
697 if(outImgID>=0){sketch.imgID=outImgID;}
|
jpayne@68
|
698 if(ffout!=null && ffout.length>0){
|
jpayne@68
|
699 sketch.addSSU();
|
jpayne@68
|
700 SketchTool.write(sketch, ffout[0]);
|
jpayne@68
|
701 }
|
jpayne@68
|
702 sketchesMade++;
|
jpayne@68
|
703 sketchesWritten++;
|
jpayne@68
|
704 }
|
jpayne@68
|
705 }
|
jpayne@68
|
706
|
jpayne@68
|
707 if(tsw!=null){
|
jpayne@68
|
708 for(int i=0; i<tsw.length; i++){tsw[i].poisonAndWait();}
|
jpayne@68
|
709 }
|
jpayne@68
|
710
|
jpayne@68
|
711 //Track whether any threads failed
|
jpayne@68
|
712 if(!success){errorState=true;}
|
jpayne@68
|
713
|
jpayne@68
|
714 //Do anything necessary after processing
|
jpayne@68
|
715
|
jpayne@68
|
716 }
|
jpayne@68
|
717
|
jpayne@68
|
718 /*--------------------------------------------------------------*/
|
jpayne@68
|
719 /*---------------- I/O Methods ----------------*/
|
jpayne@68
|
720 /*--------------------------------------------------------------*/
|
jpayne@68
|
721
|
jpayne@68
|
722 private boolean writeMap(HashMap<Long, SketchHeap>[] maps){
|
jpayne@68
|
723
|
jpayne@68
|
724 //Determine how many threads may be used
|
jpayne@68
|
725 final int threads=files;
|
jpayne@68
|
726
|
jpayne@68
|
727 //Fill a list with WriteThreads
|
jpayne@68
|
728 ArrayList<WriteThread> alwt=new ArrayList<WriteThread>(threads);
|
jpayne@68
|
729
|
jpayne@68
|
730 @SuppressWarnings("unchecked")
|
jpayne@68
|
731 ArrayDeque<SketchHeap>[] heaps=new ArrayDeque[threads];
|
jpayne@68
|
732 for(int i=0; i<threads; i++){
|
jpayne@68
|
733 heaps[i]=new ArrayDeque<SketchHeap>();
|
jpayne@68
|
734 WriteThread wt=new WriteThread(i, heaps[i]);
|
jpayne@68
|
735 alwt.add(wt);
|
jpayne@68
|
736 }
|
jpayne@68
|
737
|
jpayne@68
|
738 for(int i=0; i<maps.length; i++){
|
jpayne@68
|
739 HashMap<Long, SketchHeap> longMap=maps[i];
|
jpayne@68
|
740 for(Entry<Long, SketchHeap> entry : longMap.entrySet()){
|
jpayne@68
|
741 // set.remove(entry); This will probably not work
|
jpayne@68
|
742 SketchHeap entryHeap=entry.getValue();
|
jpayne@68
|
743 sketchesMade++;
|
jpayne@68
|
744 if(entryHeap.size()>0 && entryHeap.genomeSizeKmers>=minSizeKmers){
|
jpayne@68
|
745 heaps[(entry.hashCode()&Integer.MAX_VALUE)%threads].add(entryHeap);
|
jpayne@68
|
746 }
|
jpayne@68
|
747 }
|
jpayne@68
|
748 // intMap.clear();
|
jpayne@68
|
749 maps[i]=null;
|
jpayne@68
|
750 }
|
jpayne@68
|
751
|
jpayne@68
|
752 //Start the threads
|
jpayne@68
|
753 for(WriteThread wt : alwt){wt.start();}
|
jpayne@68
|
754
|
jpayne@68
|
755 //Wait for completion of all threads
|
jpayne@68
|
756 boolean success=true;
|
jpayne@68
|
757 for(WriteThread wt : alwt){
|
jpayne@68
|
758
|
jpayne@68
|
759 //Wait until this thread has terminated
|
jpayne@68
|
760 while(wt.getState()!=Thread.State.TERMINATED){
|
jpayne@68
|
761 try {
|
jpayne@68
|
762 //Attempt a join operation
|
jpayne@68
|
763 wt.join();
|
jpayne@68
|
764 } catch (InterruptedException e) {
|
jpayne@68
|
765 //Potentially handle this, if it is expected to occur
|
jpayne@68
|
766 e.printStackTrace();
|
jpayne@68
|
767 }
|
jpayne@68
|
768 }
|
jpayne@68
|
769 // sketchesMade+=wt.sketchesMadeT;
|
jpayne@68
|
770 sketchesWritten+=wt.sketchesWrittenT;
|
jpayne@68
|
771 success&=wt.success;
|
jpayne@68
|
772 }
|
jpayne@68
|
773 return success;
|
jpayne@68
|
774 }
|
jpayne@68
|
775
|
jpayne@68
|
776 private class WriteThread extends Thread{
|
jpayne@68
|
777
|
jpayne@68
|
778 WriteThread(int tnum_, ArrayDeque<SketchHeap> queue_){
|
jpayne@68
|
779 tnum=tnum_;
|
jpayne@68
|
780 queue=queue_;
|
jpayne@68
|
781 }
|
jpayne@68
|
782
|
jpayne@68
|
783 @Override
|
jpayne@68
|
784 public void run(){
|
jpayne@68
|
785 success=false;
|
jpayne@68
|
786 for(SketchHeap polledHeap=queue.poll(); polledHeap!=null; polledHeap=queue.poll()){
|
jpayne@68
|
787 if(polledHeap.sketchSizeEstimate()>0){
|
jpayne@68
|
788 Sketch sketch=new Sketch(polledHeap, true, tool.trackCounts, outMeta);
|
jpayne@68
|
789 if(outTaxID>=0 && sketch.taxID<0){sketch.taxID=outTaxID;}
|
jpayne@68
|
790 if(outTaxName!=null && sketch.taxName()==null){sketch.setTaxName(outTaxName);}
|
jpayne@68
|
791 if(outFname!=null && sketch.fname()==null){sketch.setFname(outFname);}
|
jpayne@68
|
792 if(outName0!=null && sketch.name0()==null){sketch.setName0(outName0);}
|
jpayne@68
|
793 if(outSpid>=0 && sketch.spid<0){sketch.spid=outSpid;}
|
jpayne@68
|
794 if(outImgID>=0 && sketch.imgID<0){sketch.imgID=outImgID;}
|
jpayne@68
|
795 sketch.addSSU();
|
jpayne@68
|
796 SketchTool.write(sketch, tsw[tnum], bb);
|
jpayne@68
|
797 sketchesWrittenT++;
|
jpayne@68
|
798 }
|
jpayne@68
|
799 }
|
jpayne@68
|
800 bb=null;
|
jpayne@68
|
801 success=true;
|
jpayne@68
|
802 queue=null;
|
jpayne@68
|
803 }
|
jpayne@68
|
804
|
jpayne@68
|
805 ArrayDeque<SketchHeap> queue;
|
jpayne@68
|
806 final int tnum;
|
jpayne@68
|
807 private ByteBuilder bb=new ByteBuilder();
|
jpayne@68
|
808 // long sketchesMadeT=0;
|
jpayne@68
|
809 long sketchesWrittenT=0;
|
jpayne@68
|
810 boolean success=false;
|
jpayne@68
|
811 }
|
jpayne@68
|
812
|
jpayne@68
|
813 // private void writeOutput(ConcurrentHashMap<Integer, SketchHeap> map){
|
jpayne@68
|
814 // ByteStreamWriter tsw=new ByteStreamWriter(ffout);
|
jpayne@68
|
815 // tsw.start();
|
jpayne@68
|
816 // KeySetView<Integer, SketchHeap> y=map.keySet();
|
jpayne@68
|
817 // for(Integer x : map.keySet()){
|
jpayne@68
|
818 // SketchHeap smm.heap=map.get(x);
|
jpayne@68
|
819 // Sketch s=tool.toSketch(smm.heap);
|
jpayne@68
|
820 // tool.write(s, tsw);
|
jpayne@68
|
821 // }
|
jpayne@68
|
822 // tsw.poisonAndWait();
|
jpayne@68
|
823 // }
|
jpayne@68
|
824
|
jpayne@68
|
825 /*--------------------------------------------------------------*/
|
jpayne@68
|
826 /*---------------- Tax Methods ----------------*/
|
jpayne@68
|
827 /*--------------------------------------------------------------*/
|
jpayne@68
|
828
|
jpayne@68
|
829 private void loadGiToTaxid(){
|
jpayne@68
|
830 Timer t=new Timer();
|
jpayne@68
|
831 outstream.println("Loading gi to taxa translation table.");
|
jpayne@68
|
832 GiToTaxid.initialize(giTableFile);
|
jpayne@68
|
833 t.stop();
|
jpayne@68
|
834 if(true){
|
jpayne@68
|
835 outstream.println("Time: \t"+t);
|
jpayne@68
|
836 Shared.printMemory();
|
jpayne@68
|
837 outstream.println();
|
jpayne@68
|
838 }
|
jpayne@68
|
839 }
|
jpayne@68
|
840
|
jpayne@68
|
841 /*--------------------------------------------------------------*/
|
jpayne@68
|
842 /*---------------- Inner Methods ----------------*/
|
jpayne@68
|
843 /*--------------------------------------------------------------*/
|
jpayne@68
|
844
|
jpayne@68
|
845 /*--------------------------------------------------------------*/
|
jpayne@68
|
846 /*---------------- Inner Classes ----------------*/
|
jpayne@68
|
847 /*--------------------------------------------------------------*/
|
jpayne@68
|
848
|
jpayne@68
|
849 private class ProcessThread extends Thread {
|
jpayne@68
|
850
|
jpayne@68
|
851 //Constructor
|
jpayne@68
|
852 ProcessThread(final ConcurrentReadInputStream cris_, final int tid_){
|
jpayne@68
|
853 cris=cris_;
|
jpayne@68
|
854 threadID=tid_;
|
jpayne@68
|
855
|
jpayne@68
|
856 smm=new SketchMakerMini(tool, mode, defaultParams.minEntropy, defaultParams.minProb, defaultParams.minQual);
|
jpayne@68
|
857 localMap=(mode==PER_TAXA ? new HashMap<Long, SketchHeap>() : null);
|
jpayne@68
|
858 }
|
jpayne@68
|
859
|
jpayne@68
|
860 //Called by start()
|
jpayne@68
|
861 @Override
|
jpayne@68
|
862 public void run(){
|
jpayne@68
|
863 //Do anything necessary prior to processing
|
jpayne@68
|
864
|
jpayne@68
|
865 //Process the reads
|
jpayne@68
|
866 processInner();
|
jpayne@68
|
867
|
jpayne@68
|
868 //Do anything necessary after processing
|
jpayne@68
|
869 bb=null;
|
jpayne@68
|
870
|
jpayne@68
|
871 //Indicate successful exit status
|
jpayne@68
|
872 success=true;
|
jpayne@68
|
873 }
|
jpayne@68
|
874
|
jpayne@68
|
875 /** Iterate through the reads */
|
jpayne@68
|
876 void processInner(){
|
jpayne@68
|
877
|
jpayne@68
|
878 //Grab the first ListNum of reads
|
jpayne@68
|
879 ListNum<Read> ln=cris.nextList();
|
jpayne@68
|
880 //Grab the actual read list from the ListNum
|
jpayne@68
|
881 ArrayList<Read> reads=(ln!=null ? ln.list : null);
|
jpayne@68
|
882
|
jpayne@68
|
883 //Check to ensure pairing is as expected
|
jpayne@68
|
884 if(reads!=null && !reads.isEmpty()){
|
jpayne@68
|
885 Read r=reads.get(0);
|
jpayne@68
|
886 assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
|
jpayne@68
|
887 }
|
jpayne@68
|
888
|
jpayne@68
|
889 // long cntr1=0, cntr2=0, cntr3=0, cntr4=0;
|
jpayne@68
|
890
|
jpayne@68
|
891 //As long as there is a nonempty read list...
|
jpayne@68
|
892 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
|
jpayne@68
|
893 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
|
jpayne@68
|
894
|
jpayne@68
|
895 //Loop through each read in the list
|
jpayne@68
|
896 for(int idx=0; idx<reads.size(); idx++){
|
jpayne@68
|
897 final Read r1=reads.get(idx);
|
jpayne@68
|
898 final Read r2=r1.mate;
|
jpayne@68
|
899
|
jpayne@68
|
900 processReadPair(r1, r2);
|
jpayne@68
|
901 }
|
jpayne@68
|
902
|
jpayne@68
|
903 //Notify the input stream that the list was used
|
jpayne@68
|
904 cris.returnList(ln);
|
jpayne@68
|
905 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
|
jpayne@68
|
906
|
jpayne@68
|
907 //Fetch a new list
|
jpayne@68
|
908 ln=cris.nextList();
|
jpayne@68
|
909 reads=(ln!=null ? ln.list : null);
|
jpayne@68
|
910 }
|
jpayne@68
|
911
|
jpayne@68
|
912 //Notify the input stream that the final list was used
|
jpayne@68
|
913 if(ln!=null){
|
jpayne@68
|
914 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
|
jpayne@68
|
915 }
|
jpayne@68
|
916
|
jpayne@68
|
917 if(mode==PER_TAXA && localMap.size()>0){dumpLocalMap();}
|
jpayne@68
|
918
|
jpayne@68
|
919 // System.out.println(cntr1+", "+cntr2+", "+cntr3+", "+cntr4);
|
jpayne@68
|
920 }
|
jpayne@68
|
921
|
jpayne@68
|
922 void processReadPair(Read r1, Read r2){
|
jpayne@68
|
923
|
jpayne@68
|
924 if(mode==PER_TAXA){
|
jpayne@68
|
925 assert(smm.heap==null || smm.heap.size()==0) : smm.heap.genomeSizeBases+", "+smm.heap;
|
jpayne@68
|
926 assert(smm.heap==null || smm.heap.genomeSizeBases==0) : smm.heap.genomeSizeBases+", "+smm.heap;
|
jpayne@68
|
927 }else if(mode==PER_SEQUENCE || mode==PER_IMG){
|
jpayne@68
|
928 assert(smm.heap==null || smm.heap.size()==0) : smm.heap.genomeSizeBases+", "+smm.heap;
|
jpayne@68
|
929 assert(smm.heap==null || smm.heap.genomeSizeBases==0) : smm.heap.genomeSizeBases+", "+smm.heap;
|
jpayne@68
|
930 }else{
|
jpayne@68
|
931 assert(smm.heap!=null && smm.heap.capacity()>=targetSketchSize) : targetSketchSize+", "+(smm.heap==null ? "null" : ""+smm.heap.capacity());
|
jpayne@68
|
932 }
|
jpayne@68
|
933
|
jpayne@68
|
934 //Track the initial length for statistics
|
jpayne@68
|
935 final int initialLength1=r1.length();
|
jpayne@68
|
936 final int initialLength2=r1.mateLength();
|
jpayne@68
|
937 final String rid=r1.id;
|
jpayne@68
|
938
|
jpayne@68
|
939 //Increment counters
|
jpayne@68
|
940 readsProcessedT+=r1.pairCount();
|
jpayne@68
|
941 basesProcessedT+=initialLength1+initialLength2;
|
jpayne@68
|
942
|
jpayne@68
|
943 if(initialLength1<k && initialLength2<k){return;}
|
jpayne@68
|
944 final long expectedBases;
|
jpayne@68
|
945 final long imgID;
|
jpayne@68
|
946 {
|
jpayne@68
|
947 int taxID=-1;
|
jpayne@68
|
948 TaxNode tn=null;
|
jpayne@68
|
949 if(taxtree!=null && (mode==PER_TAXA || mode==PER_IMG || mode==PER_SEQUENCE || ((mode==ONE_SKETCH/* || mode==PER_HEADER*/) && smm.heap.taxID<0))){
|
jpayne@68
|
950 if(mode==PER_IMG){
|
jpayne@68
|
951 imgID=ImgRecord2.parseImgId(rid, true);
|
jpayne@68
|
952 tn=taxtree.imgToTaxNode(imgID);
|
jpayne@68
|
953 if(tn==null){tn=taxtree.parseNodeFromHeader(rid, bestEffort);}
|
jpayne@68
|
954 // assert(tn!=null || !rid.startsWith("tid")) : imgID+", "+taxID+", "+rid; //123
|
jpayne@68
|
955 }else{
|
jpayne@68
|
956 imgID=ImgRecord2.parseImgId(rid, false);
|
jpayne@68
|
957 tn=taxtree.parseNodeFromHeader(rid, bestEffort);
|
jpayne@68
|
958 // assert(tn!=null || !rid.startsWith("tid")) : imgID+", "+taxID+", "+rid; //123
|
jpayne@68
|
959 }
|
jpayne@68
|
960 // assert(false) : imgID +", "+rid;
|
jpayne@68
|
961 // System.err.println("B: "+bestEffort+", "+tn);//123
|
jpayne@68
|
962 while(tn!=null && tn.pid!=tn.id && tn.level<taxLevel){
|
jpayne@68
|
963 TaxNode temp=taxtree.getNode(tn.pid);
|
jpayne@68
|
964 if(temp==null || temp.level>=TaxTree.LIFE){break;}
|
jpayne@68
|
965 tn=temp;
|
jpayne@68
|
966 }
|
jpayne@68
|
967 // assert(tn!=null) : imgID+", "+taxID+", "+rid; //123
|
jpayne@68
|
968 if(tn!=null){taxID=tn.id;}
|
jpayne@68
|
969 // System.err.println("Node: "+rid+"\n->\n"+tn);
|
jpayne@68
|
970
|
jpayne@68
|
971 // assert(taxID>0) : imgID+", "+taxID+", "+rid; //123
|
jpayne@68
|
972 }else{
|
jpayne@68
|
973 imgID=-1;
|
jpayne@68
|
974 }
|
jpayne@68
|
975
|
jpayne@68
|
976 final long unitSizeBases;
|
jpayne@68
|
977 if(sizeList!=null){
|
jpayne@68
|
978 unitSizeBases=taxID<0 ? -1 : sizeList.get(taxID);
|
jpayne@68
|
979 }else if(sizeMap!=null){
|
jpayne@68
|
980 unitSizeBases=sizeMap.get(imgID);
|
jpayne@68
|
981 }else{
|
jpayne@68
|
982 unitSizeBases=-1;
|
jpayne@68
|
983 }
|
jpayne@68
|
984
|
jpayne@68
|
985
|
jpayne@68
|
986 if(mode==PER_TAXA){
|
jpayne@68
|
987 if(tossJunk && tn==null){return;}
|
jpayne@68
|
988 if(tn!=null){
|
jpayne@68
|
989 if(taxID==0 || (tn.level>taxLevel && tn.level>=TaxTree.PHYLUM)){return;}
|
jpayne@68
|
990 TaxNode parent=taxtree.getNode(tn.pid);
|
jpayne@68
|
991 if(parent.pid==parent.id){return;}
|
jpayne@68
|
992 if(prefilter && unitSizeBases>=0 && unitSizeBases<minSizeBases){return;}
|
jpayne@68
|
993 if(tossJunk){
|
jpayne@68
|
994 if(parent.level==TaxTree.LIFE){return;}
|
jpayne@68
|
995 if(tn.level==TaxTree.NO_RANK){return;}
|
jpayne@68
|
996 }
|
jpayne@68
|
997 }
|
jpayne@68
|
998 }
|
jpayne@68
|
999
|
jpayne@68
|
1000 if(mode==PER_SEQUENCE){
|
jpayne@68
|
1001 expectedBases=initialLength1+initialLength2;
|
jpayne@68
|
1002 if(expectedBases<minSizeBases){return;}
|
jpayne@68
|
1003 int expectedSketchSize=toSketchSize(expectedBases, -1, -1, targetSketchSize);
|
jpayne@68
|
1004 if(expectedSketchSize<minSketchSize){return;}
|
jpayne@68
|
1005 if(smm.heap==null || smm.heap.capacity()<expectedSketchSize){
|
jpayne@68
|
1006 smm.heap=new SketchHeap(expectedSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
|
jpayne@68
|
1007 }
|
jpayne@68
|
1008 }else if(mode==PER_TAXA){
|
jpayne@68
|
1009 if(taxID>=0 && localMap.containsKey((long)taxID)){
|
jpayne@68
|
1010 smm.heap=localMap.get((long)taxID);
|
jpayne@68
|
1011 assert(smm.heap.taxID==taxID);
|
jpayne@68
|
1012 }else if(sizeList==null){
|
jpayne@68
|
1013 if(smm.heap==null){smm.heap=new SketchHeap(targetSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());}
|
jpayne@68
|
1014 }else{
|
jpayne@68
|
1015 expectedBases=unitSizeBases>-1 ? unitSizeBases : initialLength1+initialLength2;
|
jpayne@68
|
1016 if(expectedBases<minSizeBases){return;}
|
jpayne@68
|
1017 int expectedSketchSize=toSketchSize(expectedBases, -1, -1, targetSketchSize);
|
jpayne@68
|
1018 if(expectedSketchSize<minSketchSize){return;}
|
jpayne@68
|
1019 if(smm.heap==null || smm.heap.capacity()!=expectedSketchSize){
|
jpayne@68
|
1020 smm.heap=new SketchHeap(expectedSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
|
jpayne@68
|
1021 }
|
jpayne@68
|
1022 // assert(taxID!=96897) : taxID+", "+expectedBases+", "+expectedSketchSize+", "+unitSizeBases+", "+initialLength1+", "+sizeList.get(taxID);
|
jpayne@68
|
1023 }
|
jpayne@68
|
1024 assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID;
|
jpayne@68
|
1025 }else if(mode==PER_IMG){
|
jpayne@68
|
1026 if(sizeMap==null){
|
jpayne@68
|
1027 if(smm.heap==null){smm.heap=new SketchHeap(targetSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());}
|
jpayne@68
|
1028 }else{
|
jpayne@68
|
1029 expectedBases=unitSizeBases>-1 ? unitSizeBases : initialLength1+initialLength2;
|
jpayne@68
|
1030 if(expectedBases<minSizeBases){return;}
|
jpayne@68
|
1031 int expectedSketchSize=toSketchSize(expectedBases, -1, -1, targetSketchSize);
|
jpayne@68
|
1032 if(expectedSketchSize<minSketchSize){return;}
|
jpayne@68
|
1033 if(smm.heap==null || smm.heap.capacity()!=expectedSketchSize){
|
jpayne@68
|
1034 smm.heap=new SketchHeap(expectedSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
|
jpayne@68
|
1035 }
|
jpayne@68
|
1036 }
|
jpayne@68
|
1037 }
|
jpayne@68
|
1038
|
jpayne@68
|
1039 assert(smm.heap!=null) : mode+", "+(sizeList==null);
|
jpayne@68
|
1040 assert(taxID<0 || smm.heap.taxID<0 || smm.heap.taxID==taxID); //This is important.
|
jpayne@68
|
1041 assert(imgID<0 || smm.heap.imgID<0 || smm.heap.imgID==imgID);
|
jpayne@68
|
1042
|
jpayne@68
|
1043 if(smm.heap.taxID<0){smm.heap.taxID=taxID;}
|
jpayne@68
|
1044 if(smm.heap.imgID<0){smm.heap.imgID=imgID;}
|
jpayne@68
|
1045 if(smm.heap.name0()==null){
|
jpayne@68
|
1046 smm.heap.setName0(rid);
|
jpayne@68
|
1047 }
|
jpayne@68
|
1048 if(smm.heap.taxName()==null && tn!=null){
|
jpayne@68
|
1049 smm.heap.setTaxName(tn.name);
|
jpayne@68
|
1050 }
|
jpayne@68
|
1051
|
jpayne@68
|
1052 if(!bestEffort && tn==null){//Try to get a higher-level node
|
jpayne@68
|
1053 TaxNode tn2=taxtree.parseNodeFromHeader(rid, true);
|
jpayne@68
|
1054 if(tn2!=null){
|
jpayne@68
|
1055 while(tn2!=null && tn2.pid!=tn2.id && tn2.level<taxLevel){
|
jpayne@68
|
1056 TaxNode temp=taxtree.getNode(tn2.pid);
|
jpayne@68
|
1057 if(temp==null || temp.level>=TaxTree.LIFE){break;}
|
jpayne@68
|
1058 tn2=temp;
|
jpayne@68
|
1059 }
|
jpayne@68
|
1060 if(tn2.level<=taxLevel){
|
jpayne@68
|
1061 taxID=tn2.id;
|
jpayne@68
|
1062 }
|
jpayne@68
|
1063 if(smm.heap.taxID<0){smm.heap.taxID=tn2.id;}
|
jpayne@68
|
1064 if(smm.heap.taxName()==null && tn2!=null){
|
jpayne@68
|
1065 smm.heap.setTaxName(tn2.name);
|
jpayne@68
|
1066 }
|
jpayne@68
|
1067 }
|
jpayne@68
|
1068 }
|
jpayne@68
|
1069 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
|
jpayne@68
|
1070
|
jpayne@68
|
1071 assert(smm.heap.taxID<0 || smm.heap.taxName()!=null) : smm.heap.taxID+", "+smm.heap.taxName()+", "+smm.heap.name()+", "+tn;
|
jpayne@68
|
1072
|
jpayne@68
|
1073 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
|
jpayne@68
|
1074 if(initialLength1>=k){smm.processRead(r1);}
|
jpayne@68
|
1075 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
|
jpayne@68
|
1076 if(initialLength2>=k){smm.processRead(r2);}
|
jpayne@68
|
1077 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
|
jpayne@68
|
1078
|
jpayne@68
|
1079
|
jpayne@68
|
1080 if(mode==PER_SEQUENCE){
|
jpayne@68
|
1081 manageHeap_perSequence();
|
jpayne@68
|
1082 }else if(mode==PER_TAXA || mode==PER_IMG){
|
jpayne@68
|
1083 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
|
jpayne@68
|
1084 manageHeap_perTaxa(taxID, imgID, unitSizeBases);
|
jpayne@68
|
1085 if(localMap.size()>20){dumpLocalMap();}
|
jpayne@68
|
1086 }else if(mode==ONE_SKETCH/* || mode==PER_HEADER*/){
|
jpayne@68
|
1087 //do nothing
|
jpayne@68
|
1088 }else{
|
jpayne@68
|
1089 assert(false) : mode;
|
jpayne@68
|
1090 }
|
jpayne@68
|
1091 }
|
jpayne@68
|
1092 }
|
jpayne@68
|
1093
|
jpayne@68
|
1094 private void manageHeap_perSequence(){
|
jpayne@68
|
1095 assert(mode==PER_SEQUENCE);
|
jpayne@68
|
1096 writeHeap(smm.heap);
|
jpayne@68
|
1097 }
|
jpayne@68
|
1098
|
jpayne@68
|
1099 private void manageHeap_perTaxa(final int taxID, final long imgID, final long unitSizeBases){
|
jpayne@68
|
1100 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
|
jpayne@68
|
1101 assert(mode==PER_TAXA || mode==PER_IMG);
|
jpayne@68
|
1102
|
jpayne@68
|
1103 if(smm.heap.size()<=0 || ((taxID<0 && imgID<0) && smm.heap.genomeSizeKmers<minSizeKmers)){//Discard
|
jpayne@68
|
1104 assert(!localMap.containsKey((long)taxID));
|
jpayne@68
|
1105 smm.heap.clear(true);
|
jpayne@68
|
1106 return;
|
jpayne@68
|
1107 }
|
jpayne@68
|
1108
|
jpayne@68
|
1109 //TODO:
|
jpayne@68
|
1110 //I could, at this point, write to disk if smm.heap.genomeSize==taxSize.
|
jpayne@68
|
1111 //Or if the taxID is unknown.
|
jpayne@68
|
1112
|
jpayne@68
|
1113 final boolean known=(taxID>=0 || imgID>=0);
|
jpayne@68
|
1114 final boolean unknown=!known;
|
jpayne@68
|
1115 final boolean hasSize=(known && (sizeList!=null || sizeMap!=null));
|
jpayne@68
|
1116 boolean finished=(unknown || (hasSize && smm.heap.genomeSizeBases>=unitSizeBases));
|
jpayne@68
|
1117
|
jpayne@68
|
1118 //For some reason, this assertion fired for a single taxID (52271) halfway through sketching RefSeq.
|
jpayne@68
|
1119 //Likely a transient hardware error.
|
jpayne@68
|
1120 assert(!finished || smm.heap.genomeSizeBases==unitSizeBases) : finished+", "+unknown+", "+hasSize+", "+(sizeList==null)+"\n"
|
jpayne@68
|
1121 +taxID+", "+unitSizeBases+", "+smm.heap.genomeSizeBases+", "+smm.heap.genomeSizeKmers;
|
jpayne@68
|
1122 // if(finished && smm.heap.genomeSizeBases!=unitSizeBases){
|
jpayne@68
|
1123 // System.err.println("Warning: tid="+taxID+", finished="+finished+", known="+known+
|
jpayne@68
|
1124 // ", smm.heap.genomeSizeBases="+smm.heap.genomeSizeBases+", unitSizeBases="+unitSizeBases);
|
jpayne@68
|
1125 // }
|
jpayne@68
|
1126
|
jpayne@68
|
1127 smm.heap.taxID=taxID;
|
jpayne@68
|
1128 smm.heap.imgID=imgID;
|
jpayne@68
|
1129
|
jpayne@68
|
1130 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
|
jpayne@68
|
1131
|
jpayne@68
|
1132 final Long key;
|
jpayne@68
|
1133 if(imgID>-1 && (mode==PER_IMG || taxID<1)){key=imgID;}
|
jpayne@68
|
1134 else if(taxID>-1){key=(long)taxID;}
|
jpayne@68
|
1135 else{key=Long.valueOf(nextUnknown.getAndIncrement());}
|
jpayne@68
|
1136
|
jpayne@68
|
1137 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
|
jpayne@68
|
1138
|
jpayne@68
|
1139 if(unknown || finished){
|
jpayne@68
|
1140 writeHeap(smm.heap);
|
jpayne@68
|
1141 smm.heap.clear(true);
|
jpayne@68
|
1142 localMap.remove((long)taxID);
|
jpayne@68
|
1143 return;
|
jpayne@68
|
1144 }
|
jpayne@68
|
1145
|
jpayne@68
|
1146 //At this point, the taxID is known and this heap does not constitute the whole taxSize, or the taxSize is unknown.
|
jpayne@68
|
1147 if(!hasSize){
|
jpayne@68
|
1148 final HashMap<Long, SketchHeap> map=longMaps[(int)(key&MAP_MASK)];
|
jpayne@68
|
1149 final SketchHeap old;
|
jpayne@68
|
1150 synchronized(map){
|
jpayne@68
|
1151 old=map.get(key);
|
jpayne@68
|
1152 if(old==null){
|
jpayne@68
|
1153 map.put(key, smm.heap);
|
jpayne@68
|
1154 }else{
|
jpayne@68
|
1155 old.add(smm.heap);
|
jpayne@68
|
1156 }
|
jpayne@68
|
1157 }
|
jpayne@68
|
1158 if(old==null){
|
jpayne@68
|
1159 smm.heap=null;
|
jpayne@68
|
1160 }else{
|
jpayne@68
|
1161 smm.heap.clear(true);
|
jpayne@68
|
1162 assert(!localMap.containsKey((long)taxID));
|
jpayne@68
|
1163 }
|
jpayne@68
|
1164 localMap.remove((long)taxID);
|
jpayne@68
|
1165 return;
|
jpayne@68
|
1166 }
|
jpayne@68
|
1167
|
jpayne@68
|
1168 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
|
jpayne@68
|
1169
|
jpayne@68
|
1170 {
|
jpayne@68
|
1171 //At this point, the taxID is and taxSize are known, and this heap does not constitute the whole taxSize.
|
jpayne@68
|
1172 final int expectedHeapSize=toSketchSize(unitSizeBases, -1, -1, targetSketchSize);
|
jpayne@68
|
1173 assert(expectedHeapSize>=3) : expectedHeapSize;
|
jpayne@68
|
1174 // boolean writeHeap=false;
|
jpayne@68
|
1175 // boolean makeHeap=false;
|
jpayne@68
|
1176
|
jpayne@68
|
1177 SketchHeap local=null;
|
jpayne@68
|
1178 {
|
jpayne@68
|
1179 local=localMap.get(key);
|
jpayne@68
|
1180 assert(local==null || local.taxID==key.intValue());
|
jpayne@68
|
1181 if(local==smm.heap){
|
jpayne@68
|
1182 //do nothing
|
jpayne@68
|
1183 smm.heap=null;
|
jpayne@68
|
1184 }else if(local==null){
|
jpayne@68
|
1185 if(expectedHeapSize==smm.heap.capacity()){//Store the current heap
|
jpayne@68
|
1186 assert(smm.heap.taxID==key.intValue());
|
jpayne@68
|
1187 assert(smm.heap.name()!=null);
|
jpayne@68
|
1188 localMap.put(key, smm.heap);//Safe
|
jpayne@68
|
1189 smm.heap=null;
|
jpayne@68
|
1190 }else{//Store a precisely-sized heap
|
jpayne@68
|
1191 SketchHeap temp=new SketchHeap(expectedHeapSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
|
jpayne@68
|
1192 temp.add(smm.heap);
|
jpayne@68
|
1193 assert(temp.taxID==key.intValue());
|
jpayne@68
|
1194 assert(temp.name()!=null);
|
jpayne@68
|
1195 localMap.put(key, temp);//Looks safe
|
jpayne@68
|
1196 // smm.heap=null; //Not sure which is better
|
jpayne@68
|
1197 smm.heap.clear(true);
|
jpayne@68
|
1198 }
|
jpayne@68
|
1199 }else{
|
jpayne@68
|
1200 assert(local.taxID==smm.heap.taxID);
|
jpayne@68
|
1201 assert(local!=smm.heap);
|
jpayne@68
|
1202 // assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID+", "+key; //123
|
jpayne@68
|
1203 // assert(false) : taxID+", "+key;
|
jpayne@68
|
1204 local.add(smm.heap); //This WAS the slow line. It should no longer be executed.
|
jpayne@68
|
1205 // if(local.genomeSizeBases>=unitSizeBases){
|
jpayne@68
|
1206 // writeHeap=true;
|
jpayne@68
|
1207 // localMap.remove(key);
|
jpayne@68
|
1208 // }
|
jpayne@68
|
1209 // smm.heap=null; //Not sure which is better
|
jpayne@68
|
1210 smm.heap.clear(true);
|
jpayne@68
|
1211 }
|
jpayne@68
|
1212 }
|
jpayne@68
|
1213 }
|
jpayne@68
|
1214 }
|
jpayne@68
|
1215
|
jpayne@68
|
1216 private void dumpLocalMap(){
|
jpayne@68
|
1217
|
jpayne@68
|
1218 for(Entry<Long, SketchHeap> e : localMap.entrySet()){
|
jpayne@68
|
1219 boolean writeHeap=false;
|
jpayne@68
|
1220 final Long key=e.getKey();
|
jpayne@68
|
1221 final SketchHeap localHeap=e.getValue();
|
jpayne@68
|
1222 final HashMap<Long, SketchHeap> map=longMaps[(int)(key&MAP_MASK)];
|
jpayne@68
|
1223 final SketchHeap old;
|
jpayne@68
|
1224
|
jpayne@68
|
1225 final long unitSizeBases;
|
jpayne@68
|
1226 if(sizeList!=null){
|
jpayne@68
|
1227 unitSizeBases=localHeap.taxID<0 ? -1 : sizeList.get((int)localHeap.taxID);
|
jpayne@68
|
1228 }else if(sizeMap!=null){
|
jpayne@68
|
1229 unitSizeBases=sizeMap.get(localHeap.imgID);
|
jpayne@68
|
1230 }else{
|
jpayne@68
|
1231 unitSizeBases=-1;
|
jpayne@68
|
1232 }
|
jpayne@68
|
1233 final int expectedHeapSize=toSketchSize(unitSizeBases, -1, -1, targetSketchSize);
|
jpayne@68
|
1234
|
jpayne@68
|
1235 synchronized(map){
|
jpayne@68
|
1236 old=map.get(key);
|
jpayne@68
|
1237 if(old==null){
|
jpayne@68
|
1238 if(expectedHeapSize==localHeap.capacity()){//Store the current heap
|
jpayne@68
|
1239 map.put(key, localHeap);
|
jpayne@68
|
1240 }else{//Store a precisely-sized heap
|
jpayne@68
|
1241 // assert(key.intValue()!=96897) : key+", "+unitSizeBases+", "+", "+sizeList.get(key.intValue());
|
jpayne@68
|
1242 assert(expectedHeapSize>0) : expectedHeapSize+", "+key+", "+localHeap.taxID+", "+localHeap.name()+", "+unitSizeBases;
|
jpayne@68
|
1243 SketchHeap temp=new SketchHeap(expectedHeapSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
|
jpayne@68
|
1244 temp.add(localHeap);
|
jpayne@68
|
1245 map.put(key, temp);
|
jpayne@68
|
1246 }
|
jpayne@68
|
1247 }else{
|
jpayne@68
|
1248 old.add(localHeap); //This was the slow line; should be faster now.
|
jpayne@68
|
1249 if(old.genomeSizeBases>=unitSizeBases){
|
jpayne@68
|
1250 writeHeap=true;
|
jpayne@68
|
1251 map.remove(key);
|
jpayne@68
|
1252 }
|
jpayne@68
|
1253 }
|
jpayne@68
|
1254 }
|
jpayne@68
|
1255
|
jpayne@68
|
1256 if(writeHeap){
|
jpayne@68
|
1257 assert(old!=null); //For compiler
|
jpayne@68
|
1258 assert(old.genomeSizeBases>0) : unitSizeBases+", "+old.genomeSizeBases+", "+old.genomeSizeKmers;
|
jpayne@68
|
1259 assert(old.genomeSizeBases==unitSizeBases) : unitSizeBases+", "+old.genomeSizeBases+", "+old.genomeSizeKmers+", "+old.size()+", "+old.taxID;
|
jpayne@68
|
1260 writeHeap(old);
|
jpayne@68
|
1261 }
|
jpayne@68
|
1262 }
|
jpayne@68
|
1263 localMap.clear();
|
jpayne@68
|
1264 }
|
jpayne@68
|
1265
|
jpayne@68
|
1266 private boolean writeHeap(SketchHeap heap){
|
jpayne@68
|
1267 sketchesMadeT++;
|
jpayne@68
|
1268 // assert(heap.size()>0) : heap.size(); //Not really necessary
|
jpayne@68
|
1269 boolean written=false;
|
jpayne@68
|
1270 // assert(heap.heap.size()==heap.set.size()) : heap.heap.size()+", "+heap.set.size();
|
jpayne@68
|
1271 if(heap.size()>0 && heap.genomeSizeKmers>=minSizeKmers && heap.sketchSizeEstimate()>0){
|
jpayne@68
|
1272 Sketch sketch=new Sketch(heap, true, tool.trackCounts, outMeta);
|
jpayne@68
|
1273 if(outTaxID>=0 && sketch.taxID<0){sketch.taxID=outTaxID;}
|
jpayne@68
|
1274 if(outTaxName!=null && sketch.taxName()==null){sketch.setTaxName(outTaxName);}
|
jpayne@68
|
1275 if(outFname!=null && sketch.fname()==null){sketch.setFname(outFname);}
|
jpayne@68
|
1276 if(outName0!=null && sketch.name0()==null){sketch.setName0(outName0);}
|
jpayne@68
|
1277 if(outSpid>=0 && sketch.spid<0){sketch.spid=outSpid;}
|
jpayne@68
|
1278 if(outImgID>=0 && sketch.imgID<0){sketch.imgID=outImgID;}
|
jpayne@68
|
1279 if(parseSubunit && sketch.name0()!=null){
|
jpayne@68
|
1280 if(outMeta!=null){
|
jpayne@68
|
1281 sketch.meta=(ArrayList<String>)sketch.meta.clone();
|
jpayne@68
|
1282 }else if(sketch.meta==null){
|
jpayne@68
|
1283 if(sketch.name0().contains("SSU_")){
|
jpayne@68
|
1284 sketch.addMeta("subunit:ssu");
|
jpayne@68
|
1285 }else if(sketch.name0().contains("LSU_")){
|
jpayne@68
|
1286 sketch.addMeta("subunit:lsu");
|
jpayne@68
|
1287 }
|
jpayne@68
|
1288 }
|
jpayne@68
|
1289 }
|
jpayne@68
|
1290 if(tsw!=null){
|
jpayne@68
|
1291 final int choice=(sketch.hashCode()&Integer.MAX_VALUE)%files;
|
jpayne@68
|
1292 sketch.addSSU();
|
jpayne@68
|
1293 synchronized(tsw[choice]){
|
jpayne@68
|
1294 SketchTool.write(sketch, tsw[choice], bb);
|
jpayne@68
|
1295 sketchesWrittenT++;
|
jpayne@68
|
1296 written=true;
|
jpayne@68
|
1297 }
|
jpayne@68
|
1298 }
|
jpayne@68
|
1299 }else{
|
jpayne@68
|
1300 heap.clear(true);
|
jpayne@68
|
1301 }
|
jpayne@68
|
1302 assert(heap.genomeSizeBases==0);
|
jpayne@68
|
1303 assert(heap.genomeSequences==0);
|
jpayne@68
|
1304 return written;
|
jpayne@68
|
1305 }
|
jpayne@68
|
1306
|
jpayne@68
|
1307 /** Number of reads processed by this thread */
|
jpayne@68
|
1308 protected long readsProcessedT=0;
|
jpayne@68
|
1309 /** Number of bases processed by this thread */
|
jpayne@68
|
1310 protected long basesProcessedT=0;
|
jpayne@68
|
1311
|
jpayne@68
|
1312 long sketchesMadeT=0;
|
jpayne@68
|
1313 long sketchesWrittenT=0;
|
jpayne@68
|
1314
|
jpayne@68
|
1315 /** True only if this thread has completed successfully */
|
jpayne@68
|
1316 boolean success=false;
|
jpayne@68
|
1317
|
jpayne@68
|
1318 /** Shared input stream */
|
jpayne@68
|
1319 private final ConcurrentReadInputStream cris;
|
jpayne@68
|
1320 /** Thread ID */
|
jpayne@68
|
1321 final int threadID;
|
jpayne@68
|
1322 private ByteBuilder bb=new ByteBuilder();
|
jpayne@68
|
1323
|
jpayne@68
|
1324 final SketchMakerMini smm;
|
jpayne@68
|
1325 final HashMap<Long, SketchHeap> localMap;
|
jpayne@68
|
1326 }
|
jpayne@68
|
1327
|
jpayne@68
|
1328 /*--------------------------------------------------------------*/
|
jpayne@68
|
1329 /*---------------- Fields ----------------*/
|
jpayne@68
|
1330 /*--------------------------------------------------------------*/
|
jpayne@68
|
1331
|
jpayne@68
|
1332 /** Primary input file path */
|
jpayne@68
|
1333 private String in1=null;
|
jpayne@68
|
1334 /** Secondary input file path */
|
jpayne@68
|
1335 private String in2=null;
|
jpayne@68
|
1336
|
jpayne@68
|
1337 /** Primary output file path */
|
jpayne@68
|
1338 private String out1=null;
|
jpayne@68
|
1339
|
jpayne@68
|
1340 /** Override input file extension */
|
jpayne@68
|
1341 private String extin=null;
|
jpayne@68
|
1342
|
jpayne@68
|
1343 private String giTableFile=null;
|
jpayne@68
|
1344 private String taxTreeFile=null;
|
jpayne@68
|
1345 private String accessionFile=null;
|
jpayne@68
|
1346 private String imgFile=null;
|
jpayne@68
|
1347
|
jpayne@68
|
1348 /*Override metadata */
|
jpayne@68
|
1349 String outTaxName=null;
|
jpayne@68
|
1350 String outFname=null;
|
jpayne@68
|
1351 String outName0=null;
|
jpayne@68
|
1352 int outTaxID=-1;
|
jpayne@68
|
1353 long outSpid=-1;
|
jpayne@68
|
1354 long outImgID=-1;
|
jpayne@68
|
1355 ArrayList<String> outMeta=null;
|
jpayne@68
|
1356 static boolean parseSubunit=false;
|
jpayne@68
|
1357
|
jpayne@68
|
1358 /*--------------------------------------------------------------*/
|
jpayne@68
|
1359
|
jpayne@68
|
1360 /** Number of reads processed */
|
jpayne@68
|
1361 protected long readsProcessed=0;
|
jpayne@68
|
1362 /** Number of bases processed */
|
jpayne@68
|
1363 protected long basesProcessed=0;
|
jpayne@68
|
1364 /** Number of bases processed */
|
jpayne@68
|
1365 protected long kmersProcessed=0;
|
jpayne@68
|
1366 /** Number of sketches started */
|
jpayne@68
|
1367 protected long sketchesMade=0;
|
jpayne@68
|
1368 /** Number of sketches written */
|
jpayne@68
|
1369 protected long sketchesWritten=0;
|
jpayne@68
|
1370
|
jpayne@68
|
1371 /** Quit after processing this many input reads; -1 means no limit */
|
jpayne@68
|
1372 private long maxReads=-1;
|
jpayne@68
|
1373
|
jpayne@68
|
1374 final LongList sizeList;
|
jpayne@68
|
1375 final HashMap<Long, Long> sizeMap;
|
jpayne@68
|
1376
|
jpayne@68
|
1377 private HashMap<Long, SketchHeap> longMaps[];
|
jpayne@68
|
1378 private ByteStreamWriter tsw[];
|
jpayne@68
|
1379
|
jpayne@68
|
1380 /*--------------------------------------------------------------*/
|
jpayne@68
|
1381 /*---------------- Final Fields ----------------*/
|
jpayne@68
|
1382 /*--------------------------------------------------------------*/
|
jpayne@68
|
1383
|
jpayne@68
|
1384 /** Primary input file */
|
jpayne@68
|
1385 private final FileFormat ffin1;
|
jpayne@68
|
1386 /** Secondary input file */
|
jpayne@68
|
1387 private final FileFormat ffin2;
|
jpayne@68
|
1388
|
jpayne@68
|
1389 /** Primary output files */
|
jpayne@68
|
1390 private final FileFormat ffout[];
|
jpayne@68
|
1391 /** Number of output files */
|
jpayne@68
|
1392 private final int files;
|
jpayne@68
|
1393
|
jpayne@68
|
1394 final int mode;
|
jpayne@68
|
1395
|
jpayne@68
|
1396 private final SketchTool tool;
|
jpayne@68
|
1397
|
jpayne@68
|
1398 /** Don't make sketches from sequences smaller than this */
|
jpayne@68
|
1399 final int minSizeBases;
|
jpayne@68
|
1400 /** Don't make sketches from sequences smaller than this */
|
jpayne@68
|
1401 final int minSizeKmers;
|
jpayne@68
|
1402
|
jpayne@68
|
1403 private int taxLevel=1;
|
jpayne@68
|
1404 private boolean prefilter=false;
|
jpayne@68
|
1405 private boolean tossJunk=true;
|
jpayne@68
|
1406 boolean bestEffort=true;
|
jpayne@68
|
1407 // private final HashMap<Integer, byte[]> ssuMap=null;
|
jpayne@68
|
1408
|
jpayne@68
|
1409 private final AtomicInteger nextUnknown=new AtomicInteger(minFakeID);
|
jpayne@68
|
1410
|
jpayne@68
|
1411 private static final int MAP_WAYS=32;
|
jpayne@68
|
1412 private static final int MAP_MASK=MAP_WAYS-1;
|
jpayne@68
|
1413
|
jpayne@68
|
1414
|
jpayne@68
|
1415 /*--------------------------------------------------------------*/
|
jpayne@68
|
1416 /*---------------- Common Fields ----------------*/
|
jpayne@68
|
1417 /*--------------------------------------------------------------*/
|
jpayne@68
|
1418
|
jpayne@68
|
1419 /** Print status messages to this output stream */
|
jpayne@68
|
1420 private PrintStream outstream=System.err;
|
jpayne@68
|
1421 /** Print verbose messages */
|
jpayne@68
|
1422 public static boolean verbose=false;
|
jpayne@68
|
1423 /** True if an error was encountered */
|
jpayne@68
|
1424 public boolean errorState=false;
|
jpayne@68
|
1425 /** Overwrite existing output files */
|
jpayne@68
|
1426 private boolean overwrite=false;
|
jpayne@68
|
1427 /** Append to existing output files */
|
jpayne@68
|
1428 private boolean append=false;
|
jpayne@68
|
1429
|
jpayne@68
|
1430 }
|