jpayne@68: package consensus; jpayne@68: jpayne@68: import java.io.PrintStream; jpayne@68: import java.util.ArrayList; jpayne@68: import java.util.LinkedHashMap; jpayne@68: import java.util.Map.Entry; jpayne@68: import java.util.concurrent.atomic.AtomicIntegerArray; jpayne@68: import java.util.concurrent.atomic.AtomicLongArray; jpayne@68: jpayne@68: import fileIO.ByteFile; jpayne@68: import fileIO.FileFormat; jpayne@68: import fileIO.ReadWrite; jpayne@68: import shared.Parse; jpayne@68: import shared.Parser; jpayne@68: import shared.PreParser; jpayne@68: import shared.ReadStats; jpayne@68: import shared.Shared; jpayne@68: import shared.Timer; jpayne@68: import shared.Tools; jpayne@68: import stream.ConcurrentReadInputStream; jpayne@68: import stream.ConcurrentReadOutputStream; jpayne@68: import stream.FastaReadInputStream; jpayne@68: import stream.Read; jpayne@68: import stream.SamLine; jpayne@68: import stream.SamReadStreamer; jpayne@68: import stream.SamStreamer; jpayne@68: import structures.ByteBuilder; jpayne@68: import structures.ListNum; jpayne@68: import template.Accumulator; jpayne@68: import template.ThreadWaiter; jpayne@68: import var2.SamFilter; jpayne@68: jpayne@68: /** jpayne@68: * Resizes scaffold gaps to represent the best estimate jpayne@68: * based on the insert size distribution of paired reads. jpayne@68: * jpayne@68: * @author Brian Bushnell jpayne@68: * @date September 11, 2019 jpayne@68: * jpayne@68: */ jpayne@68: public class FixScaffoldGaps implements Accumulator { jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Initialization ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** jpayne@68: * Code entrance from the command line. jpayne@68: * @param args Command line arguments jpayne@68: */ jpayne@68: public static void main(String[] args){ jpayne@68: //Start a timer immediately upon code entrance. jpayne@68: Timer t=new Timer(); jpayne@68: jpayne@68: //Create an instance of this class jpayne@68: FixScaffoldGaps x=new FixScaffoldGaps(args); jpayne@68: jpayne@68: //Run the object jpayne@68: x.process(t); jpayne@68: jpayne@68: //Close the print stream if it was redirected jpayne@68: Shared.closeStream(x.outstream); jpayne@68: } jpayne@68: jpayne@68: /** jpayne@68: * Constructor. jpayne@68: * @param args Command line arguments jpayne@68: */ jpayne@68: public FixScaffoldGaps(String[] args){ jpayne@68: jpayne@68: {//Preparse block for help, config files, and outstream jpayne@68: PreParser pp=new PreParser(args, getClass(), false); jpayne@68: args=pp.args; jpayne@68: outstream=pp.outstream; jpayne@68: } jpayne@68: jpayne@68: //Set shared static variables prior to parsing jpayne@68: ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; jpayne@68: ReadWrite.MAX_ZIP_THREADS=Shared.threads(); jpayne@68: jpayne@68: samFilter.includeUnmapped=false; jpayne@68: samFilter.includeSupplimentary=false; jpayne@68: // samFilter.includeDuplicate=false; jpayne@68: samFilter.includeNonPrimary=false; jpayne@68: samFilter.includeQfail=false; jpayne@68: // samFilter.minMapq=4; jpayne@68: jpayne@68: {//Parse the arguments jpayne@68: final Parser parser=parse(args); jpayne@68: jpayne@68: Parser.processQuality(); jpayne@68: jpayne@68: maxReads=parser.maxReads; jpayne@68: overwrite=ReadStats.overwrite=parser.overwrite; jpayne@68: append=ReadStats.append=parser.append; jpayne@68: jpayne@68: in=parser.in1; jpayne@68: extin=parser.extin; jpayne@68: jpayne@68: out=parser.out1; jpayne@68: extout=parser.extout; jpayne@68: } jpayne@68: jpayne@68: { jpayne@68: // if("auto".equalsIgnoreCase(atomic)){Scaffold.setCA3A(Shared.threads()>8);} jpayne@68: // else{Scaffold.setCA3A(Parse.parseBoolean(atomic));} jpayne@68: samFilter.setSamtoolsFilter(); jpayne@68: jpayne@68: streamerThreads=Tools.max(1, Tools.min(streamerThreads, Shared.threads())); jpayne@68: assert(streamerThreads>0) : streamerThreads; jpayne@68: } jpayne@68: jpayne@68: validateParams(); jpayne@68: fixExtensions(); //Add or remove .gz or .bz2 as needed jpayne@68: checkFileExistence(); //Ensure files can be read and written jpayne@68: checkStatics(); //Adjust file-related static fields as needed for this program jpayne@68: jpayne@68: //Create output FileFormat objects jpayne@68: ffout=FileFormat.testOutput(out, FileFormat.FASTA, extout, true, overwrite, append, ordered); jpayne@68: jpayne@68: //Create input FileFormat objects jpayne@68: ffin=FileFormat.testInput(in, FileFormat.SAM, extin, true, true); jpayne@68: ffref=FileFormat.testInput(ref, FileFormat.FASTA, null, true, true); jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Initialization Helpers ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Parse arguments from the command line */ jpayne@68: private Parser parse(String[] args){ jpayne@68: jpayne@68: //Create a parser object jpayne@68: Parser parser=new Parser(); jpayne@68: jpayne@68: //Set any necessary Parser defaults here jpayne@68: //parser.foo=bar; jpayne@68: jpayne@68: //Parse each argument jpayne@68: for(int i=0; i1 ? split[1] : null; jpayne@68: if(b!=null && b.equalsIgnoreCase("null")){b=null;} jpayne@68: jpayne@68: if(a.equals("verbose")){ jpayne@68: verbose=Parse.parseBoolean(b); jpayne@68: }else if(a.equals("ref") || a.equals("scaffolds")){ jpayne@68: ref=b; jpayne@68: }else if(a.equals("insertlist")){ jpayne@68: insertList=b; jpayne@68: }else if(a.equals("ordered")){ jpayne@68: ordered=Parse.parseBoolean(b); jpayne@68: }else if(a.equalsIgnoreCase("ns") || a.equalsIgnoreCase("n") || a.equalsIgnoreCase("scaffoldbreak") || a.equalsIgnoreCase("gap")){ jpayne@68: scaffoldBreakNs=Integer.parseInt(b); jpayne@68: assert(scaffoldBreakNs>0); jpayne@68: }else if(a.equalsIgnoreCase("mindepth")){ jpayne@68: minDepth=Integer.parseInt(b); jpayne@68: assert(minDepth>0); jpayne@68: }else if(a.equalsIgnoreCase("trim") || a.equalsIgnoreCase("trimFraction") || a.equalsIgnoreCase("border")){ jpayne@68: trimFraction=Float.parseFloat(b); jpayne@68: assert(trimFraction>=0 && trimFraction<=1) : "trimFraction should be between 0 and 1"; jpayne@68: }else if(a.equals("clearfilters") || a.equals("clearfilter")){ jpayne@68: if(Parse.parseBoolean(b)){ jpayne@68: samFilter.clear(); jpayne@68: } jpayne@68: }else if(a.equals("parse_flag_goes_here")){ jpayne@68: long fake_variable=Parse.parseKMG(b); jpayne@68: //Set a variable here jpayne@68: }else if(samFilter.parse(arg, a, b)){ jpayne@68: //do nothing jpayne@68: }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser jpayne@68: //do nothing jpayne@68: }else{ jpayne@68: outstream.println("Unknown parameter "+args[i]); jpayne@68: assert(false) : "Unknown parameter "+args[i]; jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: return parser; jpayne@68: } jpayne@68: jpayne@68: /** Add or remove .gz or .bz2 as needed */ jpayne@68: private void fixExtensions(){ jpayne@68: in=Tools.fixExtension(in); jpayne@68: ref=Tools.fixExtension(ref); jpayne@68: } jpayne@68: jpayne@68: /** Ensure files can be read and written */ jpayne@68: private void checkFileExistence(){ jpayne@68: jpayne@68: //Ensure there is an input file jpayne@68: if(in==null){throw new RuntimeException("Error - an input file is required.");} jpayne@68: jpayne@68: //Ensure there is an input file jpayne@68: if(ref==null){throw new RuntimeException("Error - a reference file is required.");} jpayne@68: jpayne@68: //Ensure output files can be written jpayne@68: if(!Tools.testOutputFiles(overwrite, append, false, out)){ jpayne@68: outstream.println((out==null)+", "+out); jpayne@68: throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out+"\n"); jpayne@68: } jpayne@68: jpayne@68: //Ensure input files can be read jpayne@68: if(!Tools.testInputFiles(false, true, in, ref)){ jpayne@68: throw new RuntimeException("\nCan't read some input files.\n"); jpayne@68: } jpayne@68: jpayne@68: //Ensure that no file was specified multiple times jpayne@68: if(!Tools.testForDuplicateFiles(true, in, ref, out)){ jpayne@68: throw new RuntimeException("\nSome file names were specified multiple times.\n"); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: /** Adjust file-related static fields as needed for this program */ jpayne@68: private static void checkStatics(){ jpayne@68: //Adjust the number of threads for input file reading jpayne@68: if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ jpayne@68: ByteFile.FORCE_MODE_BF2=true; jpayne@68: } jpayne@68: jpayne@68: assert(FastaReadInputStream.settingsOK()); jpayne@68: } jpayne@68: jpayne@68: /** Ensure parameter ranges are within bounds and required parameters are set */ jpayne@68: private boolean validateParams(){ jpayne@68: // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo; jpayne@68: return true; jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Outer Methods ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Create read streams and process all data */ jpayne@68: void process(Timer t){ jpayne@68: jpayne@68: //Turn off read validation in the input threads to increase speed jpayne@68: final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; jpayne@68: Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; jpayne@68: jpayne@68: //Create a read input stream jpayne@68: final SamStreamer ss=makeStreamer(ffin); jpayne@68: jpayne@68: //Load reference jpayne@68: loadReferenceCustom(); jpayne@68: jpayne@68: //Reset counters jpayne@68: readsProcessed=readsOut=0; jpayne@68: basesProcessed=basesOut=0; jpayne@68: jpayne@68: //Process the reads in separate threads jpayne@68: spawnThreads(ss); jpayne@68: jpayne@68: //Optionally create a read output stream jpayne@68: final ConcurrentReadOutputStream ros=makeCros(); jpayne@68: jpayne@68: if(verbose){outstream.println("Fixing reference.");} jpayne@68: jpayne@68: fixScaffolds(ros); jpayne@68: jpayne@68: if(verbose){outstream.println("Finished; closing streams.");} jpayne@68: jpayne@68: //Write anything that was accumulated by ReadStats jpayne@68: errorState|=ReadStats.writeAll(); jpayne@68: //Close the read streams jpayne@68: errorState|=ReadWrite.closeStream(ros); jpayne@68: jpayne@68: //Reset read validation jpayne@68: Read.VALIDATE_IN_CONSTRUCTOR=vic; jpayne@68: jpayne@68: //Report timing and results jpayne@68: t.stop(); jpayne@68: outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); jpayne@68: outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, scaffoldsOut, scaffoldLengthOut, 8, false)); jpayne@68: jpayne@68: outstream.println(); jpayne@68: outstream.println(Tools.number("Average Insert", totalAverageInsert, 2, 8)); jpayne@68: outstream.println(Tools.number("Gaps Unchanged", gapsUnchanged, 8)); jpayne@68: outstream.println(Tools.number("Gaps Widened ", gapsWidened, 8)); jpayne@68: outstream.println(Tools.number("Gaps Narrowed ", gapsNarrowed, 8)); jpayne@68: outstream.println(Tools.number("Ns Total ", nsTotal, 8)); jpayne@68: outstream.println(Tools.number("Ns Added ", nsAdded, 8)); jpayne@68: outstream.println(Tools.number("Ns Removed ", nsRemoved, 8)); jpayne@68: jpayne@68: jpayne@68: //Throw an exception of there was an error in a thread jpayne@68: if(errorState){ jpayne@68: throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private synchronized void loadReferenceCustom(){ jpayne@68: assert(!loadedRef); jpayne@68: ConcurrentReadInputStream cris=makeRefCris(); jpayne@68: for(ListNum ln=cris.nextList(); ln!=null && ln.size()>0; ln=cris.nextList()) { jpayne@68: for(Read r : ln){ jpayne@68: String name=r.id; jpayne@68: String name2=Tools.trimToWhitespace(r.id); jpayne@68: Scaffold scaf=new Scaffold(name, r.bases, r.numericID); jpayne@68: refMap.put(name, scaf); jpayne@68: refMap2.put(name2, scaf); jpayne@68: } jpayne@68: } jpayne@68: loadedRef=true; jpayne@68: } jpayne@68: jpayne@68: private ConcurrentReadInputStream makeRefCris(){ jpayne@68: ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffref, null); jpayne@68: cris.start(); //Start the stream jpayne@68: if(verbose){outstream.println("Started cris");} jpayne@68: boolean paired=cris.paired(); jpayne@68: assert(!paired) : "References should not be paired."; jpayne@68: return cris; jpayne@68: } jpayne@68: jpayne@68: private SamStreamer makeStreamer(FileFormat ff){ jpayne@68: if(ff==null){return null;} jpayne@68: SamStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads); jpayne@68: ss.start(); //Start the stream jpayne@68: if(verbose){outstream.println("Started Streamer");} jpayne@68: return ss; jpayne@68: } jpayne@68: jpayne@68: private ConcurrentReadOutputStream makeCros(){ jpayne@68: if(ffout==null){return null;} jpayne@68: jpayne@68: //Select output buffer size based on whether it needs to be ordered jpayne@68: final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8); jpayne@68: jpayne@68: final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout, null, buff, null, false); jpayne@68: ros.start(); //Start the stream jpayne@68: return ros; jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Thread Management ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** Spawn process threads */ jpayne@68: private void spawnThreads(final SamStreamer ss){ jpayne@68: jpayne@68: //Do anything necessary prior to processing jpayne@68: jpayne@68: //Determine how many threads may be used jpayne@68: final int threads=Shared.threads(); jpayne@68: jpayne@68: //Fill a list with ProcessThreads jpayne@68: ArrayList alpt=new ArrayList(threads); jpayne@68: for(int i=0; i list=new ArrayList(200); jpayne@68: long num=0; jpayne@68: long lengthSum=0; jpayne@68: for(Entry e : refMap.entrySet()){ jpayne@68: Scaffold scaf=e.getValue(); jpayne@68: Read r=scaf.fixScaffold(bb); jpayne@68: lengthSum+=r.length(); jpayne@68: list.add(r); jpayne@68: scaffoldsOut++; jpayne@68: scaffoldLengthOut+=r.length(); jpayne@68: jpayne@68: if(list.size()>=200 || lengthSum>=100000){ jpayne@68: if(ros!=null){ros.add(list, num);} jpayne@68: list=new ArrayList(200); jpayne@68: num++; jpayne@68: lengthSum=0; jpayne@68: } jpayne@68: } jpayne@68: if(list.size()>0){ jpayne@68: if(ros!=null){ros.add(list, num);} jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private static int calcInsertSize(SamLine sl) { jpayne@68: assert(sl.mapped() && sl.pairedOnSameChrom()); jpayne@68: assert(sl.primary()); jpayne@68: assert(!sl.supplementary()); jpayne@68: assert(sl.leftmost()); jpayne@68: jpayne@68: assert(sl.tlen>0) : sl.tlen+"\n\n"+sl; jpayne@68: return sl.tlen>0 ? sl.tlen : -sl.tlen; jpayne@68: jpayne@68: // final int insertSize; jpayne@68: // String insertTag=null; jpayne@68: // if(sl.optional!=null){ jpayne@68: // for(String s : sl.optional){ jpayne@68: // if(s.startsWith("X8:Z:")){ jpayne@68: // insertTag=s; jpayne@68: // break; jpayne@68: // } jpayne@68: // } jpayne@68: // } jpayne@68: // if(insertTag!=null){ jpayne@68: // insertSize=Integer.parseInt(insertTag.substring(5)); jpayne@68: // }else{ jpayne@68: // insertSize=sl.tlen;//This is unsafe due to indels. jpayne@68: // assert(false) : "Reads need insert size tags."; jpayne@68: // } jpayne@68: // assert(insertSize>0) : sl; jpayne@68: // jpayne@68: // return insertSize; jpayne@68: } jpayne@68: jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: /*---------------- Inner Classes ----------------*/ jpayne@68: /*--------------------------------------------------------------*/ jpayne@68: jpayne@68: /** This class is static to prevent accidental writing to shared variables. jpayne@68: * It is safe to remove the static modifier. */ jpayne@68: class ProcessThread extends Thread { jpayne@68: jpayne@68: //Constructor jpayne@68: ProcessThread(final SamStreamer ss_, final int tid_){ jpayne@68: ss=ss_; jpayne@68: tid=tid_; jpayne@68: } jpayne@68: jpayne@68: //Called by start() jpayne@68: @Override jpayne@68: public void run(){ jpayne@68: //Do anything necessary prior to processing jpayne@68: jpayne@68: //Process the reads jpayne@68: processInner(); jpayne@68: jpayne@68: //Do anything necessary after processing jpayne@68: jpayne@68: //Indicate successful exit status jpayne@68: success=true; jpayne@68: } jpayne@68: jpayne@68: /** Iterate through the reads */ jpayne@68: void processInner(){ jpayne@68: jpayne@68: //Grab and process all lists jpayne@68: for(ListNum ln=ss.nextReads(); ln!=null; ln=ss.nextReads()){ jpayne@68: // if(verbose){outstream.println("Got list of size "+list.size());} //Disabled due to non-static access jpayne@68: jpayne@68: processList(ln); jpayne@68: } jpayne@68: jpayne@68: } jpayne@68: jpayne@68: void processList(ListNum ln){ jpayne@68: jpayne@68: //Grab the actual read list from the ListNum jpayne@68: final ArrayList reads=ln.list; jpayne@68: jpayne@68: //Loop through each read in the list jpayne@68: for(int idx=0; idx=0 && i=scaffoldBreakNs && i-streak>300 && i