jpayne@68: package prok; jpayne@68: jpayne@68: import java.io.File; jpayne@68: import java.io.PrintStream; jpayne@68: import java.util.ArrayList; jpayne@68: import java.util.Collections; jpayne@68: import java.util.Comparator; jpayne@68: import java.util.HashMap; jpayne@68: import java.util.Map.Entry; jpayne@68: import java.util.concurrent.ConcurrentLinkedQueue; jpayne@68: jpayne@68: import aligner.SingleStateAlignerFlat2; jpayne@68: import consensus.BaseGraph; jpayne@68: import fileIO.ByteFile; jpayne@68: import fileIO.FileFormat; jpayne@68: import fileIO.ReadWrite; jpayne@68: import shared.Parse; jpayne@68: import shared.Parser; jpayne@68: import shared.PreParser; jpayne@68: import shared.ReadStats; jpayne@68: import shared.Shared; jpayne@68: import shared.Timer; jpayne@68: import shared.Tools; jpayne@68: import stream.ConcurrentReadInputStream; jpayne@68: import stream.ConcurrentReadOutputStream; jpayne@68: import stream.FASTQ; jpayne@68: import stream.FastaReadInputStream; jpayne@68: import stream.Read; jpayne@68: import structures.IntHashSet; jpayne@68: import structures.ListNum; jpayne@68: import tax.GiToTaxid; jpayne@68: import template.Accumulator; jpayne@68: import template.ThreadWaiter; jpayne@68: jpayne@68: /** jpayne@68: * Picks one ribosomal (16S) sequence per taxID. jpayne@68: * jpayne@68: * @author Brian Bushnell jpayne@68: * @date November 19, 2015 jpayne@68: * jpayne@68: */ jpayne@68: public class MergeRibo implements Accumulator { jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Initialization ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** jpayne@68: * Code entrance from the command line. jpayne@68: * @param args Command line arguments jpayne@68: */ jpayne@68: public static void main(String[] args){ jpayne@68: //Start a timer immediately upon code entrance. jpayne@68: Timer t=new Timer(); jpayne@68: jpayne@68: //Create an instance of this class jpayne@68: MergeRibo x=new MergeRibo(args); jpayne@68: jpayne@68: //Run the object jpayne@68: x.process(t); jpayne@68: jpayne@68: //Close the print stream if it was redirected jpayne@68: Shared.closeStream(x.outstream); jpayne@68: } jpayne@68: jpayne@68: /** jpayne@68: * Constructor. jpayne@68: * @param args Command line arguments jpayne@68: */ jpayne@68: public MergeRibo(String[] args){ jpayne@68: jpayne@68: {//Preparse block for help, config files, and outstream jpayne@68: PreParser pp=new PreParser(args, getClass(), false); jpayne@68: args=pp.args; jpayne@68: outstream=pp.outstream; jpayne@68: } jpayne@68: jpayne@68: //Set shared static variables prior to parsing jpayne@68: ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; jpayne@68: ReadWrite.MAX_ZIP_THREADS=Shared.threads(); jpayne@68: // Shared.capBufferLen(40);//This does not help; the slowness comes from unevenness in list length during pickBest. jpayne@68: //To fix it, long lists should be sorted to be first. jpayne@68: jpayne@68: BaseGraph.MAF_sub=0.251f; jpayne@68: BaseGraph.MAF_del=0.0f; jpayne@68: BaseGraph.MAF_ins=0.0f; jpayne@68: BaseGraph.MAF_noref=0.0f; jpayne@68: BaseGraph.trimDepthFraction=0.3f; jpayne@68: BaseGraph.trimNs=true; jpayne@68: jpayne@68: {//Parse the arguments jpayne@68: final Parser parser=parse(args); jpayne@68: Parser.processQuality(); jpayne@68: jpayne@68: maxReads=parser.maxReads; jpayne@68: overwrite=ReadStats.overwrite=parser.overwrite; jpayne@68: append=ReadStats.append=parser.append; jpayne@68: jpayne@68: extin=parser.extin; jpayne@68: jpayne@68: out1=parser.out1; jpayne@68: extout=parser.extout; jpayne@68: } jpayne@68: jpayne@68: validateParams(); jpayne@68: adjustInterleaving(); //Make sure interleaving agrees with number of input and output files jpayne@68: checkFileExistence(); //Ensure files can be read and written jpayne@68: checkStatics(); //Adjust file-related static fields as needed for this program jpayne@68: jpayne@68: //Create output FileFormat objects jpayne@68: ffout1=FileFormat.testOutput(out1, FileFormat.FASTA, extout, true, overwrite, append, ordered); jpayne@68: jpayne@68: //Create input FileFormat objects jpayne@68: ffin=new ArrayList(in.size()); jpayne@68: ffalt=FileFormat.testInput(alt, FileFormat.FASTA, extin, true, true); jpayne@68: for(String s : in){ jpayne@68: FileFormat ff=FileFormat.testInput(s, FileFormat.FASTA, extin, true, true); jpayne@68: ffin.add(ff); jpayne@68: } jpayne@68: jpayne@68: //Determine how many threads may be used jpayne@68: threads=Shared.threads(); jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Initialization Helpers ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Parse arguments from the command line */ jpayne@68: private Parser parse(String[] args){ jpayne@68: jpayne@68: //Create a parser object jpayne@68: Parser parser=new Parser(); jpayne@68: jpayne@68: //Set any necessary Parser defaults here jpayne@68: //parser.foo=bar; jpayne@68: jpayne@68: //Parse each argument jpayne@68: for(int i=0; i1 ? split[1] : null; jpayne@68: if(b!=null && b.equalsIgnoreCase("null")){b=null;} jpayne@68: jpayne@68: if(a.equals("verbose")){ jpayne@68: verbose=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("ordered")){ jpayne@68: ordered=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("consensus")){ jpayne@68: useConsensus=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("best")){ jpayne@68: useConsensus=!Parse.parseBoolean(b); jpayne@68: }else if(a.equals("fast")){ jpayne@68: fast=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("minid")){ jpayne@68: minID=Float.parseFloat(b); jpayne@68: }else if(a.equals("maxns")){ jpayne@68: maxns=Integer.parseInt(b); jpayne@68: }else if(a.equals("minlen")){ jpayne@68: minlen=Integer.parseInt(b); jpayne@68: }else if(a.equals("maxlen")){ jpayne@68: maxlen=Integer.parseInt(b); jpayne@68: }else if(a.equals("in")){ jpayne@68: Tools.addFiles(b, in); jpayne@68: }else if(a.equals("alt")){ jpayne@68: alt=b; jpayne@68: }else if(a.equalsIgnoreCase("process16S") || a.equalsIgnoreCase("16S")){ jpayne@68: process16S=Parse.parseBoolean(b); jpayne@68: process18S=!process16S; jpayne@68: }else if(a.equalsIgnoreCase("process18S") || a.equalsIgnoreCase("18S")){ jpayne@68: process18S=Parse.parseBoolean(b); jpayne@68: process16S=!process18S; jpayne@68: }else if(a.equals("parse_flag_goes_here")){ jpayne@68: long fake_variable=Parse.parseKMG(b); jpayne@68: //Set a variable here jpayne@68: }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser jpayne@68: //do nothing jpayne@68: }else if(b==null && new File(arg).exists()){ jpayne@68: in.add(arg); jpayne@68: }else{ jpayne@68: outstream.println("Unknown parameter "+args[i]); jpayne@68: assert(false) : "Unknown parameter "+args[i]; jpayne@68: } jpayne@68: } jpayne@68: assert(!in.isEmpty()) : "No input file."; jpayne@68: return parser; jpayne@68: } jpayne@68: jpayne@68: /** Ensure files can be read and written */ jpayne@68: private void checkFileExistence(){ jpayne@68: //Ensure output files can be written jpayne@68: if(!Tools.testOutputFiles(overwrite, append, false, out1)){ jpayne@68: outstream.println((out1==null)+", "+out1); jpayne@68: throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+"\n"); jpayne@68: } jpayne@68: jpayne@68: //Ensure input files can be read jpayne@68: if(!Tools.testInputFiles(false, true, in)){ jpayne@68: throw new RuntimeException("\nCan't read some input files.\n"); jpayne@68: } jpayne@68: jpayne@68: // //Ensure that no file was specified multiple times jpayne@68: // if(!Tools.testForDuplicateFiles(true, out1, in.toArray(new String[0]))){ jpayne@68: // throw new RuntimeException("\nSome file names were specified multiple times.\n"); jpayne@68: // } jpayne@68: } jpayne@68: jpayne@68: /** Make sure interleaving agrees with number of input and output files */ jpayne@68: private void adjustInterleaving(){ jpayne@68: FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; jpayne@68: } jpayne@68: jpayne@68: /** Adjust file-related static fields as needed for this program */ jpayne@68: private static void checkStatics(){ jpayne@68: //Adjust the number of threads for input file reading jpayne@68: if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ jpayne@68: ByteFile.FORCE_MODE_BF2=true; jpayne@68: } jpayne@68: jpayne@68: assert(FastaReadInputStream.settingsOK()); jpayne@68: } jpayne@68: jpayne@68: /** Ensure parameter ranges are within bounds and required parameters are set */ jpayne@68: private boolean validateParams(){ jpayne@68: // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo; jpayne@68: // assert(false) : "TODO"; jpayne@68: assert(process16S || process18S) : "16S or 18S must be selected."; jpayne@68: assert(!process16S || !process18S) : "16S or 18S are both selected; only one may be active."; jpayne@68: return true; jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Outer Methods ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Create read streams and process all data */ jpayne@68: void process(Timer t){ jpayne@68: jpayne@68: if(process16S){ jpayne@68: Read[] data=ProkObject.loadConsensusSequenceType("16S", true, true); jpayne@68: consensus16S=data[0].bases; jpayne@68: if(verbose){System.err.println("process16S: Loaded 16S consensus, length "+consensus16S.length+": "+new String(consensus16S));} jpayne@68: } jpayne@68: if(process18S){ jpayne@68: Read[] data=ProkObject.loadConsensusSequenceType("18S", true, true); jpayne@68: consensus18S=data[0].bases; jpayne@68: if(verbose){System.err.println("process18S: Loaded 18S consensus, length "+consensus18S.length+": "+new String(consensus18S));} jpayne@68: } jpayne@68: jpayne@68: //Turn off read validation in the input threads to increase speed jpayne@68: final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; jpayne@68: Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; jpayne@68: jpayne@68: //Reset counters jpayne@68: readsProcessed=readsOut=0; jpayne@68: basesProcessed=basesOut=0; jpayne@68: jpayne@68: //Align everything to global consensus jpayne@68: for(FileFormat ff : ffin) { jpayne@68: //Create a read input stream jpayne@68: final ConcurrentReadInputStream cris=makeCris(ff); jpayne@68: jpayne@68: //Process the reads in separate threads jpayne@68: spawnThreads(cris, false); jpayne@68: errorState|=ReadWrite.closeStream(cris); jpayne@68: } jpayne@68: jpayne@68: if(ffalt!=null){ jpayne@68: //Create a read input stream jpayne@68: final ConcurrentReadInputStream cris=makeCris(ffalt); jpayne@68: jpayne@68: //Process the reads in separate threads jpayne@68: spawnThreads(cris, true); jpayne@68: errorState|=ReadWrite.closeStream(cris); jpayne@68: } jpayne@68: jpayne@68: // queue=new ConcurrentLinkedQueue>(); jpayne@68: // for(Entry> e : listMap.entrySet()){ jpayne@68: // queue.add(e.getValue()); jpayne@68: // } jpayne@68: // listMap=null; jpayne@68: queue=makeQueue(); jpayne@68: jpayne@68: //Run a second pass to pick the best SSU per taxID jpayne@68: spawnThreads(null, false); jpayne@68: jpayne@68: //Do anything necessary after processing jpayne@68: if(ffout1!=null){ jpayne@68: //Optionally create a read output stream jpayne@68: final ConcurrentReadOutputStream ros=makeCros(); jpayne@68: long num=0; jpayne@68: for(Ribo ribo : bestList){ jpayne@68: Read r=ribo.r; jpayne@68: readsOut++; jpayne@68: basesOut+=r.length(); jpayne@68: ArrayList list=new ArrayList(1); jpayne@68: list.add(r); jpayne@68: ros.add(list, num); jpayne@68: num++; jpayne@68: } jpayne@68: //Close the read streams jpayne@68: errorState|=ReadWrite.closeStream(ros); jpayne@68: } jpayne@68: jpayne@68: if(verbose){outstream.println("Finished; closing streams.");} jpayne@68: jpayne@68: //Write anything that was accumulated by ReadStats jpayne@68: errorState|=ReadStats.writeAll(); jpayne@68: jpayne@68: //Reset read validation jpayne@68: Read.VALIDATE_IN_CONSTRUCTOR=vic; jpayne@68: jpayne@68: //Report timing and results jpayne@68: t.stop(); jpayne@68: outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); jpayne@68: outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false)); jpayne@68: jpayne@68: //Throw an exception of there was an error in a thread jpayne@68: if(errorState){ jpayne@68: throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private ConcurrentLinkedQueue> makeQueue(){ jpayne@68: ArrayList> listList=new ArrayList>(listMap.size()); jpayne@68: for(Entry> e : listMap.entrySet()){ jpayne@68: listList.add(e.getValue()); jpayne@68: } jpayne@68: listMap=null; jpayne@68: Collections.sort(listList, new ListComparator()); jpayne@68: assert(listList.isEmpty() || listList.get(0).size()>=listList.get(listList.size()-1).size()); jpayne@68: ConcurrentLinkedQueue> q=new ConcurrentLinkedQueue>(); jpayne@68: for(ArrayList x : listList){ jpayne@68: q.add(x); jpayne@68: } jpayne@68: return q; jpayne@68: } jpayne@68: jpayne@68: private ConcurrentReadInputStream makeCris(FileFormat ff){ jpayne@68: ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ff, null); jpayne@68: cris.start(); //Start the stream jpayne@68: if(verbose){outstream.println("Started cris");} jpayne@68: boolean paired=cris.paired(); jpayne@68: assert(!paired) : "This should not be paired input."; jpayne@68: return cris; jpayne@68: } jpayne@68: jpayne@68: private ConcurrentReadOutputStream makeCros(){ jpayne@68: if(ffout1==null){return null;} jpayne@68: jpayne@68: //Select output buffer size based on whether it needs to be ordered jpayne@68: final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8); jpayne@68: jpayne@68: final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout1, null, buff, null, false); jpayne@68: ros.start(); //Start the stream jpayne@68: return ros; jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Thread Management ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Spawn process threads */ jpayne@68: private void spawnThreads(final ConcurrentReadInputStream cris, boolean altData){ jpayne@68: jpayne@68: //Do anything necessary prior to processing jpayne@68: jpayne@68: //Fill a list with ProcessThreads jpayne@68: if(verbose){System.err.println("Spawning "+threads+" threads.");} jpayne@68: ArrayList alpt=new ArrayList(threads); jpayne@68: for(int i=0; i ln){ jpayne@68: if(verbose && tid==0){System.err.println("processInput() for tid="+tid);} jpayne@68: jpayne@68: //Grab the actual read list from the ListNum jpayne@68: final ArrayList reads=ln.list; jpayne@68: jpayne@68: //Loop through each read in the list jpayne@68: for(int idx=0; idxmaxlen){return;} jpayne@68: if(maxns>=0 && r.countNocalls()>maxns){return;} jpayne@68: Integer key=GiToTaxid.parseTaxidNumber(r.id, '|'); jpayne@68: if(verbose && tid==0){System.err.println("key="+key);} jpayne@68: if(key==null || key==-1 || (altData && seenTaxID.contains(key))){return;} jpayne@68: float id=align(r); jpayne@68: if(id list=listMap.get(key); jpayne@68: if(list==null){ jpayne@68: list=new ArrayList(8); jpayne@68: listMap.put(key, list); jpayne@68: } jpayne@68: list.add(ribo); jpayne@68: if(!altData){seenTaxID.add(key);} jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: float align(Read r){ jpayne@68: float a=(process16S ? align(r.bases, consensus16S) : 0); jpayne@68: float b=(process18S ? align(r.bases, consensus18S) : 0); jpayne@68: if(verbose && tid==0){System.err.println("Aligned; a="+a+", b="+b);} jpayne@68: return Tools.max(a, b); jpayne@68: } jpayne@68: jpayne@68: float align(byte[] query, byte[] ref){ jpayne@68: int a=0, b=ref.length-1; jpayne@68: int[] max=ssa.fillUnlimited(query, ref, a, b, -9999); jpayne@68: if(max==null){return 0;} jpayne@68: jpayne@68: final int rows=max[0]; jpayne@68: final int maxCol=max[1]; jpayne@68: final int maxState=max[2]; jpayne@68: final float id=ssa.tracebackIdentity(query, ref, a, b, rows, maxCol, maxState, null); jpayne@68: return id; jpayne@68: } jpayne@68: jpayne@68: SingleStateAlignerFlat2 ssa=new SingleStateAlignerFlat2(); jpayne@68: jpayne@68: /** Number of reads processed by this thread */ jpayne@68: protected long readsProcessedT=0; jpayne@68: /** Number of bases processed by this thread */ jpayne@68: protected long basesProcessedT=0; jpayne@68: jpayne@68: /** True only if this thread has completed successfully */ jpayne@68: boolean success=false; jpayne@68: jpayne@68: /** Shared input stream */ jpayne@68: private final ConcurrentReadInputStream cris; jpayne@68: /** Thread ID */ jpayne@68: final int tid; jpayne@68: jpayne@68: //Run mode jpayne@68: final boolean processInput; jpayne@68: final boolean altData; jpayne@68: } jpayne@68: jpayne@68: private class Ribo implements Comparable{ jpayne@68: Ribo(Read r_, int tid_, float identity_){ jpayne@68: r=r_; jpayne@68: tid=tid_; jpayne@68: identity=identity_; jpayne@68: product=score(r.length(), identity); jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: public int compareTo(Ribo b) { jpayne@68: if(b.product>product){return -1;} jpayne@68: else if(b.productr.length()){return -1;} jpayne@68: else if(b.r.length()> { jpayne@68: jpayne@68: @Override jpayne@68: public int compare(ArrayList a, ArrayList b) { jpayne@68: return a.size()>b.size() ? -1 : a.size() in=new ArrayList(); jpayne@68: jpayne@68: /** Alternate input file path */ jpayne@68: private String alt=null; jpayne@68: jpayne@68: /** Primary output file path */ jpayne@68: private String out1=null; jpayne@68: jpayne@68: /** Override input file extension */ jpayne@68: private String extin=null; jpayne@68: /** Override output file extension */ jpayne@68: private String extout=null; jpayne@68: jpayne@68: ArrayList bestList=new ArrayList(); jpayne@68: HashMap> listMap=new HashMap>(100000); jpayne@68: ConcurrentLinkedQueue> queue; jpayne@68: jpayne@68: jpayne@68: IntHashSet seenTaxID=new IntHashSet(1000000); jpayne@68: jpayne@68: byte[] consensus16S; jpayne@68: byte[] consensus18S; jpayne@68: jpayne@68: int idealLength(){ jpayne@68: if(process16S){return consensus16S.length;} jpayne@68: return consensus18S.length; jpayne@68: } jpayne@68: jpayne@68: boolean useConsensus=false; jpayne@68: boolean fast=false; jpayne@68: int maxns=-1; jpayne@68: int minlen=1; jpayne@68: int maxlen=4000; jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Number of reads processed */ jpayne@68: protected long readsProcessed=0; jpayne@68: /** Number of bases processed */ jpayne@68: protected long basesProcessed=0; jpayne@68: jpayne@68: /** Number of reads retained */ jpayne@68: protected long readsOut=0; jpayne@68: /** Number of bases retained */ jpayne@68: protected long basesOut=0; jpayne@68: jpayne@68: /** Quit after processing this many input reads; -1 means no limit */ jpayne@68: private long maxReads=-1; jpayne@68: jpayne@68: private float minID=0.62f; jpayne@68: jpayne@68: private boolean process16S=true; jpayne@68: private boolean process18S=false; jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Final Fields ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Primary input file */ jpayne@68: private final ArrayList ffin; jpayne@68: private final FileFormat ffalt; jpayne@68: jpayne@68: /** Primary output file */ jpayne@68: private final FileFormat ffout1; jpayne@68: jpayne@68: final int threads; jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Common Fields ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Print status messages to this output stream */ jpayne@68: private PrintStream outstream=System.err; jpayne@68: /** Print verbose messages */ jpayne@68: public static boolean verbose=false; jpayne@68: /** True if an error was encountered */ jpayne@68: public boolean errorState=false; jpayne@68: /** Overwrite existing output files */ jpayne@68: private boolean overwrite=false; jpayne@68: /** Append to existing output files */ jpayne@68: private boolean append=false; jpayne@68: /** Reads are output in input order */ jpayne@68: private boolean ordered=false; jpayne@68: jpayne@68: }