jpayne@68: package aligner; jpayne@68: jpayne@68: import java.io.PrintStream; jpayne@68: import java.util.ArrayList; jpayne@68: import java.util.concurrent.atomic.AtomicInteger; jpayne@68: jpayne@68: import fileIO.ByteFile; jpayne@68: import fileIO.ByteStreamWriter; jpayne@68: import fileIO.FileFormat; jpayne@68: import fileIO.ReadWrite; jpayne@68: import shared.Parse; jpayne@68: import shared.Parser; jpayne@68: import shared.PreParser; jpayne@68: import shared.ReadStats; jpayne@68: import shared.Shared; jpayne@68: import shared.Timer; jpayne@68: import shared.Tools; jpayne@68: import sketch.SketchObject; jpayne@68: import stream.ConcurrentReadInputStream; jpayne@68: import stream.FastaReadInputStream; jpayne@68: import stream.Read; jpayne@68: import template.Accumulator; jpayne@68: import template.ThreadWaiter; jpayne@68: jpayne@68: /** jpayne@68: * Aligns all sequences to all sequences and produces an identity matrix. jpayne@68: * jpayne@68: * @author Brian Bushnell jpayne@68: * @date January 27, 2020 jpayne@68: * jpayne@68: */ jpayne@68: public class AllToAll implements Accumulator { jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Initialization ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** jpayne@68: * Code entrance from the command line. jpayne@68: * @param args Command line arguments jpayne@68: */ jpayne@68: public static void main(String[] args){ jpayne@68: //Start a timer immediately upon code entrance. jpayne@68: Timer t=new Timer(); jpayne@68: jpayne@68: //Create an instance of this class jpayne@68: AllToAll x=new AllToAll(args); jpayne@68: jpayne@68: //Run the object jpayne@68: x.process(t); jpayne@68: jpayne@68: //Close the print stream if it was redirected jpayne@68: Shared.closeStream(x.outstream); jpayne@68: } jpayne@68: jpayne@68: /** jpayne@68: * Constructor. jpayne@68: * @param args Command line arguments jpayne@68: */ jpayne@68: public AllToAll(String[] args){ jpayne@68: jpayne@68: {//Preparse block for help, config files, and outstream jpayne@68: PreParser pp=new PreParser(args, getClass(), false); jpayne@68: args=pp.args; jpayne@68: outstream=pp.outstream; jpayne@68: } jpayne@68: jpayne@68: //Set shared static variables prior to parsing jpayne@68: ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; jpayne@68: ReadWrite.MAX_ZIP_THREADS=Shared.threads(); jpayne@68: jpayne@68: {//Parse the arguments jpayne@68: final Parser parser=parse(args); jpayne@68: Parser.processQuality(); jpayne@68: jpayne@68: maxReads=parser.maxReads; jpayne@68: overwrite=ReadStats.overwrite=parser.overwrite; jpayne@68: append=ReadStats.append=parser.append; jpayne@68: jpayne@68: in1=parser.in1; jpayne@68: qfin1=parser.qfin1; jpayne@68: extin=parser.extin; jpayne@68: jpayne@68: out1=parser.out1; jpayne@68: } jpayne@68: jpayne@68: validateParams(); jpayne@68: fixExtensions(); //Add or remove .gz or .bz2 as needed jpayne@68: checkFileExistence(); //Ensure files can be read and written jpayne@68: checkStatics(); //Adjust file-related static fields as needed for this program jpayne@68: jpayne@68: //Create output FileFormat objects jpayne@68: ffout1=FileFormat.testOutput(out1, FileFormat.TXT, null, true, overwrite, append, ordered); jpayne@68: jpayne@68: //Create input FileFormat objects jpayne@68: ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true); jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Initialization Helpers ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Parse arguments from the command line */ jpayne@68: private Parser parse(String[] args){ jpayne@68: jpayne@68: //Create a parser object jpayne@68: Parser parser=new Parser(); jpayne@68: parser.out1="stdout.txt"; jpayne@68: jpayne@68: //Set any necessary Parser defaults here jpayne@68: //parser.foo=bar; jpayne@68: jpayne@68: //Parse each argument jpayne@68: for(int i=0; i1 ? split[1] : null; jpayne@68: if(b!=null && b.equalsIgnoreCase("null")){b=null;} jpayne@68: jpayne@68: if(a.equals("verbose")){ jpayne@68: verbose=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("ordered")){ jpayne@68: ordered=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("parse_flag_goes_here")){ jpayne@68: long fake_variable=Parse.parseKMG(b); jpayne@68: //Set a variable here jpayne@68: }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser jpayne@68: //do nothing jpayne@68: }else{ jpayne@68: outstream.println("Unknown parameter "+args[i]); jpayne@68: assert(false) : "Unknown parameter "+args[i]; jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: return parser; jpayne@68: } jpayne@68: jpayne@68: /** Add or remove .gz or .bz2 as needed */ jpayne@68: private void fixExtensions(){ jpayne@68: in1=Tools.fixExtension(in1); jpayne@68: qfin1=Tools.fixExtension(qfin1); jpayne@68: } jpayne@68: jpayne@68: /** Ensure files can be read and written */ jpayne@68: private void checkFileExistence(){ jpayne@68: jpayne@68: //Ensure output files can be written jpayne@68: if(!Tools.testOutputFiles(overwrite, append, false, out1)){ jpayne@68: outstream.println((out1==null)+", "+out1); jpayne@68: throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out1+"\n"); jpayne@68: } jpayne@68: jpayne@68: //Ensure input files can be read jpayne@68: if(!Tools.testInputFiles(false, true, in1)){ jpayne@68: throw new RuntimeException("\nCan't read some input files.\n"); jpayne@68: } jpayne@68: jpayne@68: //Ensure that no file was specified multiple times jpayne@68: if(!Tools.testForDuplicateFiles(true, in1, out1)){ jpayne@68: throw new RuntimeException("\nSome file names were specified multiple times.\n"); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: /** Adjust file-related static fields as needed for this program */ jpayne@68: private static void checkStatics(){ jpayne@68: //Adjust the number of threads for input file reading jpayne@68: if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ jpayne@68: ByteFile.FORCE_MODE_BF2=true; jpayne@68: } jpayne@68: jpayne@68: assert(FastaReadInputStream.settingsOK()); jpayne@68: } jpayne@68: jpayne@68: /** Ensure parameter ranges are within bounds and required parameters are set */ jpayne@68: private boolean validateParams(){ jpayne@68: //Ensure there is an input file jpayne@68: if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} jpayne@68: return true; jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Outer Methods ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Create read streams and process all reads */ jpayne@68: void process(Timer t){ jpayne@68: jpayne@68: //Turn off read validation in the input threads to increase speed jpayne@68: final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; jpayne@68: Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; jpayne@68: jpayne@68: //Reset counters jpayne@68: readsProcessed=alignments=0; jpayne@68: basesProcessed=0; jpayne@68: jpayne@68: //Fetch data jpayne@68: reads=ConcurrentReadInputStream.getReads(maxReads, true, ffin1, null, qfin1, null); //TODO: Note that this does not return the error state jpayne@68: results=new float[reads.size()][]; jpayne@68: jpayne@68: outstream.println("Loaded "+reads.size()+" sequences."); jpayne@68: jpayne@68: //Process the reads in separate threads jpayne@68: spawnThreads(); jpayne@68: mirrorMatrix(results); jpayne@68: if(verbose){outstream.println("Finished alignment.");} jpayne@68: jpayne@68: printResults(); jpayne@68: jpayne@68: //Reset read validation jpayne@68: Read.VALIDATE_IN_CONSTRUCTOR=vic; jpayne@68: jpayne@68: //Report timing and results jpayne@68: t.stop(); jpayne@68: outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); jpayne@68: outstream.println(Tools.number("Alignments:", alignments, 8)); jpayne@68: jpayne@68: //Throw an exception of there was an error in a thread jpayne@68: if(errorState){ jpayne@68: throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private static void mirrorMatrix(float[][] matrix){ jpayne@68: for(int i=0; i alpt=new ArrayList(threads); jpayne@68: for(int i=0; i reads_, float[][] results_, final AtomicInteger atom_, final int tid_){ jpayne@68: reads=reads_; jpayne@68: results=results_; jpayne@68: atom=atom_; jpayne@68: tid=tid_; jpayne@68: } jpayne@68: jpayne@68: //Called by start() jpayne@68: @Override jpayne@68: public void run(){ jpayne@68: //Do anything necessary prior to processing jpayne@68: jpayne@68: //Process the reads jpayne@68: processInner(); jpayne@68: jpayne@68: //Do anything necessary after processing jpayne@68: jpayne@68: //Indicate successful exit status jpayne@68: success=true; jpayne@68: } jpayne@68: jpayne@68: /** Iterate through the reads */ jpayne@68: void processInner(){ jpayne@68: jpayne@68: for(int next=atom.getAndIncrement(); next reads; jpayne@68: final float[][] results; jpayne@68: final AtomicInteger atom; jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Fields ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Primary input file path */ jpayne@68: private String in1=null; jpayne@68: jpayne@68: private String qfin1=null; jpayne@68: jpayne@68: /** Primary output file path */ jpayne@68: private String out1=null; jpayne@68: jpayne@68: /** Override input file extension */ jpayne@68: private String extin=null; jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: ArrayList reads; jpayne@68: float[][] results; jpayne@68: jpayne@68: /** Number of reads processed */ jpayne@68: protected long readsProcessed=0; jpayne@68: /** Number of bases processed */ jpayne@68: protected long basesProcessed=0; jpayne@68: jpayne@68: /** Number of reads retained */ jpayne@68: protected long alignments=0; jpayne@68: jpayne@68: /** Quit after processing this many input reads; -1 means no limit */ jpayne@68: private long maxReads=-1; jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Final Fields ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Primary input file */ jpayne@68: private final FileFormat ffin1; jpayne@68: jpayne@68: /** Primary output file */ jpayne@68: private final FileFormat ffout1; jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Common Fields ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Print status messages to this output stream */ jpayne@68: private PrintStream outstream=System.err; jpayne@68: /** Print verbose messages */ jpayne@68: public static boolean verbose=false; jpayne@68: /** True if an error was encountered */ jpayne@68: public boolean errorState=false; jpayne@68: /** Overwrite existing output files */ jpayne@68: private boolean overwrite=false; jpayne@68: /** Append to existing output files */ jpayne@68: private boolean append=false; jpayne@68: /** Reads are output in input order */ jpayne@68: private boolean ordered=false; jpayne@68: jpayne@68: }