jpayne@68
|
1 package consensus;
|
jpayne@68
|
2
|
jpayne@68
|
3 import java.io.PrintStream;
|
jpayne@68
|
4 import java.util.ArrayList;
|
jpayne@68
|
5 import java.util.LinkedHashMap;
|
jpayne@68
|
6 import java.util.Map.Entry;
|
jpayne@68
|
7 import java.util.concurrent.atomic.AtomicIntegerArray;
|
jpayne@68
|
8 import java.util.concurrent.atomic.AtomicLongArray;
|
jpayne@68
|
9
|
jpayne@68
|
10 import fileIO.ByteFile;
|
jpayne@68
|
11 import fileIO.FileFormat;
|
jpayne@68
|
12 import fileIO.ReadWrite;
|
jpayne@68
|
13 import shared.Parse;
|
jpayne@68
|
14 import shared.Parser;
|
jpayne@68
|
15 import shared.PreParser;
|
jpayne@68
|
16 import shared.ReadStats;
|
jpayne@68
|
17 import shared.Shared;
|
jpayne@68
|
18 import shared.Timer;
|
jpayne@68
|
19 import shared.Tools;
|
jpayne@68
|
20 import stream.ConcurrentReadInputStream;
|
jpayne@68
|
21 import stream.ConcurrentReadOutputStream;
|
jpayne@68
|
22 import stream.FastaReadInputStream;
|
jpayne@68
|
23 import stream.Read;
|
jpayne@68
|
24 import stream.SamLine;
|
jpayne@68
|
25 import stream.SamReadStreamer;
|
jpayne@68
|
26 import stream.SamStreamer;
|
jpayne@68
|
27 import structures.ByteBuilder;
|
jpayne@68
|
28 import structures.ListNum;
|
jpayne@68
|
29 import template.Accumulator;
|
jpayne@68
|
30 import template.ThreadWaiter;
|
jpayne@68
|
31 import var2.SamFilter;
|
jpayne@68
|
32
|
jpayne@68
|
33 /**
|
jpayne@68
|
34 * Resizes scaffold gaps to represent the best estimate
|
jpayne@68
|
35 * based on the insert size distribution of paired reads.
|
jpayne@68
|
36 *
|
jpayne@68
|
37 * @author Brian Bushnell
|
jpayne@68
|
38 * @date September 11, 2019
|
jpayne@68
|
39 *
|
jpayne@68
|
40 */
|
jpayne@68
|
41 public class FixScaffoldGaps implements Accumulator<FixScaffoldGaps.ProcessThread> {
|
jpayne@68
|
42
|
jpayne@68
|
43 /*--------------------------------------------------------------*/
|
jpayne@68
|
44 /*---------------- Initialization ----------------*/
|
jpayne@68
|
45 /*--------------------------------------------------------------*/
|
jpayne@68
|
46
|
jpayne@68
|
47 /**
|
jpayne@68
|
48 * Code entrance from the command line.
|
jpayne@68
|
49 * @param args Command line arguments
|
jpayne@68
|
50 */
|
jpayne@68
|
51 public static void main(String[] args){
|
jpayne@68
|
52 //Start a timer immediately upon code entrance.
|
jpayne@68
|
53 Timer t=new Timer();
|
jpayne@68
|
54
|
jpayne@68
|
55 //Create an instance of this class
|
jpayne@68
|
56 FixScaffoldGaps x=new FixScaffoldGaps(args);
|
jpayne@68
|
57
|
jpayne@68
|
58 //Run the object
|
jpayne@68
|
59 x.process(t);
|
jpayne@68
|
60
|
jpayne@68
|
61 //Close the print stream if it was redirected
|
jpayne@68
|
62 Shared.closeStream(x.outstream);
|
jpayne@68
|
63 }
|
jpayne@68
|
64
|
jpayne@68
|
65 /**
|
jpayne@68
|
66 * Constructor.
|
jpayne@68
|
67 * @param args Command line arguments
|
jpayne@68
|
68 */
|
jpayne@68
|
69 public FixScaffoldGaps(String[] args){
|
jpayne@68
|
70
|
jpayne@68
|
71 {//Preparse block for help, config files, and outstream
|
jpayne@68
|
72 PreParser pp=new PreParser(args, getClass(), false);
|
jpayne@68
|
73 args=pp.args;
|
jpayne@68
|
74 outstream=pp.outstream;
|
jpayne@68
|
75 }
|
jpayne@68
|
76
|
jpayne@68
|
77 //Set shared static variables prior to parsing
|
jpayne@68
|
78 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
|
jpayne@68
|
79 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
|
jpayne@68
|
80
|
jpayne@68
|
81 samFilter.includeUnmapped=false;
|
jpayne@68
|
82 samFilter.includeSupplimentary=false;
|
jpayne@68
|
83 // samFilter.includeDuplicate=false;
|
jpayne@68
|
84 samFilter.includeNonPrimary=false;
|
jpayne@68
|
85 samFilter.includeQfail=false;
|
jpayne@68
|
86 // samFilter.minMapq=4;
|
jpayne@68
|
87
|
jpayne@68
|
88 {//Parse the arguments
|
jpayne@68
|
89 final Parser parser=parse(args);
|
jpayne@68
|
90
|
jpayne@68
|
91 Parser.processQuality();
|
jpayne@68
|
92
|
jpayne@68
|
93 maxReads=parser.maxReads;
|
jpayne@68
|
94 overwrite=ReadStats.overwrite=parser.overwrite;
|
jpayne@68
|
95 append=ReadStats.append=parser.append;
|
jpayne@68
|
96
|
jpayne@68
|
97 in=parser.in1;
|
jpayne@68
|
98 extin=parser.extin;
|
jpayne@68
|
99
|
jpayne@68
|
100 out=parser.out1;
|
jpayne@68
|
101 extout=parser.extout;
|
jpayne@68
|
102 }
|
jpayne@68
|
103
|
jpayne@68
|
104 {
|
jpayne@68
|
105 // if("auto".equalsIgnoreCase(atomic)){Scaffold.setCA3A(Shared.threads()>8);}
|
jpayne@68
|
106 // else{Scaffold.setCA3A(Parse.parseBoolean(atomic));}
|
jpayne@68
|
107 samFilter.setSamtoolsFilter();
|
jpayne@68
|
108
|
jpayne@68
|
109 streamerThreads=Tools.max(1, Tools.min(streamerThreads, Shared.threads()));
|
jpayne@68
|
110 assert(streamerThreads>0) : streamerThreads;
|
jpayne@68
|
111 }
|
jpayne@68
|
112
|
jpayne@68
|
113 validateParams();
|
jpayne@68
|
114 fixExtensions(); //Add or remove .gz or .bz2 as needed
|
jpayne@68
|
115 checkFileExistence(); //Ensure files can be read and written
|
jpayne@68
|
116 checkStatics(); //Adjust file-related static fields as needed for this program
|
jpayne@68
|
117
|
jpayne@68
|
118 //Create output FileFormat objects
|
jpayne@68
|
119 ffout=FileFormat.testOutput(out, FileFormat.FASTA, extout, true, overwrite, append, ordered);
|
jpayne@68
|
120
|
jpayne@68
|
121 //Create input FileFormat objects
|
jpayne@68
|
122 ffin=FileFormat.testInput(in, FileFormat.SAM, extin, true, true);
|
jpayne@68
|
123 ffref=FileFormat.testInput(ref, FileFormat.FASTA, null, true, true);
|
jpayne@68
|
124 }
|
jpayne@68
|
125
|
jpayne@68
|
126 /*--------------------------------------------------------------*/
|
jpayne@68
|
127 /*---------------- Initialization Helpers ----------------*/
|
jpayne@68
|
128 /*--------------------------------------------------------------*/
|
jpayne@68
|
129
|
jpayne@68
|
130 /** Parse arguments from the command line */
|
jpayne@68
|
131 private Parser parse(String[] args){
|
jpayne@68
|
132
|
jpayne@68
|
133 //Create a parser object
|
jpayne@68
|
134 Parser parser=new Parser();
|
jpayne@68
|
135
|
jpayne@68
|
136 //Set any necessary Parser defaults here
|
jpayne@68
|
137 //parser.foo=bar;
|
jpayne@68
|
138
|
jpayne@68
|
139 //Parse each argument
|
jpayne@68
|
140 for(int i=0; i<args.length; i++){
|
jpayne@68
|
141 String arg=args[i];
|
jpayne@68
|
142
|
jpayne@68
|
143 //Break arguments into their constituent parts, in the form of "a=b"
|
jpayne@68
|
144 String[] split=arg.split("=");
|
jpayne@68
|
145 String a=split[0].toLowerCase();
|
jpayne@68
|
146 String b=split.length>1 ? split[1] : null;
|
jpayne@68
|
147 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
|
jpayne@68
|
148
|
jpayne@68
|
149 if(a.equals("verbose")){
|
jpayne@68
|
150 verbose=Parse.parseBoolean(b);
|
jpayne@68
|
151 }else if(a.equals("ref") || a.equals("scaffolds")){
|
jpayne@68
|
152 ref=b;
|
jpayne@68
|
153 }else if(a.equals("insertlist")){
|
jpayne@68
|
154 insertList=b;
|
jpayne@68
|
155 }else if(a.equals("ordered")){
|
jpayne@68
|
156 ordered=Parse.parseBoolean(b);
|
jpayne@68
|
157 }else if(a.equalsIgnoreCase("ns") || a.equalsIgnoreCase("n") || a.equalsIgnoreCase("scaffoldbreak") || a.equalsIgnoreCase("gap")){
|
jpayne@68
|
158 scaffoldBreakNs=Integer.parseInt(b);
|
jpayne@68
|
159 assert(scaffoldBreakNs>0);
|
jpayne@68
|
160 }else if(a.equalsIgnoreCase("mindepth")){
|
jpayne@68
|
161 minDepth=Integer.parseInt(b);
|
jpayne@68
|
162 assert(minDepth>0);
|
jpayne@68
|
163 }else if(a.equalsIgnoreCase("trim") || a.equalsIgnoreCase("trimFraction") || a.equalsIgnoreCase("border")){
|
jpayne@68
|
164 trimFraction=Float.parseFloat(b);
|
jpayne@68
|
165 assert(trimFraction>=0 && trimFraction<=1) : "trimFraction should be between 0 and 1";
|
jpayne@68
|
166 }else if(a.equals("clearfilters") || a.equals("clearfilter")){
|
jpayne@68
|
167 if(Parse.parseBoolean(b)){
|
jpayne@68
|
168 samFilter.clear();
|
jpayne@68
|
169 }
|
jpayne@68
|
170 }else if(a.equals("parse_flag_goes_here")){
|
jpayne@68
|
171 long fake_variable=Parse.parseKMG(b);
|
jpayne@68
|
172 //Set a variable here
|
jpayne@68
|
173 }else if(samFilter.parse(arg, a, b)){
|
jpayne@68
|
174 //do nothing
|
jpayne@68
|
175 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
|
jpayne@68
|
176 //do nothing
|
jpayne@68
|
177 }else{
|
jpayne@68
|
178 outstream.println("Unknown parameter "+args[i]);
|
jpayne@68
|
179 assert(false) : "Unknown parameter "+args[i];
|
jpayne@68
|
180 }
|
jpayne@68
|
181 }
|
jpayne@68
|
182
|
jpayne@68
|
183 return parser;
|
jpayne@68
|
184 }
|
jpayne@68
|
185
|
jpayne@68
|
186 /** Add or remove .gz or .bz2 as needed */
|
jpayne@68
|
187 private void fixExtensions(){
|
jpayne@68
|
188 in=Tools.fixExtension(in);
|
jpayne@68
|
189 ref=Tools.fixExtension(ref);
|
jpayne@68
|
190 }
|
jpayne@68
|
191
|
jpayne@68
|
192 /** Ensure files can be read and written */
|
jpayne@68
|
193 private void checkFileExistence(){
|
jpayne@68
|
194
|
jpayne@68
|
195 //Ensure there is an input file
|
jpayne@68
|
196 if(in==null){throw new RuntimeException("Error - an input file is required.");}
|
jpayne@68
|
197
|
jpayne@68
|
198 //Ensure there is an input file
|
jpayne@68
|
199 if(ref==null){throw new RuntimeException("Error - a reference file is required.");}
|
jpayne@68
|
200
|
jpayne@68
|
201 //Ensure output files can be written
|
jpayne@68
|
202 if(!Tools.testOutputFiles(overwrite, append, false, out)){
|
jpayne@68
|
203 outstream.println((out==null)+", "+out);
|
jpayne@68
|
204 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out+"\n");
|
jpayne@68
|
205 }
|
jpayne@68
|
206
|
jpayne@68
|
207 //Ensure input files can be read
|
jpayne@68
|
208 if(!Tools.testInputFiles(false, true, in, ref)){
|
jpayne@68
|
209 throw new RuntimeException("\nCan't read some input files.\n");
|
jpayne@68
|
210 }
|
jpayne@68
|
211
|
jpayne@68
|
212 //Ensure that no file was specified multiple times
|
jpayne@68
|
213 if(!Tools.testForDuplicateFiles(true, in, ref, out)){
|
jpayne@68
|
214 throw new RuntimeException("\nSome file names were specified multiple times.\n");
|
jpayne@68
|
215 }
|
jpayne@68
|
216 }
|
jpayne@68
|
217
|
jpayne@68
|
218 /** Adjust file-related static fields as needed for this program */
|
jpayne@68
|
219 private static void checkStatics(){
|
jpayne@68
|
220 //Adjust the number of threads for input file reading
|
jpayne@68
|
221 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
|
jpayne@68
|
222 ByteFile.FORCE_MODE_BF2=true;
|
jpayne@68
|
223 }
|
jpayne@68
|
224
|
jpayne@68
|
225 assert(FastaReadInputStream.settingsOK());
|
jpayne@68
|
226 }
|
jpayne@68
|
227
|
jpayne@68
|
228 /** Ensure parameter ranges are within bounds and required parameters are set */
|
jpayne@68
|
229 private boolean validateParams(){
|
jpayne@68
|
230 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
|
jpayne@68
|
231 return true;
|
jpayne@68
|
232 }
|
jpayne@68
|
233
|
jpayne@68
|
234 /*--------------------------------------------------------------*/
|
jpayne@68
|
235 /*---------------- Outer Methods ----------------*/
|
jpayne@68
|
236 /*--------------------------------------------------------------*/
|
jpayne@68
|
237
|
jpayne@68
|
238 /** Create read streams and process all data */
|
jpayne@68
|
239 void process(Timer t){
|
jpayne@68
|
240
|
jpayne@68
|
241 //Turn off read validation in the input threads to increase speed
|
jpayne@68
|
242 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
|
jpayne@68
|
243 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
|
jpayne@68
|
244
|
jpayne@68
|
245 //Create a read input stream
|
jpayne@68
|
246 final SamStreamer ss=makeStreamer(ffin);
|
jpayne@68
|
247
|
jpayne@68
|
248 //Load reference
|
jpayne@68
|
249 loadReferenceCustom();
|
jpayne@68
|
250
|
jpayne@68
|
251 //Reset counters
|
jpayne@68
|
252 readsProcessed=readsOut=0;
|
jpayne@68
|
253 basesProcessed=basesOut=0;
|
jpayne@68
|
254
|
jpayne@68
|
255 //Process the reads in separate threads
|
jpayne@68
|
256 spawnThreads(ss);
|
jpayne@68
|
257
|
jpayne@68
|
258 //Optionally create a read output stream
|
jpayne@68
|
259 final ConcurrentReadOutputStream ros=makeCros();
|
jpayne@68
|
260
|
jpayne@68
|
261 if(verbose){outstream.println("Fixing reference.");}
|
jpayne@68
|
262
|
jpayne@68
|
263 fixScaffolds(ros);
|
jpayne@68
|
264
|
jpayne@68
|
265 if(verbose){outstream.println("Finished; closing streams.");}
|
jpayne@68
|
266
|
jpayne@68
|
267 //Write anything that was accumulated by ReadStats
|
jpayne@68
|
268 errorState|=ReadStats.writeAll();
|
jpayne@68
|
269 //Close the read streams
|
jpayne@68
|
270 errorState|=ReadWrite.closeStream(ros);
|
jpayne@68
|
271
|
jpayne@68
|
272 //Reset read validation
|
jpayne@68
|
273 Read.VALIDATE_IN_CONSTRUCTOR=vic;
|
jpayne@68
|
274
|
jpayne@68
|
275 //Report timing and results
|
jpayne@68
|
276 t.stop();
|
jpayne@68
|
277 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
|
jpayne@68
|
278 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, scaffoldsOut, scaffoldLengthOut, 8, false));
|
jpayne@68
|
279
|
jpayne@68
|
280 outstream.println();
|
jpayne@68
|
281 outstream.println(Tools.number("Average Insert", totalAverageInsert, 2, 8));
|
jpayne@68
|
282 outstream.println(Tools.number("Gaps Unchanged", gapsUnchanged, 8));
|
jpayne@68
|
283 outstream.println(Tools.number("Gaps Widened ", gapsWidened, 8));
|
jpayne@68
|
284 outstream.println(Tools.number("Gaps Narrowed ", gapsNarrowed, 8));
|
jpayne@68
|
285 outstream.println(Tools.number("Ns Total ", nsTotal, 8));
|
jpayne@68
|
286 outstream.println(Tools.number("Ns Added ", nsAdded, 8));
|
jpayne@68
|
287 outstream.println(Tools.number("Ns Removed ", nsRemoved, 8));
|
jpayne@68
|
288
|
jpayne@68
|
289
|
jpayne@68
|
290 //Throw an exception of there was an error in a thread
|
jpayne@68
|
291 if(errorState){
|
jpayne@68
|
292 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
|
jpayne@68
|
293 }
|
jpayne@68
|
294 }
|
jpayne@68
|
295
|
jpayne@68
|
296 private synchronized void loadReferenceCustom(){
|
jpayne@68
|
297 assert(!loadedRef);
|
jpayne@68
|
298 ConcurrentReadInputStream cris=makeRefCris();
|
jpayne@68
|
299 for(ListNum<Read> ln=cris.nextList(); ln!=null && ln.size()>0; ln=cris.nextList()) {
|
jpayne@68
|
300 for(Read r : ln){
|
jpayne@68
|
301 String name=r.id;
|
jpayne@68
|
302 String name2=Tools.trimToWhitespace(r.id);
|
jpayne@68
|
303 Scaffold scaf=new Scaffold(name, r.bases, r.numericID);
|
jpayne@68
|
304 refMap.put(name, scaf);
|
jpayne@68
|
305 refMap2.put(name2, scaf);
|
jpayne@68
|
306 }
|
jpayne@68
|
307 }
|
jpayne@68
|
308 loadedRef=true;
|
jpayne@68
|
309 }
|
jpayne@68
|
310
|
jpayne@68
|
311 private ConcurrentReadInputStream makeRefCris(){
|
jpayne@68
|
312 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffref, null);
|
jpayne@68
|
313 cris.start(); //Start the stream
|
jpayne@68
|
314 if(verbose){outstream.println("Started cris");}
|
jpayne@68
|
315 boolean paired=cris.paired();
|
jpayne@68
|
316 assert(!paired) : "References should not be paired.";
|
jpayne@68
|
317 return cris;
|
jpayne@68
|
318 }
|
jpayne@68
|
319
|
jpayne@68
|
320 private SamStreamer makeStreamer(FileFormat ff){
|
jpayne@68
|
321 if(ff==null){return null;}
|
jpayne@68
|
322 SamStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads);
|
jpayne@68
|
323 ss.start(); //Start the stream
|
jpayne@68
|
324 if(verbose){outstream.println("Started Streamer");}
|
jpayne@68
|
325 return ss;
|
jpayne@68
|
326 }
|
jpayne@68
|
327
|
jpayne@68
|
328 private ConcurrentReadOutputStream makeCros(){
|
jpayne@68
|
329 if(ffout==null){return null;}
|
jpayne@68
|
330
|
jpayne@68
|
331 //Select output buffer size based on whether it needs to be ordered
|
jpayne@68
|
332 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
|
jpayne@68
|
333
|
jpayne@68
|
334 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout, null, buff, null, false);
|
jpayne@68
|
335 ros.start(); //Start the stream
|
jpayne@68
|
336 return ros;
|
jpayne@68
|
337 }
|
jpayne@68
|
338
|
jpayne@68
|
339 /*--------------------------------------------------------------*/
|
jpayne@68
|
340 /*---------------- Thread Management ----------------*/
|
jpayne@68
|
341 /*--------------------------------------------------------------*/
|
jpayne@68
|
342
|
jpayne@68
|
343 /** Spawn process threads */
|
jpayne@68
|
344 private void spawnThreads(final SamStreamer ss){
|
jpayne@68
|
345
|
jpayne@68
|
346 //Do anything necessary prior to processing
|
jpayne@68
|
347
|
jpayne@68
|
348 //Determine how many threads may be used
|
jpayne@68
|
349 final int threads=Shared.threads();
|
jpayne@68
|
350
|
jpayne@68
|
351 //Fill a list with ProcessThreads
|
jpayne@68
|
352 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
|
jpayne@68
|
353 for(int i=0; i<threads; i++){
|
jpayne@68
|
354 alpt.add(new ProcessThread(ss, i));
|
jpayne@68
|
355 }
|
jpayne@68
|
356
|
jpayne@68
|
357 //Start the threads
|
jpayne@68
|
358 for(ProcessThread pt : alpt){
|
jpayne@68
|
359 pt.start();
|
jpayne@68
|
360 }
|
jpayne@68
|
361
|
jpayne@68
|
362 //Wait for threads to finish
|
jpayne@68
|
363 boolean success=ThreadWaiter.waitForThreads(alpt, this);
|
jpayne@68
|
364 errorState&=!success;
|
jpayne@68
|
365
|
jpayne@68
|
366 //Do anything necessary after processing
|
jpayne@68
|
367 totalAverageInsert=totalInsertSum/(double)totalInsertCount;
|
jpayne@68
|
368 insertByPercentile=Tools.makeHistogram(insertCounts, buckets);
|
jpayne@68
|
369 }
|
jpayne@68
|
370
|
jpayne@68
|
371 @Override
|
jpayne@68
|
372 public final void accumulate(ProcessThread pt){
|
jpayne@68
|
373 readsProcessed+=pt.readsProcessedT;
|
jpayne@68
|
374 basesProcessed+=pt.basesProcessedT;
|
jpayne@68
|
375 readsOut+=pt.readsOutT;
|
jpayne@68
|
376 basesOut+=pt.basesOutT;
|
jpayne@68
|
377
|
jpayne@68
|
378 totalInsertSum+=pt.totalInsertSumT;
|
jpayne@68
|
379 totalInsertCount+=pt.totalInsertCountT;
|
jpayne@68
|
380
|
jpayne@68
|
381 errorState|=(!pt.success);
|
jpayne@68
|
382 }
|
jpayne@68
|
383
|
jpayne@68
|
384 @Override
|
jpayne@68
|
385 public final boolean success(){return !errorState;}
|
jpayne@68
|
386
|
jpayne@68
|
387 /*--------------------------------------------------------------*/
|
jpayne@68
|
388 /*---------------- Inner Methods ----------------*/
|
jpayne@68
|
389 /*--------------------------------------------------------------*/
|
jpayne@68
|
390
|
jpayne@68
|
391 private void fixScaffolds(ConcurrentReadOutputStream ros){
|
jpayne@68
|
392 ByteBuilder bb=new ByteBuilder(1000000);
|
jpayne@68
|
393
|
jpayne@68
|
394 ArrayList<Read> list=new ArrayList<Read>(200);
|
jpayne@68
|
395 long num=0;
|
jpayne@68
|
396 long lengthSum=0;
|
jpayne@68
|
397 for(Entry<String, Scaffold> e : refMap.entrySet()){
|
jpayne@68
|
398 Scaffold scaf=e.getValue();
|
jpayne@68
|
399 Read r=scaf.fixScaffold(bb);
|
jpayne@68
|
400 lengthSum+=r.length();
|
jpayne@68
|
401 list.add(r);
|
jpayne@68
|
402 scaffoldsOut++;
|
jpayne@68
|
403 scaffoldLengthOut+=r.length();
|
jpayne@68
|
404
|
jpayne@68
|
405 if(list.size()>=200 || lengthSum>=100000){
|
jpayne@68
|
406 if(ros!=null){ros.add(list, num);}
|
jpayne@68
|
407 list=new ArrayList<Read>(200);
|
jpayne@68
|
408 num++;
|
jpayne@68
|
409 lengthSum=0;
|
jpayne@68
|
410 }
|
jpayne@68
|
411 }
|
jpayne@68
|
412 if(list.size()>0){
|
jpayne@68
|
413 if(ros!=null){ros.add(list, num);}
|
jpayne@68
|
414 }
|
jpayne@68
|
415 }
|
jpayne@68
|
416
|
jpayne@68
|
417 private static int calcInsertSize(SamLine sl) {
|
jpayne@68
|
418 assert(sl.mapped() && sl.pairedOnSameChrom());
|
jpayne@68
|
419 assert(sl.primary());
|
jpayne@68
|
420 assert(!sl.supplementary());
|
jpayne@68
|
421 assert(sl.leftmost());
|
jpayne@68
|
422
|
jpayne@68
|
423 assert(sl.tlen>0) : sl.tlen+"\n\n"+sl;
|
jpayne@68
|
424 return sl.tlen>0 ? sl.tlen : -sl.tlen;
|
jpayne@68
|
425
|
jpayne@68
|
426 // final int insertSize;
|
jpayne@68
|
427 // String insertTag=null;
|
jpayne@68
|
428 // if(sl.optional!=null){
|
jpayne@68
|
429 // for(String s : sl.optional){
|
jpayne@68
|
430 // if(s.startsWith("X8:Z:")){
|
jpayne@68
|
431 // insertTag=s;
|
jpayne@68
|
432 // break;
|
jpayne@68
|
433 // }
|
jpayne@68
|
434 // }
|
jpayne@68
|
435 // }
|
jpayne@68
|
436 // if(insertTag!=null){
|
jpayne@68
|
437 // insertSize=Integer.parseInt(insertTag.substring(5));
|
jpayne@68
|
438 // }else{
|
jpayne@68
|
439 // insertSize=sl.tlen;//This is unsafe due to indels.
|
jpayne@68
|
440 // assert(false) : "Reads need insert size tags.";
|
jpayne@68
|
441 // }
|
jpayne@68
|
442 // assert(insertSize>0) : sl;
|
jpayne@68
|
443 //
|
jpayne@68
|
444 // return insertSize;
|
jpayne@68
|
445 }
|
jpayne@68
|
446
|
jpayne@68
|
447 /*--------------------------------------------------------------*/
|
jpayne@68
|
448 /*---------------- Inner Classes ----------------*/
|
jpayne@68
|
449 /*--------------------------------------------------------------*/
|
jpayne@68
|
450
|
jpayne@68
|
451 /** This class is static to prevent accidental writing to shared variables.
|
jpayne@68
|
452 * It is safe to remove the static modifier. */
|
jpayne@68
|
453 class ProcessThread extends Thread {
|
jpayne@68
|
454
|
jpayne@68
|
455 //Constructor
|
jpayne@68
|
456 ProcessThread(final SamStreamer ss_, final int tid_){
|
jpayne@68
|
457 ss=ss_;
|
jpayne@68
|
458 tid=tid_;
|
jpayne@68
|
459 }
|
jpayne@68
|
460
|
jpayne@68
|
461 //Called by start()
|
jpayne@68
|
462 @Override
|
jpayne@68
|
463 public void run(){
|
jpayne@68
|
464 //Do anything necessary prior to processing
|
jpayne@68
|
465
|
jpayne@68
|
466 //Process the reads
|
jpayne@68
|
467 processInner();
|
jpayne@68
|
468
|
jpayne@68
|
469 //Do anything necessary after processing
|
jpayne@68
|
470
|
jpayne@68
|
471 //Indicate successful exit status
|
jpayne@68
|
472 success=true;
|
jpayne@68
|
473 }
|
jpayne@68
|
474
|
jpayne@68
|
475 /** Iterate through the reads */
|
jpayne@68
|
476 void processInner(){
|
jpayne@68
|
477
|
jpayne@68
|
478 //Grab and process all lists
|
jpayne@68
|
479 for(ListNum<Read> ln=ss.nextReads(); ln!=null; ln=ss.nextReads()){
|
jpayne@68
|
480 // if(verbose){outstream.println("Got list of size "+list.size());} //Disabled due to non-static access
|
jpayne@68
|
481
|
jpayne@68
|
482 processList(ln);
|
jpayne@68
|
483 }
|
jpayne@68
|
484
|
jpayne@68
|
485 }
|
jpayne@68
|
486
|
jpayne@68
|
487 void processList(ListNum<Read> ln){
|
jpayne@68
|
488
|
jpayne@68
|
489 //Grab the actual read list from the ListNum
|
jpayne@68
|
490 final ArrayList<Read> reads=ln.list;
|
jpayne@68
|
491
|
jpayne@68
|
492 //Loop through each read in the list
|
jpayne@68
|
493 for(int idx=0; idx<reads.size(); idx++){
|
jpayne@68
|
494 final Read r=reads.get(idx);
|
jpayne@68
|
495
|
jpayne@68
|
496 //Validate reads in worker threads
|
jpayne@68
|
497 if(!r.validated()){r.validate(true);}
|
jpayne@68
|
498
|
jpayne@68
|
499 //Track the initial length for statistics
|
jpayne@68
|
500 final int initialLength=r.length();
|
jpayne@68
|
501
|
jpayne@68
|
502 //Increment counters
|
jpayne@68
|
503 readsProcessedT+=r.pairCount();
|
jpayne@68
|
504 basesProcessedT+=initialLength;
|
jpayne@68
|
505
|
jpayne@68
|
506 processRead(r);
|
jpayne@68
|
507 }
|
jpayne@68
|
508 }
|
jpayne@68
|
509
|
jpayne@68
|
510 /**
|
jpayne@68
|
511 * Process a read or a read pair.
|
jpayne@68
|
512 * @param r Read 1
|
jpayne@68
|
513 * @param r2 Read 2 (may be null)
|
jpayne@68
|
514 * @return True if the reads should be kept, false if they should be discarded.
|
jpayne@68
|
515 */
|
jpayne@68
|
516 void processRead(final Read r){
|
jpayne@68
|
517 final SamLine sl=r.samline;
|
jpayne@68
|
518 assert(sl!=null) : sl;
|
jpayne@68
|
519 if(samFilter!=null && !samFilter.passesFilter(sl)){return;}
|
jpayne@68
|
520
|
jpayne@68
|
521 //sl.nextMapped();
|
jpayne@68
|
522 if(sl.mapped() && sl.pairedOnSameChrom() && sl.properPair() && sl.primary() && !sl.supplementary() && sl.leftmost()){
|
jpayne@68
|
523 final String rname=sl.rnameS();
|
jpayne@68
|
524 Scaffold scaf=refMap.get(rname);
|
jpayne@68
|
525 if(scaf==null){scaf=refMap2.get(Tools.trimToWhitespace(rname));}
|
jpayne@68
|
526 assert(scaf!=null) : "Can't find graph for "+rname;
|
jpayne@68
|
527
|
jpayne@68
|
528 if(scaf!=null){
|
jpayne@68
|
529 final int insertSize=calcInsertSize(sl);
|
jpayne@68
|
530 insertCounts.incrementAndGet(Tools.mid(0, insertSize, insertCounts.length()));
|
jpayne@68
|
531 scaf.add(sl, insertSize);
|
jpayne@68
|
532
|
jpayne@68
|
533 readsOutT++;
|
jpayne@68
|
534 basesOutT+=r.length();
|
jpayne@68
|
535
|
jpayne@68
|
536 totalInsertSumT+=insertSize;
|
jpayne@68
|
537 totalInsertCountT++;
|
jpayne@68
|
538 }
|
jpayne@68
|
539 }
|
jpayne@68
|
540 }
|
jpayne@68
|
541
|
jpayne@68
|
542 /** Number of reads processed by this thread */
|
jpayne@68
|
543 protected long readsProcessedT=0;
|
jpayne@68
|
544 /** Number of bases processed by this thread */
|
jpayne@68
|
545 protected long basesProcessedT=0;
|
jpayne@68
|
546
|
jpayne@68
|
547 /** Number of reads retained by this thread */
|
jpayne@68
|
548 protected long readsOutT=0;
|
jpayne@68
|
549 /** Number of bases retained by this thread */
|
jpayne@68
|
550 protected long basesOutT=0;
|
jpayne@68
|
551
|
jpayne@68
|
552 protected long totalInsertSumT=0;
|
jpayne@68
|
553 protected long totalInsertCountT=0;
|
jpayne@68
|
554
|
jpayne@68
|
555 long insertSum=0;
|
jpayne@68
|
556
|
jpayne@68
|
557 /** True only if this thread has completed successfully */
|
jpayne@68
|
558 boolean success=false;
|
jpayne@68
|
559
|
jpayne@68
|
560 /** Shared input stream */
|
jpayne@68
|
561 private final SamStreamer ss;
|
jpayne@68
|
562 /** Thread ID */
|
jpayne@68
|
563 final int tid;
|
jpayne@68
|
564 }
|
jpayne@68
|
565
|
jpayne@68
|
566 /*--------------------------------------------------------------*/
|
jpayne@68
|
567
|
jpayne@68
|
568 private class Scaffold {
|
jpayne@68
|
569
|
jpayne@68
|
570 Scaffold(String name_, byte[] bases_, long numericID_){
|
jpayne@68
|
571 name=name_;
|
jpayne@68
|
572 bases=bases_;
|
jpayne@68
|
573 numericID=(int)numericID_;
|
jpayne@68
|
574 depthArray=new AtomicIntegerArray(bases.length);
|
jpayne@68
|
575 insertArray=new AtomicLongArray(bases.length);
|
jpayne@68
|
576 }
|
jpayne@68
|
577
|
jpayne@68
|
578 void add(SamLine sl, int insertSize){
|
jpayne@68
|
579 assert(sl.mapped() && sl.pairedOnSameChrom());
|
jpayne@68
|
580 assert(sl.primary());
|
jpayne@68
|
581 assert(!sl.supplementary());
|
jpayne@68
|
582 assert(sl.leftmost());
|
jpayne@68
|
583
|
jpayne@68
|
584 // final int insertSize=calcInsertSize(sl);
|
jpayne@68
|
585
|
jpayne@68
|
586 int start=sl.pos-1;
|
jpayne@68
|
587 int stop=start+sl.tlen;
|
jpayne@68
|
588
|
jpayne@68
|
589 int trim=(int)(sl.length()*trimFraction);
|
jpayne@68
|
590 start+=trim;
|
jpayne@68
|
591 stop-=trim;
|
jpayne@68
|
592
|
jpayne@68
|
593 for(int i=start; i<stop; i++){
|
jpayne@68
|
594 if(i>=0 && i<bases.length){
|
jpayne@68
|
595 depthArray.incrementAndGet(i);
|
jpayne@68
|
596 insertArray.addAndGet(i, insertSize);
|
jpayne@68
|
597 }
|
jpayne@68
|
598 }
|
jpayne@68
|
599
|
jpayne@68
|
600 }
|
jpayne@68
|
601
|
jpayne@68
|
602 Read fixScaffold(ByteBuilder bb){
|
jpayne@68
|
603 int streak=0;
|
jpayne@68
|
604 bb.clear();
|
jpayne@68
|
605
|
jpayne@68
|
606 if(insertList!=null){
|
jpayne@68
|
607 for(int i=0; i<bases.length; i++) {
|
jpayne@68
|
608 bb.append(i).tab().append(depthArray.get(i)).tab().append(insertArray.get(i)/(Tools.max(1, depthArray.get(i)))).nl();
|
jpayne@68
|
609 }
|
jpayne@68
|
610 ReadWrite.writeString(bb, insertList, false);
|
jpayne@68
|
611 bb.clear();
|
jpayne@68
|
612 }
|
jpayne@68
|
613
|
jpayne@68
|
614 for(int i=0; i<bases.length; i++){
|
jpayne@68
|
615 byte b=bases[i];
|
jpayne@68
|
616 if(b=='N'){
|
jpayne@68
|
617 streak++;
|
jpayne@68
|
618 }else{
|
jpayne@68
|
619 if(streak>=scaffoldBreakNs && i-streak>300 && i<bases.length-300){
|
jpayne@68
|
620 int pivot=i-streak/2-1;
|
jpayne@68
|
621 long depthSum=depthArray.get(pivot);
|
jpayne@68
|
622 long insertSum=insertArray.get(pivot);
|
jpayne@68
|
623 double avgInsert=(insertSum/(double)depthSum);
|
jpayne@68
|
624
|
jpayne@68
|
625 int avgDepth=((depthArray.get(i-200-streak)+depthArray.get(i+200))/2);
|
jpayne@68
|
626 int percentile=(int)(buckets*Tools.max(0.5, 1.0-depthSum/(double)(avgDepth+depthSum)));
|
jpayne@68
|
627 int insertProxy=insertByPercentile[percentile];
|
jpayne@68
|
628
|
jpayne@68
|
629 // assert(false) : Arrays.toString(insertByPercentile);
|
jpayne@68
|
630
|
jpayne@68
|
631 int dif=(int)Math.round(insertProxy-avgInsert);
|
jpayne@68
|
632 int toAdd=Tools.max(scaffoldBreakNs, streak+dif);
|
jpayne@68
|
633
|
jpayne@68
|
634 // System.err.println("totalAverageInsert="+(int)totalAverageInsert+", avg="+(int)avgInsert+", dif="+dif);
|
jpayne@68
|
635 // System.err.println("proxy="+insertProxy+", percentile="+percentile+", avgDepth="+(int)avgDepth+", depth="+depthSum);
|
jpayne@68
|
636 // System.err.println("pivot="+pivot+", depthSum="+depthSum+", avg="+(int)avgInsert+", streak="+streak+", dif="+dif+", toAdd="+toAdd);
|
jpayne@68
|
637
|
jpayne@68
|
638 if(dif>0){
|
jpayne@68
|
639 //TODO: This is tricky because long-insert reads will be self-selected.
|
jpayne@68
|
640 //Should be compared to average coverage, or nearby coverage, and then consult a size distribution histogram.
|
jpayne@68
|
641 gapsWidened++;
|
jpayne@68
|
642 nsAdded+=dif;
|
jpayne@68
|
643 }else if(dif<0){
|
jpayne@68
|
644 gapsNarrowed++;
|
jpayne@68
|
645 nsRemoved-=dif;
|
jpayne@68
|
646 }else{
|
jpayne@68
|
647 gapsUnchanged++;
|
jpayne@68
|
648 }
|
jpayne@68
|
649 nsTotal+=toAdd;
|
jpayne@68
|
650 for(int j=0; j<toAdd; j++){
|
jpayne@68
|
651 bb.append('N');
|
jpayne@68
|
652 }
|
jpayne@68
|
653 }
|
jpayne@68
|
654 streak=0;
|
jpayne@68
|
655 bb.append(b);
|
jpayne@68
|
656 }
|
jpayne@68
|
657 }
|
jpayne@68
|
658
|
jpayne@68
|
659 return new Read(bb.toBytes(), null, name, numericID);
|
jpayne@68
|
660 }
|
jpayne@68
|
661
|
jpayne@68
|
662 final int numericID;
|
jpayne@68
|
663 final String name;
|
jpayne@68
|
664 final byte[] bases;
|
jpayne@68
|
665 final AtomicIntegerArray depthArray;
|
jpayne@68
|
666 final AtomicLongArray insertArray;
|
jpayne@68
|
667
|
jpayne@68
|
668 }
|
jpayne@68
|
669
|
jpayne@68
|
670 /*--------------------------------------------------------------*/
|
jpayne@68
|
671 /*---------------- Fields ----------------*/
|
jpayne@68
|
672 /*--------------------------------------------------------------*/
|
jpayne@68
|
673
|
jpayne@68
|
674 /** Primary input file path */
|
jpayne@68
|
675 private String in=null;
|
jpayne@68
|
676 /** Secondary input file path */
|
jpayne@68
|
677 private String ref=null;
|
jpayne@68
|
678
|
jpayne@68
|
679 /** Primary output file path */
|
jpayne@68
|
680 private String out=null;
|
jpayne@68
|
681
|
jpayne@68
|
682 /** Override input file extension */
|
jpayne@68
|
683 private String extin=null;
|
jpayne@68
|
684 /** Override output file extension */
|
jpayne@68
|
685 private String extout=null;
|
jpayne@68
|
686
|
jpayne@68
|
687 private String insertList=null;
|
jpayne@68
|
688
|
jpayne@68
|
689 /*--------------------------------------------------------------*/
|
jpayne@68
|
690
|
jpayne@68
|
691 /** Number of reads processed */
|
jpayne@68
|
692 protected long readsProcessed=0;
|
jpayne@68
|
693 /** Number of bases processed */
|
jpayne@68
|
694 protected long basesProcessed=0;
|
jpayne@68
|
695
|
jpayne@68
|
696 /** Number of reads retained */
|
jpayne@68
|
697 protected long readsOut=0;
|
jpayne@68
|
698 /** Number of bases retained */
|
jpayne@68
|
699 protected long basesOut=0;
|
jpayne@68
|
700
|
jpayne@68
|
701 protected long scaffoldsOut=0;
|
jpayne@68
|
702 protected long scaffoldLengthOut=0;
|
jpayne@68
|
703
|
jpayne@68
|
704 protected long gapsUnchanged=0;
|
jpayne@68
|
705 protected long gapsWidened=0;
|
jpayne@68
|
706 protected long gapsNarrowed=0;
|
jpayne@68
|
707 protected long nsAdded=0;
|
jpayne@68
|
708 protected long nsRemoved=0;
|
jpayne@68
|
709 protected long nsTotal=0;
|
jpayne@68
|
710
|
jpayne@68
|
711 protected long totalInsertSum=0;
|
jpayne@68
|
712 protected long totalInsertCount=0;
|
jpayne@68
|
713 protected double totalAverageInsert;
|
jpayne@68
|
714
|
jpayne@68
|
715 protected AtomicLongArray insertCounts=new AtomicLongArray(20000);
|
jpayne@68
|
716 protected int[] insertByPercentile;
|
jpayne@68
|
717
|
jpayne@68
|
718 /** Quit after processing this many input reads; -1 means no limit */
|
jpayne@68
|
719 private long maxReads=-1;
|
jpayne@68
|
720
|
jpayne@68
|
721 /*--------------------------------------------------------------*/
|
jpayne@68
|
722
|
jpayne@68
|
723 /** Threads dedicated to reading the sam file */
|
jpayne@68
|
724 private int streamerThreads=SamStreamer.DEFAULT_THREADS;
|
jpayne@68
|
725
|
jpayne@68
|
726 private boolean loadedRef=false;
|
jpayne@68
|
727
|
jpayne@68
|
728 private int scaffoldBreakNs=10;
|
jpayne@68
|
729
|
jpayne@68
|
730 int buckets=1000;
|
jpayne@68
|
731
|
jpayne@68
|
732 private int minDepth=10;
|
jpayne@68
|
733
|
jpayne@68
|
734 private float trimFraction=0.4f;
|
jpayne@68
|
735
|
jpayne@68
|
736 public final SamFilter samFilter=new SamFilter();
|
jpayne@68
|
737
|
jpayne@68
|
738 /** Uses full ref names */
|
jpayne@68
|
739 public LinkedHashMap<String, Scaffold> refMap=new LinkedHashMap<String, Scaffold>();
|
jpayne@68
|
740 /** Uses truncated ref names */
|
jpayne@68
|
741 public LinkedHashMap<String, Scaffold> refMap2=new LinkedHashMap<String, Scaffold>();;
|
jpayne@68
|
742
|
jpayne@68
|
743 /*--------------------------------------------------------------*/
|
jpayne@68
|
744 /*---------------- Final Fields ----------------*/
|
jpayne@68
|
745 /*--------------------------------------------------------------*/
|
jpayne@68
|
746
|
jpayne@68
|
747 /** Primary input file */
|
jpayne@68
|
748 private final FileFormat ffin;
|
jpayne@68
|
749 /** Secondary input file */
|
jpayne@68
|
750 private final FileFormat ffref;
|
jpayne@68
|
751
|
jpayne@68
|
752 /** Primary output file */
|
jpayne@68
|
753 private final FileFormat ffout;
|
jpayne@68
|
754
|
jpayne@68
|
755 /*--------------------------------------------------------------*/
|
jpayne@68
|
756 /*---------------- Common Fields ----------------*/
|
jpayne@68
|
757 /*--------------------------------------------------------------*/
|
jpayne@68
|
758
|
jpayne@68
|
759 /** Print status messages to this output stream */
|
jpayne@68
|
760 private PrintStream outstream=System.err;
|
jpayne@68
|
761 /** Print verbose messages */
|
jpayne@68
|
762 public static boolean verbose=false;
|
jpayne@68
|
763 /** True if an error was encountered */
|
jpayne@68
|
764 public boolean errorState=false;
|
jpayne@68
|
765 /** Overwrite existing output files */
|
jpayne@68
|
766 private boolean overwrite=false;
|
jpayne@68
|
767 /** Append to existing output files */
|
jpayne@68
|
768 private boolean append=false;
|
jpayne@68
|
769 /** Reads are output in input order */
|
jpayne@68
|
770 private boolean ordered=false;
|
jpayne@68
|
771
|
jpayne@68
|
772 }
|