Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/aligner/AllToAll.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 package aligner; | |
2 | |
3 import java.io.PrintStream; | |
4 import java.util.ArrayList; | |
5 import java.util.concurrent.atomic.AtomicInteger; | |
6 | |
7 import fileIO.ByteFile; | |
8 import fileIO.ByteStreamWriter; | |
9 import fileIO.FileFormat; | |
10 import fileIO.ReadWrite; | |
11 import shared.Parse; | |
12 import shared.Parser; | |
13 import shared.PreParser; | |
14 import shared.ReadStats; | |
15 import shared.Shared; | |
16 import shared.Timer; | |
17 import shared.Tools; | |
18 import sketch.SketchObject; | |
19 import stream.ConcurrentReadInputStream; | |
20 import stream.FastaReadInputStream; | |
21 import stream.Read; | |
22 import template.Accumulator; | |
23 import template.ThreadWaiter; | |
24 | |
25 /** | |
26 * Aligns all sequences to all sequences and produces an identity matrix. | |
27 * | |
28 * @author Brian Bushnell | |
29 * @date January 27, 2020 | |
30 * | |
31 */ | |
32 public class AllToAll implements Accumulator<AllToAll.ProcessThread> { | |
33 | |
34 /*--------------------------------------------------------------*/ | |
35 /*---------------- Initialization ----------------*/ | |
36 /*--------------------------------------------------------------*/ | |
37 | |
38 /** | |
39 * Code entrance from the command line. | |
40 * @param args Command line arguments | |
41 */ | |
42 public static void main(String[] args){ | |
43 //Start a timer immediately upon code entrance. | |
44 Timer t=new Timer(); | |
45 | |
46 //Create an instance of this class | |
47 AllToAll x=new AllToAll(args); | |
48 | |
49 //Run the object | |
50 x.process(t); | |
51 | |
52 //Close the print stream if it was redirected | |
53 Shared.closeStream(x.outstream); | |
54 } | |
55 | |
56 /** | |
57 * Constructor. | |
58 * @param args Command line arguments | |
59 */ | |
60 public AllToAll(String[] args){ | |
61 | |
62 {//Preparse block for help, config files, and outstream | |
63 PreParser pp=new PreParser(args, getClass(), false); | |
64 args=pp.args; | |
65 outstream=pp.outstream; | |
66 } | |
67 | |
68 //Set shared static variables prior to parsing | |
69 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; | |
70 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); | |
71 | |
72 {//Parse the arguments | |
73 final Parser parser=parse(args); | |
74 Parser.processQuality(); | |
75 | |
76 maxReads=parser.maxReads; | |
77 overwrite=ReadStats.overwrite=parser.overwrite; | |
78 append=ReadStats.append=parser.append; | |
79 | |
80 in1=parser.in1; | |
81 qfin1=parser.qfin1; | |
82 extin=parser.extin; | |
83 | |
84 out1=parser.out1; | |
85 } | |
86 | |
87 validateParams(); | |
88 fixExtensions(); //Add or remove .gz or .bz2 as needed | |
89 checkFileExistence(); //Ensure files can be read and written | |
90 checkStatics(); //Adjust file-related static fields as needed for this program | |
91 | |
92 //Create output FileFormat objects | |
93 ffout1=FileFormat.testOutput(out1, FileFormat.TXT, null, true, overwrite, append, ordered); | |
94 | |
95 //Create input FileFormat objects | |
96 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true); | |
97 } | |
98 | |
99 /*--------------------------------------------------------------*/ | |
100 /*---------------- Initialization Helpers ----------------*/ | |
101 /*--------------------------------------------------------------*/ | |
102 | |
103 /** Parse arguments from the command line */ | |
104 private Parser parse(String[] args){ | |
105 | |
106 //Create a parser object | |
107 Parser parser=new Parser(); | |
108 parser.out1="stdout.txt"; | |
109 | |
110 //Set any necessary Parser defaults here | |
111 //parser.foo=bar; | |
112 | |
113 //Parse each argument | |
114 for(int i=0; i<args.length; i++){ | |
115 String arg=args[i]; | |
116 | |
117 //Break arguments into their constituent parts, in the form of "a=b" | |
118 String[] split=arg.split("="); | |
119 String a=split[0].toLowerCase(); | |
120 String b=split.length>1 ? split[1] : null; | |
121 if(b!=null && b.equalsIgnoreCase("null")){b=null;} | |
122 | |
123 if(a.equals("verbose")){ | |
124 verbose=Parse.parseBoolean(b); | |
125 }else if(a.equals("ordered")){ | |
126 ordered=Parse.parseBoolean(b); | |
127 }else if(a.equals("parse_flag_goes_here")){ | |
128 long fake_variable=Parse.parseKMG(b); | |
129 //Set a variable here | |
130 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser | |
131 //do nothing | |
132 }else{ | |
133 outstream.println("Unknown parameter "+args[i]); | |
134 assert(false) : "Unknown parameter "+args[i]; | |
135 } | |
136 } | |
137 | |
138 return parser; | |
139 } | |
140 | |
141 /** Add or remove .gz or .bz2 as needed */ | |
142 private void fixExtensions(){ | |
143 in1=Tools.fixExtension(in1); | |
144 qfin1=Tools.fixExtension(qfin1); | |
145 } | |
146 | |
147 /** Ensure files can be read and written */ | |
148 private void checkFileExistence(){ | |
149 | |
150 //Ensure output files can be written | |
151 if(!Tools.testOutputFiles(overwrite, append, false, out1)){ | |
152 outstream.println((out1==null)+", "+out1); | |
153 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out1+"\n"); | |
154 } | |
155 | |
156 //Ensure input files can be read | |
157 if(!Tools.testInputFiles(false, true, in1)){ | |
158 throw new RuntimeException("\nCan't read some input files.\n"); | |
159 } | |
160 | |
161 //Ensure that no file was specified multiple times | |
162 if(!Tools.testForDuplicateFiles(true, in1, out1)){ | |
163 throw new RuntimeException("\nSome file names were specified multiple times.\n"); | |
164 } | |
165 } | |
166 | |
167 /** Adjust file-related static fields as needed for this program */ | |
168 private static void checkStatics(){ | |
169 //Adjust the number of threads for input file reading | |
170 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ | |
171 ByteFile.FORCE_MODE_BF2=true; | |
172 } | |
173 | |
174 assert(FastaReadInputStream.settingsOK()); | |
175 } | |
176 | |
177 /** Ensure parameter ranges are within bounds and required parameters are set */ | |
178 private boolean validateParams(){ | |
179 //Ensure there is an input file | |
180 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} | |
181 return true; | |
182 } | |
183 | |
184 /*--------------------------------------------------------------*/ | |
185 /*---------------- Outer Methods ----------------*/ | |
186 /*--------------------------------------------------------------*/ | |
187 | |
188 /** Create read streams and process all reads */ | |
189 void process(Timer t){ | |
190 | |
191 //Turn off read validation in the input threads to increase speed | |
192 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; | |
193 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; | |
194 | |
195 //Reset counters | |
196 readsProcessed=alignments=0; | |
197 basesProcessed=0; | |
198 | |
199 //Fetch data | |
200 reads=ConcurrentReadInputStream.getReads(maxReads, true, ffin1, null, qfin1, null); //TODO: Note that this does not return the error state | |
201 results=new float[reads.size()][]; | |
202 | |
203 outstream.println("Loaded "+reads.size()+" sequences."); | |
204 | |
205 //Process the reads in separate threads | |
206 spawnThreads(); | |
207 mirrorMatrix(results); | |
208 if(verbose){outstream.println("Finished alignment.");} | |
209 | |
210 printResults(); | |
211 | |
212 //Reset read validation | |
213 Read.VALIDATE_IN_CONSTRUCTOR=vic; | |
214 | |
215 //Report timing and results | |
216 t.stop(); | |
217 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); | |
218 outstream.println(Tools.number("Alignments:", alignments, 8)); | |
219 | |
220 //Throw an exception of there was an error in a thread | |
221 if(errorState){ | |
222 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); | |
223 } | |
224 } | |
225 | |
226 private static void mirrorMatrix(float[][] matrix){ | |
227 for(int i=0; i<matrix.length; i++) { | |
228 for(int j=i; j<matrix.length; j++) { | |
229 assert(matrix[i][j]==0) : matrix[i][j]; | |
230 matrix[i][j]=(i==j ? 1 : matrix[j][i]); | |
231 } | |
232 } | |
233 } | |
234 | |
235 private void printResults(){ | |
236 if(ffout1==null){return;} | |
237 ByteStreamWriter bsw=new ByteStreamWriter(ffout1); | |
238 bsw.start(); | |
239 final int max=reads.size(); | |
240 bsw.print("Name"); | |
241 for(int rnum=0; rnum<max; rnum++){ | |
242 bsw.tab().print(reads.get(rnum).id); | |
243 } | |
244 bsw.println(); | |
245 for(int qnum=0; qnum<max; qnum++){ | |
246 bsw.print(reads.get(qnum).id); | |
247 final float[] scores=results[qnum]; | |
248 for(int rnum=0; rnum<max; rnum++){ | |
249 bsw.tab().print(100*scores[rnum], 2); | |
250 } | |
251 bsw.println(); | |
252 } | |
253 bsw.poisonAndWait(); | |
254 } | |
255 | |
256 /*--------------------------------------------------------------*/ | |
257 /*---------------- Thread Management ----------------*/ | |
258 /*--------------------------------------------------------------*/ | |
259 | |
260 /** Spawn process threads */ | |
261 private void spawnThreads(){ | |
262 | |
263 //Do anything necessary prior to processing | |
264 | |
265 //Determine how many threads may be used | |
266 final int threads=Shared.threads(); | |
267 | |
268 AtomicInteger atom=new AtomicInteger(0); | |
269 | |
270 //Fill a list with ProcessThreads | |
271 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads); | |
272 for(int i=0; i<threads; i++){ | |
273 alpt.add(new ProcessThread(reads, results, atom, i)); | |
274 } | |
275 | |
276 //Start the threads and wait for them to finish | |
277 boolean success=ThreadWaiter.startAndWait(alpt, this); | |
278 errorState&=!success; | |
279 | |
280 //Do anything necessary after processing | |
281 | |
282 } | |
283 | |
284 @Override | |
285 public final void accumulate(ProcessThread pt){ | |
286 readsProcessed+=pt.readsProcessedT; | |
287 basesProcessed+=pt.basesProcessedT; | |
288 alignments+=pt.alignmentsT; | |
289 errorState|=(!pt.success); | |
290 } | |
291 | |
292 @Override | |
293 public final boolean success(){return !errorState;} | |
294 | |
295 /*--------------------------------------------------------------*/ | |
296 /*---------------- Inner Methods ----------------*/ | |
297 /*--------------------------------------------------------------*/ | |
298 | |
299 /*--------------------------------------------------------------*/ | |
300 /*---------------- Inner Classes ----------------*/ | |
301 /*--------------------------------------------------------------*/ | |
302 | |
303 /** This class is static to prevent accidental writing to shared variables. | |
304 * It is safe to remove the static modifier. */ | |
305 static class ProcessThread extends Thread { | |
306 | |
307 //Constructor | |
308 ProcessThread(final ArrayList<Read> reads_, float[][] results_, final AtomicInteger atom_, final int tid_){ | |
309 reads=reads_; | |
310 results=results_; | |
311 atom=atom_; | |
312 tid=tid_; | |
313 } | |
314 | |
315 //Called by start() | |
316 @Override | |
317 public void run(){ | |
318 //Do anything necessary prior to processing | |
319 | |
320 //Process the reads | |
321 processInner(); | |
322 | |
323 //Do anything necessary after processing | |
324 | |
325 //Indicate successful exit status | |
326 success=true; | |
327 } | |
328 | |
329 /** Iterate through the reads */ | |
330 void processInner(){ | |
331 | |
332 for(int next=atom.getAndIncrement(); next<reads.size(); next=atom.getAndIncrement()){ | |
333 processQuery(next); | |
334 } | |
335 | |
336 } | |
337 | |
338 void processQuery(final int qnum){ | |
339 final Read query=reads.get(qnum); | |
340 final float[] scores=new float[reads.size()]; | |
341 readsProcessedT++; | |
342 basesProcessedT+=query.length(); | |
343 for(int rnum=0; rnum<qnum; rnum++){ | |
344 final Read ref=reads.get(rnum); | |
345 float identity=SketchObject.align(query.bases, ref.bases); | |
346 scores[rnum]=identity; | |
347 alignmentsT++; | |
348 } | |
349 synchronized(results){ | |
350 results[qnum]=scores; | |
351 } | |
352 } | |
353 | |
354 /** | |
355 * Process a read or a read pair. | |
356 * @param r1 Read 1 | |
357 * @param r2 Read 2 (may be null) | |
358 * @return True if the reads should be kept, false if they should be discarded. | |
359 */ | |
360 boolean processReadPair(final Read r1, final Read r2){ | |
361 throw new RuntimeException("TODO: Implement this method."); //TODO | |
362 // return true; | |
363 } | |
364 | |
365 /** Number of reads processed by this thread */ | |
366 protected long readsProcessedT=0; | |
367 /** Number of bases processed by this thread */ | |
368 protected long basesProcessedT=0; | |
369 | |
370 /** Number of reads retained by this thread */ | |
371 protected long alignmentsT=0; | |
372 /** Number of bases retained by this thread */ | |
373 protected long basesOutT=0; | |
374 | |
375 /** True only if this thread has completed successfully */ | |
376 boolean success=false; | |
377 | |
378 /** Thread ID */ | |
379 final int tid; | |
380 | |
381 final ArrayList<Read> reads; | |
382 final float[][] results; | |
383 final AtomicInteger atom; | |
384 } | |
385 | |
386 /*--------------------------------------------------------------*/ | |
387 /*---------------- Fields ----------------*/ | |
388 /*--------------------------------------------------------------*/ | |
389 | |
390 /** Primary input file path */ | |
391 private String in1=null; | |
392 | |
393 private String qfin1=null; | |
394 | |
395 /** Primary output file path */ | |
396 private String out1=null; | |
397 | |
398 /** Override input file extension */ | |
399 private String extin=null; | |
400 | |
401 /*--------------------------------------------------------------*/ | |
402 | |
403 ArrayList<Read> reads; | |
404 float[][] results; | |
405 | |
406 /** Number of reads processed */ | |
407 protected long readsProcessed=0; | |
408 /** Number of bases processed */ | |
409 protected long basesProcessed=0; | |
410 | |
411 /** Number of reads retained */ | |
412 protected long alignments=0; | |
413 | |
414 /** Quit after processing this many input reads; -1 means no limit */ | |
415 private long maxReads=-1; | |
416 | |
417 /*--------------------------------------------------------------*/ | |
418 /*---------------- Final Fields ----------------*/ | |
419 /*--------------------------------------------------------------*/ | |
420 | |
421 /** Primary input file */ | |
422 private final FileFormat ffin1; | |
423 | |
424 /** Primary output file */ | |
425 private final FileFormat ffout1; | |
426 | |
427 /*--------------------------------------------------------------*/ | |
428 /*---------------- Common Fields ----------------*/ | |
429 /*--------------------------------------------------------------*/ | |
430 | |
431 /** Print status messages to this output stream */ | |
432 private PrintStream outstream=System.err; | |
433 /** Print verbose messages */ | |
434 public static boolean verbose=false; | |
435 /** True if an error was encountered */ | |
436 public boolean errorState=false; | |
437 /** Overwrite existing output files */ | |
438 private boolean overwrite=false; | |
439 /** Append to existing output files */ | |
440 private boolean append=false; | |
441 /** Reads are output in input order */ | |
442 private boolean ordered=false; | |
443 | |
444 } |