jpayne@68
|
1 package prok;
|
jpayne@68
|
2
|
jpayne@68
|
3 import java.io.File;
|
jpayne@68
|
4 import java.io.PrintStream;
|
jpayne@68
|
5 import java.util.ArrayList;
|
jpayne@68
|
6 import java.util.Collections;
|
jpayne@68
|
7 import java.util.Comparator;
|
jpayne@68
|
8 import java.util.HashMap;
|
jpayne@68
|
9 import java.util.Map.Entry;
|
jpayne@68
|
10 import java.util.concurrent.ConcurrentLinkedQueue;
|
jpayne@68
|
11
|
jpayne@68
|
12 import aligner.SingleStateAlignerFlat2;
|
jpayne@68
|
13 import consensus.BaseGraph;
|
jpayne@68
|
14 import fileIO.ByteFile;
|
jpayne@68
|
15 import fileIO.FileFormat;
|
jpayne@68
|
16 import fileIO.ReadWrite;
|
jpayne@68
|
17 import shared.Parse;
|
jpayne@68
|
18 import shared.Parser;
|
jpayne@68
|
19 import shared.PreParser;
|
jpayne@68
|
20 import shared.ReadStats;
|
jpayne@68
|
21 import shared.Shared;
|
jpayne@68
|
22 import shared.Timer;
|
jpayne@68
|
23 import shared.Tools;
|
jpayne@68
|
24 import stream.ConcurrentReadInputStream;
|
jpayne@68
|
25 import stream.ConcurrentReadOutputStream;
|
jpayne@68
|
26 import stream.FASTQ;
|
jpayne@68
|
27 import stream.FastaReadInputStream;
|
jpayne@68
|
28 import stream.Read;
|
jpayne@68
|
29 import structures.IntHashSet;
|
jpayne@68
|
30 import structures.ListNum;
|
jpayne@68
|
31 import tax.GiToTaxid;
|
jpayne@68
|
32 import template.Accumulator;
|
jpayne@68
|
33 import template.ThreadWaiter;
|
jpayne@68
|
34
|
jpayne@68
|
35 /**
|
jpayne@68
|
36 * Picks one ribosomal (16S) sequence per taxID.
|
jpayne@68
|
37 *
|
jpayne@68
|
38 * @author Brian Bushnell
|
jpayne@68
|
39 * @date November 19, 2015
|
jpayne@68
|
40 *
|
jpayne@68
|
41 */
|
jpayne@68
|
42 public class MergeRibo implements Accumulator<MergeRibo.ProcessThread> {
|
jpayne@68
|
43
|
jpayne@68
|
44 /*--------------------------------------------------------------*/
|
jpayne@68
|
45 /*---------------- Initialization ----------------*/
|
jpayne@68
|
46 /*--------------------------------------------------------------*/
|
jpayne@68
|
47
|
jpayne@68
|
48 /**
|
jpayne@68
|
49 * Code entrance from the command line.
|
jpayne@68
|
50 * @param args Command line arguments
|
jpayne@68
|
51 */
|
jpayne@68
|
52 public static void main(String[] args){
|
jpayne@68
|
53 //Start a timer immediately upon code entrance.
|
jpayne@68
|
54 Timer t=new Timer();
|
jpayne@68
|
55
|
jpayne@68
|
56 //Create an instance of this class
|
jpayne@68
|
57 MergeRibo x=new MergeRibo(args);
|
jpayne@68
|
58
|
jpayne@68
|
59 //Run the object
|
jpayne@68
|
60 x.process(t);
|
jpayne@68
|
61
|
jpayne@68
|
62 //Close the print stream if it was redirected
|
jpayne@68
|
63 Shared.closeStream(x.outstream);
|
jpayne@68
|
64 }
|
jpayne@68
|
65
|
jpayne@68
|
66 /**
|
jpayne@68
|
67 * Constructor.
|
jpayne@68
|
68 * @param args Command line arguments
|
jpayne@68
|
69 */
|
jpayne@68
|
70 public MergeRibo(String[] args){
|
jpayne@68
|
71
|
jpayne@68
|
72 {//Preparse block for help, config files, and outstream
|
jpayne@68
|
73 PreParser pp=new PreParser(args, getClass(), false);
|
jpayne@68
|
74 args=pp.args;
|
jpayne@68
|
75 outstream=pp.outstream;
|
jpayne@68
|
76 }
|
jpayne@68
|
77
|
jpayne@68
|
78 //Set shared static variables prior to parsing
|
jpayne@68
|
79 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
|
jpayne@68
|
80 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
|
jpayne@68
|
81 // Shared.capBufferLen(40);//This does not help; the slowness comes from unevenness in list length during pickBest.
|
jpayne@68
|
82 //To fix it, long lists should be sorted to be first.
|
jpayne@68
|
83
|
jpayne@68
|
84 BaseGraph.MAF_sub=0.251f;
|
jpayne@68
|
85 BaseGraph.MAF_del=0.0f;
|
jpayne@68
|
86 BaseGraph.MAF_ins=0.0f;
|
jpayne@68
|
87 BaseGraph.MAF_noref=0.0f;
|
jpayne@68
|
88 BaseGraph.trimDepthFraction=0.3f;
|
jpayne@68
|
89 BaseGraph.trimNs=true;
|
jpayne@68
|
90
|
jpayne@68
|
91 {//Parse the arguments
|
jpayne@68
|
92 final Parser parser=parse(args);
|
jpayne@68
|
93 Parser.processQuality();
|
jpayne@68
|
94
|
jpayne@68
|
95 maxReads=parser.maxReads;
|
jpayne@68
|
96 overwrite=ReadStats.overwrite=parser.overwrite;
|
jpayne@68
|
97 append=ReadStats.append=parser.append;
|
jpayne@68
|
98
|
jpayne@68
|
99 extin=parser.extin;
|
jpayne@68
|
100
|
jpayne@68
|
101 out1=parser.out1;
|
jpayne@68
|
102 extout=parser.extout;
|
jpayne@68
|
103 }
|
jpayne@68
|
104
|
jpayne@68
|
105 validateParams();
|
jpayne@68
|
106 adjustInterleaving(); //Make sure interleaving agrees with number of input and output files
|
jpayne@68
|
107 checkFileExistence(); //Ensure files can be read and written
|
jpayne@68
|
108 checkStatics(); //Adjust file-related static fields as needed for this program
|
jpayne@68
|
109
|
jpayne@68
|
110 //Create output FileFormat objects
|
jpayne@68
|
111 ffout1=FileFormat.testOutput(out1, FileFormat.FASTA, extout, true, overwrite, append, ordered);
|
jpayne@68
|
112
|
jpayne@68
|
113 //Create input FileFormat objects
|
jpayne@68
|
114 ffin=new ArrayList<FileFormat>(in.size());
|
jpayne@68
|
115 ffalt=FileFormat.testInput(alt, FileFormat.FASTA, extin, true, true);
|
jpayne@68
|
116 for(String s : in){
|
jpayne@68
|
117 FileFormat ff=FileFormat.testInput(s, FileFormat.FASTA, extin, true, true);
|
jpayne@68
|
118 ffin.add(ff);
|
jpayne@68
|
119 }
|
jpayne@68
|
120
|
jpayne@68
|
121 //Determine how many threads may be used
|
jpayne@68
|
122 threads=Shared.threads();
|
jpayne@68
|
123 }
|
jpayne@68
|
124
|
jpayne@68
|
125 /*--------------------------------------------------------------*/
|
jpayne@68
|
126 /*---------------- Initialization Helpers ----------------*/
|
jpayne@68
|
127 /*--------------------------------------------------------------*/
|
jpayne@68
|
128
|
jpayne@68
|
129 /** Parse arguments from the command line */
|
jpayne@68
|
130 private Parser parse(String[] args){
|
jpayne@68
|
131
|
jpayne@68
|
132 //Create a parser object
|
jpayne@68
|
133 Parser parser=new Parser();
|
jpayne@68
|
134
|
jpayne@68
|
135 //Set any necessary Parser defaults here
|
jpayne@68
|
136 //parser.foo=bar;
|
jpayne@68
|
137
|
jpayne@68
|
138 //Parse each argument
|
jpayne@68
|
139 for(int i=0; i<args.length; i++){
|
jpayne@68
|
140 String arg=args[i];
|
jpayne@68
|
141
|
jpayne@68
|
142 //Break arguments into their constituent parts, in the form of "a=b"
|
jpayne@68
|
143 String[] split=arg.split("=");
|
jpayne@68
|
144 String a=split[0].toLowerCase();
|
jpayne@68
|
145 String b=split.length>1 ? split[1] : null;
|
jpayne@68
|
146 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
|
jpayne@68
|
147
|
jpayne@68
|
148 if(a.equals("verbose")){
|
jpayne@68
|
149 verbose=Parse.parseBoolean(b);
|
jpayne@68
|
150 }else if(a.equals("ordered")){
|
jpayne@68
|
151 ordered=Parse.parseBoolean(b);
|
jpayne@68
|
152 }else if(a.equals("consensus")){
|
jpayne@68
|
153 useConsensus=Parse.parseBoolean(b);
|
jpayne@68
|
154 }else if(a.equals("best")){
|
jpayne@68
|
155 useConsensus=!Parse.parseBoolean(b);
|
jpayne@68
|
156 }else if(a.equals("fast")){
|
jpayne@68
|
157 fast=Parse.parseBoolean(b);
|
jpayne@68
|
158 }else if(a.equals("minid")){
|
jpayne@68
|
159 minID=Float.parseFloat(b);
|
jpayne@68
|
160 }else if(a.equals("maxns")){
|
jpayne@68
|
161 maxns=Integer.parseInt(b);
|
jpayne@68
|
162 }else if(a.equals("minlen")){
|
jpayne@68
|
163 minlen=Integer.parseInt(b);
|
jpayne@68
|
164 }else if(a.equals("maxlen")){
|
jpayne@68
|
165 maxlen=Integer.parseInt(b);
|
jpayne@68
|
166 }else if(a.equals("in")){
|
jpayne@68
|
167 Tools.addFiles(b, in);
|
jpayne@68
|
168 }else if(a.equals("alt")){
|
jpayne@68
|
169 alt=b;
|
jpayne@68
|
170 }else if(a.equalsIgnoreCase("process16S") || a.equalsIgnoreCase("16S")){
|
jpayne@68
|
171 process16S=Parse.parseBoolean(b);
|
jpayne@68
|
172 process18S=!process16S;
|
jpayne@68
|
173 }else if(a.equalsIgnoreCase("process18S") || a.equalsIgnoreCase("18S")){
|
jpayne@68
|
174 process18S=Parse.parseBoolean(b);
|
jpayne@68
|
175 process16S=!process18S;
|
jpayne@68
|
176 }else if(a.equals("parse_flag_goes_here")){
|
jpayne@68
|
177 long fake_variable=Parse.parseKMG(b);
|
jpayne@68
|
178 //Set a variable here
|
jpayne@68
|
179 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
|
jpayne@68
|
180 //do nothing
|
jpayne@68
|
181 }else if(b==null && new File(arg).exists()){
|
jpayne@68
|
182 in.add(arg);
|
jpayne@68
|
183 }else{
|
jpayne@68
|
184 outstream.println("Unknown parameter "+args[i]);
|
jpayne@68
|
185 assert(false) : "Unknown parameter "+args[i];
|
jpayne@68
|
186 }
|
jpayne@68
|
187 }
|
jpayne@68
|
188 assert(!in.isEmpty()) : "No input file.";
|
jpayne@68
|
189 return parser;
|
jpayne@68
|
190 }
|
jpayne@68
|
191
|
jpayne@68
|
192 /** Ensure files can be read and written */
|
jpayne@68
|
193 private void checkFileExistence(){
|
jpayne@68
|
194 //Ensure output files can be written
|
jpayne@68
|
195 if(!Tools.testOutputFiles(overwrite, append, false, out1)){
|
jpayne@68
|
196 outstream.println((out1==null)+", "+out1);
|
jpayne@68
|
197 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+"\n");
|
jpayne@68
|
198 }
|
jpayne@68
|
199
|
jpayne@68
|
200 //Ensure input files can be read
|
jpayne@68
|
201 if(!Tools.testInputFiles(false, true, in)){
|
jpayne@68
|
202 throw new RuntimeException("\nCan't read some input files.\n");
|
jpayne@68
|
203 }
|
jpayne@68
|
204
|
jpayne@68
|
205 // //Ensure that no file was specified multiple times
|
jpayne@68
|
206 // if(!Tools.testForDuplicateFiles(true, out1, in.toArray(new String[0]))){
|
jpayne@68
|
207 // throw new RuntimeException("\nSome file names were specified multiple times.\n");
|
jpayne@68
|
208 // }
|
jpayne@68
|
209 }
|
jpayne@68
|
210
|
jpayne@68
|
211 /** Make sure interleaving agrees with number of input and output files */
|
jpayne@68
|
212 private void adjustInterleaving(){
|
jpayne@68
|
213 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
|
jpayne@68
|
214 }
|
jpayne@68
|
215
|
jpayne@68
|
216 /** Adjust file-related static fields as needed for this program */
|
jpayne@68
|
217 private static void checkStatics(){
|
jpayne@68
|
218 //Adjust the number of threads for input file reading
|
jpayne@68
|
219 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
|
jpayne@68
|
220 ByteFile.FORCE_MODE_BF2=true;
|
jpayne@68
|
221 }
|
jpayne@68
|
222
|
jpayne@68
|
223 assert(FastaReadInputStream.settingsOK());
|
jpayne@68
|
224 }
|
jpayne@68
|
225
|
jpayne@68
|
226 /** Ensure parameter ranges are within bounds and required parameters are set */
|
jpayne@68
|
227 private boolean validateParams(){
|
jpayne@68
|
228 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
|
jpayne@68
|
229 // assert(false) : "TODO";
|
jpayne@68
|
230 assert(process16S || process18S) : "16S or 18S must be selected.";
|
jpayne@68
|
231 assert(!process16S || !process18S) : "16S or 18S are both selected; only one may be active.";
|
jpayne@68
|
232 return true;
|
jpayne@68
|
233 }
|
jpayne@68
|
234
|
jpayne@68
|
235 /*--------------------------------------------------------------*/
|
jpayne@68
|
236 /*---------------- Outer Methods ----------------*/
|
jpayne@68
|
237 /*--------------------------------------------------------------*/
|
jpayne@68
|
238
|
jpayne@68
|
239 /** Create read streams and process all data */
|
jpayne@68
|
240 void process(Timer t){
|
jpayne@68
|
241
|
jpayne@68
|
242 if(process16S){
|
jpayne@68
|
243 Read[] data=ProkObject.loadConsensusSequenceType("16S", true, true);
|
jpayne@68
|
244 consensus16S=data[0].bases;
|
jpayne@68
|
245 if(verbose){System.err.println("process16S: Loaded 16S consensus, length "+consensus16S.length+": "+new String(consensus16S));}
|
jpayne@68
|
246 }
|
jpayne@68
|
247 if(process18S){
|
jpayne@68
|
248 Read[] data=ProkObject.loadConsensusSequenceType("18S", true, true);
|
jpayne@68
|
249 consensus18S=data[0].bases;
|
jpayne@68
|
250 if(verbose){System.err.println("process18S: Loaded 18S consensus, length "+consensus18S.length+": "+new String(consensus18S));}
|
jpayne@68
|
251 }
|
jpayne@68
|
252
|
jpayne@68
|
253 //Turn off read validation in the input threads to increase speed
|
jpayne@68
|
254 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
|
jpayne@68
|
255 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
|
jpayne@68
|
256
|
jpayne@68
|
257 //Reset counters
|
jpayne@68
|
258 readsProcessed=readsOut=0;
|
jpayne@68
|
259 basesProcessed=basesOut=0;
|
jpayne@68
|
260
|
jpayne@68
|
261 //Align everything to global consensus
|
jpayne@68
|
262 for(FileFormat ff : ffin) {
|
jpayne@68
|
263 //Create a read input stream
|
jpayne@68
|
264 final ConcurrentReadInputStream cris=makeCris(ff);
|
jpayne@68
|
265
|
jpayne@68
|
266 //Process the reads in separate threads
|
jpayne@68
|
267 spawnThreads(cris, false);
|
jpayne@68
|
268 errorState|=ReadWrite.closeStream(cris);
|
jpayne@68
|
269 }
|
jpayne@68
|
270
|
jpayne@68
|
271 if(ffalt!=null){
|
jpayne@68
|
272 //Create a read input stream
|
jpayne@68
|
273 final ConcurrentReadInputStream cris=makeCris(ffalt);
|
jpayne@68
|
274
|
jpayne@68
|
275 //Process the reads in separate threads
|
jpayne@68
|
276 spawnThreads(cris, true);
|
jpayne@68
|
277 errorState|=ReadWrite.closeStream(cris);
|
jpayne@68
|
278 }
|
jpayne@68
|
279
|
jpayne@68
|
280 // queue=new ConcurrentLinkedQueue<ArrayList<Ribo>>();
|
jpayne@68
|
281 // for(Entry<Integer, ArrayList<Ribo>> e : listMap.entrySet()){
|
jpayne@68
|
282 // queue.add(e.getValue());
|
jpayne@68
|
283 // }
|
jpayne@68
|
284 // listMap=null;
|
jpayne@68
|
285 queue=makeQueue();
|
jpayne@68
|
286
|
jpayne@68
|
287 //Run a second pass to pick the best SSU per taxID
|
jpayne@68
|
288 spawnThreads(null, false);
|
jpayne@68
|
289
|
jpayne@68
|
290 //Do anything necessary after processing
|
jpayne@68
|
291 if(ffout1!=null){
|
jpayne@68
|
292 //Optionally create a read output stream
|
jpayne@68
|
293 final ConcurrentReadOutputStream ros=makeCros();
|
jpayne@68
|
294 long num=0;
|
jpayne@68
|
295 for(Ribo ribo : bestList){
|
jpayne@68
|
296 Read r=ribo.r;
|
jpayne@68
|
297 readsOut++;
|
jpayne@68
|
298 basesOut+=r.length();
|
jpayne@68
|
299 ArrayList<Read> list=new ArrayList<Read>(1);
|
jpayne@68
|
300 list.add(r);
|
jpayne@68
|
301 ros.add(list, num);
|
jpayne@68
|
302 num++;
|
jpayne@68
|
303 }
|
jpayne@68
|
304 //Close the read streams
|
jpayne@68
|
305 errorState|=ReadWrite.closeStream(ros);
|
jpayne@68
|
306 }
|
jpayne@68
|
307
|
jpayne@68
|
308 if(verbose){outstream.println("Finished; closing streams.");}
|
jpayne@68
|
309
|
jpayne@68
|
310 //Write anything that was accumulated by ReadStats
|
jpayne@68
|
311 errorState|=ReadStats.writeAll();
|
jpayne@68
|
312
|
jpayne@68
|
313 //Reset read validation
|
jpayne@68
|
314 Read.VALIDATE_IN_CONSTRUCTOR=vic;
|
jpayne@68
|
315
|
jpayne@68
|
316 //Report timing and results
|
jpayne@68
|
317 t.stop();
|
jpayne@68
|
318 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
|
jpayne@68
|
319 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
|
jpayne@68
|
320
|
jpayne@68
|
321 //Throw an exception of there was an error in a thread
|
jpayne@68
|
322 if(errorState){
|
jpayne@68
|
323 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
|
jpayne@68
|
324 }
|
jpayne@68
|
325 }
|
jpayne@68
|
326
|
jpayne@68
|
327 private ConcurrentLinkedQueue<ArrayList<Ribo>> makeQueue(){
|
jpayne@68
|
328 ArrayList<ArrayList<Ribo>> listList=new ArrayList<ArrayList<Ribo>>(listMap.size());
|
jpayne@68
|
329 for(Entry<Integer, ArrayList<Ribo>> e : listMap.entrySet()){
|
jpayne@68
|
330 listList.add(e.getValue());
|
jpayne@68
|
331 }
|
jpayne@68
|
332 listMap=null;
|
jpayne@68
|
333 Collections.sort(listList, new ListComparator());
|
jpayne@68
|
334 assert(listList.isEmpty() || listList.get(0).size()>=listList.get(listList.size()-1).size());
|
jpayne@68
|
335 ConcurrentLinkedQueue<ArrayList<Ribo>> q=new ConcurrentLinkedQueue<ArrayList<Ribo>>();
|
jpayne@68
|
336 for(ArrayList<Ribo> x : listList){
|
jpayne@68
|
337 q.add(x);
|
jpayne@68
|
338 }
|
jpayne@68
|
339 return q;
|
jpayne@68
|
340 }
|
jpayne@68
|
341
|
jpayne@68
|
342 private ConcurrentReadInputStream makeCris(FileFormat ff){
|
jpayne@68
|
343 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ff, null);
|
jpayne@68
|
344 cris.start(); //Start the stream
|
jpayne@68
|
345 if(verbose){outstream.println("Started cris");}
|
jpayne@68
|
346 boolean paired=cris.paired();
|
jpayne@68
|
347 assert(!paired) : "This should not be paired input.";
|
jpayne@68
|
348 return cris;
|
jpayne@68
|
349 }
|
jpayne@68
|
350
|
jpayne@68
|
351 private ConcurrentReadOutputStream makeCros(){
|
jpayne@68
|
352 if(ffout1==null){return null;}
|
jpayne@68
|
353
|
jpayne@68
|
354 //Select output buffer size based on whether it needs to be ordered
|
jpayne@68
|
355 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
|
jpayne@68
|
356
|
jpayne@68
|
357 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout1, null, buff, null, false);
|
jpayne@68
|
358 ros.start(); //Start the stream
|
jpayne@68
|
359 return ros;
|
jpayne@68
|
360 }
|
jpayne@68
|
361
|
jpayne@68
|
362 /*--------------------------------------------------------------*/
|
jpayne@68
|
363 /*---------------- Thread Management ----------------*/
|
jpayne@68
|
364 /*--------------------------------------------------------------*/
|
jpayne@68
|
365
|
jpayne@68
|
366 /** Spawn process threads */
|
jpayne@68
|
367 private void spawnThreads(final ConcurrentReadInputStream cris, boolean altData){
|
jpayne@68
|
368
|
jpayne@68
|
369 //Do anything necessary prior to processing
|
jpayne@68
|
370
|
jpayne@68
|
371 //Fill a list with ProcessThreads
|
jpayne@68
|
372 if(verbose){System.err.println("Spawning "+threads+" threads.");}
|
jpayne@68
|
373 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
|
jpayne@68
|
374 for(int i=0; i<threads; i++){
|
jpayne@68
|
375 alpt.add(new ProcessThread(cris, i, altData));
|
jpayne@68
|
376 }
|
jpayne@68
|
377
|
jpayne@68
|
378 //Start the threads and wait for them to finish
|
jpayne@68
|
379 boolean success=ThreadWaiter.startAndWait(alpt, this);
|
jpayne@68
|
380 if(verbose){System.err.println("Threads finished with success="+success+".");}
|
jpayne@68
|
381 errorState&=!success;
|
jpayne@68
|
382 }
|
jpayne@68
|
383
|
jpayne@68
|
384 @Override
|
jpayne@68
|
385 public final void accumulate(ProcessThread pt){
|
jpayne@68
|
386 readsProcessed+=pt.readsProcessedT;
|
jpayne@68
|
387 basesProcessed+=pt.basesProcessedT;
|
jpayne@68
|
388 errorState|=(!pt.success);
|
jpayne@68
|
389 }
|
jpayne@68
|
390
|
jpayne@68
|
391 @Override
|
jpayne@68
|
392 public final boolean success(){return !errorState;}
|
jpayne@68
|
393
|
jpayne@68
|
394 /*--------------------------------------------------------------*/
|
jpayne@68
|
395 /*---------------- Inner Methods ----------------*/
|
jpayne@68
|
396 /*--------------------------------------------------------------*/
|
jpayne@68
|
397
|
jpayne@68
|
398 /*--------------------------------------------------------------*/
|
jpayne@68
|
399 /*---------------- Inner Classes ----------------*/
|
jpayne@68
|
400 /*--------------------------------------------------------------*/
|
jpayne@68
|
401
|
jpayne@68
|
402 /** This class is static to prevent accidental writing to shared variables.
|
jpayne@68
|
403 * It is safe to remove the static modifier. */
|
jpayne@68
|
404 class ProcessThread extends Thread {
|
jpayne@68
|
405
|
jpayne@68
|
406 //Constructor
|
jpayne@68
|
407 ProcessThread(final ConcurrentReadInputStream cris_, final int tid_, boolean alt_){
|
jpayne@68
|
408 cris=cris_;
|
jpayne@68
|
409 tid=tid_;
|
jpayne@68
|
410 processInput=(cris!=null);
|
jpayne@68
|
411 altData=alt_;
|
jpayne@68
|
412 }
|
jpayne@68
|
413
|
jpayne@68
|
414 //Called by start()
|
jpayne@68
|
415 @Override
|
jpayne@68
|
416 public void run(){
|
jpayne@68
|
417 //Do anything necessary prior to processing
|
jpayne@68
|
418
|
jpayne@68
|
419 if(processInput){
|
jpayne@68
|
420 //Process the reads
|
jpayne@68
|
421 processInner();
|
jpayne@68
|
422 }else{
|
jpayne@68
|
423 pickBest();
|
jpayne@68
|
424 }
|
jpayne@68
|
425
|
jpayne@68
|
426 //Do anything necessary after processing
|
jpayne@68
|
427
|
jpayne@68
|
428 //Indicate successful exit status
|
jpayne@68
|
429 success=true;
|
jpayne@68
|
430 }
|
jpayne@68
|
431
|
jpayne@68
|
432 /** Iterate through the reads */
|
jpayne@68
|
433 void processInner(){
|
jpayne@68
|
434 if(verbose && tid==0){System.err.println("processInner() for tid="+tid);}
|
jpayne@68
|
435
|
jpayne@68
|
436 //Grab the first ListNum of reads
|
jpayne@68
|
437 ListNum<Read> ln=cris.nextList();
|
jpayne@68
|
438
|
jpayne@68
|
439 //Check to ensure pairing is as expected
|
jpayne@68
|
440 if(ln!=null && !ln.isEmpty()){
|
jpayne@68
|
441 Read r=ln.get(0);
|
jpayne@68
|
442 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
|
jpayne@68
|
443 }
|
jpayne@68
|
444
|
jpayne@68
|
445 //As long as there is a nonempty read list...
|
jpayne@68
|
446 while(ln!=null && ln.size()>0){
|
jpayne@68
|
447 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
|
jpayne@68
|
448
|
jpayne@68
|
449 processInput(ln);
|
jpayne@68
|
450
|
jpayne@68
|
451 //Notify the input stream that the list was used
|
jpayne@68
|
452 cris.returnList(ln);
|
jpayne@68
|
453 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
|
jpayne@68
|
454
|
jpayne@68
|
455 //Fetch a new list
|
jpayne@68
|
456 ln=cris.nextList();
|
jpayne@68
|
457 }
|
jpayne@68
|
458
|
jpayne@68
|
459 //Notify the input stream that the final list was used
|
jpayne@68
|
460 if(ln!=null){
|
jpayne@68
|
461 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
|
jpayne@68
|
462 }
|
jpayne@68
|
463 }
|
jpayne@68
|
464
|
jpayne@68
|
465 void processInput(ListNum<Read> ln){
|
jpayne@68
|
466 if(verbose && tid==0){System.err.println("processInput() for tid="+tid);}
|
jpayne@68
|
467
|
jpayne@68
|
468 //Grab the actual read list from the ListNum
|
jpayne@68
|
469 final ArrayList<Read> reads=ln.list;
|
jpayne@68
|
470
|
jpayne@68
|
471 //Loop through each read in the list
|
jpayne@68
|
472 for(int idx=0; idx<reads.size(); idx++){
|
jpayne@68
|
473 final Read r1=reads.get(idx);
|
jpayne@68
|
474
|
jpayne@68
|
475 //Validate reads in worker threads
|
jpayne@68
|
476 if(!r1.validated()){r1.validate(true);}
|
jpayne@68
|
477
|
jpayne@68
|
478 //Track the initial length for statistics
|
jpayne@68
|
479 final int initialLength1=r1.length();
|
jpayne@68
|
480
|
jpayne@68
|
481 //Increment counters
|
jpayne@68
|
482 readsProcessedT++;
|
jpayne@68
|
483 basesProcessedT+=initialLength1;
|
jpayne@68
|
484
|
jpayne@68
|
485 processRead(r1);
|
jpayne@68
|
486 }
|
jpayne@68
|
487 }
|
jpayne@68
|
488
|
jpayne@68
|
489 void pickBest(){
|
jpayne@68
|
490 if(verbose && tid==0){System.err.println("pickBest() for tid="+tid);}
|
jpayne@68
|
491 for(ArrayList<Ribo> list=queue.poll(); list!=null; list=queue.poll()){
|
jpayne@68
|
492 Ribo best=pickBest(list);
|
jpayne@68
|
493 list.clear();
|
jpayne@68
|
494 synchronized(bestList){
|
jpayne@68
|
495 bestList.add(best);
|
jpayne@68
|
496 }
|
jpayne@68
|
497 }
|
jpayne@68
|
498 }
|
jpayne@68
|
499
|
jpayne@68
|
500 Ribo pickBest(ArrayList<Ribo> list){
|
jpayne@68
|
501 if(verbose && tid==0){System.err.println("pickBest(list[="+list.size()+"]) for tid="+tid);}
|
jpayne@68
|
502 assert(list!=null && list.size()>0);
|
jpayne@68
|
503 if(list.size()==1){return list.get(0);}
|
jpayne@68
|
504 Collections.sort(list);
|
jpayne@68
|
505 Collections.reverse(list);
|
jpayne@68
|
506 assert(list.get(0).product>=list.get(1).product);
|
jpayne@68
|
507 if(list.size()<3 || fast){return list.get(0);}
|
jpayne@68
|
508
|
jpayne@68
|
509 Ribo base=list.get(0);
|
jpayne@68
|
510 int pad=Tools.max(10, (1600-base.r.length()));
|
jpayne@68
|
511 BaseGraph bg=new BaseGraph(base.r.name(), base.r.bases, base.r.quality, base.r.numericID, pad);
|
jpayne@68
|
512 for(Ribo r : list){
|
jpayne@68
|
513 bg.alignAndGenerateMatch(r.r, ssa);
|
jpayne@68
|
514 }
|
jpayne@68
|
515 Read consensus=bg.traverse();
|
jpayne@68
|
516 Ribo best;
|
jpayne@68
|
517 if(useConsensus){
|
jpayne@68
|
518 best=new Ribo(consensus, base.tid, 1);
|
jpayne@68
|
519 }else{
|
jpayne@68
|
520 for(Ribo r : list){
|
jpayne@68
|
521 float id=align(r.r.bases, consensus.bases);
|
jpayne@68
|
522 r.identity=id;
|
jpayne@68
|
523 r.product=score(r.length(), r.identity);
|
jpayne@68
|
524 }
|
jpayne@68
|
525 Collections.sort(list);
|
jpayne@68
|
526 Collections.reverse(list);
|
jpayne@68
|
527 assert(list.get(0).product>=list.get(1).product);
|
jpayne@68
|
528 best=list.get(0);
|
jpayne@68
|
529 }
|
jpayne@68
|
530 return best;
|
jpayne@68
|
531 }
|
jpayne@68
|
532
|
jpayne@68
|
533 /**
|
jpayne@68
|
534 * Process a read or a read pair.
|
jpayne@68
|
535 * @return True if the reads should be kept, false if they should be discarded.
|
jpayne@68
|
536 */
|
jpayne@68
|
537 void processRead(final Read r){
|
jpayne@68
|
538 if(verbose && tid==0){System.err.println("processRead()");}
|
jpayne@68
|
539 if(r.length()<minlen || r.length()>maxlen){return;}
|
jpayne@68
|
540 if(maxns>=0 && r.countNocalls()>maxns){return;}
|
jpayne@68
|
541 Integer key=GiToTaxid.parseTaxidNumber(r.id, '|');
|
jpayne@68
|
542 if(verbose && tid==0){System.err.println("key="+key);}
|
jpayne@68
|
543 if(key==null || key==-1 || (altData && seenTaxID.contains(key))){return;}
|
jpayne@68
|
544 float id=align(r);
|
jpayne@68
|
545 if(id<minID){return;}
|
jpayne@68
|
546 Ribo ribo=new Ribo(r, key, id);
|
jpayne@68
|
547
|
jpayne@68
|
548 synchronized(listMap){
|
jpayne@68
|
549 ArrayList<Ribo> list=listMap.get(key);
|
jpayne@68
|
550 if(list==null){
|
jpayne@68
|
551 list=new ArrayList<Ribo>(8);
|
jpayne@68
|
552 listMap.put(key, list);
|
jpayne@68
|
553 }
|
jpayne@68
|
554 list.add(ribo);
|
jpayne@68
|
555 if(!altData){seenTaxID.add(key);}
|
jpayne@68
|
556 }
|
jpayne@68
|
557 }
|
jpayne@68
|
558
|
jpayne@68
|
559 float align(Read r){
|
jpayne@68
|
560 float a=(process16S ? align(r.bases, consensus16S) : 0);
|
jpayne@68
|
561 float b=(process18S ? align(r.bases, consensus18S) : 0);
|
jpayne@68
|
562 if(verbose && tid==0){System.err.println("Aligned; a="+a+", b="+b);}
|
jpayne@68
|
563 return Tools.max(a, b);
|
jpayne@68
|
564 }
|
jpayne@68
|
565
|
jpayne@68
|
566 float align(byte[] query, byte[] ref){
|
jpayne@68
|
567 int a=0, b=ref.length-1;
|
jpayne@68
|
568 int[] max=ssa.fillUnlimited(query, ref, a, b, -9999);
|
jpayne@68
|
569 if(max==null){return 0;}
|
jpayne@68
|
570
|
jpayne@68
|
571 final int rows=max[0];
|
jpayne@68
|
572 final int maxCol=max[1];
|
jpayne@68
|
573 final int maxState=max[2];
|
jpayne@68
|
574 final float id=ssa.tracebackIdentity(query, ref, a, b, rows, maxCol, maxState, null);
|
jpayne@68
|
575 return id;
|
jpayne@68
|
576 }
|
jpayne@68
|
577
|
jpayne@68
|
578 SingleStateAlignerFlat2 ssa=new SingleStateAlignerFlat2();
|
jpayne@68
|
579
|
jpayne@68
|
580 /** Number of reads processed by this thread */
|
jpayne@68
|
581 protected long readsProcessedT=0;
|
jpayne@68
|
582 /** Number of bases processed by this thread */
|
jpayne@68
|
583 protected long basesProcessedT=0;
|
jpayne@68
|
584
|
jpayne@68
|
585 /** True only if this thread has completed successfully */
|
jpayne@68
|
586 boolean success=false;
|
jpayne@68
|
587
|
jpayne@68
|
588 /** Shared input stream */
|
jpayne@68
|
589 private final ConcurrentReadInputStream cris;
|
jpayne@68
|
590 /** Thread ID */
|
jpayne@68
|
591 final int tid;
|
jpayne@68
|
592
|
jpayne@68
|
593 //Run mode
|
jpayne@68
|
594 final boolean processInput;
|
jpayne@68
|
595 final boolean altData;
|
jpayne@68
|
596 }
|
jpayne@68
|
597
|
jpayne@68
|
598 private class Ribo implements Comparable<Ribo>{
|
jpayne@68
|
599 Ribo(Read r_, int tid_, float identity_){
|
jpayne@68
|
600 r=r_;
|
jpayne@68
|
601 tid=tid_;
|
jpayne@68
|
602 identity=identity_;
|
jpayne@68
|
603 product=score(r.length(), identity);
|
jpayne@68
|
604 }
|
jpayne@68
|
605
|
jpayne@68
|
606 @Override
|
jpayne@68
|
607 public int compareTo(Ribo b) {
|
jpayne@68
|
608 if(b.product>product){return -1;}
|
jpayne@68
|
609 else if(b.product<product){return 1;}
|
jpayne@68
|
610 else if(b.r.length()>r.length()){return -1;}
|
jpayne@68
|
611 else if(b.r.length()<r.length()){return 1;}
|
jpayne@68
|
612 return 0;
|
jpayne@68
|
613 }
|
jpayne@68
|
614
|
jpayne@68
|
615 int length(){return r.length();}
|
jpayne@68
|
616
|
jpayne@68
|
617 Read r;
|
jpayne@68
|
618 int tid;
|
jpayne@68
|
619 float identity;
|
jpayne@68
|
620 float product;
|
jpayne@68
|
621 }
|
jpayne@68
|
622
|
jpayne@68
|
623 private class ListComparator implements Comparator<ArrayList<Ribo>> {
|
jpayne@68
|
624
|
jpayne@68
|
625 @Override
|
jpayne@68
|
626 public int compare(ArrayList<Ribo> a, ArrayList<Ribo> b) {
|
jpayne@68
|
627 return a.size()>b.size() ? -1 : a.size()<b.size() ? 1 : 0;
|
jpayne@68
|
628 }
|
jpayne@68
|
629
|
jpayne@68
|
630 }
|
jpayne@68
|
631
|
jpayne@68
|
632 private float lengthMult(int len){
|
jpayne@68
|
633 int idealLength=idealLength();
|
jpayne@68
|
634 int max=Tools.max(len, idealLength, 1);
|
jpayne@68
|
635 int min=Tools.min(len, idealLength);
|
jpayne@68
|
636 return min/(float)max;
|
jpayne@68
|
637 }
|
jpayne@68
|
638
|
jpayne@68
|
639 private float score(int len, float identity){
|
jpayne@68
|
640 return lengthMult(len)*identity;
|
jpayne@68
|
641 }
|
jpayne@68
|
642
|
jpayne@68
|
643 /*--------------------------------------------------------------*/
|
jpayne@68
|
644 /*---------------- Fields ----------------*/
|
jpayne@68
|
645 /*--------------------------------------------------------------*/
|
jpayne@68
|
646
|
jpayne@68
|
647 /** Primary input file path */
|
jpayne@68
|
648 private ArrayList<String> in=new ArrayList<String>();
|
jpayne@68
|
649
|
jpayne@68
|
650 /** Alternate input file path */
|
jpayne@68
|
651 private String alt=null;
|
jpayne@68
|
652
|
jpayne@68
|
653 /** Primary output file path */
|
jpayne@68
|
654 private String out1=null;
|
jpayne@68
|
655
|
jpayne@68
|
656 /** Override input file extension */
|
jpayne@68
|
657 private String extin=null;
|
jpayne@68
|
658 /** Override output file extension */
|
jpayne@68
|
659 private String extout=null;
|
jpayne@68
|
660
|
jpayne@68
|
661 ArrayList<Ribo> bestList=new ArrayList<Ribo>();
|
jpayne@68
|
662 HashMap<Integer, ArrayList<Ribo>> listMap=new HashMap<Integer, ArrayList<Ribo>>(100000);
|
jpayne@68
|
663 ConcurrentLinkedQueue<ArrayList<Ribo>> queue;
|
jpayne@68
|
664
|
jpayne@68
|
665
|
jpayne@68
|
666 IntHashSet seenTaxID=new IntHashSet(1000000);
|
jpayne@68
|
667
|
jpayne@68
|
668 byte[] consensus16S;
|
jpayne@68
|
669 byte[] consensus18S;
|
jpayne@68
|
670
|
jpayne@68
|
671 int idealLength(){
|
jpayne@68
|
672 if(process16S){return consensus16S.length;}
|
jpayne@68
|
673 return consensus18S.length;
|
jpayne@68
|
674 }
|
jpayne@68
|
675
|
jpayne@68
|
676 boolean useConsensus=false;
|
jpayne@68
|
677 boolean fast=false;
|
jpayne@68
|
678 int maxns=-1;
|
jpayne@68
|
679 int minlen=1;
|
jpayne@68
|
680 int maxlen=4000;
|
jpayne@68
|
681
|
jpayne@68
|
682 /*--------------------------------------------------------------*/
|
jpayne@68
|
683
|
jpayne@68
|
684 /** Number of reads processed */
|
jpayne@68
|
685 protected long readsProcessed=0;
|
jpayne@68
|
686 /** Number of bases processed */
|
jpayne@68
|
687 protected long basesProcessed=0;
|
jpayne@68
|
688
|
jpayne@68
|
689 /** Number of reads retained */
|
jpayne@68
|
690 protected long readsOut=0;
|
jpayne@68
|
691 /** Number of bases retained */
|
jpayne@68
|
692 protected long basesOut=0;
|
jpayne@68
|
693
|
jpayne@68
|
694 /** Quit after processing this many input reads; -1 means no limit */
|
jpayne@68
|
695 private long maxReads=-1;
|
jpayne@68
|
696
|
jpayne@68
|
697 private float minID=0.62f;
|
jpayne@68
|
698
|
jpayne@68
|
699 private boolean process16S=true;
|
jpayne@68
|
700 private boolean process18S=false;
|
jpayne@68
|
701
|
jpayne@68
|
702 /*--------------------------------------------------------------*/
|
jpayne@68
|
703 /*---------------- Final Fields ----------------*/
|
jpayne@68
|
704 /*--------------------------------------------------------------*/
|
jpayne@68
|
705
|
jpayne@68
|
706 /** Primary input file */
|
jpayne@68
|
707 private final ArrayList<FileFormat> ffin;
|
jpayne@68
|
708 private final FileFormat ffalt;
|
jpayne@68
|
709
|
jpayne@68
|
710 /** Primary output file */
|
jpayne@68
|
711 private final FileFormat ffout1;
|
jpayne@68
|
712
|
jpayne@68
|
713 final int threads;
|
jpayne@68
|
714
|
jpayne@68
|
715 /*--------------------------------------------------------------*/
|
jpayne@68
|
716 /*---------------- Common Fields ----------------*/
|
jpayne@68
|
717 /*--------------------------------------------------------------*/
|
jpayne@68
|
718
|
jpayne@68
|
719 /** Print status messages to this output stream */
|
jpayne@68
|
720 private PrintStream outstream=System.err;
|
jpayne@68
|
721 /** Print verbose messages */
|
jpayne@68
|
722 public static boolean verbose=false;
|
jpayne@68
|
723 /** True if an error was encountered */
|
jpayne@68
|
724 public boolean errorState=false;
|
jpayne@68
|
725 /** Overwrite existing output files */
|
jpayne@68
|
726 private boolean overwrite=false;
|
jpayne@68
|
727 /** Append to existing output files */
|
jpayne@68
|
728 private boolean append=false;
|
jpayne@68
|
729 /** Reads are output in input order */
|
jpayne@68
|
730 private boolean ordered=false;
|
jpayne@68
|
731
|
jpayne@68
|
732 }
|