comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/CompareSSU.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 package sketch;
2
3 import java.io.PrintStream;
4 import java.util.ArrayList;
5 import java.util.Collections;
6 import java.util.HashMap;
7 import java.util.Map.Entry;
8 import java.util.concurrent.atomic.AtomicInteger;
9
10 import fileIO.ByteFile;
11 import fileIO.ByteStreamWriter;
12 import fileIO.FileFormat;
13 import fileIO.ReadWrite;
14 import shared.Parse;
15 import shared.Parser;
16 import shared.PreParser;
17 import shared.ReadStats;
18 import shared.Shared;
19 import shared.Timer;
20 import shared.Tools;
21 import stream.FASTQ;
22 import stream.FastaReadInputStream;
23 import stream.Read;
24 import structures.ByteBuilder;
25 import structures.FloatList;
26 import tax.TaxNode;
27 import tax.TaxTree;
28 import template.Accumulator;
29 import template.ThreadWaiter;
30
31 /**
32 * Compares SSUs, all-to-all or fractional matrix.
33 *
34 * @author Brian Bushnell
35 * @date December 2, 2019
36 *
37 */
38 public class CompareSSU implements Accumulator<CompareSSU.ProcessThread> {
39
40 /*--------------------------------------------------------------*/
41 /*---------------- Initialization ----------------*/
42 /*--------------------------------------------------------------*/
43
44 /**
45 * Code entrance from the command line.
46 * @param args Command line arguments
47 */
48 public static void main(String[] args){
49 //Start a timer immediately upon code entrance.
50 Timer t=new Timer();
51
52 //Create an instance of this class
53 CompareSSU x=new CompareSSU(args);
54
55 //Run the object
56 x.process(t);
57
58 //Close the print stream if it was redirected
59 Shared.closeStream(x.outstream);
60 }
61
62 /**
63 * Constructor.
64 * @param args Command line arguments
65 */
66 public CompareSSU(String[] args){
67
68 {//Preparse block for help, config files, and outstream
69 PreParser pp=new PreParser(args, getClass(), false);
70 args=pp.args;
71 outstream=pp.outstream;
72 }
73
74 //Set shared static variables prior to parsing
75 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
76 ReadWrite.MAX_ZIP_THREADS=Shared.threads();
77
78 {//Parse the arguments
79 final Parser parser=parse(args);
80 Parser.processQuality();
81
82 maxReads=parser.maxReads;
83 overwrite=ReadStats.overwrite=parser.overwrite;
84 append=ReadStats.append=parser.append;
85
86 in1=parser.in1;
87
88 out1=parser.out1;
89 }
90
91 validateParams();
92 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
93 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
94 checkFileExistence(); //Ensure files can be read and written
95 checkStatics(); //Adjust file-related static fields as needed for this program
96
97 //Create output FileFormat objects
98 ffout1=FileFormat.testOutput(out1, FileFormat.TXT, null, true, overwrite, append, ordered);
99
100 tree=(treeFile==null) ? null : TaxTree.loadTaxTree(treeFile, outstream, true, false);
101
102 SSUMap.r16SFile=in1;
103 if(SSUMap.r16SFile!=null){
104 SSUMap.load(outstream);
105 HashMap<Integer, byte[]> ssuMap=SSUMap.r16SMap;
106 ssuList=new ArrayList<Read>(ssuMap.size());
107 for(Entry<Integer, byte[]> e : ssuMap.entrySet()){
108 int id=e.getKey();
109 byte[] value=e.getValue();
110 if(value.length>=minlen && value.length<=maxlen){
111 Read r=new Read(value, null, id);//Sets numeric ID to TaxID.
112 if(maxns<0 || r.countNocalls()<=maxns){
113 ssuList.add(r);
114 }
115 }
116 }
117 Collections.shuffle(ssuList);
118 }
119 for(int i=0; i<idLists.length; i++){
120 idLists[i]=new FloatList();
121 }
122 }
123
124 /*--------------------------------------------------------------*/
125 /*---------------- Initialization Helpers ----------------*/
126 /*--------------------------------------------------------------*/
127
128 /** Parse arguments from the command line */
129 private Parser parse(String[] args){
130
131 //Create a parser object
132 Parser parser=new Parser();
133
134 //Set any necessary Parser defaults here
135 //parser.foo=bar;
136
137 //Parse each argument
138 for(int i=0; i<args.length; i++){
139 String arg=args[i];
140
141 //Break arguments into their constituent parts, in the form of "a=b"
142 String[] split=arg.split("=");
143 String a=split[0].toLowerCase();
144 String b=split.length>1 ? split[1] : null;
145 if(b!=null && b.equalsIgnoreCase("null")){b=null;}
146
147 if(a.equals("verbose")){
148 verbose=Parse.parseBoolean(b);
149 }else if(a.equals("tree")){
150 treeFile=b;
151 }else if(a.equals("ordered")){
152 ordered=Parse.parseBoolean(b);
153 }else if(a.equals("ata") || a.equals("alltoall")){
154 allToAll=Parse.parseBoolean(b);
155 }else if(a.equals("store") || a.equals("storeresults")){
156 storeResults=Parse.parseBoolean(b);
157 }else if(a.equals("minlen") || a.equals("maxlength")){
158 minlen=Parse.parseIntKMG(b);
159 }else if(a.equals("maxlen") || a.equals("maxlength")){
160 maxlen=Parse.parseIntKMG(b);
161 }else if(a.equalsIgnoreCase("maxns")){
162 maxns=Parse.parseIntKMG(b);
163 }else if(a.equals("parse_flag_goes_here")){
164 long fake_variable=Parse.parseKMG(b);
165 //Set a variable here
166 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
167 //do nothing
168 }else{
169 outstream.println("Unknown parameter "+args[i]);
170 assert(false) : "Unknown parameter "+args[i];
171 }
172 }
173
174 return parser;
175 }
176
177 /** Ensure files can be read and written */
178 private void checkFileExistence(){
179 //Ensure output files can be written
180 if(!Tools.testOutputFiles(overwrite, append, false, out1)){
181 outstream.println((out1==null)+", "+out1);
182 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out1+"\n");
183 }
184
185 //Ensure input files can be read
186 if(!Tools.testInputFiles(false, true, in1)){
187 throw new RuntimeException("\nCan't read some input files.\n");
188 }
189
190 //Ensure that no file was specified multiple times
191 if(!Tools.testForDuplicateFiles(true, in1, out1)){
192 throw new RuntimeException("\nSome file names were specified multiple times.\n");
193 }
194 }
195
196 /** Adjust file-related static fields as needed for this program */
197 private static void checkStatics(){
198 //Adjust the number of threads for input file reading
199 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
200 ByteFile.FORCE_MODE_BF2=true;
201 }
202
203 assert(FastaReadInputStream.settingsOK());
204 }
205
206 /** Ensure parameter ranges are within bounds and required parameters are set */
207 private boolean validateParams(){
208 return true;
209 }
210
211 /*--------------------------------------------------------------*/
212 /*---------------- Outer Methods ----------------*/
213 /*--------------------------------------------------------------*/
214
215 /** Create read streams and process all data */
216 void process(Timer t){
217
218 ByteStreamWriter bsw=makeBSW(ffout1);
219 if(bsw!=null){
220 bsw.forcePrint("#Level\tIdentity\tQueryID\tRefID\n");
221 }
222
223 //Reset counters
224 queriesProcessed=0;
225 comparisons=0;
226
227 //Process the reads in separate threads
228 spawnThreads(bsw);
229
230 if(verbose){outstream.println("Finished; closing streams.");}
231
232 //Close the read streams
233 if(bsw!=null){errorState|=bsw.poisonAndWait();}
234
235 {
236 ByteBuilder bb=new ByteBuilder();
237 bb.append("\nLevel \tCount\tMean"+(storeResults ? "\tMedian\t90%ile\t10%ile\tSTDev" : "")+"\n");
238 outstream.print(bb);
239 final int minlen="superkingdom".length();
240 for(int level=0; level<taxLevels; level++){
241 if(counts[level]>0){
242 bb.clear();
243 bb.append(TaxTree.levelToStringExtended(level));
244 while(bb.length()<minlen){bb.space();}
245 bb.tab().append(counts[level]).tab();
246 bb.append(sums[level]/counts[level]*100, 3);
247 if(storeResults){
248 FloatList list=idLists[level];
249 list.sort();
250 double stdev=list.stdev();
251 double median=list.median();
252 // double mode=list.mode();
253 double percent90=list.percentile(0.9f);
254 double percent10=list.percentile(0.1f);
255 bb.tab().append(median*100, 3).tab().append(percent90*100, 3).tab().append(percent10*100, 3).tab().append(stdev*100, 3);
256 }
257 bb.nl();
258 outstream.print(bb);
259 }
260 }
261 }
262
263 //Report timing and results
264 t.stop();
265 outstream.println();
266 outstream.println(Tools.timeQueriesComparisonsProcessed(t, queriesProcessed, comparisons, 8));
267
268 //Throw an exception of there was an error in a thread
269 if(errorState){
270 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
271 }
272 }
273
274 /*--------------------------------------------------------------*/
275 /*---------------- Thread Management ----------------*/
276 /*--------------------------------------------------------------*/
277
278 /** Spawn process threads */
279 private void spawnThreads(ByteStreamWriter bsw){
280
281 //Do anything necessary prior to processing
282
283 //Determine how many threads may be used
284 final int threads=Shared.threads();
285
286 //Fill a list with ProcessThreads
287 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
288 for(int i=0; i<threads; i++){
289 alpt.add(new ProcessThread(bsw, i, threads));
290 }
291
292 //Start the threads and wait for them to finish
293 boolean success=ThreadWaiter.startAndWait(alpt, this);
294 errorState&=!success;
295
296 //Do anything necessary after processing
297
298 }
299
300 @Override
301 public final void accumulate(ProcessThread pt){
302 queriesProcessed+=pt.querysProcessedT;
303 comparisons+=pt.comparisonsT;
304 errorState|=(!pt.success);
305
306 for(int i=0; i<taxLevels; i++){
307 idLists[i].addAll(pt.idListsT[i]);
308 counts[i]+=pt.countsT[i];
309 sums[i]+=pt.sumsT[i];
310 }
311 }
312
313 @Override
314 public final boolean success(){return !errorState;}
315
316 /*--------------------------------------------------------------*/
317 /*---------------- Inner Methods ----------------*/
318 /*--------------------------------------------------------------*/
319
320 private static ByteStreamWriter makeBSW(FileFormat ff){
321 if(ff==null){return null;}
322 ByteStreamWriter bsw=new ByteStreamWriter(ff);
323 bsw.start();
324 return bsw;
325 }
326
327 /*--------------------------------------------------------------*/
328 /*---------------- Inner Classes ----------------*/
329 /*--------------------------------------------------------------*/
330
331 /** This class is static to prevent accidental writing to shared variables.
332 * It is safe to remove the static modifier. */
333 class ProcessThread extends Thread {
334
335 //Constructor
336 ProcessThread(ByteStreamWriter bsw_, final int tid_, final int threads_){
337 bsw=bsw_;
338 threadID=tid_;
339 threads=threads_;
340 listCopy=new ArrayList<Read>(ssuList.size());
341 listCopy.addAll(ssuList);
342 for(int i=0; i<idListsT.length; i++){
343 idListsT[i]=new FloatList();
344 }
345 }
346
347 //Called by start()
348 @Override
349 public void run(){
350 //Do anything necessary prior to processing
351
352 //Process the reads
353 processInner();
354
355 //Do anything necessary after processing
356
357 //Indicate successful exit status
358 success=true;
359 }
360
361 /** Iterate through the reads */
362 void processInner(){
363 final long limit=Tools.min(ssuList.size(), (maxReads>0 ? maxReads : Integer.MAX_VALUE));
364
365 for(int num=next.getAndIncrement(); num<limit; num=next.getAndIncrement()){
366 Read r=ssuList.get(num);
367 processRead(r);
368 }
369 }
370
371 void processRead(Read query){
372 if(query.numericID<1){return;}//invalid TID
373 final int qid=(int)query.numericID;
374 if(tree.getNode(qid)==null) {return;}
375 if(querysProcessedT%5==0) {
376 Collections.shuffle(listCopy);
377 }
378 querysProcessedT++;
379
380 ByteBuilder bb=new ByteBuilder();
381
382 long seen=0;
383 for(Read ref :listCopy){
384 int rid=(int)ref.numericID;
385 if(rid!=qid && rid>0 && tree.getNode(rid)!=null){
386 int aid=tree.commonAncestor(qid, rid);
387 if(aid>0){
388 TaxNode tn=tree.getNode(aid);
389 if(tn.isRanked()){
390 int level=tn.levelExtended;
391 long mask=1L<<level;
392 if(allToAll || ((mask&printLevels)!=0 && (mask&seen)==0)){
393 seen|=mask;
394 float identity=compare(query, ref, level);
395 bb.append(TaxTree.levelToStringExtended(level)).tab().append(identity, 6);
396 bb.tab().append(qid).tab().append(rid).nl();
397 }
398 }
399 }
400 }
401 }
402 if(bsw!=null){
403 bsw.addJob(bb);
404 }
405 }
406
407 float compare(Read query, Read ref, int level){
408 comparisonsT++;
409 float identity=SketchObject.align(query.bases, ref.bases);
410 if(storeResults){idListsT[level].add(identity);}
411 countsT[level]++;
412 sumsT[level]+=identity;
413 return identity;
414 }
415
416 /** Number of reads processed by this thread */
417 protected long querysProcessedT=0;
418 /** Number of bases processed by this thread */
419 protected long comparisonsT=0;
420
421 /** True only if this thread has completed successfully */
422 boolean success=false;
423
424 /** Shared output stream */
425 private final ByteStreamWriter bsw;
426 /** Thread ID */
427 final int threadID;
428
429 final int threads;
430
431 ArrayList<Read> listCopy;
432
433 final FloatList[] idListsT=new FloatList[taxLevels];
434 long[] countsT=new long[taxLevels];
435 double[] sumsT=new double[taxLevels];
436 }
437
438 /*--------------------------------------------------------------*/
439 /*---------------- Fields ----------------*/
440 /*--------------------------------------------------------------*/
441
442 /** Primary input file path */
443 private String in1=null;
444
445 private String treeFile="auto";
446
447 /** Primary output file path */
448 private String out1=null;
449
450 public static ArrayList<Read> ssuList=null;
451
452 final static int taxLevels=TaxTree.numTaxLevelNamesExtended;
453 static final String[] printLevelsArray=new String[] {"strain", "species", "genus", "family", "order", "class", "phylum", "superkingdom", "life"};
454 static final long printLevels=makePrintLevels(printLevelsArray);
455
456 private final TaxTree tree;
457
458 private static final long makePrintLevels(String[] names){
459 long mask=0;
460 for(String s : names){
461 int level=TaxTree.stringToLevelExtended(s);
462 mask|=(1L<<level);
463 }
464 return mask;
465 }
466
467 private FloatList[] idLists=new FloatList[taxLevels];
468 private long[] counts=new long[taxLevels];
469 private double[] sums=new double[taxLevels];
470
471 private int minlen=0;
472 private int maxlen=Integer.MAX_VALUE;
473 private int maxns=-1;
474
475 /*--------------------------------------------------------------*/
476
477 /** Number of reads processed */
478 protected long queriesProcessed=0;
479 /** Number of bases processed */
480 protected long comparisons=0;
481
482 /** Quit after processing this many input reads; -1 means no limit */
483 private long maxReads=-1;
484
485 private AtomicInteger next=new AtomicInteger(0);
486
487 private boolean allToAll=false;
488 private boolean storeResults=false;
489
490 /*--------------------------------------------------------------*/
491 /*---------------- Final Fields ----------------*/
492 /*--------------------------------------------------------------*/
493
494 /** Primary output file */
495 private final FileFormat ffout1;
496
497 /*--------------------------------------------------------------*/
498 /*---------------- Common Fields ----------------*/
499 /*--------------------------------------------------------------*/
500
501 /** Print status messages to this output stream */
502 private PrintStream outstream=System.err;
503 /** Print verbose messages */
504 public static boolean verbose=false;
505 /** True if an error was encountered */
506 public boolean errorState=false;
507 /** Overwrite existing output files */
508 private boolean overwrite=false;
509 /** Append to existing output files */
510 private boolean append=false;
511 /** Reads are output in input order */
512 private boolean ordered=false;
513
514 }