jpayne@68: package prok; jpayne@68: jpayne@68: import java.io.File; jpayne@68: import java.io.PrintStream; jpayne@68: import java.util.ArrayList; jpayne@68: import java.util.PriorityQueue; jpayne@68: jpayne@68: import aligner.Alignment; jpayne@68: import dna.AminoAcid; jpayne@68: import fileIO.ByteFile; jpayne@68: import fileIO.FileFormat; jpayne@68: import fileIO.ReadWrite; jpayne@68: import shared.Parse; jpayne@68: import shared.Parser; jpayne@68: import shared.PreParser; jpayne@68: import shared.ReadStats; jpayne@68: import shared.Shared; jpayne@68: import shared.Timer; jpayne@68: import shared.Tools; jpayne@68: import stream.ConcurrentReadInputStream; jpayne@68: import stream.ConcurrentReadOutputStream; jpayne@68: import stream.FASTQ; jpayne@68: import stream.FastaReadInputStream; jpayne@68: import stream.Read; jpayne@68: import stream.ReadInputStream; jpayne@68: import structures.ListNum; jpayne@68: import structures.LongHashSet; jpayne@68: import template.Accumulator; jpayne@68: import template.ThreadWaiter; jpayne@68: jpayne@68: /** jpayne@68: * Makes a consensus ribosomal sequence using raw reads as input. jpayne@68: * jpayne@68: * @author Brian Bushnell jpayne@68: * @date October 10, 2019 jpayne@68: * jpayne@68: */ jpayne@68: public class RiboMaker implements Accumulator { jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Initialization ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** jpayne@68: * Code entrance from the command line. jpayne@68: * @param args Command line arguments jpayne@68: */ jpayne@68: public static void main(String[] args){ jpayne@68: assert(false) : "TODO"; jpayne@68: jpayne@68: //Start a timer immediately upon code entrance. jpayne@68: Timer t=new Timer(); jpayne@68: jpayne@68: //Create an instance of this class jpayne@68: RiboMaker x=new RiboMaker(args); jpayne@68: jpayne@68: //Run the object jpayne@68: x.process(t); jpayne@68: jpayne@68: //Close the print stream if it was redirected jpayne@68: Shared.closeStream(x.outstream); jpayne@68: } jpayne@68: jpayne@68: /** jpayne@68: * Constructor. jpayne@68: * @param args Command line arguments jpayne@68: */ jpayne@68: public RiboMaker(String[] args){ jpayne@68: jpayne@68: {//Preparse block for help, config files, and outstream jpayne@68: PreParser pp=new PreParser(args, getClass(), false); jpayne@68: args=pp.args; jpayne@68: outstream=pp.outstream; jpayne@68: } jpayne@68: jpayne@68: //Set shared static variables prior to parsing jpayne@68: ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; jpayne@68: ReadWrite.MAX_ZIP_THREADS=Shared.threads(); jpayne@68: jpayne@68: {//Parse the arguments jpayne@68: final Parser parser=parse(args); jpayne@68: Parser.processQuality(); jpayne@68: jpayne@68: maxReads=parser.maxReads; jpayne@68: overwrite=ReadStats.overwrite=parser.overwrite; jpayne@68: append=ReadStats.append=parser.append; jpayne@68: setInterleaved=parser.setInterleaved; jpayne@68: jpayne@68: in1=parser.in1; jpayne@68: in2=parser.in2; jpayne@68: qfin1=parser.qfin1; jpayne@68: qfin2=parser.qfin2; jpayne@68: extin=parser.extin; jpayne@68: jpayne@68: out1=parser.out1; jpayne@68: qfout1=parser.qfout1; jpayne@68: extout=parser.extout; jpayne@68: } jpayne@68: jpayne@68: validateParams(); jpayne@68: doPoundReplacement(); //Replace # with 1 and 2 jpayne@68: adjustInterleaving(); //Make sure interleaving agrees with number of input and output files jpayne@68: fixExtensions(); //Add or remove .gz or .bz2 as needed jpayne@68: checkFileExistence(); //Ensure files can be read and written jpayne@68: checkStatics(); //Adjust file-related static fields as needed for this program jpayne@68: jpayne@68: //Create output FileFormat objects jpayne@68: ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered); jpayne@68: jpayne@68: fffilter=FileFormat.testInput(filterFile, FileFormat.FASTA, null, true, true); jpayne@68: ffref=FileFormat.testInput(refFile, FileFormat.FASTA, null, true, true); jpayne@68: jpayne@68: //Create input FileFormat objects jpayne@68: ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true); jpayne@68: ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true); jpayne@68: jpayne@68: if(fffilter==null){ jpayne@68: filter=null; jpayne@68: }else{ jpayne@68: filter=loadFilter(fffilter, k); jpayne@68: } jpayne@68: loadRef(); jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Initialization Helpers ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Parse arguments from the command line */ jpayne@68: private Parser parse(String[] args){ jpayne@68: jpayne@68: //Create a parser object jpayne@68: Parser parser=new Parser(); jpayne@68: jpayne@68: //Set any necessary Parser defaults here jpayne@68: //parser.foo=bar; jpayne@68: jpayne@68: //Parse each argument jpayne@68: for(int i=0; i1 ? split[1] : null; jpayne@68: if(b!=null && b.equalsIgnoreCase("null")){b=null;} jpayne@68: jpayne@68: if(a.equals("verbose")){ jpayne@68: verbose=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("ordered")){ jpayne@68: ordered=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("filter")){ jpayne@68: filterFile=b; jpayne@68: }else if(a.equals("ref")){ jpayne@68: refFile=b; jpayne@68: }else if(a.equals("parse_flag_goes_here")){ jpayne@68: long fake_variable=Parse.parseKMG(b); jpayne@68: //Set a variable here jpayne@68: }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser jpayne@68: //do nothing jpayne@68: }else{ jpayne@68: outstream.println("Unknown parameter "+args[i]); jpayne@68: assert(false) : "Unknown parameter "+args[i]; jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: return parser; jpayne@68: } jpayne@68: jpayne@68: /** Replace # with 1 and 2 in headers */ jpayne@68: private void doPoundReplacement(){ jpayne@68: //Do input file # replacement jpayne@68: if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){ jpayne@68: in2=in1.replace("#", "2"); jpayne@68: in1=in1.replace("#", "1"); jpayne@68: } jpayne@68: jpayne@68: //Ensure there is an input file jpayne@68: if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} jpayne@68: } jpayne@68: jpayne@68: /** Add or remove .gz or .bz2 as needed */ jpayne@68: private void fixExtensions(){ jpayne@68: in1=Tools.fixExtension(in1); jpayne@68: in2=Tools.fixExtension(in2); jpayne@68: qfin1=Tools.fixExtension(qfin1); jpayne@68: qfin2=Tools.fixExtension(qfin2); jpayne@68: } jpayne@68: jpayne@68: /** Ensure files can be read and written */ jpayne@68: private void checkFileExistence(){ jpayne@68: //Ensure output files can be written jpayne@68: if(!Tools.testOutputFiles(overwrite, append, false, out1)){ jpayne@68: outstream.println((out1==null)+", "+out1); jpayne@68: throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+"\n"); jpayne@68: } jpayne@68: jpayne@68: //Ensure input files can be read jpayne@68: if(!Tools.testInputFiles(false, true, in1, in2, filterFile, refFile)){ jpayne@68: throw new RuntimeException("\nCan't read some input files.\n"); jpayne@68: } jpayne@68: jpayne@68: //Ensure that no file was specified multiple times jpayne@68: if(!Tools.testForDuplicateFiles(true, in1, in2, out1, filterFile, refFile)){ jpayne@68: throw new RuntimeException("\nSome file names were specified multiple times.\n"); jpayne@68: } jpayne@68: jpayne@68: assert(refFile!=null); jpayne@68: } jpayne@68: jpayne@68: /** Make sure interleaving agrees with number of input and output files */ jpayne@68: private void adjustInterleaving(){ jpayne@68: //Adjust interleaved detection based on the number of input files jpayne@68: if(in2!=null){ jpayne@68: if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");} jpayne@68: FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; jpayne@68: } jpayne@68: jpayne@68: //Adjust interleaved settings based on number of output files jpayne@68: if(!setInterleaved){ jpayne@68: assert(in1!=null) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\n"; jpayne@68: if(in2!=null){ //If there are 2 input streams. jpayne@68: FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; jpayne@68: outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: /** Adjust file-related static fields as needed for this program */ jpayne@68: private static void checkStatics(){ jpayne@68: //Adjust the number of threads for input file reading jpayne@68: if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ jpayne@68: ByteFile.FORCE_MODE_BF2=true; jpayne@68: } jpayne@68: jpayne@68: assert(FastaReadInputStream.settingsOK()); jpayne@68: } jpayne@68: jpayne@68: /** Ensure parameter ranges are within bounds and required parameters are set */ jpayne@68: private boolean validateParams(){ jpayne@68: // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo; jpayne@68: assert(false) : "TODO"; jpayne@68: return true; jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Outer Methods ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Create read streams and process all data */ jpayne@68: void process(Timer t){ jpayne@68: jpayne@68: //Turn off read validation in the input threads to increase speed jpayne@68: final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; jpayne@68: Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; jpayne@68: jpayne@68: //Create a read input stream jpayne@68: final ConcurrentReadInputStream cris=makeCris(); jpayne@68: jpayne@68: //Optionally create a read output stream jpayne@68: final ConcurrentReadOutputStream ros=makeCros(cris.paired()); jpayne@68: jpayne@68: //Reset counters jpayne@68: readsProcessed=readsOut=0; jpayne@68: basesProcessed=basesOut=0; jpayne@68: jpayne@68: //Process the reads in separate threads jpayne@68: spawnThreads(cris, ros); jpayne@68: jpayne@68: if(verbose){outstream.println("Finished; closing streams.");} jpayne@68: jpayne@68: //Write anything that was accumulated by ReadStats jpayne@68: errorState|=ReadStats.writeAll(); jpayne@68: //Close the read streams jpayne@68: errorState|=ReadWrite.closeStreams(cris, ros); jpayne@68: jpayne@68: //Reset read validation jpayne@68: Read.VALIDATE_IN_CONSTRUCTOR=vic; jpayne@68: jpayne@68: //Report timing and results jpayne@68: t.stop(); jpayne@68: outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); jpayne@68: outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false)); jpayne@68: jpayne@68: //Throw an exception of there was an error in a thread jpayne@68: if(errorState){ jpayne@68: throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private void loadRef(){ jpayne@68: ArrayList reads=ReadInputStream.toReads(ffref, -1); jpayne@68: ref0=reads.get(0).bases; jpayne@68: ref=new byte[ref0.length+2*padding]; jpayne@68: for(int i=0, j=-padding; i=0 && j(queueLen); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: public static LongHashSet loadFilter(FileFormat ff, int k){ jpayne@68: if(ff==null){return null;} jpayne@68: ArrayList reads=ReadInputStream.toReads(ff, -1); jpayne@68: if(reads==null || reads.size()==0){return null;} jpayne@68: LongHashSet set=new LongHashSet(4096); jpayne@68: jpayne@68: final int shift=2*k; jpayne@68: final int shift2=shift-2; jpayne@68: final long mask=(shift>63 ? -1L : ~((-1L)<>>2)|(x2<=0){ jpayne@68: len++; jpayne@68: if(len>=k){ jpayne@68: set.add(Tools.max(kmer, rkmer)); jpayne@68: } jpayne@68: }else{ jpayne@68: len=0; jpayne@68: kmer=rkmer=0; jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: return set; jpayne@68: } jpayne@68: jpayne@68: public static boolean passesFilter(Read r, int k, LongHashSet set){ jpayne@68: if(r==null) {return false;} jpayne@68: if(set==null){return true;} jpayne@68: jpayne@68: final int shift=2*k; jpayne@68: final int shift2=shift-2; jpayne@68: final long mask=(shift>63 ? -1L : ~((-1L)<>>2)|(x2<=0){ jpayne@68: len++; jpayne@68: if(len>=k){ jpayne@68: long key=Tools.max(kmer, rkmer); jpayne@68: if(set.contains(key)){return true;} jpayne@68: } jpayne@68: }else{ jpayne@68: len=0; jpayne@68: kmer=rkmer=0; jpayne@68: } jpayne@68: } jpayne@68: return false; jpayne@68: } jpayne@68: jpayne@68: private ConcurrentReadInputStream makeCris(){ jpayne@68: ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2); jpayne@68: cris.start(); //Start the stream jpayne@68: if(verbose){outstream.println("Started cris");} jpayne@68: boolean paired=cris.paired(); jpayne@68: if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));} jpayne@68: return cris; jpayne@68: } jpayne@68: jpayne@68: private ConcurrentReadOutputStream makeCros(boolean pairedInput){ jpayne@68: if(ffout1==null){return null;} jpayne@68: jpayne@68: //Select output buffer size based on whether it needs to be ordered jpayne@68: final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8); jpayne@68: jpayne@68: final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout1, null, qfout1, null, buff, null, false); jpayne@68: ros.start(); //Start the stream jpayne@68: return ros; jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Thread Management ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Spawn process threads */ jpayne@68: private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){ jpayne@68: jpayne@68: //Do anything necessary prior to processing jpayne@68: jpayne@68: //Determine how many threads may be used jpayne@68: final int threads=Shared.threads(); jpayne@68: jpayne@68: //Fill a list with ProcessThreads jpayne@68: ArrayList alpt=new ArrayList(threads); jpayne@68: for(int i=0; i q=queues[i]; jpayne@68: PriorityQueue qt=pt.queuesT[i]; jpayne@68: for(Alignment a : qt){ jpayne@68: addToQueue(a, q); jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: public final boolean success(){return !errorState;} jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Inner Methods ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: boolean addToQueue(Alignment best, PriorityQueue[] queues){ jpayne@68: int start=best.start; jpayne@68: int qnum=start/queueWidth; jpayne@68: PriorityQueue queue=queues[qnum]; jpayne@68: return addToQueue(best, queue); jpayne@68: } jpayne@68: jpayne@68: boolean addToQueue(Alignment best, PriorityQueue queue){ jpayne@68: if(queue.size()=0){return false;} jpayne@68: queue.poll(); jpayne@68: queue.add(best); jpayne@68: } jpayne@68: return true; jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Inner Classes ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** This class is static to prevent accidental writing to shared variables. jpayne@68: * It is safe to remove the static modifier. */ jpayne@68: class ProcessThread extends Thread { jpayne@68: jpayne@68: //Constructor jpayne@68: ProcessThread(final ConcurrentReadInputStream cris_, final int tid_){ jpayne@68: cris=cris_; jpayne@68: tid=tid_; jpayne@68: jpayne@68: queuesT=new PriorityQueue[1+ref.length/queueWidth]; jpayne@68: for(int i=0; i(queueLen); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: //Called by start() jpayne@68: @Override jpayne@68: public void run(){ jpayne@68: //Do anything necessary prior to processing jpayne@68: jpayne@68: //Process the reads jpayne@68: processInner(); jpayne@68: jpayne@68: //Do anything necessary after processing jpayne@68: jpayne@68: //Indicate successful exit status jpayne@68: success=true; jpayne@68: } jpayne@68: jpayne@68: /** Iterate through the reads */ jpayne@68: void processInner(){ jpayne@68: jpayne@68: //Grab the first ListNum of reads jpayne@68: ListNum ln=cris.nextList(); jpayne@68: jpayne@68: //Check to ensure pairing is as expected jpayne@68: if(ln!=null && !ln.isEmpty()){ jpayne@68: Read r=ln.get(0); jpayne@68: // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access jpayne@68: } jpayne@68: jpayne@68: //As long as there is a nonempty read list... jpayne@68: while(ln!=null && ln.size()>0){ jpayne@68: // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access jpayne@68: jpayne@68: processList(ln); jpayne@68: jpayne@68: //Notify the input stream that the list was used jpayne@68: cris.returnList(ln); jpayne@68: // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access jpayne@68: jpayne@68: //Fetch a new list jpayne@68: ln=cris.nextList(); jpayne@68: } jpayne@68: jpayne@68: //Notify the input stream that the final list was used jpayne@68: if(ln!=null){ jpayne@68: cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: void processList(ListNum ln){ jpayne@68: jpayne@68: //Grab the actual read list from the ListNum jpayne@68: final ArrayList reads=ln.list; jpayne@68: jpayne@68: //Loop through each read in the list jpayne@68: for(int idx=0; idx=minus.id){ jpayne@68: r.reverseComplement(); jpayne@68: best=plus; jpayne@68: }else{ jpayne@68: best=minus; jpayne@68: } jpayne@68: if(best.id[] queuesT; jpayne@68: jpayne@68: /** Shared input stream */ jpayne@68: private final ConcurrentReadInputStream cris; jpayne@68: /** Thread ID */ jpayne@68: final int tid; jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Fields ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Primary input file path */ jpayne@68: private String in1=null; jpayne@68: /** Secondary input file path */ jpayne@68: private String in2=null; jpayne@68: jpayne@68: private String qfin1=null; jpayne@68: private String qfin2=null; jpayne@68: jpayne@68: /** Primary output file path */ jpayne@68: private String out1=null; jpayne@68: jpayne@68: private String qfout1=null; jpayne@68: jpayne@68: private String filterFile; jpayne@68: private String refFile; jpayne@68: jpayne@68: /** Override input file extension */ jpayne@68: private String extin=null; jpayne@68: /** Override output file extension */ jpayne@68: private String extout=null; jpayne@68: jpayne@68: /** Whether interleaved was explicitly set. */ jpayne@68: private boolean setInterleaved=false; jpayne@68: jpayne@68: /** Original ref */ jpayne@68: private byte[] ref0; jpayne@68: /** Padded ref */ jpayne@68: private byte[] ref; jpayne@68: jpayne@68: private int padding=100; jpayne@68: private int queueLen=20; jpayne@68: private int queueWidth=20; jpayne@68: private float minID=0.4f; jpayne@68: jpayne@68: private PriorityQueue[] queues; jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Number of reads processed */ jpayne@68: protected long readsProcessed=0; jpayne@68: /** Number of bases processed */ jpayne@68: protected long basesProcessed=0; jpayne@68: jpayne@68: /** Number of reads retained */ jpayne@68: protected long readsOut=0; jpayne@68: /** Number of bases retained */ jpayne@68: protected long basesOut=0; jpayne@68: jpayne@68: /** Quit after processing this many input reads; -1 means no limit */ jpayne@68: private long maxReads=-1; jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Final Fields ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Primary input file */ jpayne@68: private final FileFormat ffin1; jpayne@68: /** Secondary input file */ jpayne@68: private final FileFormat ffin2; jpayne@68: /** Filter input file */ jpayne@68: private final FileFormat fffilter; jpayne@68: /** Ref input file */ jpayne@68: private final FileFormat ffref; jpayne@68: jpayne@68: /** Primary output file */ jpayne@68: private final FileFormat ffout1; jpayne@68: jpayne@68: private final LongHashSet filter; jpayne@68: private final int k=31; jpayne@68: jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Common Fields ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Print status messages to this output stream */ jpayne@68: private PrintStream outstream=System.err; jpayne@68: /** Print verbose messages */ jpayne@68: public static boolean verbose=false; jpayne@68: /** True if an error was encountered */ jpayne@68: public boolean errorState=false; jpayne@68: /** Overwrite existing output files */ jpayne@68: private boolean overwrite=false; jpayne@68: /** Append to existing output files */ jpayne@68: private boolean append=false; jpayne@68: /** Reads are output in input order */ jpayne@68: private boolean ordered=false; jpayne@68: jpayne@68: }