Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/tax/SplitByTaxa.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/tax/SplitByTaxa.java Tue Mar 18 16:23:26 2025 -0400 @@ -0,0 +1,388 @@ +package tax; + +import java.io.File; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.HashMap; + +import fileIO.ByteFile; +import fileIO.FileFormat; +import fileIO.ReadWrite; +import shared.Parse; +import shared.Parser; +import shared.PreParser; +import shared.ReadStats; +import shared.Shared; +import shared.Timer; +import shared.Tools; +import stream.ConcurrentReadInputStream; +import stream.ConcurrentReadOutputStream; +import stream.FASTQ; +import stream.FastaReadInputStream; +import stream.Read; +import structures.ListNum; + +/** + * Filters sequences according to their taxonomy, + * as determined by the sequence name. Sequences should + * be labeled with a gi number or NCBI taxID. + * + * @author Brian Bushnell + * @date November 23, 2015 + * + */ +public class SplitByTaxa { + + /*--------------------------------------------------------------*/ + /*---------------- Initialization ----------------*/ + /*--------------------------------------------------------------*/ + + /** + * Code entrance from the command line. + * @param args Command line arguments + */ + public static void main(String[] args){ + Timer t=new Timer(); + SplitByTaxa x=new SplitByTaxa(args); + x.process(t); + + //Close the print stream if it was redirected + Shared.closeStream(x.outstream); + } + + /** + * Constructor. + * @param args Command line arguments + */ + public SplitByTaxa(String[] args){ + + {//Preparse block for help, config files, and outstream + PreParser pp=new PreParser(args, getClass(), false); + args=pp.args; + outstream=pp.outstream; + } + + //Set some shared static variables + Shared.capBuffers(4); + ReadWrite.USE_UNPIGZ=true; + ReadWrite.USE_PIGZ=false; + ReadWrite.USE_GZIP=false; + ReadWrite.MAX_ZIP_THREADS=Shared.threads(); + + boolean setInterleaved=false; //Whether interleaved was explicitly set. + String giTableFile=null; + String taxTreeFile=null; + + //Create a parser object + Parser parser=new Parser(); + + //Parse each argument + for(int i=0; i<args.length; i++){ + String arg=args[i]; + + //Break arguments into their constituent parts, in the form of "a=b" + String[] split=arg.split("="); + String a=split[0].toLowerCase(); + String b=split.length>1 ? split[1] : null; + + if(parser.parse(arg, a, b)){//Parse standard flags in the parser + //do nothing + }else if(a.equals("verbose")){ + verbose=Parse.parseBoolean(b); + }else if(a.equals("taxlevel") || a.equals("tl") || a.equals("level") || a.equals("lv")){ + taxLevelE=TaxTree.parseLevelExtended(b); + }else if(a.equals("table") || a.equals("gi") || a.equals("gitable")){ + giTableFile=b; + }else if(a.equals("tree") || a.equals("taxtree")){ + taxTreeFile=b; + }else{ + outstream.println("Unknown parameter "+args[i]); + assert(false) : "Unknown parameter "+args[i]; + // throw new RuntimeException("Unknown parameter "+args[i]); + } + } + + if("auto".equalsIgnoreCase(taxTreeFile)){taxTreeFile=TaxTree.defaultTreeFile();} + if("auto".equalsIgnoreCase(giTableFile)){giTableFile=TaxTree.defaultTableFile();} + + {//Process parser fields + Parser.processQuality(); + + maxReads=parser.maxReads; + + overwrite=ReadStats.overwrite=parser.overwrite; + append=ReadStats.append=parser.append; + setInterleaved=parser.setInterleaved; + + in1=parser.in1; + in2=parser.in2; + + out1=parser.out1; + out2=parser.out2; + + extin=parser.extin; + extout=parser.extout; + } + + //Do input file # replacement + if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){ + in2=in1.replace("#", "2"); + in1=in1.replace("#", "1"); + } + + //Do output file # replacement + if(out1!=null && out2==null && out1.indexOf('#')>-1){ + out2=out1.replace("#", "2"); + out1=out1.replace("#", "1"); + } + + assert(out1==null || out1.contains("%")) : "Output filename must contain % symbol."; + assert(out2==null || out2.contains("%")) : "Output filename must contain % symbol."; + assert(taxTreeFile!=null) : "This requires a taxtree file. On NERSC systems, set tree=auto."; + + //Adjust interleaved detection based on the number of input files + if(in2!=null){ + if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");} + FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; + } + + assert(FastaReadInputStream.settingsOK()); + + //Ensure there is an input file + if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} + + //Adjust the number of threads for input file reading + if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ + ByteFile.FORCE_MODE_BF2=true; + } + + //Ensure out2 is not set without out1 + if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");} + + //Adjust interleaved settings based on number of output files + if(!setInterleaved){ + assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n"; + if(in2!=null){ //If there are 2 input streams. + FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; + outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); + }else{ //There is one input stream. + if(out2!=null){ + FASTQ.FORCE_INTERLEAVED=true; + FASTQ.TEST_INTERLEAVED=false; + outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); + } + } + } + + //Ensure output files can be written + if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){ + outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2); + throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n"); + } + + //Ensure input files can be read + if(!Tools.testInputFiles(false, true, in1, in2)){ + throw new RuntimeException("\nCan't read some input files.\n"); + } + + //Ensure that no file was specified multiple times + if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){ + throw new RuntimeException("\nSome file names were specified multiple times.\n"); + } + + //Create input FileFormat objects + ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true); + ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true); + + TaxFilter.loadGiTable(giTableFile); + tree=TaxFilter.loadTree(taxTreeFile); + } + + /*--------------------------------------------------------------*/ + /*---------------- Outer Methods ----------------*/ + /*--------------------------------------------------------------*/ + + /** Create read streams and process all data */ + public void process(Timer t){ + + //Create a read input stream + final ConcurrentReadInputStream cris; + { + cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, null, null); + cris.start(); //Start the stream + if(verbose){outstream.println("Started cris");} + } + boolean paired=cris.paired(); + if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));} + + //Reset counters + readsProcessed=0; + basesProcessed=0; + + final HashMap<String, ConcurrentReadOutputStream> map=new HashMap<String, ConcurrentReadOutputStream>(); + + //Process the read stream + processInner(cris, map); + + if(verbose){outstream.println("Finished; closing streams.");} + + //Write anything that was accumulated by ReadStats + errorState|=ReadStats.writeAll(); + //Close the read streams + errorState|=ReadWrite.closeStream(cris); + + for(ConcurrentReadOutputStream ros : map.values()){ + ReadWrite.closeStream(ros); + } + + //Report timing and results + t.stop(); + outstream.println("Reads In: \t"+readsProcessed+" reads \t"+basesProcessed+" bases"); + outstream.println(); + outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); + + //Throw an exception of there was an error in a thread + if(errorState){ + throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); + } + } + + /** Iterate through the reads */ + void processInner(final ConcurrentReadInputStream cris, HashMap<String, ConcurrentReadOutputStream> map){ + + { + //Grab the first ListNum of reads + ListNum<Read> ln=cris.nextList(); + //Grab the actual read list from the ListNum + ArrayList<Read> reads=(ln!=null ? ln.list : null); + + //Check to ensure pairing is as expected + if(reads!=null && !reads.isEmpty()){ + Read r=reads.get(0); + assert((ffin1==null || ffin1.samOrBam()) || (r.mate!=null)==cris.paired()); + } + + //As long as there is a nonempty read list... + while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning + if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} + + //Loop through each read in the list + for(int idx=0; idx<reads.size(); idx++){ + final Read r1=reads.get(idx); + + //Track the initial length for statistics + final int initialLength1=r1.length(); + final int initialLength2=(r1.mateLength()); + + //Increment counters + readsProcessed+=r1.pairCount(); + basesProcessed+=initialLength1+initialLength2; + + TaxNode tn=tree.parseNodeFromHeader(r1.id, true); + if(tn==null){tn=tree.getNodeByName(r1.id);} + if(tn==null){tn=unknown;} + while(tn.levelExtended<taxLevelE && tn.id!=tn.pid){tn=tree.getNode(tn.pid);} + + if(out1!=null){ + ConcurrentReadOutputStream ros=map.get(tn.name); + if(ros==null){ + final int buff=4; + FileFormat ffout1=null, ffout2=null; + ffout1=FileFormat.testOutput(out1.replaceFirst("%", tn.name.replaceAll("\\s+", "_").replaceAll("[/\\\\]", "")), FileFormat.FASTQ, extout, false, overwrite, append, false); + if(out2!=null){ffout2=FileFormat.testOutput(out2.replaceFirst("%", tn.name.replaceAll("\\s+", "_").replaceAll("[/\\\\]", "")), FileFormat.FASTQ, extout, false, overwrite, append, false);} + ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, null, null, buff, null, false); + ros.start(); //Start the stream + map.put(tn.name, ros); + } + ArrayList<Read> temp=new ArrayList<Read>(1); //Kind of inefficient + temp.add(r1); + ros.add(temp, 0); + } + } + + //Notify the input stream that the list was used + cris.returnList(ln); + if(verbose){outstream.println("Returned a list.");} + + //Fetch a new list + ln=cris.nextList(); + reads=(ln!=null ? ln.list : null); + } + + //Notify the input stream that the final list was used + if(ln!=null){ + cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); + } + } + + //Do anything necessary after processing + + } + + /*--------------------------------------------------------------*/ + /*---------------- Inner Methods ----------------*/ + /*--------------------------------------------------------------*/ + + /*--------------------------------------------------------------*/ + /*---------------- Fields ----------------*/ + /*--------------------------------------------------------------*/ + + /** Primary input file path */ + private String in1=null; + /** Secondary input file path */ + private String in2=null; + + /** Primary output file path */ + private String out1=null; + /** Secondary output file path */ + private String out2=null; + + /** Override input file extension */ + private String extin=null; + /** Override output file extension */ + private String extout=null; + + /** The actual filter */ + private int taxLevelE=TaxTree.stringToLevelExtended("phylum"); + + /*--------------------------------------------------------------*/ + + /** Number of reads processed */ + protected long readsProcessed=0; + /** Number of bases processed */ + protected long basesProcessed=0; + + /** Quit after processing this many input reads; -1 means no limit */ + private long maxReads=-1; + + /*--------------------------------------------------------------*/ + /*---------------- Final Fields ----------------*/ + /*--------------------------------------------------------------*/ + + /** Primary input file */ + private final FileFormat ffin1; + /** Secondary input file */ + private final FileFormat ffin2; + + private final TaxTree tree; + + private final TaxNode unknown=new TaxNode(-99, -99, TaxTree.LIFE, TaxTree.LIFE_E, "UNKNOWN"); + + /*--------------------------------------------------------------*/ + /*---------------- Common Fields ----------------*/ + /*--------------------------------------------------------------*/ + + /** Print status messages to this output stream */ + private PrintStream outstream=System.err; + /** Print verbose messages */ + public static boolean verbose=false; + /** True if an error was encountered */ + public boolean errorState=false; + /** Overwrite existing output files */ + private boolean overwrite=false; + /** Append to existing output files */ + private boolean append=false; + /** This flag has no effect on singlethreaded programs */ + private final boolean ordered=false; + +}