annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/consensus/ConsensusMaker.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 package consensus;
jpayne@68 2
jpayne@68 3 import java.io.PrintStream;
jpayne@68 4 import java.util.ArrayList;
jpayne@68 5 import java.util.LinkedHashMap;
jpayne@68 6 import java.util.Map.Entry;
jpayne@68 7
jpayne@68 8 import fileIO.ByteFile;
jpayne@68 9 import fileIO.ByteStreamWriter;
jpayne@68 10 import fileIO.FileFormat;
jpayne@68 11 import fileIO.ReadWrite;
jpayne@68 12 import prok.ProkObject;
jpayne@68 13 import shared.Parse;
jpayne@68 14 import shared.Parser;
jpayne@68 15 import shared.PreParser;
jpayne@68 16 import shared.ReadStats;
jpayne@68 17 import shared.Shared;
jpayne@68 18 import shared.Timer;
jpayne@68 19 import shared.Tools;
jpayne@68 20 import sketch.SketchObject;
jpayne@68 21 import stream.ConcurrentReadInputStream;
jpayne@68 22 import stream.ConcurrentReadOutputStream;
jpayne@68 23 import stream.FASTQ;
jpayne@68 24 import stream.FastaReadInputStream;
jpayne@68 25 import stream.Read;
jpayne@68 26 import stream.SamLine;
jpayne@68 27 import stream.SamReadStreamer;
jpayne@68 28 import stream.SamStreamer;
jpayne@68 29 import structures.ByteBuilder;
jpayne@68 30 import structures.ListNum;
jpayne@68 31 import template.Accumulator;
jpayne@68 32 import template.ThreadWaiter;
jpayne@68 33 import var2.Realigner;
jpayne@68 34 import var2.SamFilter;
jpayne@68 35
jpayne@68 36 /**
jpayne@68 37 * Alters a reference to represent the consensus of aligned reads.
jpayne@68 38 *
jpayne@68 39 * @author Brian Bushnell
jpayne@68 40 * @date September 6, 1019
jpayne@68 41 *
jpayne@68 42 */
jpayne@68 43 public class ConsensusMaker extends ConsensusObject implements Accumulator<ConsensusMaker.ProcessThread> {
jpayne@68 44
jpayne@68 45 /*--------------------------------------------------------------*/
jpayne@68 46 /*---------------- Initialization ----------------*/
jpayne@68 47 /*--------------------------------------------------------------*/
jpayne@68 48
jpayne@68 49 /**
jpayne@68 50 * Code entrance from the command line.
jpayne@68 51 * @param args Command line arguments
jpayne@68 52 */
jpayne@68 53 public static void main(String[] args){
jpayne@68 54 //Start a timer immediately upon code entrance.
jpayne@68 55 Timer t=new Timer();
jpayne@68 56
jpayne@68 57 //Create an instance of this class
jpayne@68 58 ConsensusMaker x=new ConsensusMaker(args);
jpayne@68 59
jpayne@68 60 //Run the object
jpayne@68 61 x.process(t);
jpayne@68 62
jpayne@68 63 //Close the print stream if it was redirected
jpayne@68 64 Shared.closeStream(x.outstream);
jpayne@68 65 }
jpayne@68 66
jpayne@68 67 /**
jpayne@68 68 * Constructor.
jpayne@68 69 * @param args Command line arguments
jpayne@68 70 */
jpayne@68 71 public ConsensusMaker(String[] args){
jpayne@68 72
jpayne@68 73 {//Preparse block for help, config files, and outstream
jpayne@68 74 PreParser pp=new PreParser(args, getClass(), false);
jpayne@68 75 args=pp.args;
jpayne@68 76 outstream=pp.outstream;
jpayne@68 77 }
jpayne@68 78
jpayne@68 79 //Set shared static variables prior to parsing
jpayne@68 80 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
jpayne@68 81 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
jpayne@68 82 FASTQ.TEST_INTERLEAVED=FASTQ.FORCE_INTERLEAVED=false;
jpayne@68 83
jpayne@68 84 samFilter.includeUnmapped=false;
jpayne@68 85 // samFilter.includeSupplimentary=false;
jpayne@68 86 // samFilter.includeDuplicate=false;
jpayne@68 87 samFilter.includeNonPrimary=false;
jpayne@68 88 samFilter.includeQfail=false;
jpayne@68 89 // samFilter.minMapq=4;
jpayne@68 90
jpayne@68 91 {//Parse the arguments
jpayne@68 92 final Parser parser=parse(args);
jpayne@68 93
jpayne@68 94 Parser.processQuality();
jpayne@68 95
jpayne@68 96 maxReads=parser.maxReads;
jpayne@68 97 overwrite=ReadStats.overwrite=parser.overwrite;
jpayne@68 98 append=ReadStats.append=parser.append;
jpayne@68 99
jpayne@68 100 in=parser.in1;
jpayne@68 101 extin=parser.extin;
jpayne@68 102
jpayne@68 103 out=parser.out1;
jpayne@68 104 extout=parser.extout;
jpayne@68 105 silent=Parser.silent;
jpayne@68 106 }
jpayne@68 107
jpayne@68 108 {
jpayne@68 109 // if("auto".equalsIgnoreCase(atomic)){Scaffold.setCA3A(Shared.threads()>8);}
jpayne@68 110 // else{Scaffold.setCA3A(Parse.parseBoolean(atomic));}
jpayne@68 111
jpayne@68 112 if(ploidy<1){System.err.println("WARNING: ploidy not set; assuming ploidy=1."); ploidy=1;}
jpayne@68 113 samFilter.setSamtoolsFilter();
jpayne@68 114
jpayne@68 115 streamerThreads=Tools.max(1, Tools.min(streamerThreads, Shared.threads()));
jpayne@68 116 assert(streamerThreads>0) : streamerThreads;
jpayne@68 117 }
jpayne@68 118
jpayne@68 119 validateParams();
jpayne@68 120 fixExtensions(); //Add or remove .gz or .bz2 as needed
jpayne@68 121 checkFileExistence(); //Ensure files can be read and written
jpayne@68 122 checkStatics(); //Adjust file-related static fields as needed for this program
jpayne@68 123
jpayne@68 124 //Create output FileFormat objects
jpayne@68 125 ffout=FileFormat.testOutput(out, FileFormat.FASTA, extout, true, overwrite, append, ordered);
jpayne@68 126 ffmodel=FileFormat.testOutput(outModel, FileFormat.ALM, null, true, overwrite, false, ordered);
jpayne@68 127
jpayne@68 128 //Create input FileFormat objects
jpayne@68 129 ffin=FileFormat.testInput(in, FileFormat.SAM, extin, true, true);
jpayne@68 130 ffref=FileFormat.testInput(ref, FileFormat.FASTA, null, true, true);
jpayne@68 131
jpayne@68 132 if(inModelFile!=null){
jpayne@68 133 ArrayList<BaseGraph> list=(ArrayList<BaseGraph>)ReadWrite.readObject(inModelFile, false);
jpayne@68 134 inModel=list.get(0);
jpayne@68 135 inModel.calcProbs();
jpayne@68 136 inModel.makeWeights();
jpayne@68 137 }
jpayne@68 138 }
jpayne@68 139
jpayne@68 140 /*--------------------------------------------------------------*/
jpayne@68 141 /*---------------- Initialization Helpers ----------------*/
jpayne@68 142 /*--------------------------------------------------------------*/
jpayne@68 143
jpayne@68 144 /** Parse arguments from the command line */
jpayne@68 145 private Parser parse(String[] args){
jpayne@68 146
jpayne@68 147 //Create a parser object
jpayne@68 148 Parser parser=new Parser();
jpayne@68 149
jpayne@68 150 //Set any necessary Parser defaults here
jpayne@68 151 //parser.foo=bar;
jpayne@68 152
jpayne@68 153 //Parse each argument
jpayne@68 154 for(int i=0; i<args.length; i++){
jpayne@68 155 String arg=args[i];
jpayne@68 156
jpayne@68 157 //Break arguments into their constituent parts, in the form of "a=b"
jpayne@68 158 String[] split=arg.split("=");
jpayne@68 159 String a=split[0].toLowerCase();
jpayne@68 160 String b=split.length>1 ? split[1] : null;
jpayne@68 161 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
jpayne@68 162
jpayne@68 163 if(a.equals("verbose")){
jpayne@68 164 verbose=Parse.parseBoolean(b);
jpayne@68 165 BaseGraph.verbose=verbose;
jpayne@68 166 }else if(a.equals("ref")){
jpayne@68 167 ref=b;
jpayne@68 168 }else if(a.equals("outm") || a.equals("outmodel") || a.equals("model") || a.equals("alm")){
jpayne@68 169 outModel=b;
jpayne@68 170 }else if(a.equals("inm") || a.equals("inmodel")){
jpayne@68 171 inModelFile=b;
jpayne@68 172 }else if(a.equals("hist") || a.equals("histogram")){
jpayne@68 173 outHistFile=b;
jpayne@68 174 }else if(a.equals("realign")){
jpayne@68 175 realign=Parse.parseBoolean(b);
jpayne@68 176 }else if(a.equals("printscores")){
jpayne@68 177 printScores=Parse.parseBoolean(b);
jpayne@68 178 }else if(a.equalsIgnoreCase("useMapq")){
jpayne@68 179 useMapq=Parse.parseBoolean(b);
jpayne@68 180 }else if(a.equalsIgnoreCase("identityCeiling") || a.equalsIgnoreCase("idceiling") || a.equalsIgnoreCase("ceiling")){
jpayne@68 181 double d=Double.parseDouble(b);
jpayne@68 182 if(d<=2){d=d*100;}
jpayne@68 183 identityCeiling=(int)d;
jpayne@68 184 invertIdentity=true;
jpayne@68 185 }else if(a.equalsIgnoreCase("invertIdentity")){
jpayne@68 186 invertIdentity=Parse.parseBoolean(b);
jpayne@68 187 }else if(a.equals("name") || a.equals("rename") || a.equals("header")){
jpayne@68 188 name=b;
jpayne@68 189 }else if(a.equals("noindels")){
jpayne@68 190 noIndels=Parse.parseBoolean(b);
jpayne@68 191 }else if(a.equalsIgnoreCase("onlyConvertNs") || a.equalsIgnoreCase("nOnly") || a.equalsIgnoreCase("onlyN")){
jpayne@68 192 onlyConvertNs=Parse.parseBoolean(b);
jpayne@68 193 }else if(a.equals("ploidy")){
jpayne@68 194 ploidy=Integer.parseInt(b);
jpayne@68 195 }else if(a.equals("mindepth")){
jpayne@68 196 minDepth=Integer.parseInt(b);
jpayne@68 197 }else if(a.equals("trimdepth") || a.equals("trimdepthfraction")){
jpayne@68 198 trimDepthFraction=Float.parseFloat(b);
jpayne@68 199 }else if(a.equalsIgnoreCase("trimNs")){
jpayne@68 200 trimNs=Parse.parseBoolean(b);
jpayne@68 201 }else if(a.equalsIgnoreCase("mafn") || a.equals("mafnoref")){
jpayne@68 202 MAF_noref=Float.parseFloat(b);
jpayne@68 203 }else if(a.equalsIgnoreCase("mafsub")){
jpayne@68 204 MAF_sub=Float.parseFloat(b);
jpayne@68 205 }else if(a.equalsIgnoreCase("mafdel")){
jpayne@68 206 MAF_del=Float.parseFloat(b);
jpayne@68 207 }else if(a.equalsIgnoreCase("mafins")){
jpayne@68 208 MAF_ins=Float.parseFloat(b);
jpayne@68 209 }else if(a.equalsIgnoreCase("maf")){
jpayne@68 210 MAF_ins=MAF_noref=MAF_del=MAF_sub=Float.parseFloat(b);
jpayne@68 211 }else if(a.equalsIgnoreCase("mafindel")){
jpayne@68 212 MAF_ins=MAF_del=Float.parseFloat(b);
jpayne@68 213 }else if(a.equals("clearfilters")){
jpayne@68 214 if(Parse.parseBoolean(b)){
jpayne@68 215 samFilter.clear();
jpayne@68 216 }
jpayne@68 217 }else if(a.equals("parse_flag_goes_here")){
jpayne@68 218 long fake_variable=Parse.parseKMG(b);
jpayne@68 219 //Set a variable here
jpayne@68 220 }else if(samFilter.parse(arg, a, b)){
jpayne@68 221 //do nothing
jpayne@68 222 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
jpayne@68 223 //do nothing
jpayne@68 224 }else{
jpayne@68 225 outstream.println("Unknown parameter "+args[i]);
jpayne@68 226 assert(false) : "Unknown parameter "+args[i];
jpayne@68 227 }
jpayne@68 228 }
jpayne@68 229
jpayne@68 230 if(ref!=null && !Tools.existsInput(ref)){
jpayne@68 231 specialRef=ProkObject.isSpecialType(ref);
jpayne@68 232 }
jpayne@68 233
jpayne@68 234 return parser;
jpayne@68 235 }
jpayne@68 236
jpayne@68 237 /** Add or remove .gz or .bz2 as needed */
jpayne@68 238 private void fixExtensions(){
jpayne@68 239 in=Tools.fixExtension(in);
jpayne@68 240 if(!specialRef){ref=Tools.fixExtension(ref);}
jpayne@68 241 }
jpayne@68 242
jpayne@68 243 /** Ensure files can be read and written */
jpayne@68 244 private void checkFileExistence(){
jpayne@68 245
jpayne@68 246 //Ensure there is an input file
jpayne@68 247 if(in==null){throw new RuntimeException("Error - an input file is required.");}
jpayne@68 248
jpayne@68 249 //Ensure there is an input file
jpayne@68 250 if(ref==null){throw new RuntimeException("Error - a reference file is required.");}
jpayne@68 251
jpayne@68 252 //Ensure output files can be written
jpayne@68 253 if(!Tools.testOutputFiles(overwrite, append, false, out, outModel, outHistFile)){
jpayne@68 254 outstream.println((out==null)+", "+out);
jpayne@68 255 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out+" or "+outModel+"\n");
jpayne@68 256 }
jpayne@68 257
jpayne@68 258 //Ensure input files can be read
jpayne@68 259 if(!Tools.testInputFiles(true, true, in, inModelFile, (specialRef ? null : ref))){
jpayne@68 260 throw new RuntimeException("\nCan't read some input files.\n");
jpayne@68 261 }
jpayne@68 262
jpayne@68 263 //Ensure that no file was specified multiple times
jpayne@68 264 if(!Tools.testForDuplicateFiles(true, in, ref, out, outModel, outHistFile, inModelFile)){
jpayne@68 265 throw new RuntimeException("\nSome file names were specified multiple times.\n");
jpayne@68 266 }
jpayne@68 267 }
jpayne@68 268
jpayne@68 269 /** Adjust file-related static fields as needed for this program */
jpayne@68 270 private static void checkStatics(){
jpayne@68 271 //Adjust the number of threads for input file reading
jpayne@68 272 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
jpayne@68 273 ByteFile.FORCE_MODE_BF2=true;
jpayne@68 274 }
jpayne@68 275
jpayne@68 276 assert(FastaReadInputStream.settingsOK());
jpayne@68 277 }
jpayne@68 278
jpayne@68 279 /** Ensure parameter ranges are within bounds and required parameters are set */
jpayne@68 280 private boolean validateParams(){
jpayne@68 281 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
jpayne@68 282 return true;
jpayne@68 283 }
jpayne@68 284
jpayne@68 285 private void writeHist(String fname){
jpayne@68 286 ByteStreamWriter bsw=new ByteStreamWriter(fname, overwrite, append, false);
jpayne@68 287 bsw.start();
jpayne@68 288 bsw.print("#Value\tIdentity");
jpayne@68 289 if(inModel!=null){
jpayne@68 290 bsw.print("\tScore");
jpayne@68 291 }
jpayne@68 292 bsw.nl();
jpayne@68 293 for(int i=0; i<idHist.length; i++){
jpayne@68 294 bsw.print(0.01f*i, 2).tab().print(idHist[i]);
jpayne@68 295 if(inModel!=null){
jpayne@68 296 bsw.tab().print(scoreHist[i]);
jpayne@68 297 }
jpayne@68 298 bsw.nl();
jpayne@68 299 }
jpayne@68 300 errorState=bsw.poisonAndWait()|errorState;
jpayne@68 301 }
jpayne@68 302
jpayne@68 303 /*--------------------------------------------------------------*/
jpayne@68 304 /*---------------- Outer Methods ----------------*/
jpayne@68 305 /*--------------------------------------------------------------*/
jpayne@68 306
jpayne@68 307 /** Create read streams and process all data */
jpayne@68 308 void process(Timer t){
jpayne@68 309
jpayne@68 310 //Turn off read validation in the input threads to increase speed
jpayne@68 311 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
jpayne@68 312 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
jpayne@68 313
jpayne@68 314 //Load reference;
jpayne@68 315 refMap=loadReferenceCustom();
jpayne@68 316 makeRefMap2();
jpayne@68 317
jpayne@68 318 //Reset counters
jpayne@68 319 readsProcessed=readsOut=0;
jpayne@68 320 basesProcessed=basesOut=0;
jpayne@68 321 alignedReads=0;
jpayne@68 322
jpayne@68 323 if(ffin.samOrBam()){
jpayne@68 324 //Create a read input stream
jpayne@68 325 final SamStreamer ss=makeStreamer(ffin);
jpayne@68 326 //Process the reads in separate threads
jpayne@68 327 spawnThreads(ss);
jpayne@68 328 }else{
jpayne@68 329 Shared.capBufferLen(40);
jpayne@68 330 //Create a read input stream
jpayne@68 331 final ConcurrentReadInputStream cris=makeCris(ffin);
jpayne@68 332 //Process the reads in separate threads
jpayne@68 333 spawnThreads(cris);
jpayne@68 334 }
jpayne@68 335
jpayne@68 336 //Optionally create a read output stream
jpayne@68 337 final ConcurrentReadOutputStream ros=makeCros();
jpayne@68 338
jpayne@68 339 outputConsensus(ros);
jpayne@68 340
jpayne@68 341 if(outHistFile!=null){
jpayne@68 342 writeHist(outHistFile);
jpayne@68 343 }
jpayne@68 344
jpayne@68 345 if(verbose){outstream.println("Finished; closing streams.");}
jpayne@68 346
jpayne@68 347 //Write anything that was accumulated by ReadStats
jpayne@68 348 errorState|=ReadStats.writeAll();
jpayne@68 349 //Close the read streams
jpayne@68 350 errorState|=ReadWrite.closeStream(ros);
jpayne@68 351
jpayne@68 352 //Reset read validation
jpayne@68 353 Read.VALIDATE_IN_CONSTRUCTOR=vic;
jpayne@68 354
jpayne@68 355 //Report timing and results
jpayne@68 356 t.stop();
jpayne@68 357 if(!silent){
jpayne@68 358 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
jpayne@68 359 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
jpayne@68 360 outstream.println();
jpayne@68 361 }
jpayne@68 362 outstream.println(Tools.number("Ref Count:", refCount, 8));
jpayne@68 363 outstream.println(Tools.number("Sub Count:", subCount, 8));
jpayne@68 364 outstream.println(Tools.number("Del Count:", delCount, 8));
jpayne@68 365 outstream.println(Tools.number("Ins Count:", insCount, 8));
jpayne@68 366 outstream.println(Tools.number("Avg Identity:", 100*identitySum/alignedReads, 3, 8));
jpayne@68 367 if(scoreSum>0){
jpayne@68 368 outstream.println(Tools.number("Avg Score:", 100*scoreSum/alignedReads, 3, 8));
jpayne@68 369 }
jpayne@68 370
jpayne@68 371 //Throw an exception of there was an error in a thread
jpayne@68 372 if(errorState){
jpayne@68 373 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
jpayne@68 374 }
jpayne@68 375 }
jpayne@68 376
jpayne@68 377 private synchronized LinkedHashMap<String, BaseGraph> loadReferenceCustom(){
jpayne@68 378 assert(!loadedRef);
jpayne@68 379 if(specialRef){return loadReferenceSpecial();}
jpayne@68 380 ConcurrentReadInputStream cris=makeRefCris();
jpayne@68 381 LinkedHashMap<String, BaseGraph> map=new LinkedHashMap<String, BaseGraph>();
jpayne@68 382 ListNum<Read> ln=cris.nextList();
jpayne@68 383 for(; ln!=null && !ln.isEmpty(); ln=cris.nextList()){
jpayne@68 384 if(verbose){outstream.println("Fetched "+ln.size()+" reference sequences.");}
jpayne@68 385 for(Read r : ln){
jpayne@68 386 if(r.bases!=null && r.bases.length>0){
jpayne@68 387 BaseGraph bg=new BaseGraph(r.id, r.bases, r.quality, r.numericID, 0);
jpayne@68 388 map.put(r.name(), bg);
jpayne@68 389 // if(inModel!=null && inModel.name.equals(bg.name)){
jpayne@68 390 //// assert(false) : Arrays.toString(bg.refWeights)+"\n"+Arrays.toString(inModel.refWeights);
jpayne@68 391 // bg.refWeights=inModel.refWeights;
jpayne@68 392 // bg.insWeights=inModel.insWeights;
jpayne@68 393 // bg.delWeights=inModel.delWeights;
jpayne@68 394 // }else{
jpayne@68 395 // bg.insWeights=new float[bg.ref.length];
jpayne@68 396 // bg.delWeights=new float[bg.ref.length];
jpayne@68 397 // Arrays.fill(bg.insWeights, 1);
jpayne@68 398 // Arrays.fill(bg.delWeights, 1);
jpayne@68 399 //// assert(false) : Arrays.toString(bg.delWeights);
jpayne@68 400 // }
jpayne@68 401 }
jpayne@68 402 }
jpayne@68 403 if(ln!=null){
jpayne@68 404 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
jpayne@68 405 }
jpayne@68 406 }
jpayne@68 407 if(verbose){outstream.println("Loaded "+map.size()+" reference sequences.");}
jpayne@68 408 loadedRef=true;
jpayne@68 409 return map;
jpayne@68 410 }
jpayne@68 411
jpayne@68 412 //For ribo subunits in resources directory
jpayne@68 413 private synchronized LinkedHashMap<String, BaseGraph> loadReferenceSpecial(){
jpayne@68 414 assert(!loadedRef);
jpayne@68 415 Read[] array=ProkObject.loadConsensusSequenceType(ref, false, false);
jpayne@68 416 Read r=array[0];
jpayne@68 417 BaseGraph bg=new BaseGraph(r.id, r.bases, r.quality, r.numericID, 0);
jpayne@68 418 LinkedHashMap<String, BaseGraph> map=new LinkedHashMap<String, BaseGraph>();
jpayne@68 419 map.put(r.name(), bg);
jpayne@68 420 return map;
jpayne@68 421 }
jpayne@68 422
jpayne@68 423 private synchronized void makeRefMap2(){
jpayne@68 424 assert(refMap!=null && refMap2==null);
jpayne@68 425 if(verbose){outstream.println("Making refMap2.");}
jpayne@68 426 refMap2=new LinkedHashMap<String, BaseGraph>((refMap.size()*3)/2);
jpayne@68 427 for(Entry<String, BaseGraph> e : refMap.entrySet()){
jpayne@68 428 String key=e.getKey();
jpayne@68 429 if(verbose){outstream.println("Considering "+key);}
jpayne@68 430 BaseGraph value=e.getValue();
jpayne@68 431 String key2=Tools.trimToWhitespace(key);
jpayne@68 432 if(verbose){outstream.println("key2="+key2);}
jpayne@68 433 // if(verbose){outstream.println("put "+key2+", "+value);}
jpayne@68 434 refMap2.put(key2, value);
jpayne@68 435 // if(verbose){outstream.println("putted "+key2+", "+value);}
jpayne@68 436
jpayne@68 437 if(defaultRname==null){defaultRname=key;}
jpayne@68 438 }
jpayne@68 439 // assert(false) : refMap+"\n"+refMap2;
jpayne@68 440 if(verbose){outstream.println("Made refMap2.");}
jpayne@68 441 }
jpayne@68 442
jpayne@68 443 private ConcurrentReadInputStream makeRefCris(){
jpayne@68 444 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffref, null);
jpayne@68 445 cris.start(); //Start the stream
jpayne@68 446 if(verbose){outstream.println("Started cris");}
jpayne@68 447 boolean paired=cris.paired();
jpayne@68 448 assert(!paired) : "References should not be paired.";
jpayne@68 449 return cris;
jpayne@68 450 }
jpayne@68 451
jpayne@68 452 private SamStreamer makeStreamer(FileFormat ff){
jpayne@68 453 if(ff==null){return null;}
jpayne@68 454 SamStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads);
jpayne@68 455 ss.start(); //Start the stream
jpayne@68 456 if(verbose){outstream.println("Started Streamer");}
jpayne@68 457 return ss;
jpayne@68 458 }
jpayne@68 459
jpayne@68 460 private ConcurrentReadInputStream makeCris(FileFormat ff){
jpayne@68 461 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ff, null);
jpayne@68 462 cris.start(); //Start the stream
jpayne@68 463 if(verbose){outstream.println("Started cris");}
jpayne@68 464 boolean paired=cris.paired();
jpayne@68 465 assert(!paired);
jpayne@68 466 return cris;
jpayne@68 467 }
jpayne@68 468
jpayne@68 469 private ConcurrentReadOutputStream makeCros(){
jpayne@68 470 if(ffout==null){return null;}
jpayne@68 471
jpayne@68 472 //Select output buffer size based on whether it needs to be ordered
jpayne@68 473 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
jpayne@68 474
jpayne@68 475 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout, null, buff, null, false);
jpayne@68 476 ros.start(); //Start the stream
jpayne@68 477 return ros;
jpayne@68 478 }
jpayne@68 479
jpayne@68 480 /*--------------------------------------------------------------*/
jpayne@68 481 /*---------------- Thread Management ----------------*/
jpayne@68 482 /*--------------------------------------------------------------*/
jpayne@68 483
jpayne@68 484 /** Spawn process threads */
jpayne@68 485 private void spawnThreads(final SamStreamer ss){
jpayne@68 486
jpayne@68 487 //Do anything necessary prior to processing
jpayne@68 488
jpayne@68 489 //Determine how many threads may be used
jpayne@68 490 final int threads=Shared.threads();
jpayne@68 491
jpayne@68 492 //Fill a list with ProcessThreads
jpayne@68 493 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
jpayne@68 494 for(int i=0; i<threads; i++){
jpayne@68 495 alpt.add(new ProcessThread(ss, i));
jpayne@68 496 }
jpayne@68 497
jpayne@68 498 //Start the threads
jpayne@68 499 for(ProcessThread pt : alpt){
jpayne@68 500 pt.start();
jpayne@68 501 }
jpayne@68 502 if(verbose){outstream.println("Started worker threads.");}
jpayne@68 503
jpayne@68 504 //Wait for threads to finish
jpayne@68 505 boolean success=ThreadWaiter.waitForThreads(alpt, this);
jpayne@68 506 errorState&=!success;
jpayne@68 507
jpayne@68 508 //Do anything necessary after processing
jpayne@68 509
jpayne@68 510 }
jpayne@68 511
jpayne@68 512 /** Spawn process threads */
jpayne@68 513 private void spawnThreads(final ConcurrentReadInputStream cris){
jpayne@68 514
jpayne@68 515 //Do anything necessary prior to processing
jpayne@68 516
jpayne@68 517 //Determine how many threads may be used
jpayne@68 518 final int threads=Shared.threads();
jpayne@68 519
jpayne@68 520 //Fill a list with ProcessThreads
jpayne@68 521 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
jpayne@68 522 for(int i=0; i<threads; i++){
jpayne@68 523 alpt.add(new ProcessThread(cris, i));
jpayne@68 524 }
jpayne@68 525
jpayne@68 526 //Start the threads
jpayne@68 527 for(ProcessThread pt : alpt){
jpayne@68 528 pt.start();
jpayne@68 529 }
jpayne@68 530 if(verbose){outstream.println("Started worker threads.");}
jpayne@68 531
jpayne@68 532 //Wait for threads to finish
jpayne@68 533 boolean success=ThreadWaiter.waitForThreads(alpt, this);
jpayne@68 534 errorState&=!success;
jpayne@68 535
jpayne@68 536 //Do anything necessary after processing
jpayne@68 537 ReadWrite.closeStreams(cris);
jpayne@68 538 }
jpayne@68 539
jpayne@68 540 private void outputConsensus(ConcurrentReadOutputStream ros){
jpayne@68 541 if(verbose){outstream.println("Making consensus.");}
jpayne@68 542 ArrayList<Read> consensusList=new ArrayList<Read>(200);
jpayne@68 543 ArrayList<BaseGraph> graphList=new ArrayList<BaseGraph>(200);
jpayne@68 544 long num=0;
jpayne@68 545 for(Entry<String, BaseGraph> e : refMap.entrySet()){
jpayne@68 546 BaseGraph bg=e.getValue();
jpayne@68 547 graphList.add(bg);
jpayne@68 548
jpayne@68 549 Read r=bg.traverse();
jpayne@68 550 if(name!=null){r.id=name;}
jpayne@68 551 consensusList.add(r);
jpayne@68 552
jpayne@68 553 refCount+=bg.refCount;
jpayne@68 554 subCount+=bg.subCount;
jpayne@68 555 delCount+=bg.delCount;
jpayne@68 556 insCount+=bg.insCount;
jpayne@68 557
jpayne@68 558 readsOut++;
jpayne@68 559 basesOut+=r.length();
jpayne@68 560
jpayne@68 561 if(consensusList.size()>=200){
jpayne@68 562 if(ros!=null){ros.add(consensusList, num);}
jpayne@68 563 consensusList=new ArrayList<Read>(200);
jpayne@68 564 num++;
jpayne@68 565 }
jpayne@68 566 }
jpayne@68 567 if(consensusList.size()>0){
jpayne@68 568 if(ros!=null){ros.add(consensusList, num);}
jpayne@68 569 consensusList=new ArrayList<Read>(200);
jpayne@68 570 num++;
jpayne@68 571 }
jpayne@68 572 if(graphList.size()>0 && ffmodel!=null){
jpayne@68 573 // ReadWrite.writeObjectInThread(graphList, ffmodel.name(), true);
jpayne@68 574 ReadWrite.writeAsync(graphList, ffmodel.name(), true);
jpayne@68 575 }
jpayne@68 576 }
jpayne@68 577
jpayne@68 578 @Override
jpayne@68 579 public final void accumulate(ProcessThread pt){
jpayne@68 580 readsProcessed+=pt.readsProcessedT;
jpayne@68 581 basesProcessed+=pt.basesProcessedT;
jpayne@68 582 alignedReads+=pt.alignedReadsT;
jpayne@68 583 identitySum+=pt.identitySumT;
jpayne@68 584 scoreSum+=pt.scoreSumT;
jpayne@68 585 for(int i=0; i<idHist.length; i++){
jpayne@68 586 idHist[i]+=pt.idHistT[i];
jpayne@68 587 scoreHist[i]+=pt.scoreHistT[i];
jpayne@68 588 }
jpayne@68 589 errorState|=(!pt.success);
jpayne@68 590 }
jpayne@68 591
jpayne@68 592 @Override
jpayne@68 593 public final boolean success(){return !errorState;}
jpayne@68 594
jpayne@68 595 /*--------------------------------------------------------------*/
jpayne@68 596 /*---------------- Inner Methods ----------------*/
jpayne@68 597 /*--------------------------------------------------------------*/
jpayne@68 598
jpayne@68 599 @Override
jpayne@68 600 public ByteBuilder toText() {
jpayne@68 601 // TODO Auto-generated method stub
jpayne@68 602 return null;
jpayne@68 603 }
jpayne@68 604
jpayne@68 605 /*--------------------------------------------------------------*/
jpayne@68 606 /*---------------- Inner Classes ----------------*/
jpayne@68 607 /*--------------------------------------------------------------*/
jpayne@68 608
jpayne@68 609 /** This class is static to prevent accidental writing to shared variables.
jpayne@68 610 * It is safe to remove the static modifier. */
jpayne@68 611 class ProcessThread extends Thread {
jpayne@68 612
jpayne@68 613 //Constructor
jpayne@68 614 ProcessThread(final SamStreamer ss_, final int tid_){
jpayne@68 615 ss=ss_;
jpayne@68 616 cris=null;
jpayne@68 617 tid=tid_;
jpayne@68 618 realigner=(realign ? new Realigner() : null);
jpayne@68 619 }
jpayne@68 620
jpayne@68 621 //Constructor
jpayne@68 622 ProcessThread(final ConcurrentReadInputStream cris_, final int tid_){
jpayne@68 623 ss=null;
jpayne@68 624 cris=cris_;
jpayne@68 625 tid=tid_;
jpayne@68 626 realigner=null;//(realign ? new Realigner() : null);
jpayne@68 627 }
jpayne@68 628
jpayne@68 629 //Called by start()
jpayne@68 630 @Override
jpayne@68 631 public void run(){
jpayne@68 632 //Do anything necessary prior to processing
jpayne@68 633
jpayne@68 634 //Process the reads
jpayne@68 635 processInner();
jpayne@68 636
jpayne@68 637 //Do anything necessary after processing
jpayne@68 638
jpayne@68 639 //Indicate successful exit status
jpayne@68 640 success=true;
jpayne@68 641 }
jpayne@68 642
jpayne@68 643 /** Iterate through the reads */
jpayne@68 644 void processInner(){
jpayne@68 645
jpayne@68 646
jpayne@68 647 //Grab and process all lists
jpayne@68 648
jpayne@68 649 if(ss!=null){
jpayne@68 650 for(ListNum<Read> ln=ss.nextReads(); ln!=null; ln=ss.nextReads()){
jpayne@68 651 processList(ln);
jpayne@68 652 }
jpayne@68 653 }else{
jpayne@68 654 for(ListNum<Read> ln=cris.nextList(); ln!=null && ln.size()>0; ln=cris.nextList()){
jpayne@68 655 processList(ln);
jpayne@68 656 cris.returnList(ln);
jpayne@68 657 }
jpayne@68 658 }
jpayne@68 659
jpayne@68 660 }
jpayne@68 661
jpayne@68 662 void processList(ListNum<Read> ln){
jpayne@68 663
jpayne@68 664 //Loop through each read in the list
jpayne@68 665 for(Read r : ln){
jpayne@68 666
jpayne@68 667 //Validate reads in worker threads
jpayne@68 668 if(!r.validated()){r.validate(true);}
jpayne@68 669
jpayne@68 670 //Track the initial length for statistics
jpayne@68 671 final int initialLength=r.length();
jpayne@68 672
jpayne@68 673 //Increment counters
jpayne@68 674 readsProcessedT++;
jpayne@68 675 basesProcessedT+=initialLength;
jpayne@68 676
jpayne@68 677 {
jpayne@68 678 //Reads are processed in this block.
jpayne@68 679 processRead(r);
jpayne@68 680 }
jpayne@68 681 }
jpayne@68 682 }
jpayne@68 683
jpayne@68 684 /**
jpayne@68 685 * Process a read or a read pair.
jpayne@68 686 * @param r Read 1
jpayne@68 687 * @param r2 Read 2 (may be null)
jpayne@68 688 * @return True if the reads should be kept, false if they should be discarded.
jpayne@68 689 */
jpayne@68 690 void processRead(final Read r){
jpayne@68 691 if(r.bases==null || r.length()<=1){return;}
jpayne@68 692
jpayne@68 693 final SamLine sl=r.samline;
jpayne@68 694 if(sl!=null && !sl.mapped()){return;}
jpayne@68 695 final String rname=(sl==null ? defaultRname : sl.rnameS());
jpayne@68 696 BaseGraph bg=refMap.get(rname);
jpayne@68 697 if(bg==null){bg=refMap2.get(Tools.trimToWhitespace(rname));}
jpayne@68 698 assert(bg!=null) : "Can't find graph for "+rname;
jpayne@68 699 float identity;
jpayne@68 700 float score=0;//unused
jpayne@68 701
jpayne@68 702 // assert(false) : r;
jpayne@68 703 if(sl==null){
jpayne@68 704 // identity=SketchObject.alignAndMakeMatch(r, bg.original);
jpayne@68 705 // assert(false) : bg.refWeights[0];
jpayne@68 706 // identity=SketchObject.alignAndMakeMatch(r, bg.original, bg.refWeights, bg.insWeights, bg.delWeights);
jpayne@68 707 identity=SketchObject.alignAndMakeMatch(r, bg.original, bg.refWeights);
jpayne@68 708 if(identity<=0){return;}
jpayne@68 709 assert(r.match!=null) : identity+", "+r;
jpayne@68 710 }else{
jpayne@68 711 assert(sl!=null && sl.mapped() && sl.seq!=null) : sl;
jpayne@68 712 assert(r.match!=null);
jpayne@68 713 if(samFilter!=null && !samFilter.passesFilter(sl)){return;}
jpayne@68 714 identity=sl.calcIdentity();
jpayne@68 715 }
jpayne@68 716 assert(r.match!=null && identity>0) : identity+", "+r;
jpayne@68 717
jpayne@68 718 if(inModel!=null){
jpayne@68 719
jpayne@68 720 if(verbose){System.err.println(r.start+"\t"+r.stop+"\t"+new String(r.match));}
jpayne@68 721 score=inModel.score(r, false, true);
jpayne@68 722 if(printScores){
jpayne@68 723 // System.err.println(identity+"\t"+score+"\t"+inModel.score(r, false, true));
jpayne@68 724 System.err.println(String.format("%.5f\t%.5f", identity, score));
jpayne@68 725 }
jpayne@68 726 // assert(false);
jpayne@68 727 // if(score<0){
jpayne@68 728 // verbose=true;
jpayne@68 729 // inModel.score(r, false, false);
jpayne@68 730 // }
jpayne@68 731 // assert(!verbose);
jpayne@68 732 }
jpayne@68 733
jpayne@68 734 alignedReadsT++;
jpayne@68 735 identitySumT+=identity;
jpayne@68 736 scoreSumT+=score;
jpayne@68 737 idHistT[Tools.mid(0, Math.round(100*identity), 100)]++;
jpayne@68 738 scoreHistT[Tools.mid(0, Math.round(100*score), 100)]++;
jpayne@68 739
jpayne@68 740
jpayne@68 741
jpayne@68 742 // assert(sl.calcIdentity()<=0.78) : sl.calcIdentity()+", "+samFilter.maxId;
jpayne@68 743 // assert(false) : sl.cigar+", "+sl.calcIdentity();
jpayne@68 744
jpayne@68 745 // System.err.println(rname);
jpayne@68 746
jpayne@68 747 if(realigner!=null) {
jpayne@68 748 assert(false);
jpayne@68 749 realigner.realign(r, sl, bg.original, true);
jpayne@68 750 }
jpayne@68 751
jpayne@68 752 bg.add(r);
jpayne@68 753 }
jpayne@68 754
jpayne@68 755 long[] idHistT=new long[101];
jpayne@68 756 long[] scoreHistT=new long[101];
jpayne@68 757
jpayne@68 758 /** Number of reads processed by this thread */
jpayne@68 759 protected long readsProcessedT=0;
jpayne@68 760 /** Number of bases processed by this thread */
jpayne@68 761 protected long basesProcessedT=0;
jpayne@68 762
jpayne@68 763 protected long alignedReadsT=0;
jpayne@68 764
jpayne@68 765 double identitySumT=0;
jpayne@68 766 double scoreSumT=0;
jpayne@68 767
jpayne@68 768 /** True only if this thread has completed successfully */
jpayne@68 769 boolean success=false;
jpayne@68 770
jpayne@68 771 /** Shared input stream */
jpayne@68 772 private final SamStreamer ss;
jpayne@68 773 /** Alternate input stream */
jpayne@68 774 private final ConcurrentReadInputStream cris;
jpayne@68 775 /** Thread ID */
jpayne@68 776 final int tid;
jpayne@68 777 /** For realigning reads */
jpayne@68 778 final Realigner realigner;
jpayne@68 779 }
jpayne@68 780
jpayne@68 781 /*--------------------------------------------------------------*/
jpayne@68 782 /*---------------- Fields ----------------*/
jpayne@68 783 /*--------------------------------------------------------------*/
jpayne@68 784
jpayne@68 785 /** Read input file path */
jpayne@68 786 private String in=null;
jpayne@68 787 /** Reference input file path */
jpayne@68 788 private String ref=null;
jpayne@68 789
jpayne@68 790 private String inModelFile;
jpayne@68 791 private BaseGraph inModel;
jpayne@68 792 private String outHistFile;
jpayne@68 793
jpayne@68 794 /** Consensus output file path */
jpayne@68 795 private String out=null;
jpayne@68 796 /** Model output file path */
jpayne@68 797 private String outModel=null;
jpayne@68 798
jpayne@68 799 /** Override input file extension */
jpayne@68 800 private String extin=null;
jpayne@68 801 /** Override output file extension */
jpayne@68 802 private String extout=null;
jpayne@68 803
jpayne@68 804 /*--------------------------------------------------------------*/
jpayne@68 805
jpayne@68 806 /** Number of reads processed */
jpayne@68 807 protected long readsProcessed=0;
jpayne@68 808 /** Number of bases processed */
jpayne@68 809 protected long basesProcessed=0;
jpayne@68 810
jpayne@68 811 protected long alignedReads=0;
jpayne@68 812
jpayne@68 813 protected double identitySum=0;
jpayne@68 814 protected double scoreSum=0;
jpayne@68 815
jpayne@68 816 /** Number of reads retained */
jpayne@68 817 protected long readsOut=0;
jpayne@68 818 /** Number of bases retained */
jpayne@68 819 protected long basesOut=0;
jpayne@68 820
jpayne@68 821 /** Quit after processing this many input reads; -1 means no limit */
jpayne@68 822 private long maxReads=-1;
jpayne@68 823
jpayne@68 824 public long subCount=0;
jpayne@68 825 public long refCount=0;
jpayne@68 826 public long delCount=0;
jpayne@68 827 public long insCount=0;
jpayne@68 828
jpayne@68 829 long[] idHist=new long[101];
jpayne@68 830 long[] scoreHist=new long[101];
jpayne@68 831 boolean printScores=false;
jpayne@68 832
jpayne@68 833 /*--------------------------------------------------------------*/
jpayne@68 834
jpayne@68 835 /** Threads dedicated to reading the sam file */
jpayne@68 836 private int streamerThreads=SamStreamer.DEFAULT_THREADS;
jpayne@68 837
jpayne@68 838 private String name=null;
jpayne@68 839
jpayne@68 840 private boolean loadedRef=false;
jpayne@68 841 private boolean specialRef=false;
jpayne@68 842
jpayne@68 843 private boolean realign=false;
jpayne@68 844
jpayne@68 845 private int ploidy=1;
jpayne@68 846
jpayne@68 847 private final boolean silent;
jpayne@68 848
jpayne@68 849 public final SamFilter samFilter=new SamFilter();
jpayne@68 850 /** Uses full ref names */
jpayne@68 851 public LinkedHashMap<String, BaseGraph> refMap;
jpayne@68 852 /** Uses truncated ref names */
jpayne@68 853 public LinkedHashMap<String, BaseGraph> refMap2;
jpayne@68 854
jpayne@68 855 private String defaultRname=null;
jpayne@68 856
jpayne@68 857 /*--------------------------------------------------------------*/
jpayne@68 858 /*---------------- Final Fields ----------------*/
jpayne@68 859 /*--------------------------------------------------------------*/
jpayne@68 860
jpayne@68 861 /** Read input file */
jpayne@68 862 private final FileFormat ffin;
jpayne@68 863 /** Reference input file */
jpayne@68 864 private final FileFormat ffref;
jpayne@68 865
jpayne@68 866 /** Consensus output file */
jpayne@68 867 private final FileFormat ffout;
jpayne@68 868 /** Model output file */
jpayne@68 869 private final FileFormat ffmodel;
jpayne@68 870
jpayne@68 871 /*--------------------------------------------------------------*/
jpayne@68 872 /*---------------- Common Fields ----------------*/
jpayne@68 873 /*--------------------------------------------------------------*/
jpayne@68 874
jpayne@68 875 /** Print status messages to this output stream */
jpayne@68 876 private PrintStream outstream=System.err;
jpayne@68 877 /** True if an error was encountered */
jpayne@68 878 public boolean errorState=false;
jpayne@68 879 /** Overwrite existing output files */
jpayne@68 880 private boolean overwrite=false;
jpayne@68 881 /** Append to existing output files */
jpayne@68 882 private boolean append=false;
jpayne@68 883 /** Reads ARE output in input order, even though this is false */
jpayne@68 884 private final boolean ordered=false;
jpayne@68 885
jpayne@68 886 }