annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/SketchMaker.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 package sketch;
jpayne@68 2
jpayne@68 3 import java.io.File;
jpayne@68 4 import java.io.PrintStream;
jpayne@68 5 import java.util.ArrayDeque;
jpayne@68 6 import java.util.ArrayList;
jpayne@68 7 import java.util.Arrays;
jpayne@68 8 import java.util.HashMap;
jpayne@68 9 import java.util.Map.Entry;
jpayne@68 10 import java.util.concurrent.atomic.AtomicInteger;
jpayne@68 11
jpayne@68 12 import fileIO.ByteFile;
jpayne@68 13 import fileIO.ByteStreamWriter;
jpayne@68 14 import fileIO.FileFormat;
jpayne@68 15 import fileIO.ReadWrite;
jpayne@68 16 import shared.Parse;
jpayne@68 17 import shared.Parser;
jpayne@68 18 import shared.PreParser;
jpayne@68 19 import shared.ReadStats;
jpayne@68 20 import shared.Shared;
jpayne@68 21 import shared.Timer;
jpayne@68 22 import shared.Tools;
jpayne@68 23 import stream.ConcurrentReadInputStream;
jpayne@68 24 import stream.FASTQ;
jpayne@68 25 import stream.FastaReadInputStream;
jpayne@68 26 import stream.Read;
jpayne@68 27 import structures.ByteBuilder;
jpayne@68 28 import structures.ListNum;
jpayne@68 29 import structures.LongList;
jpayne@68 30 import tax.AccessionToTaxid;
jpayne@68 31 import tax.GiToTaxid;
jpayne@68 32 import tax.ImgRecord2;
jpayne@68 33 import tax.TaxNode;
jpayne@68 34 import tax.TaxTree;
jpayne@68 35
jpayne@68 36 /**
jpayne@68 37 * Creates MinHashSketches rapidly.
jpayne@68 38 *
jpayne@68 39 * @author Brian Bushnell
jpayne@68 40 * @date July 6, 2016
jpayne@68 41 *
jpayne@68 42 */
jpayne@68 43 public class SketchMaker extends SketchObject {
jpayne@68 44
jpayne@68 45 /*--------------------------------------------------------------*/
jpayne@68 46 /*---------------- Initialization ----------------*/
jpayne@68 47 /*--------------------------------------------------------------*/
jpayne@68 48
jpayne@68 49 /**
jpayne@68 50 * Code entrance from the command line.
jpayne@68 51 * @param args Command line arguments
jpayne@68 52 */
jpayne@68 53 public static void main(String[] args){
jpayne@68 54 //Start a timer immediately upon code entrance.
jpayne@68 55 Timer t=new Timer();
jpayne@68 56
jpayne@68 57 final int mode=parseMode(args);
jpayne@68 58 if(mode==PER_FILE/* || mode==ONE_SKETCH || mode==PER_HEADER*/ || mode==PER_SEQUENCE){//ONE_SKETCH does not work for multiple input files.
jpayne@68 59 recallCompareSketch(args);
jpayne@68 60 return;
jpayne@68 61 }
jpayne@68 62
jpayne@68 63 final int oldBufLen=Shared.bufferLen();
jpayne@68 64
jpayne@68 65 //Create an instance of this class
jpayne@68 66 SketchMaker x=new SketchMaker(args);
jpayne@68 67
jpayne@68 68 //Run the object
jpayne@68 69 x.process(t);
jpayne@68 70
jpayne@68 71 Shared.setBufferLen(oldBufLen);
jpayne@68 72
jpayne@68 73 //Close the print stream if it was redirected
jpayne@68 74 Shared.closeStream(x.outstream);
jpayne@68 75 }
jpayne@68 76
jpayne@68 77 private static void recallCompareSketch(String[] args){
jpayne@68 78 ArrayList<String> list=new ArrayList<String>(args.length+1);
jpayne@68 79 for(int i=0; i<args.length; i++){
jpayne@68 80 if(args[i].startsWith("out=")){
jpayne@68 81 args[i]=args[i].replaceFirst("out=", "outsketch=");
jpayne@68 82 }
jpayne@68 83 list.add(args[i]);
jpayne@68 84 }
jpayne@68 85 list.add("sketchonly");
jpayne@68 86 CompareSketch.main(list.toArray(new String[0]));
jpayne@68 87 }
jpayne@68 88
jpayne@68 89 /**
jpayne@68 90 * Constructor.
jpayne@68 91 * @param args Command line arguments
jpayne@68 92 */
jpayne@68 93 public SketchMaker(String[] args){
jpayne@68 94
jpayne@68 95 {//Preparse block for help, config files, and outstream
jpayne@68 96 PreParser pp=new PreParser(args, getClass(), false);
jpayne@68 97 args=pp.args;
jpayne@68 98 outstream=pp.outstream;
jpayne@68 99 }
jpayne@68 100
jpayne@68 101 //Set shared static variables
jpayne@68 102 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
jpayne@68 103 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
jpayne@68 104
jpayne@68 105 //Create a parser object
jpayne@68 106 Parser parser=new Parser();
jpayne@68 107
jpayne@68 108 int minSizeKmers_=100;
jpayne@68 109 int files_=1;
jpayne@68 110 int mode_=ONE_SKETCH;
jpayne@68 111 hashNames=true;
jpayne@68 112 boolean setPrefilter=false;
jpayne@68 113 defaultParams.printDepth=defaultParams.printDepth2=defaultParams.printVolume=false;
jpayne@68 114
jpayne@68 115 //Parse each argument
jpayne@68 116 for(int i=0; i<args.length; i++){
jpayne@68 117 String arg=args[i];
jpayne@68 118
jpayne@68 119 //Break arguments into their constituent parts, in the form of "a=b"
jpayne@68 120 String[] split=arg.split("=");
jpayne@68 121 String a=split[0].toLowerCase();
jpayne@68 122 String b=split.length>1 ? split[1] : null;
jpayne@68 123
jpayne@68 124 if(a.equals("verbose")){
jpayne@68 125 verbose=Parse.parseBoolean(b);
jpayne@68 126 }else if(a.equals("files")){
jpayne@68 127 files_=Integer.parseInt(b);
jpayne@68 128 }else if(a.equals("minsize")){
jpayne@68 129 minSizeKmers_=Parse.parseIntKMG(b);
jpayne@68 130 }else if(a.equals("prefilter")){
jpayne@68 131 prefilter=Parse.parseBoolean(b);
jpayne@68 132 setPrefilter=true;
jpayne@68 133 }
jpayne@68 134
jpayne@68 135 else if(a.equals("name") || a.equals("taxname")){
jpayne@68 136 outTaxName=b;
jpayne@68 137 }else if(a.equals("name0")){
jpayne@68 138 outName0=b;
jpayne@68 139 }else if(a.equals("fname")){
jpayne@68 140 outFname=b;
jpayne@68 141 }else if(a.equals("taxid") || a.equals("tid")){
jpayne@68 142 outTaxID=Integer.parseInt(b);
jpayne@68 143 }else if(a.equals("spid")){
jpayne@68 144 outSpid=Integer.parseInt(b);
jpayne@68 145 }else if(a.equals("imgid")){
jpayne@68 146 outImgID=Integer.parseInt(b);
jpayne@68 147 }else if((a.startsWith("meta_") || a.startsWith("mt_")) && b!=null){
jpayne@68 148 if(outMeta==null){outMeta=new ArrayList<String>();}
jpayne@68 149 int underscore=a.indexOf('_', 0);
jpayne@68 150 outMeta.add(a.substring(underscore+1)+":"+b);
jpayne@68 151 }else if(a.equals("parsesubunit")){
jpayne@68 152 parseSubunit=Parse.parseBoolean(b);
jpayne@68 153 }
jpayne@68 154
jpayne@68 155 else if(parseMode(arg, a, b)>-1){
jpayne@68 156 mode_=parseMode(arg, a, b);
jpayne@68 157 }else if(a.equals("parse_flag_goes_here")){
jpayne@68 158 long fake_variable=Parse.parseKMG(b);
jpayne@68 159 //Set a variable here
jpayne@68 160 }
jpayne@68 161
jpayne@68 162 else if(a.equals("table") || a.equals("gi") || a.equals("gitable")){
jpayne@68 163 giTableFile=b;
jpayne@68 164 }else if(a.equals("taxtree") || a.equals("tree")){
jpayne@68 165 taxTreeFile=b;
jpayne@68 166 }else if(a.equals("accession")){
jpayne@68 167 accessionFile=b;
jpayne@68 168 }else if(a.equalsIgnoreCase("img") || a.equals("imgfile") || a.equals("imgdump")){
jpayne@68 169 imgFile=b;
jpayne@68 170 }
jpayne@68 171
jpayne@68 172 else if(a.equals("tossjunk")){
jpayne@68 173 tossJunk=Parse.parseBoolean(b);
jpayne@68 174 }
jpayne@68 175
jpayne@68 176 // else if(a.equals("silva")){//Handled by parser
jpayne@68 177 // TaxTree.SILVA_MODE=Parse.parseBoolean(b);
jpayne@68 178 // }
jpayne@68 179
jpayne@68 180 else if(a.equals("taxlevel") || a.equals("tl") || a.equals("level") || a.equals("lv")){
jpayne@68 181 taxLevel=TaxTree.parseLevel(b);
jpayne@68 182 }
jpayne@68 183
jpayne@68 184 else if(parseSketchFlags(arg, a, b)){
jpayne@68 185 //do nothing
jpayne@68 186 // System.err.println("a: "+arg);
jpayne@68 187 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
jpayne@68 188 //do nothing
jpayne@68 189 // System.err.println("b: "+arg);
jpayne@68 190 }else if(defaultParams.parse(arg, a, b)){
jpayne@68 191 //do nothing
jpayne@68 192 // System.err.println("c: "+arg);
jpayne@68 193 // System.err.println(defaultParams.printDepth);
jpayne@68 194 }
jpayne@68 195
jpayne@68 196 else{
jpayne@68 197 outstream.println("Unknown parameter "+args[i]);
jpayne@68 198 assert(false) : "Unknown parameter "+args[i];
jpayne@68 199 }
jpayne@68 200 }
jpayne@68 201 if("auto".equalsIgnoreCase(imgFile)){imgFile=TaxTree.defaultImgFile();}
jpayne@68 202 if("auto".equalsIgnoreCase(taxTreeFile)){taxTreeFile=TaxTree.defaultTreeFile();}
jpayne@68 203 if("auto".equalsIgnoreCase(giTableFile)){giTableFile=TaxTree.defaultTableFile();}
jpayne@68 204 if("auto".equalsIgnoreCase(accessionFile)){accessionFile=TaxTree.defaultAccessionFile();}
jpayne@68 205
jpayne@68 206 outMeta=SketchObject.fixMeta(outMeta);
jpayne@68 207
jpayne@68 208 postParse();
jpayne@68 209 minSizeKmers=minSizeKmers_;
jpayne@68 210 mode=mode_;
jpayne@68 211
jpayne@68 212 minSizeBases=minSizeKmers_+k-1;
jpayne@68 213
jpayne@68 214 {//Process parser fields
jpayne@68 215 Parser.processQuality();
jpayne@68 216
jpayne@68 217 maxReads=parser.maxReads;
jpayne@68 218
jpayne@68 219 overwrite=ReadStats.overwrite=parser.overwrite;
jpayne@68 220 append=ReadStats.append=parser.append;
jpayne@68 221
jpayne@68 222 in1=parser.in1;
jpayne@68 223 in2=parser.in2;
jpayne@68 224
jpayne@68 225 out1=parser.out1;
jpayne@68 226
jpayne@68 227 extin=parser.extin;
jpayne@68 228 }
jpayne@68 229 files=(out1==null ? 0 : files_);
jpayne@68 230
jpayne@68 231 if(!setPrefilter && !prefilter && (mode==PER_TAXA || mode==PER_IMG) && in1!=null && !in1.startsWith("stdin") && (AUTOSIZE || AUTOSIZE_LINEAR || targetSketchSize>200)){
jpayne@68 232 prefilter=true;
jpayne@68 233 System.err.println("Enabled prefilter due to running in per-"+(mode==PER_TAXA ? "taxa" : "IMG")+" mode; override with 'prefilter=f'.");
jpayne@68 234 }
jpayne@68 235
jpayne@68 236 assert(mode!=ONE_SKETCH || files<2) : "Multiple output files are not allowed in single-sketch mode.";
jpayne@68 237
jpayne@68 238 //Do input file # replacement
jpayne@68 239 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
jpayne@68 240 in2=in1.replace("#", "2");
jpayne@68 241 in1=in1.replace("#", "1");
jpayne@68 242 }
jpayne@68 243
jpayne@68 244 //Adjust interleaved detection based on the number of input files
jpayne@68 245 if(in2!=null){
jpayne@68 246 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
jpayne@68 247 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
jpayne@68 248 }
jpayne@68 249
jpayne@68 250 assert(FastaReadInputStream.settingsOK());
jpayne@68 251
jpayne@68 252 //Ensure there is an input file
jpayne@68 253 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
jpayne@68 254
jpayne@68 255 //Adjust the number of threads for input file reading
jpayne@68 256 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
jpayne@68 257 ByteFile.FORCE_MODE_BF2=true;
jpayne@68 258 }
jpayne@68 259
jpayne@68 260 ffout=makeFFArray(out1, files, overwrite, append);
jpayne@68 261 if(ffout==null || ffout.length<1){
jpayne@68 262 System.err.println("WARNING: No output files were specified; no sketches will be written.\n");
jpayne@68 263 }
jpayne@68 264
jpayne@68 265 // //Ensure output files can be written
jpayne@68 266 // if(!Tools.testOutputFiles(overwrite, append, false, out1)){
jpayne@68 267 // outstream.println((out1==null)+", "+out1);
jpayne@68 268 // throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out1+"\n");
jpayne@68 269 // }
jpayne@68 270
jpayne@68 271 //Ensure input files can be read
jpayne@68 272 if(!Tools.testInputFiles(false, true, in1, in2, taxTreeFile, giTableFile, imgFile, SSUMap.r16SFile, SSUMap.r18SFile)){
jpayne@68 273 throw new RuntimeException("\nCan't read some input files.\n");
jpayne@68 274 }
jpayne@68 275
jpayne@68 276 //Ensure that no file was specified multiple times
jpayne@68 277 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, taxTreeFile, giTableFile, imgFile, SSUMap.r16SFile, SSUMap.r18SFile)){
jpayne@68 278 throw new RuntimeException("\nSome file names were specified multiple times.\n");
jpayne@68 279 }
jpayne@68 280
jpayne@68 281 //Create input FileFormat objects
jpayne@68 282 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
jpayne@68 283 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
jpayne@68 284
jpayne@68 285 // assert(false) : defaultParams.trackCounts();
jpayne@68 286 tool=new SketchTool(targetSketchSize, defaultParams);
jpayne@68 287
jpayne@68 288 if(taxTreeFile!=null){setTaxtree(taxTreeFile, outstream);}
jpayne@68 289
jpayne@68 290 if(giTableFile!=null){
jpayne@68 291 loadGiToTaxid();
jpayne@68 292 }
jpayne@68 293 if(accessionFile!=null){
jpayne@68 294 AccessionToTaxid.tree=taxtree;
jpayne@68 295 outstream.println("Loading accession table.");
jpayne@68 296 AccessionToTaxid.load(accessionFile);
jpayne@68 297 System.gc();
jpayne@68 298 }
jpayne@68 299 if(imgFile!=null){
jpayne@68 300 TaxTree.loadIMG(imgFile, true, outstream);
jpayne@68 301 }
jpayne@68 302 SSUMap.load(outstream);
jpayne@68 303
jpayne@68 304 if(prefilter){
jpayne@68 305 if(mode==PER_TAXA){sizeList=sizeList(); sizeMap=null;}
jpayne@68 306 else if(mode==PER_IMG){sizeMap=sizeMap(); sizeList=null;}
jpayne@68 307 else{
jpayne@68 308 assert(false) : "Wrong mode for prefilter; should be taxa or img.";
jpayne@68 309 sizeList=null; sizeMap=null;
jpayne@68 310 }
jpayne@68 311 }else{
jpayne@68 312 sizeList=null; sizeMap=null;
jpayne@68 313 }
jpayne@68 314 }
jpayne@68 315
jpayne@68 316 private static FileFormat[] makeFFArray(String fname0, int files, boolean overwrite, boolean append){
jpayne@68 317 if(files<1 || fname0==null){return null;}
jpayne@68 318 String[] fnames=new String[files];
jpayne@68 319 FileFormat[] ff=new FileFormat[files];
jpayne@68 320 for(int i=0; i<files; i++){
jpayne@68 321 String fname=fname0;
jpayne@68 322 if(files>1){
jpayne@68 323 assert(fname.indexOf('#')>-1) : "Output name requires # symbol for multiple files.";
jpayne@68 324 fname=fname.replaceFirst("#", ""+i);
jpayne@68 325 }
jpayne@68 326 fnames[i]=fname;
jpayne@68 327 ff[i]=FileFormat.testOutput(fname, FileFormat.TEXT, null, true, overwrite, append, false);
jpayne@68 328 }
jpayne@68 329
jpayne@68 330 if(!Tools.testOutputFiles(overwrite, append, false, fnames)){
jpayne@68 331 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+Arrays.toString(fnames)+"\n");
jpayne@68 332 }
jpayne@68 333
jpayne@68 334 return ff;
jpayne@68 335 }
jpayne@68 336
jpayne@68 337 // private static ByteStreamWriter[] makeTSWArray(FileFormat[] ff){
jpayne@68 338 // if(ff==null || ff.length==0){return null;}
jpayne@68 339 // ByteStreamWriter[] tsw=new ByteStreamWriter[ff.length];
jpayne@68 340 // for(int i=0; i<ff.length; i++){
jpayne@68 341 // tsw[i]=new ByteStreamWriter(ff[i]);
jpayne@68 342 // tsw[i].start();
jpayne@68 343 // }
jpayne@68 344 // return tsw;
jpayne@68 345 // }
jpayne@68 346
jpayne@68 347 private static ByteStreamWriter[] makeTSWArray(FileFormat[] ff){
jpayne@68 348 if(ff==null || ff.length==0){return null;}
jpayne@68 349 ByteStreamWriter[] tsw=new ByteStreamWriter[ff.length];
jpayne@68 350 for(int i=0; i<ff.length; i++){
jpayne@68 351 tsw[i]=new ByteStreamWriter(ff[i]);
jpayne@68 352 tsw[i].start();
jpayne@68 353 }
jpayne@68 354 return tsw;
jpayne@68 355 }
jpayne@68 356
jpayne@68 357 /*--------------------------------------------------------------*/
jpayne@68 358 /*---------------- Outer Methods ----------------*/
jpayne@68 359 /*--------------------------------------------------------------*/
jpayne@68 360
jpayne@68 361 //Makes a list of genome sizes (bases, not kmers) per taxa.
jpayne@68 362 private LongList sizeList(){
jpayne@68 363 Timer t=new Timer();
jpayne@68 364 Shared.GC_BEFORE_PRINT_MEMORY=true;
jpayne@68 365 t.start("Making taxa prefilter.");
jpayne@68 366 //For some reason this function is very slow, using only ~95% java and ~95% bgzip CPU.
jpayne@68 367 //But that's about the same speed as reformat... BBDuk is only a little faster.
jpayne@68 368
jpayne@68 369 // assert(false) : ReadWrite.USE_PIGZ+", "+ReadWrite.USE_UNPIGZ+", "+ReadWrite.MAX_ZIP_THREADS+", "+Shared.threads();
jpayne@68 370
jpayne@68 371 LongList sizes=new LongList();
jpayne@68 372
jpayne@68 373 //Create a read input stream
jpayne@68 374 final ConcurrentReadInputStream cris;
jpayne@68 375 {
jpayne@68 376 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null);
jpayne@68 377 // if(defaultParams.samplerate!=1){cris.setSampleRate(defaultParams.samplerate, sampleseed);} //Why would I ever want this here?
jpayne@68 378 cris.start(); //Start the stream
jpayne@68 379 if(verbose){outstream.println("Started cris");}
jpayne@68 380 }
jpayne@68 381
jpayne@68 382 //Grab the first ListNum of reads
jpayne@68 383 ListNum<Read> ln=cris.nextList();
jpayne@68 384 //Grab the actual read list from the ListNum
jpayne@68 385 ArrayList<Read> reads=(ln!=null ? ln.list : null);
jpayne@68 386
jpayne@68 387 //As long as there is a nonempty read list...
jpayne@68 388 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
jpayne@68 389 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
jpayne@68 390
jpayne@68 391 //Loop through each read in the list
jpayne@68 392 for(int idx=0; idx<reads.size(); idx++){
jpayne@68 393 final Read r1=reads.get(idx);
jpayne@68 394
jpayne@68 395 int taxID=-1;
jpayne@68 396 TaxNode tn=null;
jpayne@68 397 if(taxtree!=null){
jpayne@68 398 tn=taxtree.parseNodeFromHeader(r1.id, bestEffort);
jpayne@68 399 // System.err.println("A: "+bestEffort+", "+tn);//123
jpayne@68 400 while(tn!=null && tn.pid!=tn.id && tn.level<taxLevel){
jpayne@68 401 TaxNode temp=taxtree.getNode(tn.pid);
jpayne@68 402 if(temp==null || temp.level>=TaxTree.LIFE){break;}
jpayne@68 403 tn=temp;
jpayne@68 404 }
jpayne@68 405 if(tn!=null){taxID=tn.id;}
jpayne@68 406 }
jpayne@68 407
jpayne@68 408 if(taxID>0){
jpayne@68 409 long a=r1.length();
jpayne@68 410 long b=r1.mateLength();
jpayne@68 411 if(a<k){a=0;}
jpayne@68 412 if(b<k){b=0;}
jpayne@68 413 sizes.increment(taxID, a+b);
jpayne@68 414 }
jpayne@68 415 }
jpayne@68 416
jpayne@68 417 //Notify the input stream that the list was used
jpayne@68 418 cris.returnList(ln);
jpayne@68 419 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
jpayne@68 420
jpayne@68 421 //Fetch a new list
jpayne@68 422 ln=cris.nextList();
jpayne@68 423 reads=(ln!=null ? ln.list : null);
jpayne@68 424 }
jpayne@68 425
jpayne@68 426 //Notify the input stream that the final list was used
jpayne@68 427 if(ln!=null){
jpayne@68 428 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
jpayne@68 429 }
jpayne@68 430
jpayne@68 431 errorState|=ReadWrite.closeStream(cris);
jpayne@68 432 // outstream.println("Created prefilter.");
jpayne@68 433 t.stop("Created prefilter:");
jpayne@68 434 Shared.printMemory();
jpayne@68 435 System.err.println();
jpayne@68 436
jpayne@68 437 return sizes;
jpayne@68 438 }
jpayne@68 439
jpayne@68 440 //Makes a list of genome sizes (bases, not kmers) per img.
jpayne@68 441 private HashMap<Long, Long> sizeMap(){
jpayne@68 442 Timer t=new Timer();
jpayne@68 443 t.start("Making img prefilter.");
jpayne@68 444
jpayne@68 445 // assert(false) : ReadWrite.USE_PIGZ+", "+ReadWrite.USE_UNPIGZ+", "+ReadWrite.MAX_ZIP_THREADS+", "+Shared.threads();
jpayne@68 446
jpayne@68 447 HashMap<Long, Long> sizes=new HashMap<Long, Long>();
jpayne@68 448
jpayne@68 449 //Create a read input stream
jpayne@68 450 final ConcurrentReadInputStream cris;
jpayne@68 451 {
jpayne@68 452 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null);
jpayne@68 453 if(defaultParams.samplerate!=1){cris.setSampleRate(defaultParams.samplerate, sampleseed);}
jpayne@68 454 cris.start(); //Start the stream
jpayne@68 455 if(verbose){outstream.println("Started cris");}
jpayne@68 456 }
jpayne@68 457
jpayne@68 458 //Grab the first ListNum of reads
jpayne@68 459 ListNum<Read> ln=cris.nextList();
jpayne@68 460 //Grab the actual read list from the ListNum
jpayne@68 461 ArrayList<Read> reads=(ln!=null ? ln.list : null);
jpayne@68 462
jpayne@68 463 //As long as there is a nonempty read list...
jpayne@68 464 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
jpayne@68 465 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
jpayne@68 466
jpayne@68 467 //Loop through each read in the list
jpayne@68 468 for(int idx=0; idx<reads.size(); idx++){
jpayne@68 469 final Read r1=reads.get(idx);
jpayne@68 470
jpayne@68 471 long imgID=ImgRecord2.parseImgId(r1.id, true);
jpayne@68 472 assert(imgID>-1) : "IMG records must start with IMG number followed by a space: "+r1.id;
jpayne@68 473
jpayne@68 474 if(imgID>0){
jpayne@68 475 long a=r1.length();
jpayne@68 476 long b=r1.mateLength();
jpayne@68 477 if(a<k){a=0;}
jpayne@68 478 if(b<k){b=0;}
jpayne@68 479
jpayne@68 480 if(a+b>0){
jpayne@68 481 Long old=sizes.get(imgID);
jpayne@68 482 if(old==null){sizes.put(imgID, a+b);}
jpayne@68 483 else{sizes.put(imgID, a+b+old);}
jpayne@68 484 }
jpayne@68 485 }
jpayne@68 486 }
jpayne@68 487
jpayne@68 488 //Notify the input stream that the list was used
jpayne@68 489 cris.returnList(ln);
jpayne@68 490 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
jpayne@68 491
jpayne@68 492 //Fetch a new list
jpayne@68 493 ln=cris.nextList();
jpayne@68 494 reads=(ln!=null ? ln.list : null);
jpayne@68 495 }
jpayne@68 496
jpayne@68 497 //Notify the input stream that the final list was used
jpayne@68 498 if(ln!=null){
jpayne@68 499 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
jpayne@68 500 }
jpayne@68 501
jpayne@68 502 errorState|=ReadWrite.closeStream(cris);
jpayne@68 503 // outstream.println("Created prefilter.");
jpayne@68 504 t.stop("Created prefilter:");
jpayne@68 505 Shared.printMemory();
jpayne@68 506 System.err.println();
jpayne@68 507
jpayne@68 508 return sizes;
jpayne@68 509 }
jpayne@68 510
jpayne@68 511 /** Create read streams and process all data */
jpayne@68 512 void process(Timer t){
jpayne@68 513
jpayne@68 514 //Reset counters
jpayne@68 515 readsProcessed=0;
jpayne@68 516 basesProcessed=0;
jpayne@68 517
jpayne@68 518 if(mode==ONE_SKETCH && !forceDisableMultithreadedFastq && Shared.threads()>2 && ffin1.fastq()){
jpayne@68 519 // Shared.setBufferLen(2);
jpayne@68 520 singleSketchMT();
jpayne@68 521 }else{
jpayne@68 522 final int oldLen=Shared.bufferLen();
jpayne@68 523 Shared.capBufferLen(ffin1.fastq() ? 40 : 4);
jpayne@68 524
jpayne@68 525 //Turn off read validation in the input threads to increase speed
jpayne@68 526 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
jpayne@68 527 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
jpayne@68 528
jpayne@68 529 //Create a read input stream
jpayne@68 530 final ConcurrentReadInputStream cris;
jpayne@68 531 {
jpayne@68 532 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null);
jpayne@68 533 cris.start(); //Start the stream
jpayne@68 534 if(verbose){outstream.println("Started cris");}
jpayne@68 535 }
jpayne@68 536
jpayne@68 537 //Process the reads in separate threads
jpayne@68 538 spawnThreads(cris);
jpayne@68 539
jpayne@68 540 if(verbose){outstream.println("Finished; closing streams.");}
jpayne@68 541
jpayne@68 542 //Write anything that was accumulated by ReadStats
jpayne@68 543 errorState|=ReadStats.writeAll();
jpayne@68 544 //Close the read streams
jpayne@68 545 errorState|=ReadWrite.closeStream(cris);
jpayne@68 546
jpayne@68 547 //TODO: Write sketch
jpayne@68 548
jpayne@68 549 //Reset read validation
jpayne@68 550 Read.VALIDATE_IN_CONSTRUCTOR=vic;
jpayne@68 551 Shared.setBufferLen(oldLen);
jpayne@68 552 }
jpayne@68 553
jpayne@68 554 //Report timing and results
jpayne@68 555 t.stop();
jpayne@68 556 outstream.println("Wrote "+sketchesWritten+" of "+sketchesMade+" sketches.\n");
jpayne@68 557 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
jpayne@68 558
jpayne@68 559 //Throw an exception of there was an error in a thread
jpayne@68 560 if(errorState){
jpayne@68 561 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
jpayne@68 562 }
jpayne@68 563 }
jpayne@68 564
jpayne@68 565 private void singleSketchMT(){
jpayne@68 566 Timer t=new Timer();
jpayne@68 567 Sketch sketch=tool.processReadsMT(ffin1, ffin2, Shared.threads(),
jpayne@68 568 maxReads, mode, defaultParams.samplerate, defaultParams.minEntropy, defaultParams.minProb, defaultParams.minQual, false);
jpayne@68 569
jpayne@68 570 if(outTaxID>=0){sketch.taxID=outTaxID;}
jpayne@68 571 if(outTaxName!=null){sketch.setTaxName(outTaxName);}
jpayne@68 572 if(outFname!=null){sketch.setFname(outFname);}
jpayne@68 573 if(outName0!=null){sketch.setName0(outName0);}
jpayne@68 574 if(outSpid>=0){sketch.spid=outSpid;}
jpayne@68 575 if(outImgID>=0){sketch.imgID=outImgID;}
jpayne@68 576 sketch.setMeta(outMeta);
jpayne@68 577
jpayne@68 578 //Accumulate per-thread statistics
jpayne@68 579 readsProcessed+=sketch.genomeSequences;
jpayne@68 580 basesProcessed+=sketch.genomeSizeBases;
jpayne@68 581 kmersProcessed+=sketch.genomeSizeKmers;
jpayne@68 582
jpayne@68 583 sketchesMade+=1;
jpayne@68 584
jpayne@68 585 t.stop("Finished sketching: ");
jpayne@68 586 Shared.printMemory();
jpayne@68 587
jpayne@68 588 if(ffout!=null && ffout.length>0){
jpayne@68 589 sketch.addSSU();
jpayne@68 590 SketchTool.write(sketch, ffout[0]);
jpayne@68 591 sketchesWritten+=1;
jpayne@68 592 }
jpayne@68 593 }
jpayne@68 594
jpayne@68 595 /** Spawn process threads */
jpayne@68 596 @SuppressWarnings("unchecked")
jpayne@68 597 private void spawnThreads(final ConcurrentReadInputStream cris){
jpayne@68 598
jpayne@68 599 //Do anything necessary prior to processing
jpayne@68 600 Timer t=new Timer();
jpayne@68 601
jpayne@68 602 //Determine how many threads may be used
jpayne@68 603 final int threads=Tools.mid(1, Shared.threads(), 14);//Probably capped for memory reasons. Rarely hits 1200% anyway (was 12, bumped to 14).
jpayne@68 604
jpayne@68 605 //Fill a list with ProcessThreads
jpayne@68 606 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
jpayne@68 607
jpayne@68 608 if(mode==PER_TAXA || mode==PER_IMG){
jpayne@68 609 longMaps=new HashMap[MAP_WAYS];
jpayne@68 610 for(int i=0; i<longMaps.length; i++){
jpayne@68 611 longMaps[i]=new HashMap<Long, SketchHeap>();
jpayne@68 612 }
jpayne@68 613 }
jpayne@68 614
jpayne@68 615 if(mode!=ONE_SKETCH){tsw=makeTSWArray(ffout);}
jpayne@68 616
jpayne@68 617 for(int i=0; i<threads; i++){
jpayne@68 618 alpt.add(new ProcessThread(cris, i));
jpayne@68 619 }
jpayne@68 620
jpayne@68 621 //Start the threads
jpayne@68 622 for(ProcessThread pt : alpt){
jpayne@68 623 pt.start();
jpayne@68 624 }
jpayne@68 625
jpayne@68 626 //Wait for completion of all threads
jpayne@68 627 boolean success=true;
jpayne@68 628 SketchHeap singleHeap=null;
jpayne@68 629 for(ProcessThread pt : alpt){
jpayne@68 630
jpayne@68 631 //Wait until this thread has terminated
jpayne@68 632 while(pt.getState()!=Thread.State.TERMINATED){
jpayne@68 633 try {
jpayne@68 634 //Attempt a join operation
jpayne@68 635 pt.join();
jpayne@68 636 } catch (InterruptedException e) {
jpayne@68 637 //Potentially handle this, if it is expected to occur
jpayne@68 638 e.printStackTrace();
jpayne@68 639 }
jpayne@68 640 }
jpayne@68 641
jpayne@68 642 //Accumulate per-thread statistics
jpayne@68 643 readsProcessed+=pt.readsProcessedT;
jpayne@68 644 basesProcessed+=pt.basesProcessedT;
jpayne@68 645 kmersProcessed+=pt.smm.kmersProcessed;
jpayne@68 646
jpayne@68 647 sketchesMade+=pt.sketchesMadeT;
jpayne@68 648 sketchesWritten+=pt.sketchesWrittenT;
jpayne@68 649
jpayne@68 650 // System.err.println(pt.sketchesMadeT+" "+pt.sketchesWrittenT);
jpayne@68 651
jpayne@68 652 // System.err.println("pt.readsProcessedT="+pt.readsProcessedT);
jpayne@68 653 if(mode==ONE_SKETCH){
jpayne@68 654 SketchHeap temp=pt.smm.heap;
jpayne@68 655
jpayne@68 656 if(temp==null){
jpayne@68 657 //do nothing
jpayne@68 658 }else if(singleHeap==null){singleHeap=pt.smm.heap;}
jpayne@68 659 else{singleHeap.add(pt.smm.heap);}
jpayne@68 660
jpayne@68 661 if(singleHeap!=null){
jpayne@68 662 if(outTaxID>=0){singleHeap.taxID=outTaxID;}
jpayne@68 663 if(outTaxName!=null){singleHeap.setTaxName(outTaxName);}
jpayne@68 664 if(outFname!=null){singleHeap.setFname(outFname);}
jpayne@68 665 if(outName0!=null){singleHeap.setName0(outName0);}
jpayne@68 666 if(outImgID>=0){singleHeap.imgID=outImgID;}
jpayne@68 667
jpayne@68 668 singleHeap.genomeSizeBases=basesProcessed;
jpayne@68 669 singleHeap.genomeSizeKmers=kmersProcessed;
jpayne@68 670 singleHeap.genomeSequences=readsProcessed;
jpayne@68 671 }
jpayne@68 672 }
jpayne@68 673 success&=pt.success;
jpayne@68 674 }
jpayne@68 675
jpayne@68 676 if(singleHeap!=null){
jpayne@68 677 singleHeap.setFname(ffin1.simpleName());
jpayne@68 678 if(singleHeap.name0()==null){
jpayne@68 679 singleHeap.setName0(ffin1.simpleName());
jpayne@68 680 }
jpayne@68 681 }
jpayne@68 682
jpayne@68 683 t.stop("Finished sketching: ");
jpayne@68 684 Shared.printMemory();
jpayne@68 685
jpayne@68 686 if(ffout!=null){
jpayne@68 687 if(mode==PER_TAXA || mode==PER_IMG){
jpayne@68 688 if(tsw==null){tsw=makeTSWArray(ffout);}
jpayne@68 689 success&=writeMap(longMaps);
jpayne@68 690 }else if(mode==ONE_SKETCH){
jpayne@68 691 Sketch sketch=new Sketch(singleHeap, true, tool.trackCounts, outMeta);
jpayne@68 692 if(outTaxID>=0){sketch.taxID=outTaxID;}
jpayne@68 693 if(outTaxName!=null){sketch.setTaxName(outTaxName);}
jpayne@68 694 if(outFname!=null){sketch.setFname(outFname);}
jpayne@68 695 if(outName0!=null){sketch.setName0(outName0);}
jpayne@68 696 if(outSpid>=0){sketch.spid=outSpid;}
jpayne@68 697 if(outImgID>=0){sketch.imgID=outImgID;}
jpayne@68 698 if(ffout!=null && ffout.length>0){
jpayne@68 699 sketch.addSSU();
jpayne@68 700 SketchTool.write(sketch, ffout[0]);
jpayne@68 701 }
jpayne@68 702 sketchesMade++;
jpayne@68 703 sketchesWritten++;
jpayne@68 704 }
jpayne@68 705 }
jpayne@68 706
jpayne@68 707 if(tsw!=null){
jpayne@68 708 for(int i=0; i<tsw.length; i++){tsw[i].poisonAndWait();}
jpayne@68 709 }
jpayne@68 710
jpayne@68 711 //Track whether any threads failed
jpayne@68 712 if(!success){errorState=true;}
jpayne@68 713
jpayne@68 714 //Do anything necessary after processing
jpayne@68 715
jpayne@68 716 }
jpayne@68 717
jpayne@68 718 /*--------------------------------------------------------------*/
jpayne@68 719 /*---------------- I/O Methods ----------------*/
jpayne@68 720 /*--------------------------------------------------------------*/
jpayne@68 721
jpayne@68 722 private boolean writeMap(HashMap<Long, SketchHeap>[] maps){
jpayne@68 723
jpayne@68 724 //Determine how many threads may be used
jpayne@68 725 final int threads=files;
jpayne@68 726
jpayne@68 727 //Fill a list with WriteThreads
jpayne@68 728 ArrayList<WriteThread> alwt=new ArrayList<WriteThread>(threads);
jpayne@68 729
jpayne@68 730 @SuppressWarnings("unchecked")
jpayne@68 731 ArrayDeque<SketchHeap>[] heaps=new ArrayDeque[threads];
jpayne@68 732 for(int i=0; i<threads; i++){
jpayne@68 733 heaps[i]=new ArrayDeque<SketchHeap>();
jpayne@68 734 WriteThread wt=new WriteThread(i, heaps[i]);
jpayne@68 735 alwt.add(wt);
jpayne@68 736 }
jpayne@68 737
jpayne@68 738 for(int i=0; i<maps.length; i++){
jpayne@68 739 HashMap<Long, SketchHeap> longMap=maps[i];
jpayne@68 740 for(Entry<Long, SketchHeap> entry : longMap.entrySet()){
jpayne@68 741 // set.remove(entry); This will probably not work
jpayne@68 742 SketchHeap entryHeap=entry.getValue();
jpayne@68 743 sketchesMade++;
jpayne@68 744 if(entryHeap.size()>0 && entryHeap.genomeSizeKmers>=minSizeKmers){
jpayne@68 745 heaps[(entry.hashCode()&Integer.MAX_VALUE)%threads].add(entryHeap);
jpayne@68 746 }
jpayne@68 747 }
jpayne@68 748 // intMap.clear();
jpayne@68 749 maps[i]=null;
jpayne@68 750 }
jpayne@68 751
jpayne@68 752 //Start the threads
jpayne@68 753 for(WriteThread wt : alwt){wt.start();}
jpayne@68 754
jpayne@68 755 //Wait for completion of all threads
jpayne@68 756 boolean success=true;
jpayne@68 757 for(WriteThread wt : alwt){
jpayne@68 758
jpayne@68 759 //Wait until this thread has terminated
jpayne@68 760 while(wt.getState()!=Thread.State.TERMINATED){
jpayne@68 761 try {
jpayne@68 762 //Attempt a join operation
jpayne@68 763 wt.join();
jpayne@68 764 } catch (InterruptedException e) {
jpayne@68 765 //Potentially handle this, if it is expected to occur
jpayne@68 766 e.printStackTrace();
jpayne@68 767 }
jpayne@68 768 }
jpayne@68 769 // sketchesMade+=wt.sketchesMadeT;
jpayne@68 770 sketchesWritten+=wt.sketchesWrittenT;
jpayne@68 771 success&=wt.success;
jpayne@68 772 }
jpayne@68 773 return success;
jpayne@68 774 }
jpayne@68 775
jpayne@68 776 private class WriteThread extends Thread{
jpayne@68 777
jpayne@68 778 WriteThread(int tnum_, ArrayDeque<SketchHeap> queue_){
jpayne@68 779 tnum=tnum_;
jpayne@68 780 queue=queue_;
jpayne@68 781 }
jpayne@68 782
jpayne@68 783 @Override
jpayne@68 784 public void run(){
jpayne@68 785 success=false;
jpayne@68 786 for(SketchHeap polledHeap=queue.poll(); polledHeap!=null; polledHeap=queue.poll()){
jpayne@68 787 if(polledHeap.sketchSizeEstimate()>0){
jpayne@68 788 Sketch sketch=new Sketch(polledHeap, true, tool.trackCounts, outMeta);
jpayne@68 789 if(outTaxID>=0 && sketch.taxID<0){sketch.taxID=outTaxID;}
jpayne@68 790 if(outTaxName!=null && sketch.taxName()==null){sketch.setTaxName(outTaxName);}
jpayne@68 791 if(outFname!=null && sketch.fname()==null){sketch.setFname(outFname);}
jpayne@68 792 if(outName0!=null && sketch.name0()==null){sketch.setName0(outName0);}
jpayne@68 793 if(outSpid>=0 && sketch.spid<0){sketch.spid=outSpid;}
jpayne@68 794 if(outImgID>=0 && sketch.imgID<0){sketch.imgID=outImgID;}
jpayne@68 795 sketch.addSSU();
jpayne@68 796 SketchTool.write(sketch, tsw[tnum], bb);
jpayne@68 797 sketchesWrittenT++;
jpayne@68 798 }
jpayne@68 799 }
jpayne@68 800 bb=null;
jpayne@68 801 success=true;
jpayne@68 802 queue=null;
jpayne@68 803 }
jpayne@68 804
jpayne@68 805 ArrayDeque<SketchHeap> queue;
jpayne@68 806 final int tnum;
jpayne@68 807 private ByteBuilder bb=new ByteBuilder();
jpayne@68 808 // long sketchesMadeT=0;
jpayne@68 809 long sketchesWrittenT=0;
jpayne@68 810 boolean success=false;
jpayne@68 811 }
jpayne@68 812
jpayne@68 813 // private void writeOutput(ConcurrentHashMap<Integer, SketchHeap> map){
jpayne@68 814 // ByteStreamWriter tsw=new ByteStreamWriter(ffout);
jpayne@68 815 // tsw.start();
jpayne@68 816 // KeySetView<Integer, SketchHeap> y=map.keySet();
jpayne@68 817 // for(Integer x : map.keySet()){
jpayne@68 818 // SketchHeap smm.heap=map.get(x);
jpayne@68 819 // Sketch s=tool.toSketch(smm.heap);
jpayne@68 820 // tool.write(s, tsw);
jpayne@68 821 // }
jpayne@68 822 // tsw.poisonAndWait();
jpayne@68 823 // }
jpayne@68 824
jpayne@68 825 /*--------------------------------------------------------------*/
jpayne@68 826 /*---------------- Tax Methods ----------------*/
jpayne@68 827 /*--------------------------------------------------------------*/
jpayne@68 828
jpayne@68 829 private void loadGiToTaxid(){
jpayne@68 830 Timer t=new Timer();
jpayne@68 831 outstream.println("Loading gi to taxa translation table.");
jpayne@68 832 GiToTaxid.initialize(giTableFile);
jpayne@68 833 t.stop();
jpayne@68 834 if(true){
jpayne@68 835 outstream.println("Time: \t"+t);
jpayne@68 836 Shared.printMemory();
jpayne@68 837 outstream.println();
jpayne@68 838 }
jpayne@68 839 }
jpayne@68 840
jpayne@68 841 /*--------------------------------------------------------------*/
jpayne@68 842 /*---------------- Inner Methods ----------------*/
jpayne@68 843 /*--------------------------------------------------------------*/
jpayne@68 844
jpayne@68 845 /*--------------------------------------------------------------*/
jpayne@68 846 /*---------------- Inner Classes ----------------*/
jpayne@68 847 /*--------------------------------------------------------------*/
jpayne@68 848
jpayne@68 849 private class ProcessThread extends Thread {
jpayne@68 850
jpayne@68 851 //Constructor
jpayne@68 852 ProcessThread(final ConcurrentReadInputStream cris_, final int tid_){
jpayne@68 853 cris=cris_;
jpayne@68 854 threadID=tid_;
jpayne@68 855
jpayne@68 856 smm=new SketchMakerMini(tool, mode, defaultParams.minEntropy, defaultParams.minProb, defaultParams.minQual);
jpayne@68 857 localMap=(mode==PER_TAXA ? new HashMap<Long, SketchHeap>() : null);
jpayne@68 858 }
jpayne@68 859
jpayne@68 860 //Called by start()
jpayne@68 861 @Override
jpayne@68 862 public void run(){
jpayne@68 863 //Do anything necessary prior to processing
jpayne@68 864
jpayne@68 865 //Process the reads
jpayne@68 866 processInner();
jpayne@68 867
jpayne@68 868 //Do anything necessary after processing
jpayne@68 869 bb=null;
jpayne@68 870
jpayne@68 871 //Indicate successful exit status
jpayne@68 872 success=true;
jpayne@68 873 }
jpayne@68 874
jpayne@68 875 /** Iterate through the reads */
jpayne@68 876 void processInner(){
jpayne@68 877
jpayne@68 878 //Grab the first ListNum of reads
jpayne@68 879 ListNum<Read> ln=cris.nextList();
jpayne@68 880 //Grab the actual read list from the ListNum
jpayne@68 881 ArrayList<Read> reads=(ln!=null ? ln.list : null);
jpayne@68 882
jpayne@68 883 //Check to ensure pairing is as expected
jpayne@68 884 if(reads!=null && !reads.isEmpty()){
jpayne@68 885 Read r=reads.get(0);
jpayne@68 886 assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
jpayne@68 887 }
jpayne@68 888
jpayne@68 889 // long cntr1=0, cntr2=0, cntr3=0, cntr4=0;
jpayne@68 890
jpayne@68 891 //As long as there is a nonempty read list...
jpayne@68 892 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
jpayne@68 893 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
jpayne@68 894
jpayne@68 895 //Loop through each read in the list
jpayne@68 896 for(int idx=0; idx<reads.size(); idx++){
jpayne@68 897 final Read r1=reads.get(idx);
jpayne@68 898 final Read r2=r1.mate;
jpayne@68 899
jpayne@68 900 processReadPair(r1, r2);
jpayne@68 901 }
jpayne@68 902
jpayne@68 903 //Notify the input stream that the list was used
jpayne@68 904 cris.returnList(ln);
jpayne@68 905 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
jpayne@68 906
jpayne@68 907 //Fetch a new list
jpayne@68 908 ln=cris.nextList();
jpayne@68 909 reads=(ln!=null ? ln.list : null);
jpayne@68 910 }
jpayne@68 911
jpayne@68 912 //Notify the input stream that the final list was used
jpayne@68 913 if(ln!=null){
jpayne@68 914 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
jpayne@68 915 }
jpayne@68 916
jpayne@68 917 if(mode==PER_TAXA && localMap.size()>0){dumpLocalMap();}
jpayne@68 918
jpayne@68 919 // System.out.println(cntr1+", "+cntr2+", "+cntr3+", "+cntr4);
jpayne@68 920 }
jpayne@68 921
jpayne@68 922 void processReadPair(Read r1, Read r2){
jpayne@68 923
jpayne@68 924 if(mode==PER_TAXA){
jpayne@68 925 assert(smm.heap==null || smm.heap.size()==0) : smm.heap.genomeSizeBases+", "+smm.heap;
jpayne@68 926 assert(smm.heap==null || smm.heap.genomeSizeBases==0) : smm.heap.genomeSizeBases+", "+smm.heap;
jpayne@68 927 }else if(mode==PER_SEQUENCE || mode==PER_IMG){
jpayne@68 928 assert(smm.heap==null || smm.heap.size()==0) : smm.heap.genomeSizeBases+", "+smm.heap;
jpayne@68 929 assert(smm.heap==null || smm.heap.genomeSizeBases==0) : smm.heap.genomeSizeBases+", "+smm.heap;
jpayne@68 930 }else{
jpayne@68 931 assert(smm.heap!=null && smm.heap.capacity()>=targetSketchSize) : targetSketchSize+", "+(smm.heap==null ? "null" : ""+smm.heap.capacity());
jpayne@68 932 }
jpayne@68 933
jpayne@68 934 //Track the initial length for statistics
jpayne@68 935 final int initialLength1=r1.length();
jpayne@68 936 final int initialLength2=r1.mateLength();
jpayne@68 937 final String rid=r1.id;
jpayne@68 938
jpayne@68 939 //Increment counters
jpayne@68 940 readsProcessedT+=r1.pairCount();
jpayne@68 941 basesProcessedT+=initialLength1+initialLength2;
jpayne@68 942
jpayne@68 943 if(initialLength1<k && initialLength2<k){return;}
jpayne@68 944 final long expectedBases;
jpayne@68 945 final long imgID;
jpayne@68 946 {
jpayne@68 947 int taxID=-1;
jpayne@68 948 TaxNode tn=null;
jpayne@68 949 if(taxtree!=null && (mode==PER_TAXA || mode==PER_IMG || mode==PER_SEQUENCE || ((mode==ONE_SKETCH/* || mode==PER_HEADER*/) && smm.heap.taxID<0))){
jpayne@68 950 if(mode==PER_IMG){
jpayne@68 951 imgID=ImgRecord2.parseImgId(rid, true);
jpayne@68 952 tn=taxtree.imgToTaxNode(imgID);
jpayne@68 953 if(tn==null){tn=taxtree.parseNodeFromHeader(rid, bestEffort);}
jpayne@68 954 // assert(tn!=null || !rid.startsWith("tid")) : imgID+", "+taxID+", "+rid; //123
jpayne@68 955 }else{
jpayne@68 956 imgID=ImgRecord2.parseImgId(rid, false);
jpayne@68 957 tn=taxtree.parseNodeFromHeader(rid, bestEffort);
jpayne@68 958 // assert(tn!=null || !rid.startsWith("tid")) : imgID+", "+taxID+", "+rid; //123
jpayne@68 959 }
jpayne@68 960 // assert(false) : imgID +", "+rid;
jpayne@68 961 // System.err.println("B: "+bestEffort+", "+tn);//123
jpayne@68 962 while(tn!=null && tn.pid!=tn.id && tn.level<taxLevel){
jpayne@68 963 TaxNode temp=taxtree.getNode(tn.pid);
jpayne@68 964 if(temp==null || temp.level>=TaxTree.LIFE){break;}
jpayne@68 965 tn=temp;
jpayne@68 966 }
jpayne@68 967 // assert(tn!=null) : imgID+", "+taxID+", "+rid; //123
jpayne@68 968 if(tn!=null){taxID=tn.id;}
jpayne@68 969 // System.err.println("Node: "+rid+"\n->\n"+tn);
jpayne@68 970
jpayne@68 971 // assert(taxID>0) : imgID+", "+taxID+", "+rid; //123
jpayne@68 972 }else{
jpayne@68 973 imgID=-1;
jpayne@68 974 }
jpayne@68 975
jpayne@68 976 final long unitSizeBases;
jpayne@68 977 if(sizeList!=null){
jpayne@68 978 unitSizeBases=taxID<0 ? -1 : sizeList.get(taxID);
jpayne@68 979 }else if(sizeMap!=null){
jpayne@68 980 unitSizeBases=sizeMap.get(imgID);
jpayne@68 981 }else{
jpayne@68 982 unitSizeBases=-1;
jpayne@68 983 }
jpayne@68 984
jpayne@68 985
jpayne@68 986 if(mode==PER_TAXA){
jpayne@68 987 if(tossJunk && tn==null){return;}
jpayne@68 988 if(tn!=null){
jpayne@68 989 if(taxID==0 || (tn.level>taxLevel && tn.level>=TaxTree.PHYLUM)){return;}
jpayne@68 990 TaxNode parent=taxtree.getNode(tn.pid);
jpayne@68 991 if(parent.pid==parent.id){return;}
jpayne@68 992 if(prefilter && unitSizeBases>=0 && unitSizeBases<minSizeBases){return;}
jpayne@68 993 if(tossJunk){
jpayne@68 994 if(parent.level==TaxTree.LIFE){return;}
jpayne@68 995 if(tn.level==TaxTree.NO_RANK){return;}
jpayne@68 996 }
jpayne@68 997 }
jpayne@68 998 }
jpayne@68 999
jpayne@68 1000 if(mode==PER_SEQUENCE){
jpayne@68 1001 expectedBases=initialLength1+initialLength2;
jpayne@68 1002 if(expectedBases<minSizeBases){return;}
jpayne@68 1003 int expectedSketchSize=toSketchSize(expectedBases, -1, -1, targetSketchSize);
jpayne@68 1004 if(expectedSketchSize<minSketchSize){return;}
jpayne@68 1005 if(smm.heap==null || smm.heap.capacity()<expectedSketchSize){
jpayne@68 1006 smm.heap=new SketchHeap(expectedSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
jpayne@68 1007 }
jpayne@68 1008 }else if(mode==PER_TAXA){
jpayne@68 1009 if(taxID>=0 && localMap.containsKey((long)taxID)){
jpayne@68 1010 smm.heap=localMap.get((long)taxID);
jpayne@68 1011 assert(smm.heap.taxID==taxID);
jpayne@68 1012 }else if(sizeList==null){
jpayne@68 1013 if(smm.heap==null){smm.heap=new SketchHeap(targetSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());}
jpayne@68 1014 }else{
jpayne@68 1015 expectedBases=unitSizeBases>-1 ? unitSizeBases : initialLength1+initialLength2;
jpayne@68 1016 if(expectedBases<minSizeBases){return;}
jpayne@68 1017 int expectedSketchSize=toSketchSize(expectedBases, -1, -1, targetSketchSize);
jpayne@68 1018 if(expectedSketchSize<minSketchSize){return;}
jpayne@68 1019 if(smm.heap==null || smm.heap.capacity()!=expectedSketchSize){
jpayne@68 1020 smm.heap=new SketchHeap(expectedSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
jpayne@68 1021 }
jpayne@68 1022 // assert(taxID!=96897) : taxID+", "+expectedBases+", "+expectedSketchSize+", "+unitSizeBases+", "+initialLength1+", "+sizeList.get(taxID);
jpayne@68 1023 }
jpayne@68 1024 assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID;
jpayne@68 1025 }else if(mode==PER_IMG){
jpayne@68 1026 if(sizeMap==null){
jpayne@68 1027 if(smm.heap==null){smm.heap=new SketchHeap(targetSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());}
jpayne@68 1028 }else{
jpayne@68 1029 expectedBases=unitSizeBases>-1 ? unitSizeBases : initialLength1+initialLength2;
jpayne@68 1030 if(expectedBases<minSizeBases){return;}
jpayne@68 1031 int expectedSketchSize=toSketchSize(expectedBases, -1, -1, targetSketchSize);
jpayne@68 1032 if(expectedSketchSize<minSketchSize){return;}
jpayne@68 1033 if(smm.heap==null || smm.heap.capacity()!=expectedSketchSize){
jpayne@68 1034 smm.heap=new SketchHeap(expectedSketchSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
jpayne@68 1035 }
jpayne@68 1036 }
jpayne@68 1037 }
jpayne@68 1038
jpayne@68 1039 assert(smm.heap!=null) : mode+", "+(sizeList==null);
jpayne@68 1040 assert(taxID<0 || smm.heap.taxID<0 || smm.heap.taxID==taxID); //This is important.
jpayne@68 1041 assert(imgID<0 || smm.heap.imgID<0 || smm.heap.imgID==imgID);
jpayne@68 1042
jpayne@68 1043 if(smm.heap.taxID<0){smm.heap.taxID=taxID;}
jpayne@68 1044 if(smm.heap.imgID<0){smm.heap.imgID=imgID;}
jpayne@68 1045 if(smm.heap.name0()==null){
jpayne@68 1046 smm.heap.setName0(rid);
jpayne@68 1047 }
jpayne@68 1048 if(smm.heap.taxName()==null && tn!=null){
jpayne@68 1049 smm.heap.setTaxName(tn.name);
jpayne@68 1050 }
jpayne@68 1051
jpayne@68 1052 if(!bestEffort && tn==null){//Try to get a higher-level node
jpayne@68 1053 TaxNode tn2=taxtree.parseNodeFromHeader(rid, true);
jpayne@68 1054 if(tn2!=null){
jpayne@68 1055 while(tn2!=null && tn2.pid!=tn2.id && tn2.level<taxLevel){
jpayne@68 1056 TaxNode temp=taxtree.getNode(tn2.pid);
jpayne@68 1057 if(temp==null || temp.level>=TaxTree.LIFE){break;}
jpayne@68 1058 tn2=temp;
jpayne@68 1059 }
jpayne@68 1060 if(tn2.level<=taxLevel){
jpayne@68 1061 taxID=tn2.id;
jpayne@68 1062 }
jpayne@68 1063 if(smm.heap.taxID<0){smm.heap.taxID=tn2.id;}
jpayne@68 1064 if(smm.heap.taxName()==null && tn2!=null){
jpayne@68 1065 smm.heap.setTaxName(tn2.name);
jpayne@68 1066 }
jpayne@68 1067 }
jpayne@68 1068 }
jpayne@68 1069 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
jpayne@68 1070
jpayne@68 1071 assert(smm.heap.taxID<0 || smm.heap.taxName()!=null) : smm.heap.taxID+", "+smm.heap.taxName()+", "+smm.heap.name()+", "+tn;
jpayne@68 1072
jpayne@68 1073 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
jpayne@68 1074 if(initialLength1>=k){smm.processRead(r1);}
jpayne@68 1075 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
jpayne@68 1076 if(initialLength2>=k){smm.processRead(r2);}
jpayne@68 1077 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
jpayne@68 1078
jpayne@68 1079
jpayne@68 1080 if(mode==PER_SEQUENCE){
jpayne@68 1081 manageHeap_perSequence();
jpayne@68 1082 }else if(mode==PER_TAXA || mode==PER_IMG){
jpayne@68 1083 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
jpayne@68 1084 manageHeap_perTaxa(taxID, imgID, unitSizeBases);
jpayne@68 1085 if(localMap.size()>20){dumpLocalMap();}
jpayne@68 1086 }else if(mode==ONE_SKETCH/* || mode==PER_HEADER*/){
jpayne@68 1087 //do nothing
jpayne@68 1088 }else{
jpayne@68 1089 assert(false) : mode;
jpayne@68 1090 }
jpayne@68 1091 }
jpayne@68 1092 }
jpayne@68 1093
jpayne@68 1094 private void manageHeap_perSequence(){
jpayne@68 1095 assert(mode==PER_SEQUENCE);
jpayne@68 1096 writeHeap(smm.heap);
jpayne@68 1097 }
jpayne@68 1098
jpayne@68 1099 private void manageHeap_perTaxa(final int taxID, final long imgID, final long unitSizeBases){
jpayne@68 1100 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
jpayne@68 1101 assert(mode==PER_TAXA || mode==PER_IMG);
jpayne@68 1102
jpayne@68 1103 if(smm.heap.size()<=0 || ((taxID<0 && imgID<0) && smm.heap.genomeSizeKmers<minSizeKmers)){//Discard
jpayne@68 1104 assert(!localMap.containsKey((long)taxID));
jpayne@68 1105 smm.heap.clear(true);
jpayne@68 1106 return;
jpayne@68 1107 }
jpayne@68 1108
jpayne@68 1109 //TODO:
jpayne@68 1110 //I could, at this point, write to disk if smm.heap.genomeSize==taxSize.
jpayne@68 1111 //Or if the taxID is unknown.
jpayne@68 1112
jpayne@68 1113 final boolean known=(taxID>=0 || imgID>=0);
jpayne@68 1114 final boolean unknown=!known;
jpayne@68 1115 final boolean hasSize=(known && (sizeList!=null || sizeMap!=null));
jpayne@68 1116 boolean finished=(unknown || (hasSize && smm.heap.genomeSizeBases>=unitSizeBases));
jpayne@68 1117
jpayne@68 1118 //For some reason, this assertion fired for a single taxID (52271) halfway through sketching RefSeq.
jpayne@68 1119 //Likely a transient hardware error.
jpayne@68 1120 assert(!finished || smm.heap.genomeSizeBases==unitSizeBases) : finished+", "+unknown+", "+hasSize+", "+(sizeList==null)+"\n"
jpayne@68 1121 +taxID+", "+unitSizeBases+", "+smm.heap.genomeSizeBases+", "+smm.heap.genomeSizeKmers;
jpayne@68 1122 // if(finished && smm.heap.genomeSizeBases!=unitSizeBases){
jpayne@68 1123 // System.err.println("Warning: tid="+taxID+", finished="+finished+", known="+known+
jpayne@68 1124 // ", smm.heap.genomeSizeBases="+smm.heap.genomeSizeBases+", unitSizeBases="+unitSizeBases);
jpayne@68 1125 // }
jpayne@68 1126
jpayne@68 1127 smm.heap.taxID=taxID;
jpayne@68 1128 smm.heap.imgID=imgID;
jpayne@68 1129
jpayne@68 1130 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
jpayne@68 1131
jpayne@68 1132 final Long key;
jpayne@68 1133 if(imgID>-1 && (mode==PER_IMG || taxID<1)){key=imgID;}
jpayne@68 1134 else if(taxID>-1){key=(long)taxID;}
jpayne@68 1135 else{key=Long.valueOf(nextUnknown.getAndIncrement());}
jpayne@68 1136
jpayne@68 1137 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
jpayne@68 1138
jpayne@68 1139 if(unknown || finished){
jpayne@68 1140 writeHeap(smm.heap);
jpayne@68 1141 smm.heap.clear(true);
jpayne@68 1142 localMap.remove((long)taxID);
jpayne@68 1143 return;
jpayne@68 1144 }
jpayne@68 1145
jpayne@68 1146 //At this point, the taxID is known and this heap does not constitute the whole taxSize, or the taxSize is unknown.
jpayne@68 1147 if(!hasSize){
jpayne@68 1148 final HashMap<Long, SketchHeap> map=longMaps[(int)(key&MAP_MASK)];
jpayne@68 1149 final SketchHeap old;
jpayne@68 1150 synchronized(map){
jpayne@68 1151 old=map.get(key);
jpayne@68 1152 if(old==null){
jpayne@68 1153 map.put(key, smm.heap);
jpayne@68 1154 }else{
jpayne@68 1155 old.add(smm.heap);
jpayne@68 1156 }
jpayne@68 1157 }
jpayne@68 1158 if(old==null){
jpayne@68 1159 smm.heap=null;
jpayne@68 1160 }else{
jpayne@68 1161 smm.heap.clear(true);
jpayne@68 1162 assert(!localMap.containsKey((long)taxID));
jpayne@68 1163 }
jpayne@68 1164 localMap.remove((long)taxID);
jpayne@68 1165 return;
jpayne@68 1166 }
jpayne@68 1167
jpayne@68 1168 //assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID; //123
jpayne@68 1169
jpayne@68 1170 {
jpayne@68 1171 //At this point, the taxID is and taxSize are known, and this heap does not constitute the whole taxSize.
jpayne@68 1172 final int expectedHeapSize=toSketchSize(unitSizeBases, -1, -1, targetSketchSize);
jpayne@68 1173 assert(expectedHeapSize>=3) : expectedHeapSize;
jpayne@68 1174 // boolean writeHeap=false;
jpayne@68 1175 // boolean makeHeap=false;
jpayne@68 1176
jpayne@68 1177 SketchHeap local=null;
jpayne@68 1178 {
jpayne@68 1179 local=localMap.get(key);
jpayne@68 1180 assert(local==null || local.taxID==key.intValue());
jpayne@68 1181 if(local==smm.heap){
jpayne@68 1182 //do nothing
jpayne@68 1183 smm.heap=null;
jpayne@68 1184 }else if(local==null){
jpayne@68 1185 if(expectedHeapSize==smm.heap.capacity()){//Store the current heap
jpayne@68 1186 assert(smm.heap.taxID==key.intValue());
jpayne@68 1187 assert(smm.heap.name()!=null);
jpayne@68 1188 localMap.put(key, smm.heap);//Safe
jpayne@68 1189 smm.heap=null;
jpayne@68 1190 }else{//Store a precisely-sized heap
jpayne@68 1191 SketchHeap temp=new SketchHeap(expectedHeapSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
jpayne@68 1192 temp.add(smm.heap);
jpayne@68 1193 assert(temp.taxID==key.intValue());
jpayne@68 1194 assert(temp.name()!=null);
jpayne@68 1195 localMap.put(key, temp);//Looks safe
jpayne@68 1196 // smm.heap=null; //Not sure which is better
jpayne@68 1197 smm.heap.clear(true);
jpayne@68 1198 }
jpayne@68 1199 }else{
jpayne@68 1200 assert(local.taxID==smm.heap.taxID);
jpayne@68 1201 assert(local!=smm.heap);
jpayne@68 1202 // assert(!localMap.containsKey((long)taxID) || localMap.get((long)taxID)==smm.heap) : taxID+", "+key; //123
jpayne@68 1203 // assert(false) : taxID+", "+key;
jpayne@68 1204 local.add(smm.heap); //This WAS the slow line. It should no longer be executed.
jpayne@68 1205 // if(local.genomeSizeBases>=unitSizeBases){
jpayne@68 1206 // writeHeap=true;
jpayne@68 1207 // localMap.remove(key);
jpayne@68 1208 // }
jpayne@68 1209 // smm.heap=null; //Not sure which is better
jpayne@68 1210 smm.heap.clear(true);
jpayne@68 1211 }
jpayne@68 1212 }
jpayne@68 1213 }
jpayne@68 1214 }
jpayne@68 1215
jpayne@68 1216 private void dumpLocalMap(){
jpayne@68 1217
jpayne@68 1218 for(Entry<Long, SketchHeap> e : localMap.entrySet()){
jpayne@68 1219 boolean writeHeap=false;
jpayne@68 1220 final Long key=e.getKey();
jpayne@68 1221 final SketchHeap localHeap=e.getValue();
jpayne@68 1222 final HashMap<Long, SketchHeap> map=longMaps[(int)(key&MAP_MASK)];
jpayne@68 1223 final SketchHeap old;
jpayne@68 1224
jpayne@68 1225 final long unitSizeBases;
jpayne@68 1226 if(sizeList!=null){
jpayne@68 1227 unitSizeBases=localHeap.taxID<0 ? -1 : sizeList.get((int)localHeap.taxID);
jpayne@68 1228 }else if(sizeMap!=null){
jpayne@68 1229 unitSizeBases=sizeMap.get(localHeap.imgID);
jpayne@68 1230 }else{
jpayne@68 1231 unitSizeBases=-1;
jpayne@68 1232 }
jpayne@68 1233 final int expectedHeapSize=toSketchSize(unitSizeBases, -1, -1, targetSketchSize);
jpayne@68 1234
jpayne@68 1235 synchronized(map){
jpayne@68 1236 old=map.get(key);
jpayne@68 1237 if(old==null){
jpayne@68 1238 if(expectedHeapSize==localHeap.capacity()){//Store the current heap
jpayne@68 1239 map.put(key, localHeap);
jpayne@68 1240 }else{//Store a precisely-sized heap
jpayne@68 1241 // assert(key.intValue()!=96897) : key+", "+unitSizeBases+", "+", "+sizeList.get(key.intValue());
jpayne@68 1242 assert(expectedHeapSize>0) : expectedHeapSize+", "+key+", "+localHeap.taxID+", "+localHeap.name()+", "+unitSizeBases;
jpayne@68 1243 SketchHeap temp=new SketchHeap(expectedHeapSize, defaultParams.minKeyOccuranceCount, defaultParams.trackCounts());
jpayne@68 1244 temp.add(localHeap);
jpayne@68 1245 map.put(key, temp);
jpayne@68 1246 }
jpayne@68 1247 }else{
jpayne@68 1248 old.add(localHeap); //This was the slow line; should be faster now.
jpayne@68 1249 if(old.genomeSizeBases>=unitSizeBases){
jpayne@68 1250 writeHeap=true;
jpayne@68 1251 map.remove(key);
jpayne@68 1252 }
jpayne@68 1253 }
jpayne@68 1254 }
jpayne@68 1255
jpayne@68 1256 if(writeHeap){
jpayne@68 1257 assert(old!=null); //For compiler
jpayne@68 1258 assert(old.genomeSizeBases>0) : unitSizeBases+", "+old.genomeSizeBases+", "+old.genomeSizeKmers;
jpayne@68 1259 assert(old.genomeSizeBases==unitSizeBases) : unitSizeBases+", "+old.genomeSizeBases+", "+old.genomeSizeKmers+", "+old.size()+", "+old.taxID;
jpayne@68 1260 writeHeap(old);
jpayne@68 1261 }
jpayne@68 1262 }
jpayne@68 1263 localMap.clear();
jpayne@68 1264 }
jpayne@68 1265
jpayne@68 1266 private boolean writeHeap(SketchHeap heap){
jpayne@68 1267 sketchesMadeT++;
jpayne@68 1268 // assert(heap.size()>0) : heap.size(); //Not really necessary
jpayne@68 1269 boolean written=false;
jpayne@68 1270 // assert(heap.heap.size()==heap.set.size()) : heap.heap.size()+", "+heap.set.size();
jpayne@68 1271 if(heap.size()>0 && heap.genomeSizeKmers>=minSizeKmers && heap.sketchSizeEstimate()>0){
jpayne@68 1272 Sketch sketch=new Sketch(heap, true, tool.trackCounts, outMeta);
jpayne@68 1273 if(outTaxID>=0 && sketch.taxID<0){sketch.taxID=outTaxID;}
jpayne@68 1274 if(outTaxName!=null && sketch.taxName()==null){sketch.setTaxName(outTaxName);}
jpayne@68 1275 if(outFname!=null && sketch.fname()==null){sketch.setFname(outFname);}
jpayne@68 1276 if(outName0!=null && sketch.name0()==null){sketch.setName0(outName0);}
jpayne@68 1277 if(outSpid>=0 && sketch.spid<0){sketch.spid=outSpid;}
jpayne@68 1278 if(outImgID>=0 && sketch.imgID<0){sketch.imgID=outImgID;}
jpayne@68 1279 if(parseSubunit && sketch.name0()!=null){
jpayne@68 1280 if(outMeta!=null){
jpayne@68 1281 sketch.meta=(ArrayList<String>)sketch.meta.clone();
jpayne@68 1282 }else if(sketch.meta==null){
jpayne@68 1283 if(sketch.name0().contains("SSU_")){
jpayne@68 1284 sketch.addMeta("subunit:ssu");
jpayne@68 1285 }else if(sketch.name0().contains("LSU_")){
jpayne@68 1286 sketch.addMeta("subunit:lsu");
jpayne@68 1287 }
jpayne@68 1288 }
jpayne@68 1289 }
jpayne@68 1290 if(tsw!=null){
jpayne@68 1291 final int choice=(sketch.hashCode()&Integer.MAX_VALUE)%files;
jpayne@68 1292 sketch.addSSU();
jpayne@68 1293 synchronized(tsw[choice]){
jpayne@68 1294 SketchTool.write(sketch, tsw[choice], bb);
jpayne@68 1295 sketchesWrittenT++;
jpayne@68 1296 written=true;
jpayne@68 1297 }
jpayne@68 1298 }
jpayne@68 1299 }else{
jpayne@68 1300 heap.clear(true);
jpayne@68 1301 }
jpayne@68 1302 assert(heap.genomeSizeBases==0);
jpayne@68 1303 assert(heap.genomeSequences==0);
jpayne@68 1304 return written;
jpayne@68 1305 }
jpayne@68 1306
jpayne@68 1307 /** Number of reads processed by this thread */
jpayne@68 1308 protected long readsProcessedT=0;
jpayne@68 1309 /** Number of bases processed by this thread */
jpayne@68 1310 protected long basesProcessedT=0;
jpayne@68 1311
jpayne@68 1312 long sketchesMadeT=0;
jpayne@68 1313 long sketchesWrittenT=0;
jpayne@68 1314
jpayne@68 1315 /** True only if this thread has completed successfully */
jpayne@68 1316 boolean success=false;
jpayne@68 1317
jpayne@68 1318 /** Shared input stream */
jpayne@68 1319 private final ConcurrentReadInputStream cris;
jpayne@68 1320 /** Thread ID */
jpayne@68 1321 final int threadID;
jpayne@68 1322 private ByteBuilder bb=new ByteBuilder();
jpayne@68 1323
jpayne@68 1324 final SketchMakerMini smm;
jpayne@68 1325 final HashMap<Long, SketchHeap> localMap;
jpayne@68 1326 }
jpayne@68 1327
jpayne@68 1328 /*--------------------------------------------------------------*/
jpayne@68 1329 /*---------------- Fields ----------------*/
jpayne@68 1330 /*--------------------------------------------------------------*/
jpayne@68 1331
jpayne@68 1332 /** Primary input file path */
jpayne@68 1333 private String in1=null;
jpayne@68 1334 /** Secondary input file path */
jpayne@68 1335 private String in2=null;
jpayne@68 1336
jpayne@68 1337 /** Primary output file path */
jpayne@68 1338 private String out1=null;
jpayne@68 1339
jpayne@68 1340 /** Override input file extension */
jpayne@68 1341 private String extin=null;
jpayne@68 1342
jpayne@68 1343 private String giTableFile=null;
jpayne@68 1344 private String taxTreeFile=null;
jpayne@68 1345 private String accessionFile=null;
jpayne@68 1346 private String imgFile=null;
jpayne@68 1347
jpayne@68 1348 /*Override metadata */
jpayne@68 1349 String outTaxName=null;
jpayne@68 1350 String outFname=null;
jpayne@68 1351 String outName0=null;
jpayne@68 1352 int outTaxID=-1;
jpayne@68 1353 long outSpid=-1;
jpayne@68 1354 long outImgID=-1;
jpayne@68 1355 ArrayList<String> outMeta=null;
jpayne@68 1356 static boolean parseSubunit=false;
jpayne@68 1357
jpayne@68 1358 /*--------------------------------------------------------------*/
jpayne@68 1359
jpayne@68 1360 /** Number of reads processed */
jpayne@68 1361 protected long readsProcessed=0;
jpayne@68 1362 /** Number of bases processed */
jpayne@68 1363 protected long basesProcessed=0;
jpayne@68 1364 /** Number of bases processed */
jpayne@68 1365 protected long kmersProcessed=0;
jpayne@68 1366 /** Number of sketches started */
jpayne@68 1367 protected long sketchesMade=0;
jpayne@68 1368 /** Number of sketches written */
jpayne@68 1369 protected long sketchesWritten=0;
jpayne@68 1370
jpayne@68 1371 /** Quit after processing this many input reads; -1 means no limit */
jpayne@68 1372 private long maxReads=-1;
jpayne@68 1373
jpayne@68 1374 final LongList sizeList;
jpayne@68 1375 final HashMap<Long, Long> sizeMap;
jpayne@68 1376
jpayne@68 1377 private HashMap<Long, SketchHeap> longMaps[];
jpayne@68 1378 private ByteStreamWriter tsw[];
jpayne@68 1379
jpayne@68 1380 /*--------------------------------------------------------------*/
jpayne@68 1381 /*---------------- Final Fields ----------------*/
jpayne@68 1382 /*--------------------------------------------------------------*/
jpayne@68 1383
jpayne@68 1384 /** Primary input file */
jpayne@68 1385 private final FileFormat ffin1;
jpayne@68 1386 /** Secondary input file */
jpayne@68 1387 private final FileFormat ffin2;
jpayne@68 1388
jpayne@68 1389 /** Primary output files */
jpayne@68 1390 private final FileFormat ffout[];
jpayne@68 1391 /** Number of output files */
jpayne@68 1392 private final int files;
jpayne@68 1393
jpayne@68 1394 final int mode;
jpayne@68 1395
jpayne@68 1396 private final SketchTool tool;
jpayne@68 1397
jpayne@68 1398 /** Don't make sketches from sequences smaller than this */
jpayne@68 1399 final int minSizeBases;
jpayne@68 1400 /** Don't make sketches from sequences smaller than this */
jpayne@68 1401 final int minSizeKmers;
jpayne@68 1402
jpayne@68 1403 private int taxLevel=1;
jpayne@68 1404 private boolean prefilter=false;
jpayne@68 1405 private boolean tossJunk=true;
jpayne@68 1406 boolean bestEffort=true;
jpayne@68 1407 // private final HashMap<Integer, byte[]> ssuMap=null;
jpayne@68 1408
jpayne@68 1409 private final AtomicInteger nextUnknown=new AtomicInteger(minFakeID);
jpayne@68 1410
jpayne@68 1411 private static final int MAP_WAYS=32;
jpayne@68 1412 private static final int MAP_MASK=MAP_WAYS-1;
jpayne@68 1413
jpayne@68 1414
jpayne@68 1415 /*--------------------------------------------------------------*/
jpayne@68 1416 /*---------------- Common Fields ----------------*/
jpayne@68 1417 /*--------------------------------------------------------------*/
jpayne@68 1418
jpayne@68 1419 /** Print status messages to this output stream */
jpayne@68 1420 private PrintStream outstream=System.err;
jpayne@68 1421 /** Print verbose messages */
jpayne@68 1422 public static boolean verbose=false;
jpayne@68 1423 /** True if an error was encountered */
jpayne@68 1424 public boolean errorState=false;
jpayne@68 1425 /** Overwrite existing output files */
jpayne@68 1426 private boolean overwrite=false;
jpayne@68 1427 /** Append to existing output files */
jpayne@68 1428 private boolean append=false;
jpayne@68 1429
jpayne@68 1430 }