comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/fileIO/ByteStreamWriter.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 package fileIO;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.OutputStream;
6 import java.util.HashMap;
7 import java.util.Locale;
8 import java.util.concurrent.ArrayBlockingQueue;
9
10 import assemble.Contig;
11 import dna.AminoAcid;
12 import dna.Data;
13 import kmer.AbstractKmerTable;
14 import shared.Shared;
15 import shared.Timer;
16 import shared.Tools;
17 import stream.Read;
18 import structures.ByteBuilder;
19 import ukmer.AbstractKmerTableU;
20
21
22
23 /**
24 * @author Brian Bushnell
25 * @date Oct 21, 2014
26 *
27 */
28 public class ByteStreamWriter extends Thread {
29
30 /*--------------------------------------------------------------*/
31 /*---------------- Initialization ----------------*/
32 /*--------------------------------------------------------------*/
33
34 public static void main(String[] args){
35 Timer t=new Timer();
36 final int alen=1000;
37 byte[] array=new byte[alen];
38 for(int i=0; i<array.length; i++){
39 array[i]=AminoAcid.numberToBase[i&3];
40 }
41 array[array.length-1]='\n';
42 long iters=Long.parseLong(args[1]);
43 String fname=args[0];
44 ByteStreamWriter bsw=new ByteStreamWriter(fname, true, false, true);
45 bsw.start();
46 for(long i=0; i<iters; i++){
47 bsw.print(array);
48 }
49 bsw.poisonAndWait();
50 t.stop();
51 System.err.println("MB/s: \t"+String.format(Locale.ROOT, "%.2f", ((alen*iters)/(t.elapsed/1000.0))));
52 System.err.println("Time: \t"+t);
53 }
54
55 /** @See primary constructor */
56 public ByteStreamWriter(String fname_, boolean overwrite_, boolean append_, boolean allowSubprocess_){
57 this(fname_, overwrite_, append_, allowSubprocess_, 0);
58 }
59
60 /** @See primary constructor */
61 public ByteStreamWriter(String fname_, boolean overwrite_, boolean append_, boolean allowSubprocess_, int format){
62 this(FileFormat.testOutput(fname_, FileFormat.TEXT, format, 0, allowSubprocess_, overwrite_, append_, false));
63 }
64
65 /**
66 * Create a ByteStreamWriter for this FileFormat.
67 * @param ff Contains information about the file name, output format, etc.
68 */
69 public ByteStreamWriter(FileFormat ff){
70 FASTQ=ff.fastq() || ff.text();
71 FASTA=ff.fasta();
72 BREAD=ff.bread();
73 SAM=ff.samOrBam();
74 BAM=ff.bam();
75 SITES=ff.sites();
76 INFO=ff.attachment();
77 OTHER=(!FASTQ && !FASTA && !BREAD && !SAM && !BAM && !SITES && !INFO);
78
79
80 fname=ff.name();
81 overwrite=ff.overwrite();
82 append=ff.append();
83 allowSubprocess=ff.allowSubprocess();
84 ordered=ff.ordered();
85 assert(!(overwrite&append));
86 assert(ff.canWrite()) : "File "+fname+" exists "+(new File(ff.name()).canWrite() ?
87 ("and overwrite="+overwrite+".\nPlease add the flag ow to overwrite the file.\n") :
88 "and is read-only.");
89 if(append && !(ff.raw() || ff.gzip())){throw new RuntimeException("Can't append to compressed files.");}
90
91 if(!BAM || !(Data.SAMTOOLS() /*|| Data.SAMBAMBA()*/) /*|| !Data.SH()*/){
92 outstream=ReadWrite.getOutputStream(fname, append, true, allowSubprocess);
93 }else{
94 if(Data.SAMTOOLS()){
95 outstream=ReadWrite.getOutputStreamFromProcess(fname, "samtools view -S -b -h - ", true, append, true, true);
96 }else{
97 outstream=ReadWrite.getOutputStreamFromProcess(fname, "sambamba view -S -f bam -h ", true, append, true, true); //Sambamba does not support stdin
98 }
99 }
100
101 queue=new ArrayBlockingQueue<ByteBuilder>(5);
102 if(ordered){
103 buffer=null;
104 map=new HashMap<Long, ByteBuilder>(MAX_CAPACITY);
105 }else{
106 buffer=new ByteBuilder(initialLen);
107 map=null;
108 }
109 }
110
111 public static ByteStreamWriter makeBSW(FileFormat ff){
112 if(ff==null){return null;}
113 ByteStreamWriter bsw=new ByteStreamWriter(ff);
114 bsw.start();
115 return bsw;
116 }
117
118 /*--------------------------------------------------------------*/
119 /*---------------- Primary Method ----------------*/
120 /*--------------------------------------------------------------*/
121
122
123 @Override
124 public void run() {
125 if(verbose){System.err.println("running");}
126 assert(open) : fname;
127
128 synchronized(this){
129 started=true;
130 this.notify();
131 }
132
133 if(verbose){System.err.println("waiting for jobs");}
134
135 processJobs();
136
137 if(verbose){System.err.println("null/poison job");}
138 // assert(false);
139 open=false;
140 ReadWrite.finishWriting(null, outstream, fname, allowSubprocess);
141 if(verbose){System.err.println("finish writing");}
142 synchronized(this){notifyAll();}
143 if(verbose){System.err.println("done");}
144 }
145
146 public void processJobs() {
147
148 ByteBuilder job=null;
149 while(job==null){
150 try {
151 job=queue.take();
152 // job.list=queue.take();
153 } catch (InterruptedException e) {
154 // TODO Auto-generated catch block
155 e.printStackTrace();
156 }
157 }
158
159 if(verbose){System.err.println("processing jobs");}
160 while(job!=null && job!=POISON2){
161 if(job.length()>0){
162 try {
163 outstream.write(job.array, 0, job.length());
164 } catch (IOException e) {
165 // TODO Auto-generated catch block
166 e.printStackTrace();
167 }
168 }
169
170 job=null;
171 while(job==null){
172 try {
173 job=queue.take();
174 } catch (InterruptedException e) {
175 // TODO Auto-generated catch block
176 e.printStackTrace();
177 }
178 }
179 }
180 }
181
182 /*--------------------------------------------------------------*/
183 /*---------------- Control and Helpers ----------------*/
184 /*--------------------------------------------------------------*/
185
186
187 @Override
188 public synchronized void start(){
189 super.start();
190 if(verbose){System.err.println(this.getState());}
191 synchronized(this){
192 while(!started){
193 try {
194 this.wait(20);
195 } catch (InterruptedException e) {
196 // TODO Auto-generated catch block
197 e.printStackTrace();
198 }
199 }
200 }
201 }
202
203
204 public synchronized void poison(){
205 //Don't allow thread to shut down before it has started
206 while(!started || this.getState()==Thread.State.NEW){
207 try {
208 this.wait(20);
209 } catch (InterruptedException e) {
210 // TODO Auto-generated catch block
211 e.printStackTrace();
212 }
213 }
214
215 if(!open){return;}
216
217 if(ordered){
218 addOrdered(POISON2, maxJobID+1);
219 }else{
220 if(buffer!=null){addJob(buffer);}
221 }
222 buffer=null;
223 // System.err.println("Poisoned!");
224 // assert(false);
225
226 // assert(false) : open+", "+this.getState()+", "+started;
227 open=false;
228 addJob(POISON2);
229 }
230
231 /**
232 * Wait for this object's thread to terminate.
233 * Should be poisoned first.
234 */
235 public void waitForFinish(){
236 while(this.getState()!=Thread.State.TERMINATED){
237 try {
238 this.join(1000);
239 } catch (InterruptedException e) {
240 e.printStackTrace();
241 }
242 }
243 }
244
245 /**
246 * Poison the thread, and wait for it to terminate.
247 * @return true if there was an error, false otherwise
248 */
249 public boolean poisonAndWait(){
250 poison();
251 waitForFinish();
252 return errorState;
253 }
254
255 //TODO Why is this synchronized?
256 public synchronized void addJob(ByteBuilder bb){
257 // System.err.println("Got job "+(j.list==null ? "null" : j.list.size()));
258
259 assert(started) : "Wait for start() to return before using the writer.";
260 // while(!started || this.getState()==Thread.State.NEW){
261 // try {
262 // this.wait(20);
263 // } catch (InterruptedException e) {
264 // // TODO Auto-generated catch block
265 // e.printStackTrace();
266 // }
267 // }
268
269 boolean success=false;
270 while(!success){
271 try {
272 queue.put(bb);
273 success=true;
274 } catch (InterruptedException e) {
275 // TODO Auto-generated catch block
276 e.printStackTrace();
277 assert(!queue.contains(bb)); //Hopefully it was not added.
278 }
279 }
280 }
281
282 public final void forceFlushBuffer(){
283 flushBuffer(true);
284 }
285
286 /** Called after every write to the buffer */
287 public final void flushBuffer(boolean force){
288 final int x=buffer.length();
289 if(x>=maxLen || (force && x>0)){
290 addJob(buffer);
291 buffer=new ByteBuilder(initialLen);
292 }
293 }
294
295
296 /*--------------------------------------------------------------*/
297 /*---------------- Ordering ----------------*/
298 /*--------------------------------------------------------------*/
299
300 public synchronized void add(ByteBuilder job, long jobID){
301
302 if(ordered){
303 int size=map.size();
304 // System.err.print(size+", ");
305 // System.err.println("A.Adding job "+jobID+"; next="+nextJobID+"; max="+maxJobID+", map="+map.keySet());
306 final boolean flag=(size>=HALF_LIMIT);
307 if(jobID>nextJobID && size>=ADD_LIMIT){
308 // if(printBufferNotification){
309 // System.err.println("Output buffer became full; key "+jobID+" waiting on "+nextJobID+".");
310 // printBufferNotification=false;
311 // }
312 while(jobID>nextJobID && size>=HALF_LIMIT){
313 try {
314 this.wait(2000);
315 } catch (InterruptedException e) {
316 e.printStackTrace();
317 }
318 size=map.size();
319 }
320 // if(printBufferNotification){
321 // System.err.println("Output buffer became clear for key "+jobID+"; next="+nextJobID+", size="+size);
322 // }
323 }
324 // System.err.println("B.Adding ordered job "+jobID+"; next="+nextJobID+"; max="+maxJobID);
325 addOrdered(job, jobID);
326 assert(jobID!=nextJobID);
327 if(flag && jobID<nextJobID){this.notifyAll();}
328 }else{
329 addDisordered(job);
330 }
331 }
332
333 private synchronized void addOrdered(ByteBuilder job, long jobID){
334 // System.err.println("addOrdered "+jobID+"; nextJobID="+nextJobID);
335 // assert(false);
336 assert(ordered);
337 assert(job!=null) : jobID;
338 assert(jobID>=nextJobID) : jobID+", "+nextJobID;
339 maxJobID=Tools.max(maxJobID, jobID);
340 ByteBuilder old=map.put(jobID, job);
341 assert(old==null);
342 // System.err.println("C.Adding ordered job "+jobID+"; next="+nextJobID+"; max="+maxJobID+", map="+map.keySet());
343
344 if(jobID==nextJobID){
345 do{
346 ByteBuilder value=map.remove(nextJobID);
347 // System.err.println("Removing and queueing "+nextJobID+": "+value.toString());
348 addJob(value);
349 nextJobID++;
350 // System.err.println("D.nextJobID="+nextJobID);
351 }while(map.containsKey(nextJobID));
352
353 if(map.isEmpty()){notifyAll();}
354 }else{
355 assert(!map.containsKey(nextJobID));
356 }
357 }
358
359 private synchronized void addDisordered(ByteBuilder job){
360 assert(!ordered);
361 assert(buffer==null || buffer.isEmpty());
362 addJob(job);
363 }
364
365 /*--------------------------------------------------------------*/
366 /*---------------- Print ----------------*/
367 /*--------------------------------------------------------------*/
368
369 /**
370 * Skip the buffers and print directly.
371 * Mainly for headers with ordered streams.
372 * @param s String to print.
373 */
374 public void forcePrint(String s){
375 forcePrint(s.getBytes());
376 }
377
378 /**
379 * Skip the buffers and print directly.
380 * Mainly for headers with ordered streams.
381 * @param b Data to print.
382 */
383 public void forcePrint(byte[] b){
384 try {
385 outstream.write(b, 0, b.length);
386 } catch (IOException e) {
387 // TODO Auto-generated catch block
388 e.printStackTrace();
389 }
390 }
391
392
393 public ByteBuilder getBuffer() {
394 assert(open);
395 assert(buffer!=null);
396 return buffer;
397 }
398
399 @Deprecated
400 /** Avoid using this if possible. */
401 public ByteStreamWriter print(CharSequence x){
402 if(verbose){System.err.println("Added line '"+x+"'");}
403 assert(open) : x;
404 buffer.append(x);
405 flushBuffer(false);
406 return this;
407 }
408
409 @Deprecated
410 /** Avoid using this if possible. */
411 public ByteStreamWriter print(StringBuilder x){
412 if(verbose){System.err.println("Added line '"+x+"'");}
413 assert(open) : x;
414 buffer.append(x);
415 flushBuffer(false);
416 return this;
417 }
418
419 @Deprecated
420 /** Avoid using this if possible. */
421 public ByteStreamWriter print(String x){
422 if(verbose){System.err.println("Added line '"+x+"'");}
423 assert(open) : x;
424 buffer.append(x);
425 flushBuffer(false);
426 return this;
427 }
428
429 public ByteStreamWriter tab(){return print('\t');}
430 public ByteStreamWriter nl(){return print('\n');}
431
432 public ByteStreamWriter print(boolean x){
433 if(verbose){System.err.println("Added line '"+x+"'");}
434 assert(open) : x;
435 buffer.append(x);
436 flushBuffer(false);
437 return this;
438 }
439
440 public ByteStreamWriter print(int x){
441 if(verbose){System.err.println("Added line '"+(x)+"'");}
442 assert(open) : x;
443 buffer.append(x);
444 flushBuffer(false);
445 return this;
446 }
447
448 public ByteStreamWriter print(long x){
449 if(verbose){System.err.println("Added line '"+(x)+"'");}
450 assert(open) : x;
451 buffer.append(x);
452 flushBuffer(false);
453 return this;
454 }
455
456 // public ByteStreamWriter print(float x){
457 // if(verbose){System.err.println("Added line '"+(x)+"'");}
458 // assert(open) : x;
459 // buffer.appendSlow(x);
460 // flushBuffer(false);
461 // return this;
462 // }
463 //
464 // public ByteStreamWriter print(double x){
465 // if(verbose){System.err.println("Added line '"+(x)+"'");}
466 // assert(open) : x;
467 // buffer.appendSlow(x);
468 // flushBuffer(false);
469 // return this;
470 // }
471
472 public ByteStreamWriter print(float x, int decimals){
473 if(verbose){System.err.println("Added line '"+(x)+"'");}
474 assert(open) : x;
475 buffer.append(x, decimals);
476 flushBuffer(false);
477 return this;
478 }
479
480 public ByteStreamWriter print(double x, int decimals){
481 if(verbose){System.err.println("Added line '"+(x)+"'");}
482 assert(open) : x;
483 buffer.append(x, decimals);
484 flushBuffer(false);
485 return this;
486 }
487
488 public ByteStreamWriter print(byte x){
489 if(verbose){System.err.println("Added line '"+((char)x)+"'");}
490 assert(open) : ((char)x);
491 buffer.append(x);
492 flushBuffer(false);
493 return this;
494 }
495
496 public ByteStreamWriter print(char x){
497 if(verbose){System.err.println("Added line '"+(x)+"'");}
498 assert(open) : (x);
499 buffer.append(x);
500 flushBuffer(false);
501 return this;
502 }
503
504 public ByteStreamWriter print(byte[] x){
505 if(verbose){System.err.println("Added line '"+new String(x)+"'");}
506 assert(open) : new String(x);
507 buffer.append(x);
508 flushBuffer(false);
509 return this;
510 }
511
512 public ByteStreamWriter println(byte[] x){
513 if(verbose){System.err.println("Added line '"+new String(x)+"'");}
514 assert(open) : new String(x);
515 buffer.append(x).nl();
516 flushBuffer(false);
517 return this;
518 }
519
520 public ByteStreamWriter print(byte[] x, int len){
521 if(verbose){System.err.println("Added line '"+new String(x)+"'");}
522 assert(open) : new String(x);
523 buffer.append(x, len);
524 flushBuffer(false);
525 return this;
526 }
527
528 public ByteStreamWriter print(char[] x){
529 if(verbose){System.err.println("Added line '"+new String(x)+"'");}
530 assert(open) : new String(x);
531 buffer.append(x);
532 flushBuffer(false);
533 return this;
534 }
535
536 public ByteStreamWriter print(ByteBuilder x){
537 if(verbose){System.err.println("Added line '"+x+"'");}
538 assert(open) : x;
539 buffer.append(x);
540 flushBuffer(false);
541 return this;
542 }
543
544 public ByteStreamWriter print(ByteBuilder x, boolean destroy){
545 if(!destroy || buffer.length()>0){print(x);}
546 else{
547 if(verbose){System.err.println("Added line '"+x+"'");}
548 assert(open) : x;
549 addJob(x);
550 }
551 return this;
552 }
553
554 public ByteStreamWriter print(Read r){
555 assert(!OTHER);
556 ByteBuilder x=(FASTQ ? r.toFastq(buffer) : FASTA ? r.toFasta(FASTA_WRAP, buffer) : SAM ? r.toSam(buffer) :
557 SITES ? r.toSites(buffer) : INFO ? r.toInfo(buffer) : r.toText(true, buffer));
558 flushBuffer(false);
559 return this;
560 }
561
562 public ByteStreamWriter print(Contig c){
563 assert(!OTHER);
564 c.toFasta(FASTA_WRAP, buffer);
565 flushBuffer(false);
566 return this;
567 }
568
569 public ByteStreamWriter printKmer(long kmer, long count, int k){
570 AbstractKmerTable.toBytes(kmer, count, k, buffer);
571 flushBuffer(false);
572 return this;
573 }
574
575 public ByteStreamWriter printKmer(long kmer, int[] values, int k){
576 AbstractKmerTable.toBytes(kmer, values, k, buffer);
577 flushBuffer(false);
578 return this;
579 }
580
581 public ByteStreamWriter printKmer(long[] array, long count, int k){
582 AbstractKmerTableU.toBytes(array, count, k, buffer);
583 flushBuffer(false);
584 return this;
585 }
586
587 public ByteStreamWriter printKmer(long[] array, int[] values, int k){
588 AbstractKmerTableU.toBytes(array, values, k, buffer);
589 flushBuffer(false);
590 return this;
591 }
592
593 // public ByteStreamWriter printKmer(long kmer, long[] values, int k){
594 // kmer64.AbstractKmerTable64.toBytes(kmer, values, k, buffer);
595 // flushBuffer(false);
596 // return this;
597 // }
598
599
600 /*--------------------------------------------------------------*/
601 /*---------------- Println ----------------*/
602 /*--------------------------------------------------------------*/
603
604
605 public ByteStreamWriter println(){return print('\n');}
606 public ByteStreamWriter println(CharSequence x){print(x); return print('\n');}
607 public ByteStreamWriter println(String x){print(x); return print('\n');}
608 public ByteStreamWriter println(StringBuilder x){print(x); return print('\n');}
609 public ByteStreamWriter println(int x){print(x); return print('\n');}
610 public ByteStreamWriter println(long x){print(x); return print('\n');}
611 // public void println(float x){print(x); print('\n');}
612 // public void println(double x){print(x); print('\n');}
613 public ByteStreamWriter println(float x, int d){print(x, d); return print('\n');}
614 public ByteStreamWriter println(double x, int d){print(x, d); return print('\n');}
615 public ByteStreamWriter println(byte x){print(x); return print('\n');}
616 public ByteStreamWriter println(char x){print(x); return print('\n');}
617 // public ByteStreamWriter println(byte[] x){print(x); return print('\n');}
618 public ByteStreamWriter println(char[] x){print(x); return print('\n');}
619 public ByteStreamWriter println(ByteBuilder x){print(x); return print('\n');}
620 public ByteStreamWriter println(ByteBuilder x, boolean destroy){
621 if(destroy){return print(x.append('\n'));}else{print(x); return print('\n');}
622 }
623 public ByteStreamWriter printlnKmer(long kmer, int count, int k){printKmer(kmer, count, k); return print('\n');}
624 public ByteStreamWriter printlnKmer(long kmer, int[] values, int k){printKmer(kmer, values, k); return print('\n');}
625 public ByteStreamWriter printlnKmer(long[] array, int count, int k){printKmer(array, count, k); return print('\n');}
626 public ByteStreamWriter printlnKmer(long[] array, int[] values, int k){printKmer(array, values, k); return print('\n');}
627 public ByteStreamWriter println(Read r){print(r); return print('\n');}
628 public ByteStreamWriter println(Contig c){print(c); return print('\n');}
629
630 public ByteStreamWriter printlnKmer(long kmer, long count, int k){printKmer(kmer, count, k); return print('\n');}
631 // public ByteStreamWriter printlnKmer(long kmer, long[] values, int k){printKmer(kmer, values, k); return print('\n');}
632 public ByteStreamWriter printlnKmer(long[] array, long count, int k){printKmer(array, count, k); return print('\n');}
633 // public ByteStreamWriter printlnKmer(long[] array, long[] values, int k){printKmer(array, values, k); return print('\n');}
634
635
636
637 public ByteStreamWriter println(Read r, boolean paired){
638 println(r);
639 if(paired && r.mate!=null){println(r.mate);}
640 return this;
641 }
642
643 /*--------------------------------------------------------------*/
644 /*---------------- Inherited ----------------*/
645 /*--------------------------------------------------------------*/
646
647 @Override
648 public String toString(){
649 return "BSW for "+fname;
650 }
651
652 /*--------------------------------------------------------------*/
653 /*---------------- Fields ----------------*/
654 /*--------------------------------------------------------------*/
655
656 private ByteBuilder buffer;
657
658 public int initialLen=36000;
659 public int maxLen=32768;
660 public final boolean overwrite;
661 public final boolean append;
662 public final boolean allowSubprocess;
663 public final String fname;
664 public final boolean ordered;
665 private final OutputStream outstream;
666 private final ArrayBlockingQueue<ByteBuilder> queue;
667
668 /** For ordered output */
669 private final HashMap<Long, ByteBuilder> map;
670 private long nextJobID=0;
671 private long maxJobID=-1;
672
673 private boolean open=true;
674 private volatile boolean started=false;
675
676 /** TODO */
677 public boolean errorState=false;
678
679 /*--------------------------------------------------------------*/
680
681 private final boolean BAM;
682 private final boolean SAM;
683 private final boolean FASTQ;
684 private final boolean FASTA;
685 private final boolean BREAD;
686 private final boolean SITES;
687 private final boolean INFO;
688 private final boolean OTHER;
689
690 private final int FASTA_WRAP=Shared.FASTA_WRAP;
691
692 /*--------------------------------------------------------------*/
693
694 // private static final ByteBuilder POISON=new ByteBuilder("POISON_ByteStreamWriter");
695 private static final ByteBuilder POISON2=new ByteBuilder(1);
696
697 public static boolean verbose=false;
698 /** Number of lists held before the stream blocks */
699 private final int MAX_CAPACITY=256;
700 private final int ADD_LIMIT=MAX_CAPACITY/2;
701 private final int HALF_LIMIT=ADD_LIMIT/4;
702
703 }