annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/consensus/FixScaffoldGaps.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 package consensus;
jpayne@68 2
jpayne@68 3 import java.io.PrintStream;
jpayne@68 4 import java.util.ArrayList;
jpayne@68 5 import java.util.LinkedHashMap;
jpayne@68 6 import java.util.Map.Entry;
jpayne@68 7 import java.util.concurrent.atomic.AtomicIntegerArray;
jpayne@68 8 import java.util.concurrent.atomic.AtomicLongArray;
jpayne@68 9
jpayne@68 10 import fileIO.ByteFile;
jpayne@68 11 import fileIO.FileFormat;
jpayne@68 12 import fileIO.ReadWrite;
jpayne@68 13 import shared.Parse;
jpayne@68 14 import shared.Parser;
jpayne@68 15 import shared.PreParser;
jpayne@68 16 import shared.ReadStats;
jpayne@68 17 import shared.Shared;
jpayne@68 18 import shared.Timer;
jpayne@68 19 import shared.Tools;
jpayne@68 20 import stream.ConcurrentReadInputStream;
jpayne@68 21 import stream.ConcurrentReadOutputStream;
jpayne@68 22 import stream.FastaReadInputStream;
jpayne@68 23 import stream.Read;
jpayne@68 24 import stream.SamLine;
jpayne@68 25 import stream.SamReadStreamer;
jpayne@68 26 import stream.SamStreamer;
jpayne@68 27 import structures.ByteBuilder;
jpayne@68 28 import structures.ListNum;
jpayne@68 29 import template.Accumulator;
jpayne@68 30 import template.ThreadWaiter;
jpayne@68 31 import var2.SamFilter;
jpayne@68 32
jpayne@68 33 /**
jpayne@68 34 * Resizes scaffold gaps to represent the best estimate
jpayne@68 35 * based on the insert size distribution of paired reads.
jpayne@68 36 *
jpayne@68 37 * @author Brian Bushnell
jpayne@68 38 * @date September 11, 2019
jpayne@68 39 *
jpayne@68 40 */
jpayne@68 41 public class FixScaffoldGaps implements Accumulator<FixScaffoldGaps.ProcessThread> {
jpayne@68 42
jpayne@68 43 /*--------------------------------------------------------------*/
jpayne@68 44 /*---------------- Initialization ----------------*/
jpayne@68 45 /*--------------------------------------------------------------*/
jpayne@68 46
jpayne@68 47 /**
jpayne@68 48 * Code entrance from the command line.
jpayne@68 49 * @param args Command line arguments
jpayne@68 50 */
jpayne@68 51 public static void main(String[] args){
jpayne@68 52 //Start a timer immediately upon code entrance.
jpayne@68 53 Timer t=new Timer();
jpayne@68 54
jpayne@68 55 //Create an instance of this class
jpayne@68 56 FixScaffoldGaps x=new FixScaffoldGaps(args);
jpayne@68 57
jpayne@68 58 //Run the object
jpayne@68 59 x.process(t);
jpayne@68 60
jpayne@68 61 //Close the print stream if it was redirected
jpayne@68 62 Shared.closeStream(x.outstream);
jpayne@68 63 }
jpayne@68 64
jpayne@68 65 /**
jpayne@68 66 * Constructor.
jpayne@68 67 * @param args Command line arguments
jpayne@68 68 */
jpayne@68 69 public FixScaffoldGaps(String[] args){
jpayne@68 70
jpayne@68 71 {//Preparse block for help, config files, and outstream
jpayne@68 72 PreParser pp=new PreParser(args, getClass(), false);
jpayne@68 73 args=pp.args;
jpayne@68 74 outstream=pp.outstream;
jpayne@68 75 }
jpayne@68 76
jpayne@68 77 //Set shared static variables prior to parsing
jpayne@68 78 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
jpayne@68 79 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
jpayne@68 80
jpayne@68 81 samFilter.includeUnmapped=false;
jpayne@68 82 samFilter.includeSupplimentary=false;
jpayne@68 83 // samFilter.includeDuplicate=false;
jpayne@68 84 samFilter.includeNonPrimary=false;
jpayne@68 85 samFilter.includeQfail=false;
jpayne@68 86 // samFilter.minMapq=4;
jpayne@68 87
jpayne@68 88 {//Parse the arguments
jpayne@68 89 final Parser parser=parse(args);
jpayne@68 90
jpayne@68 91 Parser.processQuality();
jpayne@68 92
jpayne@68 93 maxReads=parser.maxReads;
jpayne@68 94 overwrite=ReadStats.overwrite=parser.overwrite;
jpayne@68 95 append=ReadStats.append=parser.append;
jpayne@68 96
jpayne@68 97 in=parser.in1;
jpayne@68 98 extin=parser.extin;
jpayne@68 99
jpayne@68 100 out=parser.out1;
jpayne@68 101 extout=parser.extout;
jpayne@68 102 }
jpayne@68 103
jpayne@68 104 {
jpayne@68 105 // if("auto".equalsIgnoreCase(atomic)){Scaffold.setCA3A(Shared.threads()>8);}
jpayne@68 106 // else{Scaffold.setCA3A(Parse.parseBoolean(atomic));}
jpayne@68 107 samFilter.setSamtoolsFilter();
jpayne@68 108
jpayne@68 109 streamerThreads=Tools.max(1, Tools.min(streamerThreads, Shared.threads()));
jpayne@68 110 assert(streamerThreads>0) : streamerThreads;
jpayne@68 111 }
jpayne@68 112
jpayne@68 113 validateParams();
jpayne@68 114 fixExtensions(); //Add or remove .gz or .bz2 as needed
jpayne@68 115 checkFileExistence(); //Ensure files can be read and written
jpayne@68 116 checkStatics(); //Adjust file-related static fields as needed for this program
jpayne@68 117
jpayne@68 118 //Create output FileFormat objects
jpayne@68 119 ffout=FileFormat.testOutput(out, FileFormat.FASTA, extout, true, overwrite, append, ordered);
jpayne@68 120
jpayne@68 121 //Create input FileFormat objects
jpayne@68 122 ffin=FileFormat.testInput(in, FileFormat.SAM, extin, true, true);
jpayne@68 123 ffref=FileFormat.testInput(ref, FileFormat.FASTA, null, true, true);
jpayne@68 124 }
jpayne@68 125
jpayne@68 126 /*--------------------------------------------------------------*/
jpayne@68 127 /*---------------- Initialization Helpers ----------------*/
jpayne@68 128 /*--------------------------------------------------------------*/
jpayne@68 129
jpayne@68 130 /** Parse arguments from the command line */
jpayne@68 131 private Parser parse(String[] args){
jpayne@68 132
jpayne@68 133 //Create a parser object
jpayne@68 134 Parser parser=new Parser();
jpayne@68 135
jpayne@68 136 //Set any necessary Parser defaults here
jpayne@68 137 //parser.foo=bar;
jpayne@68 138
jpayne@68 139 //Parse each argument
jpayne@68 140 for(int i=0; i<args.length; i++){
jpayne@68 141 String arg=args[i];
jpayne@68 142
jpayne@68 143 //Break arguments into their constituent parts, in the form of "a=b"
jpayne@68 144 String[] split=arg.split("=");
jpayne@68 145 String a=split[0].toLowerCase();
jpayne@68 146 String b=split.length>1 ? split[1] : null;
jpayne@68 147 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
jpayne@68 148
jpayne@68 149 if(a.equals("verbose")){
jpayne@68 150 verbose=Parse.parseBoolean(b);
jpayne@68 151 }else if(a.equals("ref") || a.equals("scaffolds")){
jpayne@68 152 ref=b;
jpayne@68 153 }else if(a.equals("insertlist")){
jpayne@68 154 insertList=b;
jpayne@68 155 }else if(a.equals("ordered")){
jpayne@68 156 ordered=Parse.parseBoolean(b);
jpayne@68 157 }else if(a.equalsIgnoreCase("ns") || a.equalsIgnoreCase("n") || a.equalsIgnoreCase("scaffoldbreak") || a.equalsIgnoreCase("gap")){
jpayne@68 158 scaffoldBreakNs=Integer.parseInt(b);
jpayne@68 159 assert(scaffoldBreakNs>0);
jpayne@68 160 }else if(a.equalsIgnoreCase("mindepth")){
jpayne@68 161 minDepth=Integer.parseInt(b);
jpayne@68 162 assert(minDepth>0);
jpayne@68 163 }else if(a.equalsIgnoreCase("trim") || a.equalsIgnoreCase("trimFraction") || a.equalsIgnoreCase("border")){
jpayne@68 164 trimFraction=Float.parseFloat(b);
jpayne@68 165 assert(trimFraction>=0 && trimFraction<=1) : "trimFraction should be between 0 and 1";
jpayne@68 166 }else if(a.equals("clearfilters") || a.equals("clearfilter")){
jpayne@68 167 if(Parse.parseBoolean(b)){
jpayne@68 168 samFilter.clear();
jpayne@68 169 }
jpayne@68 170 }else if(a.equals("parse_flag_goes_here")){
jpayne@68 171 long fake_variable=Parse.parseKMG(b);
jpayne@68 172 //Set a variable here
jpayne@68 173 }else if(samFilter.parse(arg, a, b)){
jpayne@68 174 //do nothing
jpayne@68 175 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
jpayne@68 176 //do nothing
jpayne@68 177 }else{
jpayne@68 178 outstream.println("Unknown parameter "+args[i]);
jpayne@68 179 assert(false) : "Unknown parameter "+args[i];
jpayne@68 180 }
jpayne@68 181 }
jpayne@68 182
jpayne@68 183 return parser;
jpayne@68 184 }
jpayne@68 185
jpayne@68 186 /** Add or remove .gz or .bz2 as needed */
jpayne@68 187 private void fixExtensions(){
jpayne@68 188 in=Tools.fixExtension(in);
jpayne@68 189 ref=Tools.fixExtension(ref);
jpayne@68 190 }
jpayne@68 191
jpayne@68 192 /** Ensure files can be read and written */
jpayne@68 193 private void checkFileExistence(){
jpayne@68 194
jpayne@68 195 //Ensure there is an input file
jpayne@68 196 if(in==null){throw new RuntimeException("Error - an input file is required.");}
jpayne@68 197
jpayne@68 198 //Ensure there is an input file
jpayne@68 199 if(ref==null){throw new RuntimeException("Error - a reference file is required.");}
jpayne@68 200
jpayne@68 201 //Ensure output files can be written
jpayne@68 202 if(!Tools.testOutputFiles(overwrite, append, false, out)){
jpayne@68 203 outstream.println((out==null)+", "+out);
jpayne@68 204 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out+"\n");
jpayne@68 205 }
jpayne@68 206
jpayne@68 207 //Ensure input files can be read
jpayne@68 208 if(!Tools.testInputFiles(false, true, in, ref)){
jpayne@68 209 throw new RuntimeException("\nCan't read some input files.\n");
jpayne@68 210 }
jpayne@68 211
jpayne@68 212 //Ensure that no file was specified multiple times
jpayne@68 213 if(!Tools.testForDuplicateFiles(true, in, ref, out)){
jpayne@68 214 throw new RuntimeException("\nSome file names were specified multiple times.\n");
jpayne@68 215 }
jpayne@68 216 }
jpayne@68 217
jpayne@68 218 /** Adjust file-related static fields as needed for this program */
jpayne@68 219 private static void checkStatics(){
jpayne@68 220 //Adjust the number of threads for input file reading
jpayne@68 221 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
jpayne@68 222 ByteFile.FORCE_MODE_BF2=true;
jpayne@68 223 }
jpayne@68 224
jpayne@68 225 assert(FastaReadInputStream.settingsOK());
jpayne@68 226 }
jpayne@68 227
jpayne@68 228 /** Ensure parameter ranges are within bounds and required parameters are set */
jpayne@68 229 private boolean validateParams(){
jpayne@68 230 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
jpayne@68 231 return true;
jpayne@68 232 }
jpayne@68 233
jpayne@68 234 /*--------------------------------------------------------------*/
jpayne@68 235 /*---------------- Outer Methods ----------------*/
jpayne@68 236 /*--------------------------------------------------------------*/
jpayne@68 237
jpayne@68 238 /** Create read streams and process all data */
jpayne@68 239 void process(Timer t){
jpayne@68 240
jpayne@68 241 //Turn off read validation in the input threads to increase speed
jpayne@68 242 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
jpayne@68 243 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
jpayne@68 244
jpayne@68 245 //Create a read input stream
jpayne@68 246 final SamStreamer ss=makeStreamer(ffin);
jpayne@68 247
jpayne@68 248 //Load reference
jpayne@68 249 loadReferenceCustom();
jpayne@68 250
jpayne@68 251 //Reset counters
jpayne@68 252 readsProcessed=readsOut=0;
jpayne@68 253 basesProcessed=basesOut=0;
jpayne@68 254
jpayne@68 255 //Process the reads in separate threads
jpayne@68 256 spawnThreads(ss);
jpayne@68 257
jpayne@68 258 //Optionally create a read output stream
jpayne@68 259 final ConcurrentReadOutputStream ros=makeCros();
jpayne@68 260
jpayne@68 261 if(verbose){outstream.println("Fixing reference.");}
jpayne@68 262
jpayne@68 263 fixScaffolds(ros);
jpayne@68 264
jpayne@68 265 if(verbose){outstream.println("Finished; closing streams.");}
jpayne@68 266
jpayne@68 267 //Write anything that was accumulated by ReadStats
jpayne@68 268 errorState|=ReadStats.writeAll();
jpayne@68 269 //Close the read streams
jpayne@68 270 errorState|=ReadWrite.closeStream(ros);
jpayne@68 271
jpayne@68 272 //Reset read validation
jpayne@68 273 Read.VALIDATE_IN_CONSTRUCTOR=vic;
jpayne@68 274
jpayne@68 275 //Report timing and results
jpayne@68 276 t.stop();
jpayne@68 277 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
jpayne@68 278 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, scaffoldsOut, scaffoldLengthOut, 8, false));
jpayne@68 279
jpayne@68 280 outstream.println();
jpayne@68 281 outstream.println(Tools.number("Average Insert", totalAverageInsert, 2, 8));
jpayne@68 282 outstream.println(Tools.number("Gaps Unchanged", gapsUnchanged, 8));
jpayne@68 283 outstream.println(Tools.number("Gaps Widened ", gapsWidened, 8));
jpayne@68 284 outstream.println(Tools.number("Gaps Narrowed ", gapsNarrowed, 8));
jpayne@68 285 outstream.println(Tools.number("Ns Total ", nsTotal, 8));
jpayne@68 286 outstream.println(Tools.number("Ns Added ", nsAdded, 8));
jpayne@68 287 outstream.println(Tools.number("Ns Removed ", nsRemoved, 8));
jpayne@68 288
jpayne@68 289
jpayne@68 290 //Throw an exception of there was an error in a thread
jpayne@68 291 if(errorState){
jpayne@68 292 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
jpayne@68 293 }
jpayne@68 294 }
jpayne@68 295
jpayne@68 296 private synchronized void loadReferenceCustom(){
jpayne@68 297 assert(!loadedRef);
jpayne@68 298 ConcurrentReadInputStream cris=makeRefCris();
jpayne@68 299 for(ListNum<Read> ln=cris.nextList(); ln!=null && ln.size()>0; ln=cris.nextList()) {
jpayne@68 300 for(Read r : ln){
jpayne@68 301 String name=r.id;
jpayne@68 302 String name2=Tools.trimToWhitespace(r.id);
jpayne@68 303 Scaffold scaf=new Scaffold(name, r.bases, r.numericID);
jpayne@68 304 refMap.put(name, scaf);
jpayne@68 305 refMap2.put(name2, scaf);
jpayne@68 306 }
jpayne@68 307 }
jpayne@68 308 loadedRef=true;
jpayne@68 309 }
jpayne@68 310
jpayne@68 311 private ConcurrentReadInputStream makeRefCris(){
jpayne@68 312 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffref, null);
jpayne@68 313 cris.start(); //Start the stream
jpayne@68 314 if(verbose){outstream.println("Started cris");}
jpayne@68 315 boolean paired=cris.paired();
jpayne@68 316 assert(!paired) : "References should not be paired.";
jpayne@68 317 return cris;
jpayne@68 318 }
jpayne@68 319
jpayne@68 320 private SamStreamer makeStreamer(FileFormat ff){
jpayne@68 321 if(ff==null){return null;}
jpayne@68 322 SamStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads);
jpayne@68 323 ss.start(); //Start the stream
jpayne@68 324 if(verbose){outstream.println("Started Streamer");}
jpayne@68 325 return ss;
jpayne@68 326 }
jpayne@68 327
jpayne@68 328 private ConcurrentReadOutputStream makeCros(){
jpayne@68 329 if(ffout==null){return null;}
jpayne@68 330
jpayne@68 331 //Select output buffer size based on whether it needs to be ordered
jpayne@68 332 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
jpayne@68 333
jpayne@68 334 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout, null, buff, null, false);
jpayne@68 335 ros.start(); //Start the stream
jpayne@68 336 return ros;
jpayne@68 337 }
jpayne@68 338
jpayne@68 339 /*--------------------------------------------------------------*/
jpayne@68 340 /*---------------- Thread Management ----------------*/
jpayne@68 341 /*--------------------------------------------------------------*/
jpayne@68 342
jpayne@68 343 /** Spawn process threads */
jpayne@68 344 private void spawnThreads(final SamStreamer ss){
jpayne@68 345
jpayne@68 346 //Do anything necessary prior to processing
jpayne@68 347
jpayne@68 348 //Determine how many threads may be used
jpayne@68 349 final int threads=Shared.threads();
jpayne@68 350
jpayne@68 351 //Fill a list with ProcessThreads
jpayne@68 352 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
jpayne@68 353 for(int i=0; i<threads; i++){
jpayne@68 354 alpt.add(new ProcessThread(ss, i));
jpayne@68 355 }
jpayne@68 356
jpayne@68 357 //Start the threads
jpayne@68 358 for(ProcessThread pt : alpt){
jpayne@68 359 pt.start();
jpayne@68 360 }
jpayne@68 361
jpayne@68 362 //Wait for threads to finish
jpayne@68 363 boolean success=ThreadWaiter.waitForThreads(alpt, this);
jpayne@68 364 errorState&=!success;
jpayne@68 365
jpayne@68 366 //Do anything necessary after processing
jpayne@68 367 totalAverageInsert=totalInsertSum/(double)totalInsertCount;
jpayne@68 368 insertByPercentile=Tools.makeHistogram(insertCounts, buckets);
jpayne@68 369 }
jpayne@68 370
jpayne@68 371 @Override
jpayne@68 372 public final void accumulate(ProcessThread pt){
jpayne@68 373 readsProcessed+=pt.readsProcessedT;
jpayne@68 374 basesProcessed+=pt.basesProcessedT;
jpayne@68 375 readsOut+=pt.readsOutT;
jpayne@68 376 basesOut+=pt.basesOutT;
jpayne@68 377
jpayne@68 378 totalInsertSum+=pt.totalInsertSumT;
jpayne@68 379 totalInsertCount+=pt.totalInsertCountT;
jpayne@68 380
jpayne@68 381 errorState|=(!pt.success);
jpayne@68 382 }
jpayne@68 383
jpayne@68 384 @Override
jpayne@68 385 public final boolean success(){return !errorState;}
jpayne@68 386
jpayne@68 387 /*--------------------------------------------------------------*/
jpayne@68 388 /*---------------- Inner Methods ----------------*/
jpayne@68 389 /*--------------------------------------------------------------*/
jpayne@68 390
jpayne@68 391 private void fixScaffolds(ConcurrentReadOutputStream ros){
jpayne@68 392 ByteBuilder bb=new ByteBuilder(1000000);
jpayne@68 393
jpayne@68 394 ArrayList<Read> list=new ArrayList<Read>(200);
jpayne@68 395 long num=0;
jpayne@68 396 long lengthSum=0;
jpayne@68 397 for(Entry<String, Scaffold> e : refMap.entrySet()){
jpayne@68 398 Scaffold scaf=e.getValue();
jpayne@68 399 Read r=scaf.fixScaffold(bb);
jpayne@68 400 lengthSum+=r.length();
jpayne@68 401 list.add(r);
jpayne@68 402 scaffoldsOut++;
jpayne@68 403 scaffoldLengthOut+=r.length();
jpayne@68 404
jpayne@68 405 if(list.size()>=200 || lengthSum>=100000){
jpayne@68 406 if(ros!=null){ros.add(list, num);}
jpayne@68 407 list=new ArrayList<Read>(200);
jpayne@68 408 num++;
jpayne@68 409 lengthSum=0;
jpayne@68 410 }
jpayne@68 411 }
jpayne@68 412 if(list.size()>0){
jpayne@68 413 if(ros!=null){ros.add(list, num);}
jpayne@68 414 }
jpayne@68 415 }
jpayne@68 416
jpayne@68 417 private static int calcInsertSize(SamLine sl) {
jpayne@68 418 assert(sl.mapped() && sl.pairedOnSameChrom());
jpayne@68 419 assert(sl.primary());
jpayne@68 420 assert(!sl.supplementary());
jpayne@68 421 assert(sl.leftmost());
jpayne@68 422
jpayne@68 423 assert(sl.tlen>0) : sl.tlen+"\n\n"+sl;
jpayne@68 424 return sl.tlen>0 ? sl.tlen : -sl.tlen;
jpayne@68 425
jpayne@68 426 // final int insertSize;
jpayne@68 427 // String insertTag=null;
jpayne@68 428 // if(sl.optional!=null){
jpayne@68 429 // for(String s : sl.optional){
jpayne@68 430 // if(s.startsWith("X8:Z:")){
jpayne@68 431 // insertTag=s;
jpayne@68 432 // break;
jpayne@68 433 // }
jpayne@68 434 // }
jpayne@68 435 // }
jpayne@68 436 // if(insertTag!=null){
jpayne@68 437 // insertSize=Integer.parseInt(insertTag.substring(5));
jpayne@68 438 // }else{
jpayne@68 439 // insertSize=sl.tlen;//This is unsafe due to indels.
jpayne@68 440 // assert(false) : "Reads need insert size tags.";
jpayne@68 441 // }
jpayne@68 442 // assert(insertSize>0) : sl;
jpayne@68 443 //
jpayne@68 444 // return insertSize;
jpayne@68 445 }
jpayne@68 446
jpayne@68 447 /*--------------------------------------------------------------*/
jpayne@68 448 /*---------------- Inner Classes ----------------*/
jpayne@68 449 /*--------------------------------------------------------------*/
jpayne@68 450
jpayne@68 451 /** This class is static to prevent accidental writing to shared variables.
jpayne@68 452 * It is safe to remove the static modifier. */
jpayne@68 453 class ProcessThread extends Thread {
jpayne@68 454
jpayne@68 455 //Constructor
jpayne@68 456 ProcessThread(final SamStreamer ss_, final int tid_){
jpayne@68 457 ss=ss_;
jpayne@68 458 tid=tid_;
jpayne@68 459 }
jpayne@68 460
jpayne@68 461 //Called by start()
jpayne@68 462 @Override
jpayne@68 463 public void run(){
jpayne@68 464 //Do anything necessary prior to processing
jpayne@68 465
jpayne@68 466 //Process the reads
jpayne@68 467 processInner();
jpayne@68 468
jpayne@68 469 //Do anything necessary after processing
jpayne@68 470
jpayne@68 471 //Indicate successful exit status
jpayne@68 472 success=true;
jpayne@68 473 }
jpayne@68 474
jpayne@68 475 /** Iterate through the reads */
jpayne@68 476 void processInner(){
jpayne@68 477
jpayne@68 478 //Grab and process all lists
jpayne@68 479 for(ListNum<Read> ln=ss.nextReads(); ln!=null; ln=ss.nextReads()){
jpayne@68 480 // if(verbose){outstream.println("Got list of size "+list.size());} //Disabled due to non-static access
jpayne@68 481
jpayne@68 482 processList(ln);
jpayne@68 483 }
jpayne@68 484
jpayne@68 485 }
jpayne@68 486
jpayne@68 487 void processList(ListNum<Read> ln){
jpayne@68 488
jpayne@68 489 //Grab the actual read list from the ListNum
jpayne@68 490 final ArrayList<Read> reads=ln.list;
jpayne@68 491
jpayne@68 492 //Loop through each read in the list
jpayne@68 493 for(int idx=0; idx<reads.size(); idx++){
jpayne@68 494 final Read r=reads.get(idx);
jpayne@68 495
jpayne@68 496 //Validate reads in worker threads
jpayne@68 497 if(!r.validated()){r.validate(true);}
jpayne@68 498
jpayne@68 499 //Track the initial length for statistics
jpayne@68 500 final int initialLength=r.length();
jpayne@68 501
jpayne@68 502 //Increment counters
jpayne@68 503 readsProcessedT+=r.pairCount();
jpayne@68 504 basesProcessedT+=initialLength;
jpayne@68 505
jpayne@68 506 processRead(r);
jpayne@68 507 }
jpayne@68 508 }
jpayne@68 509
jpayne@68 510 /**
jpayne@68 511 * Process a read or a read pair.
jpayne@68 512 * @param r Read 1
jpayne@68 513 * @param r2 Read 2 (may be null)
jpayne@68 514 * @return True if the reads should be kept, false if they should be discarded.
jpayne@68 515 */
jpayne@68 516 void processRead(final Read r){
jpayne@68 517 final SamLine sl=r.samline;
jpayne@68 518 assert(sl!=null) : sl;
jpayne@68 519 if(samFilter!=null && !samFilter.passesFilter(sl)){return;}
jpayne@68 520
jpayne@68 521 //sl.nextMapped();
jpayne@68 522 if(sl.mapped() && sl.pairedOnSameChrom() && sl.properPair() && sl.primary() && !sl.supplementary() && sl.leftmost()){
jpayne@68 523 final String rname=sl.rnameS();
jpayne@68 524 Scaffold scaf=refMap.get(rname);
jpayne@68 525 if(scaf==null){scaf=refMap2.get(Tools.trimToWhitespace(rname));}
jpayne@68 526 assert(scaf!=null) : "Can't find graph for "+rname;
jpayne@68 527
jpayne@68 528 if(scaf!=null){
jpayne@68 529 final int insertSize=calcInsertSize(sl);
jpayne@68 530 insertCounts.incrementAndGet(Tools.mid(0, insertSize, insertCounts.length()));
jpayne@68 531 scaf.add(sl, insertSize);
jpayne@68 532
jpayne@68 533 readsOutT++;
jpayne@68 534 basesOutT+=r.length();
jpayne@68 535
jpayne@68 536 totalInsertSumT+=insertSize;
jpayne@68 537 totalInsertCountT++;
jpayne@68 538 }
jpayne@68 539 }
jpayne@68 540 }
jpayne@68 541
jpayne@68 542 /** Number of reads processed by this thread */
jpayne@68 543 protected long readsProcessedT=0;
jpayne@68 544 /** Number of bases processed by this thread */
jpayne@68 545 protected long basesProcessedT=0;
jpayne@68 546
jpayne@68 547 /** Number of reads retained by this thread */
jpayne@68 548 protected long readsOutT=0;
jpayne@68 549 /** Number of bases retained by this thread */
jpayne@68 550 protected long basesOutT=0;
jpayne@68 551
jpayne@68 552 protected long totalInsertSumT=0;
jpayne@68 553 protected long totalInsertCountT=0;
jpayne@68 554
jpayne@68 555 long insertSum=0;
jpayne@68 556
jpayne@68 557 /** True only if this thread has completed successfully */
jpayne@68 558 boolean success=false;
jpayne@68 559
jpayne@68 560 /** Shared input stream */
jpayne@68 561 private final SamStreamer ss;
jpayne@68 562 /** Thread ID */
jpayne@68 563 final int tid;
jpayne@68 564 }
jpayne@68 565
jpayne@68 566 /*--------------------------------------------------------------*/
jpayne@68 567
jpayne@68 568 private class Scaffold {
jpayne@68 569
jpayne@68 570 Scaffold(String name_, byte[] bases_, long numericID_){
jpayne@68 571 name=name_;
jpayne@68 572 bases=bases_;
jpayne@68 573 numericID=(int)numericID_;
jpayne@68 574 depthArray=new AtomicIntegerArray(bases.length);
jpayne@68 575 insertArray=new AtomicLongArray(bases.length);
jpayne@68 576 }
jpayne@68 577
jpayne@68 578 void add(SamLine sl, int insertSize){
jpayne@68 579 assert(sl.mapped() && sl.pairedOnSameChrom());
jpayne@68 580 assert(sl.primary());
jpayne@68 581 assert(!sl.supplementary());
jpayne@68 582 assert(sl.leftmost());
jpayne@68 583
jpayne@68 584 // final int insertSize=calcInsertSize(sl);
jpayne@68 585
jpayne@68 586 int start=sl.pos-1;
jpayne@68 587 int stop=start+sl.tlen;
jpayne@68 588
jpayne@68 589 int trim=(int)(sl.length()*trimFraction);
jpayne@68 590 start+=trim;
jpayne@68 591 stop-=trim;
jpayne@68 592
jpayne@68 593 for(int i=start; i<stop; i++){
jpayne@68 594 if(i>=0 && i<bases.length){
jpayne@68 595 depthArray.incrementAndGet(i);
jpayne@68 596 insertArray.addAndGet(i, insertSize);
jpayne@68 597 }
jpayne@68 598 }
jpayne@68 599
jpayne@68 600 }
jpayne@68 601
jpayne@68 602 Read fixScaffold(ByteBuilder bb){
jpayne@68 603 int streak=0;
jpayne@68 604 bb.clear();
jpayne@68 605
jpayne@68 606 if(insertList!=null){
jpayne@68 607 for(int i=0; i<bases.length; i++) {
jpayne@68 608 bb.append(i).tab().append(depthArray.get(i)).tab().append(insertArray.get(i)/(Tools.max(1, depthArray.get(i)))).nl();
jpayne@68 609 }
jpayne@68 610 ReadWrite.writeString(bb, insertList, false);
jpayne@68 611 bb.clear();
jpayne@68 612 }
jpayne@68 613
jpayne@68 614 for(int i=0; i<bases.length; i++){
jpayne@68 615 byte b=bases[i];
jpayne@68 616 if(b=='N'){
jpayne@68 617 streak++;
jpayne@68 618 }else{
jpayne@68 619 if(streak>=scaffoldBreakNs && i-streak>300 && i<bases.length-300){
jpayne@68 620 int pivot=i-streak/2-1;
jpayne@68 621 long depthSum=depthArray.get(pivot);
jpayne@68 622 long insertSum=insertArray.get(pivot);
jpayne@68 623 double avgInsert=(insertSum/(double)depthSum);
jpayne@68 624
jpayne@68 625 int avgDepth=((depthArray.get(i-200-streak)+depthArray.get(i+200))/2);
jpayne@68 626 int percentile=(int)(buckets*Tools.max(0.5, 1.0-depthSum/(double)(avgDepth+depthSum)));
jpayne@68 627 int insertProxy=insertByPercentile[percentile];
jpayne@68 628
jpayne@68 629 // assert(false) : Arrays.toString(insertByPercentile);
jpayne@68 630
jpayne@68 631 int dif=(int)Math.round(insertProxy-avgInsert);
jpayne@68 632 int toAdd=Tools.max(scaffoldBreakNs, streak+dif);
jpayne@68 633
jpayne@68 634 // System.err.println("totalAverageInsert="+(int)totalAverageInsert+", avg="+(int)avgInsert+", dif="+dif);
jpayne@68 635 // System.err.println("proxy="+insertProxy+", percentile="+percentile+", avgDepth="+(int)avgDepth+", depth="+depthSum);
jpayne@68 636 // System.err.println("pivot="+pivot+", depthSum="+depthSum+", avg="+(int)avgInsert+", streak="+streak+", dif="+dif+", toAdd="+toAdd);
jpayne@68 637
jpayne@68 638 if(dif>0){
jpayne@68 639 //TODO: This is tricky because long-insert reads will be self-selected.
jpayne@68 640 //Should be compared to average coverage, or nearby coverage, and then consult a size distribution histogram.
jpayne@68 641 gapsWidened++;
jpayne@68 642 nsAdded+=dif;
jpayne@68 643 }else if(dif<0){
jpayne@68 644 gapsNarrowed++;
jpayne@68 645 nsRemoved-=dif;
jpayne@68 646 }else{
jpayne@68 647 gapsUnchanged++;
jpayne@68 648 }
jpayne@68 649 nsTotal+=toAdd;
jpayne@68 650 for(int j=0; j<toAdd; j++){
jpayne@68 651 bb.append('N');
jpayne@68 652 }
jpayne@68 653 }
jpayne@68 654 streak=0;
jpayne@68 655 bb.append(b);
jpayne@68 656 }
jpayne@68 657 }
jpayne@68 658
jpayne@68 659 return new Read(bb.toBytes(), null, name, numericID);
jpayne@68 660 }
jpayne@68 661
jpayne@68 662 final int numericID;
jpayne@68 663 final String name;
jpayne@68 664 final byte[] bases;
jpayne@68 665 final AtomicIntegerArray depthArray;
jpayne@68 666 final AtomicLongArray insertArray;
jpayne@68 667
jpayne@68 668 }
jpayne@68 669
jpayne@68 670 /*--------------------------------------------------------------*/
jpayne@68 671 /*---------------- Fields ----------------*/
jpayne@68 672 /*--------------------------------------------------------------*/
jpayne@68 673
jpayne@68 674 /** Primary input file path */
jpayne@68 675 private String in=null;
jpayne@68 676 /** Secondary input file path */
jpayne@68 677 private String ref=null;
jpayne@68 678
jpayne@68 679 /** Primary output file path */
jpayne@68 680 private String out=null;
jpayne@68 681
jpayne@68 682 /** Override input file extension */
jpayne@68 683 private String extin=null;
jpayne@68 684 /** Override output file extension */
jpayne@68 685 private String extout=null;
jpayne@68 686
jpayne@68 687 private String insertList=null;
jpayne@68 688
jpayne@68 689 /*--------------------------------------------------------------*/
jpayne@68 690
jpayne@68 691 /** Number of reads processed */
jpayne@68 692 protected long readsProcessed=0;
jpayne@68 693 /** Number of bases processed */
jpayne@68 694 protected long basesProcessed=0;
jpayne@68 695
jpayne@68 696 /** Number of reads retained */
jpayne@68 697 protected long readsOut=0;
jpayne@68 698 /** Number of bases retained */
jpayne@68 699 protected long basesOut=0;
jpayne@68 700
jpayne@68 701 protected long scaffoldsOut=0;
jpayne@68 702 protected long scaffoldLengthOut=0;
jpayne@68 703
jpayne@68 704 protected long gapsUnchanged=0;
jpayne@68 705 protected long gapsWidened=0;
jpayne@68 706 protected long gapsNarrowed=0;
jpayne@68 707 protected long nsAdded=0;
jpayne@68 708 protected long nsRemoved=0;
jpayne@68 709 protected long nsTotal=0;
jpayne@68 710
jpayne@68 711 protected long totalInsertSum=0;
jpayne@68 712 protected long totalInsertCount=0;
jpayne@68 713 protected double totalAverageInsert;
jpayne@68 714
jpayne@68 715 protected AtomicLongArray insertCounts=new AtomicLongArray(20000);
jpayne@68 716 protected int[] insertByPercentile;
jpayne@68 717
jpayne@68 718 /** Quit after processing this many input reads; -1 means no limit */
jpayne@68 719 private long maxReads=-1;
jpayne@68 720
jpayne@68 721 /*--------------------------------------------------------------*/
jpayne@68 722
jpayne@68 723 /** Threads dedicated to reading the sam file */
jpayne@68 724 private int streamerThreads=SamStreamer.DEFAULT_THREADS;
jpayne@68 725
jpayne@68 726 private boolean loadedRef=false;
jpayne@68 727
jpayne@68 728 private int scaffoldBreakNs=10;
jpayne@68 729
jpayne@68 730 int buckets=1000;
jpayne@68 731
jpayne@68 732 private int minDepth=10;
jpayne@68 733
jpayne@68 734 private float trimFraction=0.4f;
jpayne@68 735
jpayne@68 736 public final SamFilter samFilter=new SamFilter();
jpayne@68 737
jpayne@68 738 /** Uses full ref names */
jpayne@68 739 public LinkedHashMap<String, Scaffold> refMap=new LinkedHashMap<String, Scaffold>();
jpayne@68 740 /** Uses truncated ref names */
jpayne@68 741 public LinkedHashMap<String, Scaffold> refMap2=new LinkedHashMap<String, Scaffold>();;
jpayne@68 742
jpayne@68 743 /*--------------------------------------------------------------*/
jpayne@68 744 /*---------------- Final Fields ----------------*/
jpayne@68 745 /*--------------------------------------------------------------*/
jpayne@68 746
jpayne@68 747 /** Primary input file */
jpayne@68 748 private final FileFormat ffin;
jpayne@68 749 /** Secondary input file */
jpayne@68 750 private final FileFormat ffref;
jpayne@68 751
jpayne@68 752 /** Primary output file */
jpayne@68 753 private final FileFormat ffout;
jpayne@68 754
jpayne@68 755 /*--------------------------------------------------------------*/
jpayne@68 756 /*---------------- Common Fields ----------------*/
jpayne@68 757 /*--------------------------------------------------------------*/
jpayne@68 758
jpayne@68 759 /** Print status messages to this output stream */
jpayne@68 760 private PrintStream outstream=System.err;
jpayne@68 761 /** Print verbose messages */
jpayne@68 762 public static boolean verbose=false;
jpayne@68 763 /** True if an error was encountered */
jpayne@68 764 public boolean errorState=false;
jpayne@68 765 /** Overwrite existing output files */
jpayne@68 766 private boolean overwrite=false;
jpayne@68 767 /** Append to existing output files */
jpayne@68 768 private boolean append=false;
jpayne@68 769 /** Reads are output in input order */
jpayne@68 770 private boolean ordered=false;
jpayne@68 771
jpayne@68 772 }