Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/CompareSSU.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 package sketch; | |
2 | |
3 import java.io.PrintStream; | |
4 import java.util.ArrayList; | |
5 import java.util.Collections; | |
6 import java.util.HashMap; | |
7 import java.util.Map.Entry; | |
8 import java.util.concurrent.atomic.AtomicInteger; | |
9 | |
10 import fileIO.ByteFile; | |
11 import fileIO.ByteStreamWriter; | |
12 import fileIO.FileFormat; | |
13 import fileIO.ReadWrite; | |
14 import shared.Parse; | |
15 import shared.Parser; | |
16 import shared.PreParser; | |
17 import shared.ReadStats; | |
18 import shared.Shared; | |
19 import shared.Timer; | |
20 import shared.Tools; | |
21 import stream.FASTQ; | |
22 import stream.FastaReadInputStream; | |
23 import stream.Read; | |
24 import structures.ByteBuilder; | |
25 import structures.FloatList; | |
26 import tax.TaxNode; | |
27 import tax.TaxTree; | |
28 import template.Accumulator; | |
29 import template.ThreadWaiter; | |
30 | |
31 /** | |
32 * Compares SSUs, all-to-all or fractional matrix. | |
33 * | |
34 * @author Brian Bushnell | |
35 * @date December 2, 2019 | |
36 * | |
37 */ | |
38 public class CompareSSU implements Accumulator<CompareSSU.ProcessThread> { | |
39 | |
40 /*--------------------------------------------------------------*/ | |
41 /*---------------- Initialization ----------------*/ | |
42 /*--------------------------------------------------------------*/ | |
43 | |
44 /** | |
45 * Code entrance from the command line. | |
46 * @param args Command line arguments | |
47 */ | |
48 public static void main(String[] args){ | |
49 //Start a timer immediately upon code entrance. | |
50 Timer t=new Timer(); | |
51 | |
52 //Create an instance of this class | |
53 CompareSSU x=new CompareSSU(args); | |
54 | |
55 //Run the object | |
56 x.process(t); | |
57 | |
58 //Close the print stream if it was redirected | |
59 Shared.closeStream(x.outstream); | |
60 } | |
61 | |
62 /** | |
63 * Constructor. | |
64 * @param args Command line arguments | |
65 */ | |
66 public CompareSSU(String[] args){ | |
67 | |
68 {//Preparse block for help, config files, and outstream | |
69 PreParser pp=new PreParser(args, getClass(), false); | |
70 args=pp.args; | |
71 outstream=pp.outstream; | |
72 } | |
73 | |
74 //Set shared static variables prior to parsing | |
75 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; | |
76 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); | |
77 | |
78 {//Parse the arguments | |
79 final Parser parser=parse(args); | |
80 Parser.processQuality(); | |
81 | |
82 maxReads=parser.maxReads; | |
83 overwrite=ReadStats.overwrite=parser.overwrite; | |
84 append=ReadStats.append=parser.append; | |
85 | |
86 in1=parser.in1; | |
87 | |
88 out1=parser.out1; | |
89 } | |
90 | |
91 validateParams(); | |
92 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} | |
93 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; | |
94 checkFileExistence(); //Ensure files can be read and written | |
95 checkStatics(); //Adjust file-related static fields as needed for this program | |
96 | |
97 //Create output FileFormat objects | |
98 ffout1=FileFormat.testOutput(out1, FileFormat.TXT, null, true, overwrite, append, ordered); | |
99 | |
100 tree=(treeFile==null) ? null : TaxTree.loadTaxTree(treeFile, outstream, true, false); | |
101 | |
102 SSUMap.r16SFile=in1; | |
103 if(SSUMap.r16SFile!=null){ | |
104 SSUMap.load(outstream); | |
105 HashMap<Integer, byte[]> ssuMap=SSUMap.r16SMap; | |
106 ssuList=new ArrayList<Read>(ssuMap.size()); | |
107 for(Entry<Integer, byte[]> e : ssuMap.entrySet()){ | |
108 int id=e.getKey(); | |
109 byte[] value=e.getValue(); | |
110 if(value.length>=minlen && value.length<=maxlen){ | |
111 Read r=new Read(value, null, id);//Sets numeric ID to TaxID. | |
112 if(maxns<0 || r.countNocalls()<=maxns){ | |
113 ssuList.add(r); | |
114 } | |
115 } | |
116 } | |
117 Collections.shuffle(ssuList); | |
118 } | |
119 for(int i=0; i<idLists.length; i++){ | |
120 idLists[i]=new FloatList(); | |
121 } | |
122 } | |
123 | |
124 /*--------------------------------------------------------------*/ | |
125 /*---------------- Initialization Helpers ----------------*/ | |
126 /*--------------------------------------------------------------*/ | |
127 | |
128 /** Parse arguments from the command line */ | |
129 private Parser parse(String[] args){ | |
130 | |
131 //Create a parser object | |
132 Parser parser=new Parser(); | |
133 | |
134 //Set any necessary Parser defaults here | |
135 //parser.foo=bar; | |
136 | |
137 //Parse each argument | |
138 for(int i=0; i<args.length; i++){ | |
139 String arg=args[i]; | |
140 | |
141 //Break arguments into their constituent parts, in the form of "a=b" | |
142 String[] split=arg.split("="); | |
143 String a=split[0].toLowerCase(); | |
144 String b=split.length>1 ? split[1] : null; | |
145 if(b!=null && b.equalsIgnoreCase("null")){b=null;} | |
146 | |
147 if(a.equals("verbose")){ | |
148 verbose=Parse.parseBoolean(b); | |
149 }else if(a.equals("tree")){ | |
150 treeFile=b; | |
151 }else if(a.equals("ordered")){ | |
152 ordered=Parse.parseBoolean(b); | |
153 }else if(a.equals("ata") || a.equals("alltoall")){ | |
154 allToAll=Parse.parseBoolean(b); | |
155 }else if(a.equals("store") || a.equals("storeresults")){ | |
156 storeResults=Parse.parseBoolean(b); | |
157 }else if(a.equals("minlen") || a.equals("maxlength")){ | |
158 minlen=Parse.parseIntKMG(b); | |
159 }else if(a.equals("maxlen") || a.equals("maxlength")){ | |
160 maxlen=Parse.parseIntKMG(b); | |
161 }else if(a.equalsIgnoreCase("maxns")){ | |
162 maxns=Parse.parseIntKMG(b); | |
163 }else if(a.equals("parse_flag_goes_here")){ | |
164 long fake_variable=Parse.parseKMG(b); | |
165 //Set a variable here | |
166 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser | |
167 //do nothing | |
168 }else{ | |
169 outstream.println("Unknown parameter "+args[i]); | |
170 assert(false) : "Unknown parameter "+args[i]; | |
171 } | |
172 } | |
173 | |
174 return parser; | |
175 } | |
176 | |
177 /** Ensure files can be read and written */ | |
178 private void checkFileExistence(){ | |
179 //Ensure output files can be written | |
180 if(!Tools.testOutputFiles(overwrite, append, false, out1)){ | |
181 outstream.println((out1==null)+", "+out1); | |
182 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out1+"\n"); | |
183 } | |
184 | |
185 //Ensure input files can be read | |
186 if(!Tools.testInputFiles(false, true, in1)){ | |
187 throw new RuntimeException("\nCan't read some input files.\n"); | |
188 } | |
189 | |
190 //Ensure that no file was specified multiple times | |
191 if(!Tools.testForDuplicateFiles(true, in1, out1)){ | |
192 throw new RuntimeException("\nSome file names were specified multiple times.\n"); | |
193 } | |
194 } | |
195 | |
196 /** Adjust file-related static fields as needed for this program */ | |
197 private static void checkStatics(){ | |
198 //Adjust the number of threads for input file reading | |
199 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ | |
200 ByteFile.FORCE_MODE_BF2=true; | |
201 } | |
202 | |
203 assert(FastaReadInputStream.settingsOK()); | |
204 } | |
205 | |
206 /** Ensure parameter ranges are within bounds and required parameters are set */ | |
207 private boolean validateParams(){ | |
208 return true; | |
209 } | |
210 | |
211 /*--------------------------------------------------------------*/ | |
212 /*---------------- Outer Methods ----------------*/ | |
213 /*--------------------------------------------------------------*/ | |
214 | |
215 /** Create read streams and process all data */ | |
216 void process(Timer t){ | |
217 | |
218 ByteStreamWriter bsw=makeBSW(ffout1); | |
219 if(bsw!=null){ | |
220 bsw.forcePrint("#Level\tIdentity\tQueryID\tRefID\n"); | |
221 } | |
222 | |
223 //Reset counters | |
224 queriesProcessed=0; | |
225 comparisons=0; | |
226 | |
227 //Process the reads in separate threads | |
228 spawnThreads(bsw); | |
229 | |
230 if(verbose){outstream.println("Finished; closing streams.");} | |
231 | |
232 //Close the read streams | |
233 if(bsw!=null){errorState|=bsw.poisonAndWait();} | |
234 | |
235 { | |
236 ByteBuilder bb=new ByteBuilder(); | |
237 bb.append("\nLevel \tCount\tMean"+(storeResults ? "\tMedian\t90%ile\t10%ile\tSTDev" : "")+"\n"); | |
238 outstream.print(bb); | |
239 final int minlen="superkingdom".length(); | |
240 for(int level=0; level<taxLevels; level++){ | |
241 if(counts[level]>0){ | |
242 bb.clear(); | |
243 bb.append(TaxTree.levelToStringExtended(level)); | |
244 while(bb.length()<minlen){bb.space();} | |
245 bb.tab().append(counts[level]).tab(); | |
246 bb.append(sums[level]/counts[level]*100, 3); | |
247 if(storeResults){ | |
248 FloatList list=idLists[level]; | |
249 list.sort(); | |
250 double stdev=list.stdev(); | |
251 double median=list.median(); | |
252 // double mode=list.mode(); | |
253 double percent90=list.percentile(0.9f); | |
254 double percent10=list.percentile(0.1f); | |
255 bb.tab().append(median*100, 3).tab().append(percent90*100, 3).tab().append(percent10*100, 3).tab().append(stdev*100, 3); | |
256 } | |
257 bb.nl(); | |
258 outstream.print(bb); | |
259 } | |
260 } | |
261 } | |
262 | |
263 //Report timing and results | |
264 t.stop(); | |
265 outstream.println(); | |
266 outstream.println(Tools.timeQueriesComparisonsProcessed(t, queriesProcessed, comparisons, 8)); | |
267 | |
268 //Throw an exception of there was an error in a thread | |
269 if(errorState){ | |
270 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); | |
271 } | |
272 } | |
273 | |
274 /*--------------------------------------------------------------*/ | |
275 /*---------------- Thread Management ----------------*/ | |
276 /*--------------------------------------------------------------*/ | |
277 | |
278 /** Spawn process threads */ | |
279 private void spawnThreads(ByteStreamWriter bsw){ | |
280 | |
281 //Do anything necessary prior to processing | |
282 | |
283 //Determine how many threads may be used | |
284 final int threads=Shared.threads(); | |
285 | |
286 //Fill a list with ProcessThreads | |
287 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads); | |
288 for(int i=0; i<threads; i++){ | |
289 alpt.add(new ProcessThread(bsw, i, threads)); | |
290 } | |
291 | |
292 //Start the threads and wait for them to finish | |
293 boolean success=ThreadWaiter.startAndWait(alpt, this); | |
294 errorState&=!success; | |
295 | |
296 //Do anything necessary after processing | |
297 | |
298 } | |
299 | |
300 @Override | |
301 public final void accumulate(ProcessThread pt){ | |
302 queriesProcessed+=pt.querysProcessedT; | |
303 comparisons+=pt.comparisonsT; | |
304 errorState|=(!pt.success); | |
305 | |
306 for(int i=0; i<taxLevels; i++){ | |
307 idLists[i].addAll(pt.idListsT[i]); | |
308 counts[i]+=pt.countsT[i]; | |
309 sums[i]+=pt.sumsT[i]; | |
310 } | |
311 } | |
312 | |
313 @Override | |
314 public final boolean success(){return !errorState;} | |
315 | |
316 /*--------------------------------------------------------------*/ | |
317 /*---------------- Inner Methods ----------------*/ | |
318 /*--------------------------------------------------------------*/ | |
319 | |
320 private static ByteStreamWriter makeBSW(FileFormat ff){ | |
321 if(ff==null){return null;} | |
322 ByteStreamWriter bsw=new ByteStreamWriter(ff); | |
323 bsw.start(); | |
324 return bsw; | |
325 } | |
326 | |
327 /*--------------------------------------------------------------*/ | |
328 /*---------------- Inner Classes ----------------*/ | |
329 /*--------------------------------------------------------------*/ | |
330 | |
331 /** This class is static to prevent accidental writing to shared variables. | |
332 * It is safe to remove the static modifier. */ | |
333 class ProcessThread extends Thread { | |
334 | |
335 //Constructor | |
336 ProcessThread(ByteStreamWriter bsw_, final int tid_, final int threads_){ | |
337 bsw=bsw_; | |
338 threadID=tid_; | |
339 threads=threads_; | |
340 listCopy=new ArrayList<Read>(ssuList.size()); | |
341 listCopy.addAll(ssuList); | |
342 for(int i=0; i<idListsT.length; i++){ | |
343 idListsT[i]=new FloatList(); | |
344 } | |
345 } | |
346 | |
347 //Called by start() | |
348 @Override | |
349 public void run(){ | |
350 //Do anything necessary prior to processing | |
351 | |
352 //Process the reads | |
353 processInner(); | |
354 | |
355 //Do anything necessary after processing | |
356 | |
357 //Indicate successful exit status | |
358 success=true; | |
359 } | |
360 | |
361 /** Iterate through the reads */ | |
362 void processInner(){ | |
363 final long limit=Tools.min(ssuList.size(), (maxReads>0 ? maxReads : Integer.MAX_VALUE)); | |
364 | |
365 for(int num=next.getAndIncrement(); num<limit; num=next.getAndIncrement()){ | |
366 Read r=ssuList.get(num); | |
367 processRead(r); | |
368 } | |
369 } | |
370 | |
371 void processRead(Read query){ | |
372 if(query.numericID<1){return;}//invalid TID | |
373 final int qid=(int)query.numericID; | |
374 if(tree.getNode(qid)==null) {return;} | |
375 if(querysProcessedT%5==0) { | |
376 Collections.shuffle(listCopy); | |
377 } | |
378 querysProcessedT++; | |
379 | |
380 ByteBuilder bb=new ByteBuilder(); | |
381 | |
382 long seen=0; | |
383 for(Read ref :listCopy){ | |
384 int rid=(int)ref.numericID; | |
385 if(rid!=qid && rid>0 && tree.getNode(rid)!=null){ | |
386 int aid=tree.commonAncestor(qid, rid); | |
387 if(aid>0){ | |
388 TaxNode tn=tree.getNode(aid); | |
389 if(tn.isRanked()){ | |
390 int level=tn.levelExtended; | |
391 long mask=1L<<level; | |
392 if(allToAll || ((mask&printLevels)!=0 && (mask&seen)==0)){ | |
393 seen|=mask; | |
394 float identity=compare(query, ref, level); | |
395 bb.append(TaxTree.levelToStringExtended(level)).tab().append(identity, 6); | |
396 bb.tab().append(qid).tab().append(rid).nl(); | |
397 } | |
398 } | |
399 } | |
400 } | |
401 } | |
402 if(bsw!=null){ | |
403 bsw.addJob(bb); | |
404 } | |
405 } | |
406 | |
407 float compare(Read query, Read ref, int level){ | |
408 comparisonsT++; | |
409 float identity=SketchObject.align(query.bases, ref.bases); | |
410 if(storeResults){idListsT[level].add(identity);} | |
411 countsT[level]++; | |
412 sumsT[level]+=identity; | |
413 return identity; | |
414 } | |
415 | |
416 /** Number of reads processed by this thread */ | |
417 protected long querysProcessedT=0; | |
418 /** Number of bases processed by this thread */ | |
419 protected long comparisonsT=0; | |
420 | |
421 /** True only if this thread has completed successfully */ | |
422 boolean success=false; | |
423 | |
424 /** Shared output stream */ | |
425 private final ByteStreamWriter bsw; | |
426 /** Thread ID */ | |
427 final int threadID; | |
428 | |
429 final int threads; | |
430 | |
431 ArrayList<Read> listCopy; | |
432 | |
433 final FloatList[] idListsT=new FloatList[taxLevels]; | |
434 long[] countsT=new long[taxLevels]; | |
435 double[] sumsT=new double[taxLevels]; | |
436 } | |
437 | |
438 /*--------------------------------------------------------------*/ | |
439 /*---------------- Fields ----------------*/ | |
440 /*--------------------------------------------------------------*/ | |
441 | |
442 /** Primary input file path */ | |
443 private String in1=null; | |
444 | |
445 private String treeFile="auto"; | |
446 | |
447 /** Primary output file path */ | |
448 private String out1=null; | |
449 | |
450 public static ArrayList<Read> ssuList=null; | |
451 | |
452 final static int taxLevels=TaxTree.numTaxLevelNamesExtended; | |
453 static final String[] printLevelsArray=new String[] {"strain", "species", "genus", "family", "order", "class", "phylum", "superkingdom", "life"}; | |
454 static final long printLevels=makePrintLevels(printLevelsArray); | |
455 | |
456 private final TaxTree tree; | |
457 | |
458 private static final long makePrintLevels(String[] names){ | |
459 long mask=0; | |
460 for(String s : names){ | |
461 int level=TaxTree.stringToLevelExtended(s); | |
462 mask|=(1L<<level); | |
463 } | |
464 return mask; | |
465 } | |
466 | |
467 private FloatList[] idLists=new FloatList[taxLevels]; | |
468 private long[] counts=new long[taxLevels]; | |
469 private double[] sums=new double[taxLevels]; | |
470 | |
471 private int minlen=0; | |
472 private int maxlen=Integer.MAX_VALUE; | |
473 private int maxns=-1; | |
474 | |
475 /*--------------------------------------------------------------*/ | |
476 | |
477 /** Number of reads processed */ | |
478 protected long queriesProcessed=0; | |
479 /** Number of bases processed */ | |
480 protected long comparisons=0; | |
481 | |
482 /** Quit after processing this many input reads; -1 means no limit */ | |
483 private long maxReads=-1; | |
484 | |
485 private AtomicInteger next=new AtomicInteger(0); | |
486 | |
487 private boolean allToAll=false; | |
488 private boolean storeResults=false; | |
489 | |
490 /*--------------------------------------------------------------*/ | |
491 /*---------------- Final Fields ----------------*/ | |
492 /*--------------------------------------------------------------*/ | |
493 | |
494 /** Primary output file */ | |
495 private final FileFormat ffout1; | |
496 | |
497 /*--------------------------------------------------------------*/ | |
498 /*---------------- Common Fields ----------------*/ | |
499 /*--------------------------------------------------------------*/ | |
500 | |
501 /** Print status messages to this output stream */ | |
502 private PrintStream outstream=System.err; | |
503 /** Print verbose messages */ | |
504 public static boolean verbose=false; | |
505 /** True if an error was encountered */ | |
506 public boolean errorState=false; | |
507 /** Overwrite existing output files */ | |
508 private boolean overwrite=false; | |
509 /** Append to existing output files */ | |
510 private boolean append=false; | |
511 /** Reads are output in input order */ | |
512 private boolean ordered=false; | |
513 | |
514 } |