Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/CompareSSU.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/CompareSSU.java Tue Mar 18 16:23:26 2025 -0400 @@ -0,0 +1,514 @@ +package sketch; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicInteger; + +import fileIO.ByteFile; +import fileIO.ByteStreamWriter; +import fileIO.FileFormat; +import fileIO.ReadWrite; +import shared.Parse; +import shared.Parser; +import shared.PreParser; +import shared.ReadStats; +import shared.Shared; +import shared.Timer; +import shared.Tools; +import stream.FASTQ; +import stream.FastaReadInputStream; +import stream.Read; +import structures.ByteBuilder; +import structures.FloatList; +import tax.TaxNode; +import tax.TaxTree; +import template.Accumulator; +import template.ThreadWaiter; + +/** + * Compares SSUs, all-to-all or fractional matrix. + * + * @author Brian Bushnell + * @date December 2, 2019 + * + */ +public class CompareSSU implements Accumulator<CompareSSU.ProcessThread> { + + /*--------------------------------------------------------------*/ + /*---------------- Initialization ----------------*/ + /*--------------------------------------------------------------*/ + + /** + * Code entrance from the command line. + * @param args Command line arguments + */ + public static void main(String[] args){ + //Start a timer immediately upon code entrance. + Timer t=new Timer(); + + //Create an instance of this class + CompareSSU x=new CompareSSU(args); + + //Run the object + x.process(t); + + //Close the print stream if it was redirected + Shared.closeStream(x.outstream); + } + + /** + * Constructor. + * @param args Command line arguments + */ + public CompareSSU(String[] args){ + + {//Preparse block for help, config files, and outstream + PreParser pp=new PreParser(args, getClass(), false); + args=pp.args; + outstream=pp.outstream; + } + + //Set shared static variables prior to parsing + ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; + ReadWrite.MAX_ZIP_THREADS=Shared.threads(); + + {//Parse the arguments + final Parser parser=parse(args); + Parser.processQuality(); + + maxReads=parser.maxReads; + overwrite=ReadStats.overwrite=parser.overwrite; + append=ReadStats.append=parser.append; + + in1=parser.in1; + + out1=parser.out1; + } + + validateParams(); + if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} + FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; + checkFileExistence(); //Ensure files can be read and written + checkStatics(); //Adjust file-related static fields as needed for this program + + //Create output FileFormat objects + ffout1=FileFormat.testOutput(out1, FileFormat.TXT, null, true, overwrite, append, ordered); + + tree=(treeFile==null) ? null : TaxTree.loadTaxTree(treeFile, outstream, true, false); + + SSUMap.r16SFile=in1; + if(SSUMap.r16SFile!=null){ + SSUMap.load(outstream); + HashMap<Integer, byte[]> ssuMap=SSUMap.r16SMap; + ssuList=new ArrayList<Read>(ssuMap.size()); + for(Entry<Integer, byte[]> e : ssuMap.entrySet()){ + int id=e.getKey(); + byte[] value=e.getValue(); + if(value.length>=minlen && value.length<=maxlen){ + Read r=new Read(value, null, id);//Sets numeric ID to TaxID. + if(maxns<0 || r.countNocalls()<=maxns){ + ssuList.add(r); + } + } + } + Collections.shuffle(ssuList); + } + for(int i=0; i<idLists.length; i++){ + idLists[i]=new FloatList(); + } + } + + /*--------------------------------------------------------------*/ + /*---------------- Initialization Helpers ----------------*/ + /*--------------------------------------------------------------*/ + + /** Parse arguments from the command line */ + private Parser parse(String[] args){ + + //Create a parser object + Parser parser=new Parser(); + + //Set any necessary Parser defaults here + //parser.foo=bar; + + //Parse each argument + for(int i=0; i<args.length; i++){ + String arg=args[i]; + + //Break arguments into their constituent parts, in the form of "a=b" + String[] split=arg.split("="); + String a=split[0].toLowerCase(); + String b=split.length>1 ? split[1] : null; + if(b!=null && b.equalsIgnoreCase("null")){b=null;} + + if(a.equals("verbose")){ + verbose=Parse.parseBoolean(b); + }else if(a.equals("tree")){ + treeFile=b; + }else if(a.equals("ordered")){ + ordered=Parse.parseBoolean(b); + }else if(a.equals("ata") || a.equals("alltoall")){ + allToAll=Parse.parseBoolean(b); + }else if(a.equals("store") || a.equals("storeresults")){ + storeResults=Parse.parseBoolean(b); + }else if(a.equals("minlen") || a.equals("maxlength")){ + minlen=Parse.parseIntKMG(b); + }else if(a.equals("maxlen") || a.equals("maxlength")){ + maxlen=Parse.parseIntKMG(b); + }else if(a.equalsIgnoreCase("maxns")){ + maxns=Parse.parseIntKMG(b); + }else if(a.equals("parse_flag_goes_here")){ + long fake_variable=Parse.parseKMG(b); + //Set a variable here + }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser + //do nothing + }else{ + outstream.println("Unknown parameter "+args[i]); + assert(false) : "Unknown parameter "+args[i]; + } + } + + return parser; + } + + /** Ensure files can be read and written */ + private void checkFileExistence(){ + //Ensure output files can be written + if(!Tools.testOutputFiles(overwrite, append, false, out1)){ + outstream.println((out1==null)+", "+out1); + throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out1+"\n"); + } + + //Ensure input files can be read + if(!Tools.testInputFiles(false, true, in1)){ + throw new RuntimeException("\nCan't read some input files.\n"); + } + + //Ensure that no file was specified multiple times + if(!Tools.testForDuplicateFiles(true, in1, out1)){ + throw new RuntimeException("\nSome file names were specified multiple times.\n"); + } + } + + /** Adjust file-related static fields as needed for this program */ + private static void checkStatics(){ + //Adjust the number of threads for input file reading + if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ + ByteFile.FORCE_MODE_BF2=true; + } + + assert(FastaReadInputStream.settingsOK()); + } + + /** Ensure parameter ranges are within bounds and required parameters are set */ + private boolean validateParams(){ + return true; + } + + /*--------------------------------------------------------------*/ + /*---------------- Outer Methods ----------------*/ + /*--------------------------------------------------------------*/ + + /** Create read streams and process all data */ + void process(Timer t){ + + ByteStreamWriter bsw=makeBSW(ffout1); + if(bsw!=null){ + bsw.forcePrint("#Level\tIdentity\tQueryID\tRefID\n"); + } + + //Reset counters + queriesProcessed=0; + comparisons=0; + + //Process the reads in separate threads + spawnThreads(bsw); + + if(verbose){outstream.println("Finished; closing streams.");} + + //Close the read streams + if(bsw!=null){errorState|=bsw.poisonAndWait();} + + { + ByteBuilder bb=new ByteBuilder(); + bb.append("\nLevel \tCount\tMean"+(storeResults ? "\tMedian\t90%ile\t10%ile\tSTDev" : "")+"\n"); + outstream.print(bb); + final int minlen="superkingdom".length(); + for(int level=0; level<taxLevels; level++){ + if(counts[level]>0){ + bb.clear(); + bb.append(TaxTree.levelToStringExtended(level)); + while(bb.length()<minlen){bb.space();} + bb.tab().append(counts[level]).tab(); + bb.append(sums[level]/counts[level]*100, 3); + if(storeResults){ + FloatList list=idLists[level]; + list.sort(); + double stdev=list.stdev(); + double median=list.median(); +// double mode=list.mode(); + double percent90=list.percentile(0.9f); + double percent10=list.percentile(0.1f); + bb.tab().append(median*100, 3).tab().append(percent90*100, 3).tab().append(percent10*100, 3).tab().append(stdev*100, 3); + } + bb.nl(); + outstream.print(bb); + } + } + } + + //Report timing and results + t.stop(); + outstream.println(); + outstream.println(Tools.timeQueriesComparisonsProcessed(t, queriesProcessed, comparisons, 8)); + + //Throw an exception of there was an error in a thread + if(errorState){ + throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); + } + } + + /*--------------------------------------------------------------*/ + /*---------------- Thread Management ----------------*/ + /*--------------------------------------------------------------*/ + + /** Spawn process threads */ + private void spawnThreads(ByteStreamWriter bsw){ + + //Do anything necessary prior to processing + + //Determine how many threads may be used + final int threads=Shared.threads(); + + //Fill a list with ProcessThreads + ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads); + for(int i=0; i<threads; i++){ + alpt.add(new ProcessThread(bsw, i, threads)); + } + + //Start the threads and wait for them to finish + boolean success=ThreadWaiter.startAndWait(alpt, this); + errorState&=!success; + + //Do anything necessary after processing + + } + + @Override + public final void accumulate(ProcessThread pt){ + queriesProcessed+=pt.querysProcessedT; + comparisons+=pt.comparisonsT; + errorState|=(!pt.success); + + for(int i=0; i<taxLevels; i++){ + idLists[i].addAll(pt.idListsT[i]); + counts[i]+=pt.countsT[i]; + sums[i]+=pt.sumsT[i]; + } + } + + @Override + public final boolean success(){return !errorState;} + + /*--------------------------------------------------------------*/ + /*---------------- Inner Methods ----------------*/ + /*--------------------------------------------------------------*/ + + private static ByteStreamWriter makeBSW(FileFormat ff){ + if(ff==null){return null;} + ByteStreamWriter bsw=new ByteStreamWriter(ff); + bsw.start(); + return bsw; + } + + /*--------------------------------------------------------------*/ + /*---------------- Inner Classes ----------------*/ + /*--------------------------------------------------------------*/ + + /** This class is static to prevent accidental writing to shared variables. + * It is safe to remove the static modifier. */ + class ProcessThread extends Thread { + + //Constructor + ProcessThread(ByteStreamWriter bsw_, final int tid_, final int threads_){ + bsw=bsw_; + threadID=tid_; + threads=threads_; + listCopy=new ArrayList<Read>(ssuList.size()); + listCopy.addAll(ssuList); + for(int i=0; i<idListsT.length; i++){ + idListsT[i]=new FloatList(); + } + } + + //Called by start() + @Override + public void run(){ + //Do anything necessary prior to processing + + //Process the reads + processInner(); + + //Do anything necessary after processing + + //Indicate successful exit status + success=true; + } + + /** Iterate through the reads */ + void processInner(){ + final long limit=Tools.min(ssuList.size(), (maxReads>0 ? maxReads : Integer.MAX_VALUE)); + + for(int num=next.getAndIncrement(); num<limit; num=next.getAndIncrement()){ + Read r=ssuList.get(num); + processRead(r); + } + } + + void processRead(Read query){ + if(query.numericID<1){return;}//invalid TID + final int qid=(int)query.numericID; + if(tree.getNode(qid)==null) {return;} + if(querysProcessedT%5==0) { + Collections.shuffle(listCopy); + } + querysProcessedT++; + + ByteBuilder bb=new ByteBuilder(); + + long seen=0; + for(Read ref :listCopy){ + int rid=(int)ref.numericID; + if(rid!=qid && rid>0 && tree.getNode(rid)!=null){ + int aid=tree.commonAncestor(qid, rid); + if(aid>0){ + TaxNode tn=tree.getNode(aid); + if(tn.isRanked()){ + int level=tn.levelExtended; + long mask=1L<<level; + if(allToAll || ((mask&printLevels)!=0 && (mask&seen)==0)){ + seen|=mask; + float identity=compare(query, ref, level); + bb.append(TaxTree.levelToStringExtended(level)).tab().append(identity, 6); + bb.tab().append(qid).tab().append(rid).nl(); + } + } + } + } + } + if(bsw!=null){ + bsw.addJob(bb); + } + } + + float compare(Read query, Read ref, int level){ + comparisonsT++; + float identity=SketchObject.align(query.bases, ref.bases); + if(storeResults){idListsT[level].add(identity);} + countsT[level]++; + sumsT[level]+=identity; + return identity; + } + + /** Number of reads processed by this thread */ + protected long querysProcessedT=0; + /** Number of bases processed by this thread */ + protected long comparisonsT=0; + + /** True only if this thread has completed successfully */ + boolean success=false; + + /** Shared output stream */ + private final ByteStreamWriter bsw; + /** Thread ID */ + final int threadID; + + final int threads; + + ArrayList<Read> listCopy; + + final FloatList[] idListsT=new FloatList[taxLevels]; + long[] countsT=new long[taxLevels]; + double[] sumsT=new double[taxLevels]; + } + + /*--------------------------------------------------------------*/ + /*---------------- Fields ----------------*/ + /*--------------------------------------------------------------*/ + + /** Primary input file path */ + private String in1=null; + + private String treeFile="auto"; + + /** Primary output file path */ + private String out1=null; + + public static ArrayList<Read> ssuList=null; + + final static int taxLevels=TaxTree.numTaxLevelNamesExtended; + static final String[] printLevelsArray=new String[] {"strain", "species", "genus", "family", "order", "class", "phylum", "superkingdom", "life"}; + static final long printLevels=makePrintLevels(printLevelsArray); + + private final TaxTree tree; + + private static final long makePrintLevels(String[] names){ + long mask=0; + for(String s : names){ + int level=TaxTree.stringToLevelExtended(s); + mask|=(1L<<level); + } + return mask; + } + + private FloatList[] idLists=new FloatList[taxLevels]; + private long[] counts=new long[taxLevels]; + private double[] sums=new double[taxLevels]; + + private int minlen=0; + private int maxlen=Integer.MAX_VALUE; + private int maxns=-1; + + /*--------------------------------------------------------------*/ + + /** Number of reads processed */ + protected long queriesProcessed=0; + /** Number of bases processed */ + protected long comparisons=0; + + /** Quit after processing this many input reads; -1 means no limit */ + private long maxReads=-1; + + private AtomicInteger next=new AtomicInteger(0); + + private boolean allToAll=false; + private boolean storeResults=false; + + /*--------------------------------------------------------------*/ + /*---------------- Final Fields ----------------*/ + /*--------------------------------------------------------------*/ + + /** Primary output file */ + private final FileFormat ffout1; + + /*--------------------------------------------------------------*/ + /*---------------- Common Fields ----------------*/ + /*--------------------------------------------------------------*/ + + /** Print status messages to this output stream */ + private PrintStream outstream=System.err; + /** Print verbose messages */ + public static boolean verbose=false; + /** True if an error was encountered */ + public boolean errorState=false; + /** Overwrite existing output files */ + private boolean overwrite=false; + /** Append to existing output files */ + private boolean append=false; + /** Reads are output in input order */ + private boolean ordered=false; + +}