jpayne@68
|
1 package prok;
|
jpayne@68
|
2
|
jpayne@68
|
3 import java.io.PrintStream;
|
jpayne@68
|
4 import java.util.ArrayList;
|
jpayne@68
|
5 import java.util.Arrays;
|
jpayne@68
|
6
|
jpayne@68
|
7 import aligner.SingleStateAlignerFlat2;
|
jpayne@68
|
8 import fileIO.ByteFile;
|
jpayne@68
|
9 import fileIO.FileFormat;
|
jpayne@68
|
10 import fileIO.ReadWrite;
|
jpayne@68
|
11 import shared.Parse;
|
jpayne@68
|
12 import shared.Parser;
|
jpayne@68
|
13 import shared.PreParser;
|
jpayne@68
|
14 import shared.ReadStats;
|
jpayne@68
|
15 import shared.Shared;
|
jpayne@68
|
16 import shared.Timer;
|
jpayne@68
|
17 import shared.Tools;
|
jpayne@68
|
18 import stream.ConcurrentReadInputStream;
|
jpayne@68
|
19 import stream.ConcurrentReadOutputStream;
|
jpayne@68
|
20 import stream.FastaReadInputStream;
|
jpayne@68
|
21 import stream.Read;
|
jpayne@68
|
22 import structures.ListNum;
|
jpayne@68
|
23 import template.Accumulator;
|
jpayne@68
|
24 import template.ThreadWaiter;
|
jpayne@68
|
25
|
jpayne@68
|
26 /**
|
jpayne@68
|
27 * Splits a mix of ribosomal sequences (such as Silva) into different files per type (16S, 18S, etc).
|
jpayne@68
|
28 *
|
jpayne@68
|
29 * @author Brian Bushnell
|
jpayne@68
|
30 * @date November 19, 2015
|
jpayne@68
|
31 *
|
jpayne@68
|
32 */
|
jpayne@68
|
33 public class SplitRibo implements Accumulator<SplitRibo.ProcessThread> {
|
jpayne@68
|
34
|
jpayne@68
|
35 /*--------------------------------------------------------------*/
|
jpayne@68
|
36 /*---------------- Initialization ----------------*/
|
jpayne@68
|
37 /*--------------------------------------------------------------*/
|
jpayne@68
|
38
|
jpayne@68
|
39 /**
|
jpayne@68
|
40 * Code entrance from the command line.
|
jpayne@68
|
41 * @param args Command line arguments
|
jpayne@68
|
42 */
|
jpayne@68
|
43 public static void main(String[] args){
|
jpayne@68
|
44 //Start a timer immediately upon code entrance.
|
jpayne@68
|
45 Timer t=new Timer();
|
jpayne@68
|
46
|
jpayne@68
|
47 //Create an instance of this class
|
jpayne@68
|
48 SplitRibo x=new SplitRibo(args);
|
jpayne@68
|
49
|
jpayne@68
|
50 //Run the object
|
jpayne@68
|
51 x.process(t);
|
jpayne@68
|
52
|
jpayne@68
|
53 //Close the print stream if it was redirected
|
jpayne@68
|
54 Shared.closeStream(x.outstream);
|
jpayne@68
|
55 }
|
jpayne@68
|
56
|
jpayne@68
|
57 /**
|
jpayne@68
|
58 * Constructor.
|
jpayne@68
|
59 * @param args Command line arguments
|
jpayne@68
|
60 */
|
jpayne@68
|
61 public SplitRibo(String[] args){
|
jpayne@68
|
62
|
jpayne@68
|
63 {//Preparse block for help, config files, and outstream
|
jpayne@68
|
64 PreParser pp=new PreParser(args, getClass(), false);
|
jpayne@68
|
65 args=pp.args;
|
jpayne@68
|
66 outstream=pp.outstream;
|
jpayne@68
|
67 }
|
jpayne@68
|
68
|
jpayne@68
|
69 //Set shared static variables prior to parsing
|
jpayne@68
|
70 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
|
jpayne@68
|
71 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
|
jpayne@68
|
72 Shared.capBufferLen(50);
|
jpayne@68
|
73 ReadWrite.ZIPLEVEL=9;
|
jpayne@68
|
74
|
jpayne@68
|
75 {//Parse the arguments
|
jpayne@68
|
76 final Parser parser=parse(args);
|
jpayne@68
|
77 Parser.processQuality();
|
jpayne@68
|
78
|
jpayne@68
|
79 maxReads=parser.maxReads;
|
jpayne@68
|
80 overwrite=ReadStats.overwrite=parser.overwrite;
|
jpayne@68
|
81 append=ReadStats.append=parser.append;
|
jpayne@68
|
82
|
jpayne@68
|
83 in1=parser.in1;
|
jpayne@68
|
84 qfin1=parser.qfin1;
|
jpayne@68
|
85 extin=parser.extin;
|
jpayne@68
|
86
|
jpayne@68
|
87 outPattern=parser.out1;
|
jpayne@68
|
88 extout=parser.extout;
|
jpayne@68
|
89 }
|
jpayne@68
|
90
|
jpayne@68
|
91 validateParams();
|
jpayne@68
|
92 fixExtensions(); //Add or remove .gz or .bz2 as needed
|
jpayne@68
|
93 checkFileExistence(); //Ensure files can be read and written
|
jpayne@68
|
94 checkStatics(); //Adjust file-related static fields as needed for this program
|
jpayne@68
|
95
|
jpayne@68
|
96 //Create input FileFormat objects
|
jpayne@68
|
97 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
|
jpayne@68
|
98
|
jpayne@68
|
99 numTypes=sequenceTypes.length;
|
jpayne@68
|
100 readsOut=new long[numTypes];
|
jpayne@68
|
101 basesOut=new long[numTypes];
|
jpayne@68
|
102 consensusSequences=loadConsensusSequenceFromFile();
|
jpayne@68
|
103 }
|
jpayne@68
|
104
|
jpayne@68
|
105 /*--------------------------------------------------------------*/
|
jpayne@68
|
106 /*---------------- Initialization Helpers ----------------*/
|
jpayne@68
|
107 /*--------------------------------------------------------------*/
|
jpayne@68
|
108
|
jpayne@68
|
109 /** Parse arguments from the command line */
|
jpayne@68
|
110 private Parser parse(String[] args){
|
jpayne@68
|
111
|
jpayne@68
|
112 //Create a parser object
|
jpayne@68
|
113 Parser parser=new Parser();
|
jpayne@68
|
114
|
jpayne@68
|
115 //Set any necessary Parser defaults here
|
jpayne@68
|
116 //parser.foo=bar;
|
jpayne@68
|
117
|
jpayne@68
|
118 //Parse each argument
|
jpayne@68
|
119 for(int i=0; i<args.length; i++){
|
jpayne@68
|
120 String arg=args[i];
|
jpayne@68
|
121
|
jpayne@68
|
122 //Break arguments into their constituent parts, in the form of "a=b"
|
jpayne@68
|
123 String[] split=arg.split("=");
|
jpayne@68
|
124 String a=split[0].toLowerCase();
|
jpayne@68
|
125 String b=split.length>1 ? split[1] : null;
|
jpayne@68
|
126 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
|
jpayne@68
|
127
|
jpayne@68
|
128 if(a.equals("verbose")){
|
jpayne@68
|
129 verbose=Parse.parseBoolean(b);
|
jpayne@68
|
130 }else if(a.equals("ordered")){
|
jpayne@68
|
131 ordered=Parse.parseBoolean(b);
|
jpayne@68
|
132 }else if(a.equalsIgnoreCase("minid")){
|
jpayne@68
|
133 minID=Float.parseFloat(b);
|
jpayne@68
|
134 }else if(a.equalsIgnoreCase("minid2") || a.equalsIgnoreCase("refineid")){
|
jpayne@68
|
135 refineID=Float.parseFloat(b);
|
jpayne@68
|
136 }else if(a.equals("out") || a.equals("pattern") || a.equals("outpattern")){
|
jpayne@68
|
137 parser.out1=b;
|
jpayne@68
|
138 }else if(a.equals("type") || a.equals("types")){
|
jpayne@68
|
139 parseTypes(b);
|
jpayne@68
|
140 }else if(a.equals("parse_flag_goes_here")){
|
jpayne@68
|
141 long fake_variable=Parse.parseKMG(b);
|
jpayne@68
|
142 //Set a variable here
|
jpayne@68
|
143 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
|
jpayne@68
|
144 //do nothing
|
jpayne@68
|
145 }else{
|
jpayne@68
|
146 outstream.println("Unknown parameter "+args[i]);
|
jpayne@68
|
147 assert(false) : "Unknown parameter "+args[i];
|
jpayne@68
|
148 }
|
jpayne@68
|
149 }
|
jpayne@68
|
150
|
jpayne@68
|
151 return parser;
|
jpayne@68
|
152 }
|
jpayne@68
|
153
|
jpayne@68
|
154 private void parseTypes(String b){
|
jpayne@68
|
155 sequenceTypes=null;
|
jpayne@68
|
156 if(b==null){
|
jpayne@68
|
157 assert(false) : "'types' flag requires a list of types, such as 'types=16S,18S'";
|
jpayne@68
|
158 sequenceTypes=new String[] {"Other"};
|
jpayne@68
|
159 }else{
|
jpayne@68
|
160 String[] split=b.split(",");
|
jpayne@68
|
161 sequenceTypes=new String[split.length+1];
|
jpayne@68
|
162 sequenceTypes[0]="Other";
|
jpayne@68
|
163 for(int i=0; i<split.length; i++){
|
jpayne@68
|
164 String s=split[i].replace('s', 'S');
|
jpayne@68
|
165 if(s.startsWith("its")){s=s.replaceFirst("its", "ITS");}
|
jpayne@68
|
166 sequenceTypes[i+1]=s;
|
jpayne@68
|
167 }
|
jpayne@68
|
168 }
|
jpayne@68
|
169 }
|
jpayne@68
|
170
|
jpayne@68
|
171 /** Add or remove .gz or .bz2 as needed */
|
jpayne@68
|
172 private void fixExtensions(){
|
jpayne@68
|
173 in1=Tools.fixExtension(in1);
|
jpayne@68
|
174 qfin1=Tools.fixExtension(qfin1);
|
jpayne@68
|
175 }
|
jpayne@68
|
176
|
jpayne@68
|
177 /** Ensure files can be read and written */
|
jpayne@68
|
178 private void checkFileExistence(){
|
jpayne@68
|
179
|
jpayne@68
|
180 //Ensure input files can be read
|
jpayne@68
|
181 if(!Tools.testInputFiles(false, true, in1)){
|
jpayne@68
|
182 throw new RuntimeException("\nCan't read some input files.\n");
|
jpayne@68
|
183 }
|
jpayne@68
|
184
|
jpayne@68
|
185 if(outPattern==null){return;}
|
jpayne@68
|
186
|
jpayne@68
|
187 if(!outPattern.contains("#")){
|
jpayne@68
|
188 throw new RuntimeException("OutPattern must contain '#' symbol: "+outPattern);
|
jpayne@68
|
189 }
|
jpayne@68
|
190
|
jpayne@68
|
191 for(String type : sequenceTypes) {
|
jpayne@68
|
192 String out=outPattern.replaceFirst("#", type);
|
jpayne@68
|
193
|
jpayne@68
|
194 //Ensure output files can be written
|
jpayne@68
|
195 if(!Tools.testOutputFiles(overwrite, append, false, out)){
|
jpayne@68
|
196 outstream.println((outPattern==null)+", "+(out==null)+", "+outPattern+", "+out);
|
jpayne@68
|
197 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out+"\n");
|
jpayne@68
|
198 }
|
jpayne@68
|
199
|
jpayne@68
|
200 //Ensure that no file was specified multiple times
|
jpayne@68
|
201 if(!Tools.testForDuplicateFiles(true, in1, out)){
|
jpayne@68
|
202 throw new RuntimeException("\nSome file names were specified multiple times.\n");
|
jpayne@68
|
203 }
|
jpayne@68
|
204 }
|
jpayne@68
|
205 }
|
jpayne@68
|
206
|
jpayne@68
|
207 /** Adjust file-related static fields as needed for this program */
|
jpayne@68
|
208 private static void checkStatics(){
|
jpayne@68
|
209 //Adjust the number of threads for input file reading
|
jpayne@68
|
210 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
|
jpayne@68
|
211 ByteFile.FORCE_MODE_BF2=true;
|
jpayne@68
|
212 }
|
jpayne@68
|
213
|
jpayne@68
|
214 assert(FastaReadInputStream.settingsOK());
|
jpayne@68
|
215 }
|
jpayne@68
|
216
|
jpayne@68
|
217 /** Ensure parameter ranges are within bounds and required parameters are set */
|
jpayne@68
|
218 private boolean validateParams(){
|
jpayne@68
|
219 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
|
jpayne@68
|
220 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
|
jpayne@68
|
221 return true;
|
jpayne@68
|
222 }
|
jpayne@68
|
223
|
jpayne@68
|
224 private final Read[][] loadConsensusSequenceFromFile(){
|
jpayne@68
|
225 Read[][] seqs=new Read[numTypes][];
|
jpayne@68
|
226 m16S_index=Tools.find("m16S", sequenceTypes);
|
jpayne@68
|
227 m18S_index=Tools.find("m18S", sequenceTypes);
|
jpayne@68
|
228 p16S_index=Tools.find("p16S", sequenceTypes);
|
jpayne@68
|
229 boolean stripM16S=(m16S_index>=0);
|
jpayne@68
|
230 boolean stripM18S=(m18S_index>=0);
|
jpayne@68
|
231 boolean stripP16S=(p16S_index>=0);
|
jpayne@68
|
232 for(int st=1; st<numTypes; st++){
|
jpayne@68
|
233 String name=sequenceTypes[st];
|
jpayne@68
|
234 boolean is16S=name.equalsIgnoreCase("16S");
|
jpayne@68
|
235 boolean is18S=name.equalsIgnoreCase("18S");
|
jpayne@68
|
236 seqs[st]=ProkObject.loadConsensusSequenceType(name, ((is16S && stripM16S) || (is18S && stripM18S)), (is16S && stripP16S));
|
jpayne@68
|
237 }
|
jpayne@68
|
238 return seqs;
|
jpayne@68
|
239 }
|
jpayne@68
|
240
|
jpayne@68
|
241 /*--------------------------------------------------------------*/
|
jpayne@68
|
242 /*---------------- Outer Methods ----------------*/
|
jpayne@68
|
243 /*--------------------------------------------------------------*/
|
jpayne@68
|
244
|
jpayne@68
|
245 /** Create read streams and process all data */
|
jpayne@68
|
246 void process(Timer t){
|
jpayne@68
|
247
|
jpayne@68
|
248 //Turn off read validation in the input threads to increase speed
|
jpayne@68
|
249 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
|
jpayne@68
|
250 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
|
jpayne@68
|
251
|
jpayne@68
|
252 //Create a read input stream
|
jpayne@68
|
253 final ConcurrentReadInputStream cris=makeCris();
|
jpayne@68
|
254
|
jpayne@68
|
255 //Optionally create a read output stream
|
jpayne@68
|
256 final ConcurrentReadOutputStream[] rosa=makeCrosArray();
|
jpayne@68
|
257
|
jpayne@68
|
258 //Reset counters
|
jpayne@68
|
259 readsProcessed=0;
|
jpayne@68
|
260 basesProcessed=0;
|
jpayne@68
|
261 Arrays.fill(readsOut, 0);
|
jpayne@68
|
262 Arrays.fill(basesOut, 0);
|
jpayne@68
|
263
|
jpayne@68
|
264 //Process the reads in separate threads
|
jpayne@68
|
265 spawnThreads(cris, rosa);
|
jpayne@68
|
266
|
jpayne@68
|
267 if(verbose){outstream.println("Finished; closing streams.");}
|
jpayne@68
|
268
|
jpayne@68
|
269 //Write anything that was accumulated by ReadStats
|
jpayne@68
|
270 errorState|=ReadStats.writeAll();
|
jpayne@68
|
271 //assert(!errorState);
|
jpayne@68
|
272 //Close the read streams
|
jpayne@68
|
273 errorState|=ReadWrite.closeStreams(cris, rosa);
|
jpayne@68
|
274 //assert(!errorState);
|
jpayne@68
|
275
|
jpayne@68
|
276 //Reset read validation
|
jpayne@68
|
277 Read.VALIDATE_IN_CONSTRUCTOR=vic;
|
jpayne@68
|
278
|
jpayne@68
|
279 long readsOut2=Tools.sum(readsOut)-readsOut[0];
|
jpayne@68
|
280 long basesOut2=Tools.sum(basesOut)-basesOut[0];
|
jpayne@68
|
281
|
jpayne@68
|
282 //Report timing and results
|
jpayne@68
|
283 t.stop();
|
jpayne@68
|
284 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
|
jpayne@68
|
285 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut2, basesOut2, 8, true));
|
jpayne@68
|
286
|
jpayne@68
|
287 outstream.println();
|
jpayne@68
|
288 outstream.println(Tools.string("Type", "Count", 8));
|
jpayne@68
|
289 for(int type=0; type<numTypes; type++){
|
jpayne@68
|
290 outstream.println(Tools.number(sequenceTypes[type], readsOut[type], 8));
|
jpayne@68
|
291 }
|
jpayne@68
|
292
|
jpayne@68
|
293 //Throw an exception of there was an error in a thread
|
jpayne@68
|
294 if(errorState){
|
jpayne@68
|
295 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
|
jpayne@68
|
296 }
|
jpayne@68
|
297 }
|
jpayne@68
|
298
|
jpayne@68
|
299 private ConcurrentReadInputStream makeCris(){
|
jpayne@68
|
300 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, null, qfin1, null);
|
jpayne@68
|
301 cris.start(); //Start the stream
|
jpayne@68
|
302 if(verbose){outstream.println("Started cris");}
|
jpayne@68
|
303 return cris;
|
jpayne@68
|
304 }
|
jpayne@68
|
305
|
jpayne@68
|
306 private ConcurrentReadOutputStream[] makeCrosArray(){
|
jpayne@68
|
307 ConcurrentReadOutputStream[] rosa=new ConcurrentReadOutputStream[numTypes];
|
jpayne@68
|
308 for(int i=0; i<numTypes; i++){
|
jpayne@68
|
309 String type=sequenceTypes[i];
|
jpayne@68
|
310 final ConcurrentReadOutputStream ros=makeCros(type);
|
jpayne@68
|
311 rosa[i]=ros;
|
jpayne@68
|
312 }
|
jpayne@68
|
313 return rosa;
|
jpayne@68
|
314 }
|
jpayne@68
|
315
|
jpayne@68
|
316 private ConcurrentReadOutputStream makeCros(String type){
|
jpayne@68
|
317 if(outPattern==null){return null;}
|
jpayne@68
|
318
|
jpayne@68
|
319 //Select output buffer size based on whether it needs to be ordered
|
jpayne@68
|
320 final int buff=(ordered ? Tools.mid(2, 16, (Shared.threads()*2)/3) : 4);
|
jpayne@68
|
321 final String fname=outPattern.replaceFirst("#", type);
|
jpayne@68
|
322 FileFormat ff=FileFormat.testOutput(fname, FileFormat.FASTA, extout, true, overwrite, append, ordered);
|
jpayne@68
|
323
|
jpayne@68
|
324 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ff, null, buff, null, false);
|
jpayne@68
|
325 ros.start(); //Start the stream
|
jpayne@68
|
326 return ros;
|
jpayne@68
|
327 }
|
jpayne@68
|
328
|
jpayne@68
|
329 /*--------------------------------------------------------------*/
|
jpayne@68
|
330 /*---------------- Thread Management ----------------*/
|
jpayne@68
|
331 /*--------------------------------------------------------------*/
|
jpayne@68
|
332
|
jpayne@68
|
333 /** Spawn process threads */
|
jpayne@68
|
334 private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream[] rosa){
|
jpayne@68
|
335
|
jpayne@68
|
336 //Do anything necessary prior to processing
|
jpayne@68
|
337
|
jpayne@68
|
338 //Determine how many threads may be used
|
jpayne@68
|
339 final int threads=Shared.threads();
|
jpayne@68
|
340
|
jpayne@68
|
341 //Fill a list with ProcessThreads
|
jpayne@68
|
342 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
|
jpayne@68
|
343 for(int i=0; i<threads; i++){
|
jpayne@68
|
344 alpt.add(new ProcessThread(cris, rosa, i));
|
jpayne@68
|
345 }
|
jpayne@68
|
346
|
jpayne@68
|
347 //Start the threads and wait for them to finish
|
jpayne@68
|
348 boolean success=ThreadWaiter.startAndWait(alpt, this);
|
jpayne@68
|
349 errorState&=!success;
|
jpayne@68
|
350 //assert(!errorState);
|
jpayne@68
|
351
|
jpayne@68
|
352 //Do anything necessary after processing
|
jpayne@68
|
353
|
jpayne@68
|
354 }
|
jpayne@68
|
355
|
jpayne@68
|
356 @Override
|
jpayne@68
|
357 public final void accumulate(ProcessThread pt){
|
jpayne@68
|
358 readsProcessed+=pt.readsProcessedT;
|
jpayne@68
|
359 basesProcessed+=pt.basesProcessedT;
|
jpayne@68
|
360 Tools.add(readsOut, pt.readsOutT);
|
jpayne@68
|
361 Tools.add(basesOut, pt.basesOutT);
|
jpayne@68
|
362 errorState|=(!pt.success);
|
jpayne@68
|
363 //assert(!errorState);
|
jpayne@68
|
364 }
|
jpayne@68
|
365
|
jpayne@68
|
366 @Override
|
jpayne@68
|
367 public final boolean success(){return !errorState;}
|
jpayne@68
|
368
|
jpayne@68
|
369 /*--------------------------------------------------------------*/
|
jpayne@68
|
370 /*---------------- Inner Methods ----------------*/
|
jpayne@68
|
371 /*--------------------------------------------------------------*/
|
jpayne@68
|
372
|
jpayne@68
|
373 /*--------------------------------------------------------------*/
|
jpayne@68
|
374 /*---------------- Inner Classes ----------------*/
|
jpayne@68
|
375 /*--------------------------------------------------------------*/
|
jpayne@68
|
376
|
jpayne@68
|
377 /** This class is static to prevent accidental writing to shared variables.
|
jpayne@68
|
378 * It is safe to remove the static modifier. */
|
jpayne@68
|
379 class ProcessThread extends Thread {
|
jpayne@68
|
380
|
jpayne@68
|
381 //Constructor
|
jpayne@68
|
382 ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream[] rosa_, final int tid_){
|
jpayne@68
|
383 cris=cris_;
|
jpayne@68
|
384 rosa=rosa_;
|
jpayne@68
|
385 tid=tid_;
|
jpayne@68
|
386 }
|
jpayne@68
|
387
|
jpayne@68
|
388 //Called by start()
|
jpayne@68
|
389 @Override
|
jpayne@68
|
390 public void run(){
|
jpayne@68
|
391 //Do anything necessary prior to processing
|
jpayne@68
|
392
|
jpayne@68
|
393 //Process the reads
|
jpayne@68
|
394 processInner();
|
jpayne@68
|
395
|
jpayne@68
|
396 //Do anything necessary after processing
|
jpayne@68
|
397
|
jpayne@68
|
398 //Indicate successful exit status
|
jpayne@68
|
399 success=true;
|
jpayne@68
|
400 }
|
jpayne@68
|
401
|
jpayne@68
|
402 /** Iterate through the reads */
|
jpayne@68
|
403 void processInner(){
|
jpayne@68
|
404
|
jpayne@68
|
405 //Grab the first ListNum of reads
|
jpayne@68
|
406 ListNum<Read> ln=cris.nextList();
|
jpayne@68
|
407
|
jpayne@68
|
408 //Check to ensure pairing is as expected
|
jpayne@68
|
409 if(ln!=null && !ln.isEmpty()){
|
jpayne@68
|
410 Read r=ln.get(0);
|
jpayne@68
|
411 assert(r.mate==null);
|
jpayne@68
|
412 }
|
jpayne@68
|
413
|
jpayne@68
|
414 //As long as there is a nonempty read list...
|
jpayne@68
|
415 while(ln!=null && ln.size()>0){
|
jpayne@68
|
416 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
|
jpayne@68
|
417
|
jpayne@68
|
418 processList(ln);
|
jpayne@68
|
419
|
jpayne@68
|
420 //Notify the input stream that the list was used
|
jpayne@68
|
421 cris.returnList(ln);
|
jpayne@68
|
422 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
|
jpayne@68
|
423
|
jpayne@68
|
424 //Fetch a new list
|
jpayne@68
|
425 ln=cris.nextList();
|
jpayne@68
|
426 }
|
jpayne@68
|
427
|
jpayne@68
|
428 //Notify the input stream that the final list was used
|
jpayne@68
|
429 if(ln!=null){
|
jpayne@68
|
430 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
|
jpayne@68
|
431 }
|
jpayne@68
|
432 }
|
jpayne@68
|
433
|
jpayne@68
|
434 void processList(ListNum<Read> ln){
|
jpayne@68
|
435
|
jpayne@68
|
436 //Grab the actual read list from the ListNum
|
jpayne@68
|
437 final ArrayList<Read> reads=ln.list;
|
jpayne@68
|
438
|
jpayne@68
|
439 @SuppressWarnings("unchecked")
|
jpayne@68
|
440 final ArrayList<Read>[] out=new ArrayList[numTypes];
|
jpayne@68
|
441 for(int i=0; i<numTypes; i++){
|
jpayne@68
|
442 ArrayList<Read> list=new ArrayList<Read>(50);
|
jpayne@68
|
443 out[i]=list;
|
jpayne@68
|
444 }
|
jpayne@68
|
445
|
jpayne@68
|
446 //Loop through each read in the list
|
jpayne@68
|
447 for(int idx=0; idx<reads.size(); idx++){
|
jpayne@68
|
448 final Read r1=reads.get(idx);
|
jpayne@68
|
449
|
jpayne@68
|
450 //Validate reads in worker threads
|
jpayne@68
|
451 if(!r1.validated()){r1.validate(true);}
|
jpayne@68
|
452
|
jpayne@68
|
453 //Track the initial length for statistics
|
jpayne@68
|
454 final int initialLength1=r1.length();
|
jpayne@68
|
455 final int initialLength2=r1.mateLength();
|
jpayne@68
|
456
|
jpayne@68
|
457 //Increment counters
|
jpayne@68
|
458 readsProcessedT+=r1.pairCount();
|
jpayne@68
|
459 basesProcessedT+=initialLength1+initialLength2;
|
jpayne@68
|
460
|
jpayne@68
|
461 {
|
jpayne@68
|
462 //Reads are processed in this block.
|
jpayne@68
|
463 final int type=processRead(r1);
|
jpayne@68
|
464 readsOutT[type]+=r1.pairCount();
|
jpayne@68
|
465 basesOutT[type]+=r1.pairLength();
|
jpayne@68
|
466 out[type].add(r1);
|
jpayne@68
|
467 }
|
jpayne@68
|
468 }
|
jpayne@68
|
469
|
jpayne@68
|
470 //Output reads to the output stream
|
jpayne@68
|
471 if(rosa!=null){
|
jpayne@68
|
472 for(int type=0; type<numTypes; type++){
|
jpayne@68
|
473 rosa[type].add(out[type], ln.id);
|
jpayne@68
|
474 }
|
jpayne@68
|
475 }
|
jpayne@68
|
476 }
|
jpayne@68
|
477
|
jpayne@68
|
478 /**
|
jpayne@68
|
479 * Process a read.
|
jpayne@68
|
480 * @param r1 Read 1
|
jpayne@68
|
481 * @return The best-matching type, or 0 for no matches.
|
jpayne@68
|
482 */
|
jpayne@68
|
483 private int processRead(final Read r){
|
jpayne@68
|
484 int bestType=0;
|
jpayne@68
|
485 float bestID=-1;
|
jpayne@68
|
486 for(int type=1; type<numTypes; type++){//Align to only the overall consensus
|
jpayne@68
|
487 Read[] refs=consensusSequences[type];
|
jpayne@68
|
488 float id=align(r, refs, 0, 1);
|
jpayne@68
|
489 if(id>bestID && id>=minID){
|
jpayne@68
|
490 bestType=type;
|
jpayne@68
|
491 bestID=id;
|
jpayne@68
|
492 }
|
jpayne@68
|
493 }
|
jpayne@68
|
494 if(bestType<1 || bestID<refineID || bestType==p16S_index){//If nothing met minID, or if it matched chloro, align to clade-specific consensuses
|
jpayne@68
|
495 for(int type=1; type<numTypes; type++){
|
jpayne@68
|
496 Read[] refs=consensusSequences[type];
|
jpayne@68
|
497 float id=align(r, refs, 1, refs.length);
|
jpayne@68
|
498 if(id>bestID && id>=minID){
|
jpayne@68
|
499 bestType=type;
|
jpayne@68
|
500 bestID=id;
|
jpayne@68
|
501 }
|
jpayne@68
|
502 }
|
jpayne@68
|
503 }
|
jpayne@68
|
504 r.obj=bestID;//If desired... in actuality, more info might be useful, like alignment length
|
jpayne@68
|
505 return bestID<minID ? 0 : bestType;
|
jpayne@68
|
506 }
|
jpayne@68
|
507
|
jpayne@68
|
508 private float align(Read r, Read[] refs, int minRef, int maxRef){
|
jpayne@68
|
509 float bestID=-1;
|
jpayne@68
|
510 if(refs!=null){
|
jpayne@68
|
511 for(int i=minRef; i<maxRef; i++){
|
jpayne@68
|
512 Read ref=refs[i];
|
jpayne@68
|
513 float id=align(r.bases, ref.bases);
|
jpayne@68
|
514 bestID=Tools.max(id, bestID);
|
jpayne@68
|
515 }
|
jpayne@68
|
516 }
|
jpayne@68
|
517 return bestID;
|
jpayne@68
|
518 }
|
jpayne@68
|
519
|
jpayne@68
|
520 private float align(byte[] query, byte[] ref){
|
jpayne@68
|
521 int a=0, b=ref.length-1;
|
jpayne@68
|
522 int[] max=ssa.fillUnlimited(query, ref, a, b, -9999);
|
jpayne@68
|
523 if(max==null){return 0;}
|
jpayne@68
|
524
|
jpayne@68
|
525 final int rows=max[0];
|
jpayne@68
|
526 final int maxCol=max[1];
|
jpayne@68
|
527 final int maxState=max[2];
|
jpayne@68
|
528 final float id=ssa.tracebackIdentity(query, ref, a, b, rows, maxCol, maxState, null);
|
jpayne@68
|
529 return id;
|
jpayne@68
|
530 }
|
jpayne@68
|
531
|
jpayne@68
|
532 SingleStateAlignerFlat2 ssa=new SingleStateAlignerFlat2();
|
jpayne@68
|
533
|
jpayne@68
|
534 /** Number of reads processed by this thread */
|
jpayne@68
|
535 protected long readsProcessedT=0;
|
jpayne@68
|
536 /** Number of bases processed by this thread */
|
jpayne@68
|
537 protected long basesProcessedT=0;
|
jpayne@68
|
538
|
jpayne@68
|
539 /** Number of reads retained by this thread */
|
jpayne@68
|
540 protected long[] readsOutT=new long[numTypes];
|
jpayne@68
|
541 /** Number of bases retained by this thread */
|
jpayne@68
|
542 protected long[] basesOutT=new long[numTypes];
|
jpayne@68
|
543
|
jpayne@68
|
544 /** True only if this thread has completed successfully */
|
jpayne@68
|
545 boolean success=false;
|
jpayne@68
|
546
|
jpayne@68
|
547 /** Shared input stream */
|
jpayne@68
|
548 private final ConcurrentReadInputStream cris;
|
jpayne@68
|
549 /** Shared output stream */
|
jpayne@68
|
550 private final ConcurrentReadOutputStream[] rosa;
|
jpayne@68
|
551 /** Thread ID */
|
jpayne@68
|
552 final int tid;
|
jpayne@68
|
553 }
|
jpayne@68
|
554
|
jpayne@68
|
555 /*--------------------------------------------------------------*/
|
jpayne@68
|
556 /*---------------- Fields ----------------*/
|
jpayne@68
|
557 /*--------------------------------------------------------------*/
|
jpayne@68
|
558
|
jpayne@68
|
559 /** Primary input file path */
|
jpayne@68
|
560 private String in1=null;
|
jpayne@68
|
561
|
jpayne@68
|
562 private String qfin1=null;
|
jpayne@68
|
563
|
jpayne@68
|
564 /** Primary output file path */
|
jpayne@68
|
565 private String outPattern=null;
|
jpayne@68
|
566
|
jpayne@68
|
567 /** Override input file extension */
|
jpayne@68
|
568 private String extin=null;
|
jpayne@68
|
569 /** Override output file extension */
|
jpayne@68
|
570 private String extout=null;
|
jpayne@68
|
571
|
jpayne@68
|
572 float minID=0.59f; //This could be a per-type value
|
jpayne@68
|
573 float refineID=0.70f; //Refine alignment if best is less than this
|
jpayne@68
|
574
|
jpayne@68
|
575 private int m16S_index=-2;
|
jpayne@68
|
576 private int m18S_index=-2;
|
jpayne@68
|
577 private int p16S_index=-2;
|
jpayne@68
|
578
|
jpayne@68
|
579 /*--------------------------------------------------------------*/
|
jpayne@68
|
580
|
jpayne@68
|
581 /** Number of reads processed */
|
jpayne@68
|
582 protected long readsProcessed=0;
|
jpayne@68
|
583 /** Number of bases processed */
|
jpayne@68
|
584 protected long basesProcessed=0;
|
jpayne@68
|
585
|
jpayne@68
|
586 /** Quit after processing this many input reads; -1 means no limit */
|
jpayne@68
|
587 private long maxReads=-1;
|
jpayne@68
|
588
|
jpayne@68
|
589 private String[] sequenceTypes=new String[] {"Other", "16S", "18S", "23S", "5S", "m16S", "m18S", "p16S"};
|
jpayne@68
|
590 private final int numTypes;//=sequenceTypes.length;
|
jpayne@68
|
591 final Read[][] consensusSequences;
|
jpayne@68
|
592
|
jpayne@68
|
593 /** Number of reads retained */
|
jpayne@68
|
594 final long[] readsOut;
|
jpayne@68
|
595 /** Number of bases retained */
|
jpayne@68
|
596 final long[] basesOut;
|
jpayne@68
|
597
|
jpayne@68
|
598 /*--------------------------------------------------------------*/
|
jpayne@68
|
599 /*---------------- Final Fields ----------------*/
|
jpayne@68
|
600 /*--------------------------------------------------------------*/
|
jpayne@68
|
601
|
jpayne@68
|
602 /** Primary input file */
|
jpayne@68
|
603 private final FileFormat ffin1;
|
jpayne@68
|
604
|
jpayne@68
|
605 /*--------------------------------------------------------------*/
|
jpayne@68
|
606 /*---------------- Static Fields ----------------*/
|
jpayne@68
|
607 /*--------------------------------------------------------------*/
|
jpayne@68
|
608
|
jpayne@68
|
609 /*--------------------------------------------------------------*/
|
jpayne@68
|
610 /*---------------- Common Fields ----------------*/
|
jpayne@68
|
611 /*--------------------------------------------------------------*/
|
jpayne@68
|
612
|
jpayne@68
|
613 /** Print status messages to this output stream */
|
jpayne@68
|
614 private PrintStream outstream=System.err;
|
jpayne@68
|
615 /** Print verbose messages */
|
jpayne@68
|
616 public static boolean verbose=false;
|
jpayne@68
|
617 /** True if an error was encountered */
|
jpayne@68
|
618 public boolean errorState=false;
|
jpayne@68
|
619 /** Overwrite existing output files */
|
jpayne@68
|
620 private boolean overwrite=false;
|
jpayne@68
|
621 /** Append to existing output files */
|
jpayne@68
|
622 private boolean append=false;
|
jpayne@68
|
623 /** Reads are output in input order */
|
jpayne@68
|
624 private boolean ordered=true;
|
jpayne@68
|
625
|
jpayne@68
|
626 }
|