annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/aligner/AllToAll.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 package aligner;
jpayne@68 2
jpayne@68 3 import java.io.PrintStream;
jpayne@68 4 import java.util.ArrayList;
jpayne@68 5 import java.util.concurrent.atomic.AtomicInteger;
jpayne@68 6
jpayne@68 7 import fileIO.ByteFile;
jpayne@68 8 import fileIO.ByteStreamWriter;
jpayne@68 9 import fileIO.FileFormat;
jpayne@68 10 import fileIO.ReadWrite;
jpayne@68 11 import shared.Parse;
jpayne@68 12 import shared.Parser;
jpayne@68 13 import shared.PreParser;
jpayne@68 14 import shared.ReadStats;
jpayne@68 15 import shared.Shared;
jpayne@68 16 import shared.Timer;
jpayne@68 17 import shared.Tools;
jpayne@68 18 import sketch.SketchObject;
jpayne@68 19 import stream.ConcurrentReadInputStream;
jpayne@68 20 import stream.FastaReadInputStream;
jpayne@68 21 import stream.Read;
jpayne@68 22 import template.Accumulator;
jpayne@68 23 import template.ThreadWaiter;
jpayne@68 24
jpayne@68 25 /**
jpayne@68 26 * Aligns all sequences to all sequences and produces an identity matrix.
jpayne@68 27 *
jpayne@68 28 * @author Brian Bushnell
jpayne@68 29 * @date January 27, 2020
jpayne@68 30 *
jpayne@68 31 */
jpayne@68 32 public class AllToAll implements Accumulator<AllToAll.ProcessThread> {
jpayne@68 33
jpayne@68 34 /*--------------------------------------------------------------*/
jpayne@68 35 /*---------------- Initialization ----------------*/
jpayne@68 36 /*--------------------------------------------------------------*/
jpayne@68 37
jpayne@68 38 /**
jpayne@68 39 * Code entrance from the command line.
jpayne@68 40 * @param args Command line arguments
jpayne@68 41 */
jpayne@68 42 public static void main(String[] args){
jpayne@68 43 //Start a timer immediately upon code entrance.
jpayne@68 44 Timer t=new Timer();
jpayne@68 45
jpayne@68 46 //Create an instance of this class
jpayne@68 47 AllToAll x=new AllToAll(args);
jpayne@68 48
jpayne@68 49 //Run the object
jpayne@68 50 x.process(t);
jpayne@68 51
jpayne@68 52 //Close the print stream if it was redirected
jpayne@68 53 Shared.closeStream(x.outstream);
jpayne@68 54 }
jpayne@68 55
jpayne@68 56 /**
jpayne@68 57 * Constructor.
jpayne@68 58 * @param args Command line arguments
jpayne@68 59 */
jpayne@68 60 public AllToAll(String[] args){
jpayne@68 61
jpayne@68 62 {//Preparse block for help, config files, and outstream
jpayne@68 63 PreParser pp=new PreParser(args, getClass(), false);
jpayne@68 64 args=pp.args;
jpayne@68 65 outstream=pp.outstream;
jpayne@68 66 }
jpayne@68 67
jpayne@68 68 //Set shared static variables prior to parsing
jpayne@68 69 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
jpayne@68 70 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
jpayne@68 71
jpayne@68 72 {//Parse the arguments
jpayne@68 73 final Parser parser=parse(args);
jpayne@68 74 Parser.processQuality();
jpayne@68 75
jpayne@68 76 maxReads=parser.maxReads;
jpayne@68 77 overwrite=ReadStats.overwrite=parser.overwrite;
jpayne@68 78 append=ReadStats.append=parser.append;
jpayne@68 79
jpayne@68 80 in1=parser.in1;
jpayne@68 81 qfin1=parser.qfin1;
jpayne@68 82 extin=parser.extin;
jpayne@68 83
jpayne@68 84 out1=parser.out1;
jpayne@68 85 }
jpayne@68 86
jpayne@68 87 validateParams();
jpayne@68 88 fixExtensions(); //Add or remove .gz or .bz2 as needed
jpayne@68 89 checkFileExistence(); //Ensure files can be read and written
jpayne@68 90 checkStatics(); //Adjust file-related static fields as needed for this program
jpayne@68 91
jpayne@68 92 //Create output FileFormat objects
jpayne@68 93 ffout1=FileFormat.testOutput(out1, FileFormat.TXT, null, true, overwrite, append, ordered);
jpayne@68 94
jpayne@68 95 //Create input FileFormat objects
jpayne@68 96 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
jpayne@68 97 }
jpayne@68 98
jpayne@68 99 /*--------------------------------------------------------------*/
jpayne@68 100 /*---------------- Initialization Helpers ----------------*/
jpayne@68 101 /*--------------------------------------------------------------*/
jpayne@68 102
jpayne@68 103 /** Parse arguments from the command line */
jpayne@68 104 private Parser parse(String[] args){
jpayne@68 105
jpayne@68 106 //Create a parser object
jpayne@68 107 Parser parser=new Parser();
jpayne@68 108 parser.out1="stdout.txt";
jpayne@68 109
jpayne@68 110 //Set any necessary Parser defaults here
jpayne@68 111 //parser.foo=bar;
jpayne@68 112
jpayne@68 113 //Parse each argument
jpayne@68 114 for(int i=0; i<args.length; i++){
jpayne@68 115 String arg=args[i];
jpayne@68 116
jpayne@68 117 //Break arguments into their constituent parts, in the form of "a=b"
jpayne@68 118 String[] split=arg.split("=");
jpayne@68 119 String a=split[0].toLowerCase();
jpayne@68 120 String b=split.length>1 ? split[1] : null;
jpayne@68 121 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
jpayne@68 122
jpayne@68 123 if(a.equals("verbose")){
jpayne@68 124 verbose=Parse.parseBoolean(b);
jpayne@68 125 }else if(a.equals("ordered")){
jpayne@68 126 ordered=Parse.parseBoolean(b);
jpayne@68 127 }else if(a.equals("parse_flag_goes_here")){
jpayne@68 128 long fake_variable=Parse.parseKMG(b);
jpayne@68 129 //Set a variable here
jpayne@68 130 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
jpayne@68 131 //do nothing
jpayne@68 132 }else{
jpayne@68 133 outstream.println("Unknown parameter "+args[i]);
jpayne@68 134 assert(false) : "Unknown parameter "+args[i];
jpayne@68 135 }
jpayne@68 136 }
jpayne@68 137
jpayne@68 138 return parser;
jpayne@68 139 }
jpayne@68 140
jpayne@68 141 /** Add or remove .gz or .bz2 as needed */
jpayne@68 142 private void fixExtensions(){
jpayne@68 143 in1=Tools.fixExtension(in1);
jpayne@68 144 qfin1=Tools.fixExtension(qfin1);
jpayne@68 145 }
jpayne@68 146
jpayne@68 147 /** Ensure files can be read and written */
jpayne@68 148 private void checkFileExistence(){
jpayne@68 149
jpayne@68 150 //Ensure output files can be written
jpayne@68 151 if(!Tools.testOutputFiles(overwrite, append, false, out1)){
jpayne@68 152 outstream.println((out1==null)+", "+out1);
jpayne@68 153 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out1+"\n");
jpayne@68 154 }
jpayne@68 155
jpayne@68 156 //Ensure input files can be read
jpayne@68 157 if(!Tools.testInputFiles(false, true, in1)){
jpayne@68 158 throw new RuntimeException("\nCan't read some input files.\n");
jpayne@68 159 }
jpayne@68 160
jpayne@68 161 //Ensure that no file was specified multiple times
jpayne@68 162 if(!Tools.testForDuplicateFiles(true, in1, out1)){
jpayne@68 163 throw new RuntimeException("\nSome file names were specified multiple times.\n");
jpayne@68 164 }
jpayne@68 165 }
jpayne@68 166
jpayne@68 167 /** Adjust file-related static fields as needed for this program */
jpayne@68 168 private static void checkStatics(){
jpayne@68 169 //Adjust the number of threads for input file reading
jpayne@68 170 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
jpayne@68 171 ByteFile.FORCE_MODE_BF2=true;
jpayne@68 172 }
jpayne@68 173
jpayne@68 174 assert(FastaReadInputStream.settingsOK());
jpayne@68 175 }
jpayne@68 176
jpayne@68 177 /** Ensure parameter ranges are within bounds and required parameters are set */
jpayne@68 178 private boolean validateParams(){
jpayne@68 179 //Ensure there is an input file
jpayne@68 180 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
jpayne@68 181 return true;
jpayne@68 182 }
jpayne@68 183
jpayne@68 184 /*--------------------------------------------------------------*/
jpayne@68 185 /*---------------- Outer Methods ----------------*/
jpayne@68 186 /*--------------------------------------------------------------*/
jpayne@68 187
jpayne@68 188 /** Create read streams and process all reads */
jpayne@68 189 void process(Timer t){
jpayne@68 190
jpayne@68 191 //Turn off read validation in the input threads to increase speed
jpayne@68 192 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
jpayne@68 193 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
jpayne@68 194
jpayne@68 195 //Reset counters
jpayne@68 196 readsProcessed=alignments=0;
jpayne@68 197 basesProcessed=0;
jpayne@68 198
jpayne@68 199 //Fetch data
jpayne@68 200 reads=ConcurrentReadInputStream.getReads(maxReads, true, ffin1, null, qfin1, null); //TODO: Note that this does not return the error state
jpayne@68 201 results=new float[reads.size()][];
jpayne@68 202
jpayne@68 203 outstream.println("Loaded "+reads.size()+" sequences.");
jpayne@68 204
jpayne@68 205 //Process the reads in separate threads
jpayne@68 206 spawnThreads();
jpayne@68 207 mirrorMatrix(results);
jpayne@68 208 if(verbose){outstream.println("Finished alignment.");}
jpayne@68 209
jpayne@68 210 printResults();
jpayne@68 211
jpayne@68 212 //Reset read validation
jpayne@68 213 Read.VALIDATE_IN_CONSTRUCTOR=vic;
jpayne@68 214
jpayne@68 215 //Report timing and results
jpayne@68 216 t.stop();
jpayne@68 217 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
jpayne@68 218 outstream.println(Tools.number("Alignments:", alignments, 8));
jpayne@68 219
jpayne@68 220 //Throw an exception of there was an error in a thread
jpayne@68 221 if(errorState){
jpayne@68 222 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
jpayne@68 223 }
jpayne@68 224 }
jpayne@68 225
jpayne@68 226 private static void mirrorMatrix(float[][] matrix){
jpayne@68 227 for(int i=0; i<matrix.length; i++) {
jpayne@68 228 for(int j=i; j<matrix.length; j++) {
jpayne@68 229 assert(matrix[i][j]==0) : matrix[i][j];
jpayne@68 230 matrix[i][j]=(i==j ? 1 : matrix[j][i]);
jpayne@68 231 }
jpayne@68 232 }
jpayne@68 233 }
jpayne@68 234
jpayne@68 235 private void printResults(){
jpayne@68 236 if(ffout1==null){return;}
jpayne@68 237 ByteStreamWriter bsw=new ByteStreamWriter(ffout1);
jpayne@68 238 bsw.start();
jpayne@68 239 final int max=reads.size();
jpayne@68 240 bsw.print("Name");
jpayne@68 241 for(int rnum=0; rnum<max; rnum++){
jpayne@68 242 bsw.tab().print(reads.get(rnum).id);
jpayne@68 243 }
jpayne@68 244 bsw.println();
jpayne@68 245 for(int qnum=0; qnum<max; qnum++){
jpayne@68 246 bsw.print(reads.get(qnum).id);
jpayne@68 247 final float[] scores=results[qnum];
jpayne@68 248 for(int rnum=0; rnum<max; rnum++){
jpayne@68 249 bsw.tab().print(100*scores[rnum], 2);
jpayne@68 250 }
jpayne@68 251 bsw.println();
jpayne@68 252 }
jpayne@68 253 bsw.poisonAndWait();
jpayne@68 254 }
jpayne@68 255
jpayne@68 256 /*--------------------------------------------------------------*/
jpayne@68 257 /*---------------- Thread Management ----------------*/
jpayne@68 258 /*--------------------------------------------------------------*/
jpayne@68 259
jpayne@68 260 /** Spawn process threads */
jpayne@68 261 private void spawnThreads(){
jpayne@68 262
jpayne@68 263 //Do anything necessary prior to processing
jpayne@68 264
jpayne@68 265 //Determine how many threads may be used
jpayne@68 266 final int threads=Shared.threads();
jpayne@68 267
jpayne@68 268 AtomicInteger atom=new AtomicInteger(0);
jpayne@68 269
jpayne@68 270 //Fill a list with ProcessThreads
jpayne@68 271 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
jpayne@68 272 for(int i=0; i<threads; i++){
jpayne@68 273 alpt.add(new ProcessThread(reads, results, atom, i));
jpayne@68 274 }
jpayne@68 275
jpayne@68 276 //Start the threads and wait for them to finish
jpayne@68 277 boolean success=ThreadWaiter.startAndWait(alpt, this);
jpayne@68 278 errorState&=!success;
jpayne@68 279
jpayne@68 280 //Do anything necessary after processing
jpayne@68 281
jpayne@68 282 }
jpayne@68 283
jpayne@68 284 @Override
jpayne@68 285 public final void accumulate(ProcessThread pt){
jpayne@68 286 readsProcessed+=pt.readsProcessedT;
jpayne@68 287 basesProcessed+=pt.basesProcessedT;
jpayne@68 288 alignments+=pt.alignmentsT;
jpayne@68 289 errorState|=(!pt.success);
jpayne@68 290 }
jpayne@68 291
jpayne@68 292 @Override
jpayne@68 293 public final boolean success(){return !errorState;}
jpayne@68 294
jpayne@68 295 /*--------------------------------------------------------------*/
jpayne@68 296 /*---------------- Inner Methods ----------------*/
jpayne@68 297 /*--------------------------------------------------------------*/
jpayne@68 298
jpayne@68 299 /*--------------------------------------------------------------*/
jpayne@68 300 /*---------------- Inner Classes ----------------*/
jpayne@68 301 /*--------------------------------------------------------------*/
jpayne@68 302
jpayne@68 303 /** This class is static to prevent accidental writing to shared variables.
jpayne@68 304 * It is safe to remove the static modifier. */
jpayne@68 305 static class ProcessThread extends Thread {
jpayne@68 306
jpayne@68 307 //Constructor
jpayne@68 308 ProcessThread(final ArrayList<Read> reads_, float[][] results_, final AtomicInteger atom_, final int tid_){
jpayne@68 309 reads=reads_;
jpayne@68 310 results=results_;
jpayne@68 311 atom=atom_;
jpayne@68 312 tid=tid_;
jpayne@68 313 }
jpayne@68 314
jpayne@68 315 //Called by start()
jpayne@68 316 @Override
jpayne@68 317 public void run(){
jpayne@68 318 //Do anything necessary prior to processing
jpayne@68 319
jpayne@68 320 //Process the reads
jpayne@68 321 processInner();
jpayne@68 322
jpayne@68 323 //Do anything necessary after processing
jpayne@68 324
jpayne@68 325 //Indicate successful exit status
jpayne@68 326 success=true;
jpayne@68 327 }
jpayne@68 328
jpayne@68 329 /** Iterate through the reads */
jpayne@68 330 void processInner(){
jpayne@68 331
jpayne@68 332 for(int next=atom.getAndIncrement(); next<reads.size(); next=atom.getAndIncrement()){
jpayne@68 333 processQuery(next);
jpayne@68 334 }
jpayne@68 335
jpayne@68 336 }
jpayne@68 337
jpayne@68 338 void processQuery(final int qnum){
jpayne@68 339 final Read query=reads.get(qnum);
jpayne@68 340 final float[] scores=new float[reads.size()];
jpayne@68 341 readsProcessedT++;
jpayne@68 342 basesProcessedT+=query.length();
jpayne@68 343 for(int rnum=0; rnum<qnum; rnum++){
jpayne@68 344 final Read ref=reads.get(rnum);
jpayne@68 345 float identity=SketchObject.align(query.bases, ref.bases);
jpayne@68 346 scores[rnum]=identity;
jpayne@68 347 alignmentsT++;
jpayne@68 348 }
jpayne@68 349 synchronized(results){
jpayne@68 350 results[qnum]=scores;
jpayne@68 351 }
jpayne@68 352 }
jpayne@68 353
jpayne@68 354 /**
jpayne@68 355 * Process a read or a read pair.
jpayne@68 356 * @param r1 Read 1
jpayne@68 357 * @param r2 Read 2 (may be null)
jpayne@68 358 * @return True if the reads should be kept, false if they should be discarded.
jpayne@68 359 */
jpayne@68 360 boolean processReadPair(final Read r1, final Read r2){
jpayne@68 361 throw new RuntimeException("TODO: Implement this method."); //TODO
jpayne@68 362 // return true;
jpayne@68 363 }
jpayne@68 364
jpayne@68 365 /** Number of reads processed by this thread */
jpayne@68 366 protected long readsProcessedT=0;
jpayne@68 367 /** Number of bases processed by this thread */
jpayne@68 368 protected long basesProcessedT=0;
jpayne@68 369
jpayne@68 370 /** Number of reads retained by this thread */
jpayne@68 371 protected long alignmentsT=0;
jpayne@68 372 /** Number of bases retained by this thread */
jpayne@68 373 protected long basesOutT=0;
jpayne@68 374
jpayne@68 375 /** True only if this thread has completed successfully */
jpayne@68 376 boolean success=false;
jpayne@68 377
jpayne@68 378 /** Thread ID */
jpayne@68 379 final int tid;
jpayne@68 380
jpayne@68 381 final ArrayList<Read> reads;
jpayne@68 382 final float[][] results;
jpayne@68 383 final AtomicInteger atom;
jpayne@68 384 }
jpayne@68 385
jpayne@68 386 /*--------------------------------------------------------------*/
jpayne@68 387 /*---------------- Fields ----------------*/
jpayne@68 388 /*--------------------------------------------------------------*/
jpayne@68 389
jpayne@68 390 /** Primary input file path */
jpayne@68 391 private String in1=null;
jpayne@68 392
jpayne@68 393 private String qfin1=null;
jpayne@68 394
jpayne@68 395 /** Primary output file path */
jpayne@68 396 private String out1=null;
jpayne@68 397
jpayne@68 398 /** Override input file extension */
jpayne@68 399 private String extin=null;
jpayne@68 400
jpayne@68 401 /*--------------------------------------------------------------*/
jpayne@68 402
jpayne@68 403 ArrayList<Read> reads;
jpayne@68 404 float[][] results;
jpayne@68 405
jpayne@68 406 /** Number of reads processed */
jpayne@68 407 protected long readsProcessed=0;
jpayne@68 408 /** Number of bases processed */
jpayne@68 409 protected long basesProcessed=0;
jpayne@68 410
jpayne@68 411 /** Number of reads retained */
jpayne@68 412 protected long alignments=0;
jpayne@68 413
jpayne@68 414 /** Quit after processing this many input reads; -1 means no limit */
jpayne@68 415 private long maxReads=-1;
jpayne@68 416
jpayne@68 417 /*--------------------------------------------------------------*/
jpayne@68 418 /*---------------- Final Fields ----------------*/
jpayne@68 419 /*--------------------------------------------------------------*/
jpayne@68 420
jpayne@68 421 /** Primary input file */
jpayne@68 422 private final FileFormat ffin1;
jpayne@68 423
jpayne@68 424 /** Primary output file */
jpayne@68 425 private final FileFormat ffout1;
jpayne@68 426
jpayne@68 427 /*--------------------------------------------------------------*/
jpayne@68 428 /*---------------- Common Fields ----------------*/
jpayne@68 429 /*--------------------------------------------------------------*/
jpayne@68 430
jpayne@68 431 /** Print status messages to this output stream */
jpayne@68 432 private PrintStream outstream=System.err;
jpayne@68 433 /** Print verbose messages */
jpayne@68 434 public static boolean verbose=false;
jpayne@68 435 /** True if an error was encountered */
jpayne@68 436 public boolean errorState=false;
jpayne@68 437 /** Overwrite existing output files */
jpayne@68 438 private boolean overwrite=false;
jpayne@68 439 /** Append to existing output files */
jpayne@68 440 private boolean append=false;
jpayne@68 441 /** Reads are output in input order */
jpayne@68 442 private boolean ordered=false;
jpayne@68 443
jpayne@68 444 }