comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/aligner/AllToAll.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 package aligner;
2
3 import java.io.PrintStream;
4 import java.util.ArrayList;
5 import java.util.concurrent.atomic.AtomicInteger;
6
7 import fileIO.ByteFile;
8 import fileIO.ByteStreamWriter;
9 import fileIO.FileFormat;
10 import fileIO.ReadWrite;
11 import shared.Parse;
12 import shared.Parser;
13 import shared.PreParser;
14 import shared.ReadStats;
15 import shared.Shared;
16 import shared.Timer;
17 import shared.Tools;
18 import sketch.SketchObject;
19 import stream.ConcurrentReadInputStream;
20 import stream.FastaReadInputStream;
21 import stream.Read;
22 import template.Accumulator;
23 import template.ThreadWaiter;
24
25 /**
26 * Aligns all sequences to all sequences and produces an identity matrix.
27 *
28 * @author Brian Bushnell
29 * @date January 27, 2020
30 *
31 */
32 public class AllToAll implements Accumulator<AllToAll.ProcessThread> {
33
34 /*--------------------------------------------------------------*/
35 /*---------------- Initialization ----------------*/
36 /*--------------------------------------------------------------*/
37
38 /**
39 * Code entrance from the command line.
40 * @param args Command line arguments
41 */
42 public static void main(String[] args){
43 //Start a timer immediately upon code entrance.
44 Timer t=new Timer();
45
46 //Create an instance of this class
47 AllToAll x=new AllToAll(args);
48
49 //Run the object
50 x.process(t);
51
52 //Close the print stream if it was redirected
53 Shared.closeStream(x.outstream);
54 }
55
56 /**
57 * Constructor.
58 * @param args Command line arguments
59 */
60 public AllToAll(String[] args){
61
62 {//Preparse block for help, config files, and outstream
63 PreParser pp=new PreParser(args, getClass(), false);
64 args=pp.args;
65 outstream=pp.outstream;
66 }
67
68 //Set shared static variables prior to parsing
69 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
70 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
71
72 {//Parse the arguments
73 final Parser parser=parse(args);
74 Parser.processQuality();
75
76 maxReads=parser.maxReads;
77 overwrite=ReadStats.overwrite=parser.overwrite;
78 append=ReadStats.append=parser.append;
79
80 in1=parser.in1;
81 qfin1=parser.qfin1;
82 extin=parser.extin;
83
84 out1=parser.out1;
85 }
86
87 validateParams();
88 fixExtensions(); //Add or remove .gz or .bz2 as needed
89 checkFileExistence(); //Ensure files can be read and written
90 checkStatics(); //Adjust file-related static fields as needed for this program
91
92 //Create output FileFormat objects
93 ffout1=FileFormat.testOutput(out1, FileFormat.TXT, null, true, overwrite, append, ordered);
94
95 //Create input FileFormat objects
96 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
97 }
98
99 /*--------------------------------------------------------------*/
100 /*---------------- Initialization Helpers ----------------*/
101 /*--------------------------------------------------------------*/
102
103 /** Parse arguments from the command line */
104 private Parser parse(String[] args){
105
106 //Create a parser object
107 Parser parser=new Parser();
108 parser.out1="stdout.txt";
109
110 //Set any necessary Parser defaults here
111 //parser.foo=bar;
112
113 //Parse each argument
114 for(int i=0; i<args.length; i++){
115 String arg=args[i];
116
117 //Break arguments into their constituent parts, in the form of "a=b"
118 String[] split=arg.split("=");
119 String a=split[0].toLowerCase();
120 String b=split.length>1 ? split[1] : null;
121 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
122
123 if(a.equals("verbose")){
124 verbose=Parse.parseBoolean(b);
125 }else if(a.equals("ordered")){
126 ordered=Parse.parseBoolean(b);
127 }else if(a.equals("parse_flag_goes_here")){
128 long fake_variable=Parse.parseKMG(b);
129 //Set a variable here
130 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
131 //do nothing
132 }else{
133 outstream.println("Unknown parameter "+args[i]);
134 assert(false) : "Unknown parameter "+args[i];
135 }
136 }
137
138 return parser;
139 }
140
141 /** Add or remove .gz or .bz2 as needed */
142 private void fixExtensions(){
143 in1=Tools.fixExtension(in1);
144 qfin1=Tools.fixExtension(qfin1);
145 }
146
147 /** Ensure files can be read and written */
148 private void checkFileExistence(){
149
150 //Ensure output files can be written
151 if(!Tools.testOutputFiles(overwrite, append, false, out1)){
152 outstream.println((out1==null)+", "+out1);
153 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out1+"\n");
154 }
155
156 //Ensure input files can be read
157 if(!Tools.testInputFiles(false, true, in1)){
158 throw new RuntimeException("\nCan't read some input files.\n");
159 }
160
161 //Ensure that no file was specified multiple times
162 if(!Tools.testForDuplicateFiles(true, in1, out1)){
163 throw new RuntimeException("\nSome file names were specified multiple times.\n");
164 }
165 }
166
167 /** Adjust file-related static fields as needed for this program */
168 private static void checkStatics(){
169 //Adjust the number of threads for input file reading
170 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
171 ByteFile.FORCE_MODE_BF2=true;
172 }
173
174 assert(FastaReadInputStream.settingsOK());
175 }
176
177 /** Ensure parameter ranges are within bounds and required parameters are set */
178 private boolean validateParams(){
179 //Ensure there is an input file
180 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
181 return true;
182 }
183
184 /*--------------------------------------------------------------*/
185 /*---------------- Outer Methods ----------------*/
186 /*--------------------------------------------------------------*/
187
188 /** Create read streams and process all reads */
189 void process(Timer t){
190
191 //Turn off read validation in the input threads to increase speed
192 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
193 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
194
195 //Reset counters
196 readsProcessed=alignments=0;
197 basesProcessed=0;
198
199 //Fetch data
200 reads=ConcurrentReadInputStream.getReads(maxReads, true, ffin1, null, qfin1, null); //TODO: Note that this does not return the error state
201 results=new float[reads.size()][];
202
203 outstream.println("Loaded "+reads.size()+" sequences.");
204
205 //Process the reads in separate threads
206 spawnThreads();
207 mirrorMatrix(results);
208 if(verbose){outstream.println("Finished alignment.");}
209
210 printResults();
211
212 //Reset read validation
213 Read.VALIDATE_IN_CONSTRUCTOR=vic;
214
215 //Report timing and results
216 t.stop();
217 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
218 outstream.println(Tools.number("Alignments:", alignments, 8));
219
220 //Throw an exception of there was an error in a thread
221 if(errorState){
222 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
223 }
224 }
225
226 private static void mirrorMatrix(float[][] matrix){
227 for(int i=0; i<matrix.length; i++) {
228 for(int j=i; j<matrix.length; j++) {
229 assert(matrix[i][j]==0) : matrix[i][j];
230 matrix[i][j]=(i==j ? 1 : matrix[j][i]);
231 }
232 }
233 }
234
235 private void printResults(){
236 if(ffout1==null){return;}
237 ByteStreamWriter bsw=new ByteStreamWriter(ffout1);
238 bsw.start();
239 final int max=reads.size();
240 bsw.print("Name");
241 for(int rnum=0; rnum<max; rnum++){
242 bsw.tab().print(reads.get(rnum).id);
243 }
244 bsw.println();
245 for(int qnum=0; qnum<max; qnum++){
246 bsw.print(reads.get(qnum).id);
247 final float[] scores=results[qnum];
248 for(int rnum=0; rnum<max; rnum++){
249 bsw.tab().print(100*scores[rnum], 2);
250 }
251 bsw.println();
252 }
253 bsw.poisonAndWait();
254 }
255
256 /*--------------------------------------------------------------*/
257 /*---------------- Thread Management ----------------*/
258 /*--------------------------------------------------------------*/
259
260 /** Spawn process threads */
261 private void spawnThreads(){
262
263 //Do anything necessary prior to processing
264
265 //Determine how many threads may be used
266 final int threads=Shared.threads();
267
268 AtomicInteger atom=new AtomicInteger(0);
269
270 //Fill a list with ProcessThreads
271 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
272 for(int i=0; i<threads; i++){
273 alpt.add(new ProcessThread(reads, results, atom, i));
274 }
275
276 //Start the threads and wait for them to finish
277 boolean success=ThreadWaiter.startAndWait(alpt, this);
278 errorState&=!success;
279
280 //Do anything necessary after processing
281
282 }
283
284 @Override
285 public final void accumulate(ProcessThread pt){
286 readsProcessed+=pt.readsProcessedT;
287 basesProcessed+=pt.basesProcessedT;
288 alignments+=pt.alignmentsT;
289 errorState|=(!pt.success);
290 }
291
292 @Override
293 public final boolean success(){return !errorState;}
294
295 /*--------------------------------------------------------------*/
296 /*---------------- Inner Methods ----------------*/
297 /*--------------------------------------------------------------*/
298
299 /*--------------------------------------------------------------*/
300 /*---------------- Inner Classes ----------------*/
301 /*--------------------------------------------------------------*/
302
303 /** This class is static to prevent accidental writing to shared variables.
304 * It is safe to remove the static modifier. */
305 static class ProcessThread extends Thread {
306
307 //Constructor
308 ProcessThread(final ArrayList<Read> reads_, float[][] results_, final AtomicInteger atom_, final int tid_){
309 reads=reads_;
310 results=results_;
311 atom=atom_;
312 tid=tid_;
313 }
314
315 //Called by start()
316 @Override
317 public void run(){
318 //Do anything necessary prior to processing
319
320 //Process the reads
321 processInner();
322
323 //Do anything necessary after processing
324
325 //Indicate successful exit status
326 success=true;
327 }
328
329 /** Iterate through the reads */
330 void processInner(){
331
332 for(int next=atom.getAndIncrement(); next<reads.size(); next=atom.getAndIncrement()){
333 processQuery(next);
334 }
335
336 }
337
338 void processQuery(final int qnum){
339 final Read query=reads.get(qnum);
340 final float[] scores=new float[reads.size()];
341 readsProcessedT++;
342 basesProcessedT+=query.length();
343 for(int rnum=0; rnum<qnum; rnum++){
344 final Read ref=reads.get(rnum);
345 float identity=SketchObject.align(query.bases, ref.bases);
346 scores[rnum]=identity;
347 alignmentsT++;
348 }
349 synchronized(results){
350 results[qnum]=scores;
351 }
352 }
353
354 /**
355 * Process a read or a read pair.
356 * @param r1 Read 1
357 * @param r2 Read 2 (may be null)
358 * @return True if the reads should be kept, false if they should be discarded.
359 */
360 boolean processReadPair(final Read r1, final Read r2){
361 throw new RuntimeException("TODO: Implement this method."); //TODO
362 // return true;
363 }
364
365 /** Number of reads processed by this thread */
366 protected long readsProcessedT=0;
367 /** Number of bases processed by this thread */
368 protected long basesProcessedT=0;
369
370 /** Number of reads retained by this thread */
371 protected long alignmentsT=0;
372 /** Number of bases retained by this thread */
373 protected long basesOutT=0;
374
375 /** True only if this thread has completed successfully */
376 boolean success=false;
377
378 /** Thread ID */
379 final int tid;
380
381 final ArrayList<Read> reads;
382 final float[][] results;
383 final AtomicInteger atom;
384 }
385
386 /*--------------------------------------------------------------*/
387 /*---------------- Fields ----------------*/
388 /*--------------------------------------------------------------*/
389
390 /** Primary input file path */
391 private String in1=null;
392
393 private String qfin1=null;
394
395 /** Primary output file path */
396 private String out1=null;
397
398 /** Override input file extension */
399 private String extin=null;
400
401 /*--------------------------------------------------------------*/
402
403 ArrayList<Read> reads;
404 float[][] results;
405
406 /** Number of reads processed */
407 protected long readsProcessed=0;
408 /** Number of bases processed */
409 protected long basesProcessed=0;
410
411 /** Number of reads retained */
412 protected long alignments=0;
413
414 /** Quit after processing this many input reads; -1 means no limit */
415 private long maxReads=-1;
416
417 /*--------------------------------------------------------------*/
418 /*---------------- Final Fields ----------------*/
419 /*--------------------------------------------------------------*/
420
421 /** Primary input file */
422 private final FileFormat ffin1;
423
424 /** Primary output file */
425 private final FileFormat ffout1;
426
427 /*--------------------------------------------------------------*/
428 /*---------------- Common Fields ----------------*/
429 /*--------------------------------------------------------------*/
430
431 /** Print status messages to this output stream */
432 private PrintStream outstream=System.err;
433 /** Print verbose messages */
434 public static boolean verbose=false;
435 /** True if an error was encountered */
436 public boolean errorState=false;
437 /** Overwrite existing output files */
438 private boolean overwrite=false;
439 /** Append to existing output files */
440 private boolean append=false;
441 /** Reads are output in input order */
442 private boolean ordered=false;
443
444 }