Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/fileIO/ByteStreamWriter.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/fileIO/ByteStreamWriter.java Tue Mar 18 16:23:26 2025 -0400 @@ -0,0 +1,703 @@ +package fileIO; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Locale; +import java.util.concurrent.ArrayBlockingQueue; + +import assemble.Contig; +import dna.AminoAcid; +import dna.Data; +import kmer.AbstractKmerTable; +import shared.Shared; +import shared.Timer; +import shared.Tools; +import stream.Read; +import structures.ByteBuilder; +import ukmer.AbstractKmerTableU; + + + +/** + * @author Brian Bushnell + * @date Oct 21, 2014 + * + */ +public class ByteStreamWriter extends Thread { + + /*--------------------------------------------------------------*/ + /*---------------- Initialization ----------------*/ + /*--------------------------------------------------------------*/ + + public static void main(String[] args){ + Timer t=new Timer(); + final int alen=1000; + byte[] array=new byte[alen]; + for(int i=0; i<array.length; i++){ + array[i]=AminoAcid.numberToBase[i&3]; + } + array[array.length-1]='\n'; + long iters=Long.parseLong(args[1]); + String fname=args[0]; + ByteStreamWriter bsw=new ByteStreamWriter(fname, true, false, true); + bsw.start(); + for(long i=0; i<iters; i++){ + bsw.print(array); + } + bsw.poisonAndWait(); + t.stop(); + System.err.println("MB/s: \t"+String.format(Locale.ROOT, "%.2f", ((alen*iters)/(t.elapsed/1000.0)))); + System.err.println("Time: \t"+t); + } + + /** @See primary constructor */ + public ByteStreamWriter(String fname_, boolean overwrite_, boolean append_, boolean allowSubprocess_){ + this(fname_, overwrite_, append_, allowSubprocess_, 0); + } + + /** @See primary constructor */ + public ByteStreamWriter(String fname_, boolean overwrite_, boolean append_, boolean allowSubprocess_, int format){ + this(FileFormat.testOutput(fname_, FileFormat.TEXT, format, 0, allowSubprocess_, overwrite_, append_, false)); + } + + /** + * Create a ByteStreamWriter for this FileFormat. + * @param ff Contains information about the file name, output format, etc. + */ + public ByteStreamWriter(FileFormat ff){ + FASTQ=ff.fastq() || ff.text(); + FASTA=ff.fasta(); + BREAD=ff.bread(); + SAM=ff.samOrBam(); + BAM=ff.bam(); + SITES=ff.sites(); + INFO=ff.attachment(); + OTHER=(!FASTQ && !FASTA && !BREAD && !SAM && !BAM && !SITES && !INFO); + + + fname=ff.name(); + overwrite=ff.overwrite(); + append=ff.append(); + allowSubprocess=ff.allowSubprocess(); + ordered=ff.ordered(); + assert(!(overwrite&append)); + assert(ff.canWrite()) : "File "+fname+" exists "+(new File(ff.name()).canWrite() ? + ("and overwrite="+overwrite+".\nPlease add the flag ow to overwrite the file.\n") : + "and is read-only."); + if(append && !(ff.raw() || ff.gzip())){throw new RuntimeException("Can't append to compressed files.");} + + if(!BAM || !(Data.SAMTOOLS() /*|| Data.SAMBAMBA()*/) /*|| !Data.SH()*/){ + outstream=ReadWrite.getOutputStream(fname, append, true, allowSubprocess); + }else{ + if(Data.SAMTOOLS()){ + outstream=ReadWrite.getOutputStreamFromProcess(fname, "samtools view -S -b -h - ", true, append, true, true); + }else{ + outstream=ReadWrite.getOutputStreamFromProcess(fname, "sambamba view -S -f bam -h ", true, append, true, true); //Sambamba does not support stdin + } + } + + queue=new ArrayBlockingQueue<ByteBuilder>(5); + if(ordered){ + buffer=null; + map=new HashMap<Long, ByteBuilder>(MAX_CAPACITY); + }else{ + buffer=new ByteBuilder(initialLen); + map=null; + } + } + + public static ByteStreamWriter makeBSW(FileFormat ff){ + if(ff==null){return null;} + ByteStreamWriter bsw=new ByteStreamWriter(ff); + bsw.start(); + return bsw; + } + + /*--------------------------------------------------------------*/ + /*---------------- Primary Method ----------------*/ + /*--------------------------------------------------------------*/ + + + @Override + public void run() { + if(verbose){System.err.println("running");} + assert(open) : fname; + + synchronized(this){ + started=true; + this.notify(); + } + + if(verbose){System.err.println("waiting for jobs");} + + processJobs(); + + if(verbose){System.err.println("null/poison job");} +// assert(false); + open=false; + ReadWrite.finishWriting(null, outstream, fname, allowSubprocess); + if(verbose){System.err.println("finish writing");} + synchronized(this){notifyAll();} + if(verbose){System.err.println("done");} + } + + public void processJobs() { + + ByteBuilder job=null; + while(job==null){ + try { + job=queue.take(); +// job.list=queue.take(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + if(verbose){System.err.println("processing jobs");} + while(job!=null && job!=POISON2){ + if(job.length()>0){ + try { + outstream.write(job.array, 0, job.length()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + job=null; + while(job==null){ + try { + job=queue.take(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + } + + /*--------------------------------------------------------------*/ + /*---------------- Control and Helpers ----------------*/ + /*--------------------------------------------------------------*/ + + + @Override + public synchronized void start(){ + super.start(); + if(verbose){System.err.println(this.getState());} + synchronized(this){ + while(!started){ + try { + this.wait(20); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + } + + + public synchronized void poison(){ + //Don't allow thread to shut down before it has started + while(!started || this.getState()==Thread.State.NEW){ + try { + this.wait(20); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + if(!open){return;} + + if(ordered){ + addOrdered(POISON2, maxJobID+1); + }else{ + if(buffer!=null){addJob(buffer);} + } + buffer=null; +// System.err.println("Poisoned!"); +// assert(false); + +// assert(false) : open+", "+this.getState()+", "+started; + open=false; + addJob(POISON2); + } + + /** + * Wait for this object's thread to terminate. + * Should be poisoned first. + */ + public void waitForFinish(){ + while(this.getState()!=Thread.State.TERMINATED){ + try { + this.join(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * Poison the thread, and wait for it to terminate. + * @return true if there was an error, false otherwise + */ + public boolean poisonAndWait(){ + poison(); + waitForFinish(); + return errorState; + } + + //TODO Why is this synchronized? + public synchronized void addJob(ByteBuilder bb){ +// System.err.println("Got job "+(j.list==null ? "null" : j.list.size())); + + assert(started) : "Wait for start() to return before using the writer."; +// while(!started || this.getState()==Thread.State.NEW){ +// try { +// this.wait(20); +// } catch (InterruptedException e) { +// // TODO Auto-generated catch block +// e.printStackTrace(); +// } +// } + + boolean success=false; + while(!success){ + try { + queue.put(bb); + success=true; + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + assert(!queue.contains(bb)); //Hopefully it was not added. + } + } + } + + public final void forceFlushBuffer(){ + flushBuffer(true); + } + + /** Called after every write to the buffer */ + public final void flushBuffer(boolean force){ + final int x=buffer.length(); + if(x>=maxLen || (force && x>0)){ + addJob(buffer); + buffer=new ByteBuilder(initialLen); + } + } + + + /*--------------------------------------------------------------*/ + /*---------------- Ordering ----------------*/ + /*--------------------------------------------------------------*/ + + public synchronized void add(ByteBuilder job, long jobID){ + + if(ordered){ + int size=map.size(); +// System.err.print(size+", "); +// System.err.println("A.Adding job "+jobID+"; next="+nextJobID+"; max="+maxJobID+", map="+map.keySet()); + final boolean flag=(size>=HALF_LIMIT); + if(jobID>nextJobID && size>=ADD_LIMIT){ +// if(printBufferNotification){ +// System.err.println("Output buffer became full; key "+jobID+" waiting on "+nextJobID+"."); +// printBufferNotification=false; +// } + while(jobID>nextJobID && size>=HALF_LIMIT){ + try { + this.wait(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + size=map.size(); + } +// if(printBufferNotification){ +// System.err.println("Output buffer became clear for key "+jobID+"; next="+nextJobID+", size="+size); +// } + } +// System.err.println("B.Adding ordered job "+jobID+"; next="+nextJobID+"; max="+maxJobID); + addOrdered(job, jobID); + assert(jobID!=nextJobID); + if(flag && jobID<nextJobID){this.notifyAll();} + }else{ + addDisordered(job); + } + } + + private synchronized void addOrdered(ByteBuilder job, long jobID){ +// System.err.println("addOrdered "+jobID+"; nextJobID="+nextJobID); +// assert(false); + assert(ordered); + assert(job!=null) : jobID; + assert(jobID>=nextJobID) : jobID+", "+nextJobID; + maxJobID=Tools.max(maxJobID, jobID); + ByteBuilder old=map.put(jobID, job); + assert(old==null); +// System.err.println("C.Adding ordered job "+jobID+"; next="+nextJobID+"; max="+maxJobID+", map="+map.keySet()); + + if(jobID==nextJobID){ + do{ + ByteBuilder value=map.remove(nextJobID); + // System.err.println("Removing and queueing "+nextJobID+": "+value.toString()); + addJob(value); + nextJobID++; + // System.err.println("D.nextJobID="+nextJobID); + }while(map.containsKey(nextJobID)); + + if(map.isEmpty()){notifyAll();} + }else{ + assert(!map.containsKey(nextJobID)); + } + } + + private synchronized void addDisordered(ByteBuilder job){ + assert(!ordered); + assert(buffer==null || buffer.isEmpty()); + addJob(job); + } + + /*--------------------------------------------------------------*/ + /*---------------- Print ----------------*/ + /*--------------------------------------------------------------*/ + + /** + * Skip the buffers and print directly. + * Mainly for headers with ordered streams. + * @param s String to print. + */ + public void forcePrint(String s){ + forcePrint(s.getBytes()); + } + + /** + * Skip the buffers and print directly. + * Mainly for headers with ordered streams. + * @param b Data to print. + */ + public void forcePrint(byte[] b){ + try { + outstream.write(b, 0, b.length); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + + public ByteBuilder getBuffer() { + assert(open); + assert(buffer!=null); + return buffer; + } + + @Deprecated + /** Avoid using this if possible. */ + public ByteStreamWriter print(CharSequence x){ + if(verbose){System.err.println("Added line '"+x+"'");} + assert(open) : x; + buffer.append(x); + flushBuffer(false); + return this; + } + + @Deprecated + /** Avoid using this if possible. */ + public ByteStreamWriter print(StringBuilder x){ + if(verbose){System.err.println("Added line '"+x+"'");} + assert(open) : x; + buffer.append(x); + flushBuffer(false); + return this; + } + + @Deprecated + /** Avoid using this if possible. */ + public ByteStreamWriter print(String x){ + if(verbose){System.err.println("Added line '"+x+"'");} + assert(open) : x; + buffer.append(x); + flushBuffer(false); + return this; + } + + public ByteStreamWriter tab(){return print('\t');} + public ByteStreamWriter nl(){return print('\n');} + + public ByteStreamWriter print(boolean x){ + if(verbose){System.err.println("Added line '"+x+"'");} + assert(open) : x; + buffer.append(x); + flushBuffer(false); + return this; + } + + public ByteStreamWriter print(int x){ + if(verbose){System.err.println("Added line '"+(x)+"'");} + assert(open) : x; + buffer.append(x); + flushBuffer(false); + return this; + } + + public ByteStreamWriter print(long x){ + if(verbose){System.err.println("Added line '"+(x)+"'");} + assert(open) : x; + buffer.append(x); + flushBuffer(false); + return this; + } + +// public ByteStreamWriter print(float x){ +// if(verbose){System.err.println("Added line '"+(x)+"'");} +// assert(open) : x; +// buffer.appendSlow(x); +// flushBuffer(false); +// return this; +// } +// +// public ByteStreamWriter print(double x){ +// if(verbose){System.err.println("Added line '"+(x)+"'");} +// assert(open) : x; +// buffer.appendSlow(x); +// flushBuffer(false); +// return this; +// } + + public ByteStreamWriter print(float x, int decimals){ + if(verbose){System.err.println("Added line '"+(x)+"'");} + assert(open) : x; + buffer.append(x, decimals); + flushBuffer(false); + return this; + } + + public ByteStreamWriter print(double x, int decimals){ + if(verbose){System.err.println("Added line '"+(x)+"'");} + assert(open) : x; + buffer.append(x, decimals); + flushBuffer(false); + return this; + } + + public ByteStreamWriter print(byte x){ + if(verbose){System.err.println("Added line '"+((char)x)+"'");} + assert(open) : ((char)x); + buffer.append(x); + flushBuffer(false); + return this; + } + + public ByteStreamWriter print(char x){ + if(verbose){System.err.println("Added line '"+(x)+"'");} + assert(open) : (x); + buffer.append(x); + flushBuffer(false); + return this; + } + + public ByteStreamWriter print(byte[] x){ + if(verbose){System.err.println("Added line '"+new String(x)+"'");} + assert(open) : new String(x); + buffer.append(x); + flushBuffer(false); + return this; + } + + public ByteStreamWriter println(byte[] x){ + if(verbose){System.err.println("Added line '"+new String(x)+"'");} + assert(open) : new String(x); + buffer.append(x).nl(); + flushBuffer(false); + return this; + } + + public ByteStreamWriter print(byte[] x, int len){ + if(verbose){System.err.println("Added line '"+new String(x)+"'");} + assert(open) : new String(x); + buffer.append(x, len); + flushBuffer(false); + return this; + } + + public ByteStreamWriter print(char[] x){ + if(verbose){System.err.println("Added line '"+new String(x)+"'");} + assert(open) : new String(x); + buffer.append(x); + flushBuffer(false); + return this; + } + + public ByteStreamWriter print(ByteBuilder x){ + if(verbose){System.err.println("Added line '"+x+"'");} + assert(open) : x; + buffer.append(x); + flushBuffer(false); + return this; + } + + public ByteStreamWriter print(ByteBuilder x, boolean destroy){ + if(!destroy || buffer.length()>0){print(x);} + else{ + if(verbose){System.err.println("Added line '"+x+"'");} + assert(open) : x; + addJob(x); + } + return this; + } + + public ByteStreamWriter print(Read r){ + assert(!OTHER); + ByteBuilder x=(FASTQ ? r.toFastq(buffer) : FASTA ? r.toFasta(FASTA_WRAP, buffer) : SAM ? r.toSam(buffer) : + SITES ? r.toSites(buffer) : INFO ? r.toInfo(buffer) : r.toText(true, buffer)); + flushBuffer(false); + return this; + } + + public ByteStreamWriter print(Contig c){ + assert(!OTHER); + c.toFasta(FASTA_WRAP, buffer); + flushBuffer(false); + return this; + } + + public ByteStreamWriter printKmer(long kmer, long count, int k){ + AbstractKmerTable.toBytes(kmer, count, k, buffer); + flushBuffer(false); + return this; + } + + public ByteStreamWriter printKmer(long kmer, int[] values, int k){ + AbstractKmerTable.toBytes(kmer, values, k, buffer); + flushBuffer(false); + return this; + } + + public ByteStreamWriter printKmer(long[] array, long count, int k){ + AbstractKmerTableU.toBytes(array, count, k, buffer); + flushBuffer(false); + return this; + } + + public ByteStreamWriter printKmer(long[] array, int[] values, int k){ + AbstractKmerTableU.toBytes(array, values, k, buffer); + flushBuffer(false); + return this; + } + +// public ByteStreamWriter printKmer(long kmer, long[] values, int k){ +// kmer64.AbstractKmerTable64.toBytes(kmer, values, k, buffer); +// flushBuffer(false); +// return this; +// } + + + /*--------------------------------------------------------------*/ + /*---------------- Println ----------------*/ + /*--------------------------------------------------------------*/ + + + public ByteStreamWriter println(){return print('\n');} + public ByteStreamWriter println(CharSequence x){print(x); return print('\n');} + public ByteStreamWriter println(String x){print(x); return print('\n');} + public ByteStreamWriter println(StringBuilder x){print(x); return print('\n');} + public ByteStreamWriter println(int x){print(x); return print('\n');} + public ByteStreamWriter println(long x){print(x); return print('\n');} +// public void println(float x){print(x); print('\n');} +// public void println(double x){print(x); print('\n');} + public ByteStreamWriter println(float x, int d){print(x, d); return print('\n');} + public ByteStreamWriter println(double x, int d){print(x, d); return print('\n');} + public ByteStreamWriter println(byte x){print(x); return print('\n');} + public ByteStreamWriter println(char x){print(x); return print('\n');} +// public ByteStreamWriter println(byte[] x){print(x); return print('\n');} + public ByteStreamWriter println(char[] x){print(x); return print('\n');} + public ByteStreamWriter println(ByteBuilder x){print(x); return print('\n');} + public ByteStreamWriter println(ByteBuilder x, boolean destroy){ + if(destroy){return print(x.append('\n'));}else{print(x); return print('\n');} + } + public ByteStreamWriter printlnKmer(long kmer, int count, int k){printKmer(kmer, count, k); return print('\n');} + public ByteStreamWriter printlnKmer(long kmer, int[] values, int k){printKmer(kmer, values, k); return print('\n');} + public ByteStreamWriter printlnKmer(long[] array, int count, int k){printKmer(array, count, k); return print('\n');} + public ByteStreamWriter printlnKmer(long[] array, int[] values, int k){printKmer(array, values, k); return print('\n');} + public ByteStreamWriter println(Read r){print(r); return print('\n');} + public ByteStreamWriter println(Contig c){print(c); return print('\n');} + + public ByteStreamWriter printlnKmer(long kmer, long count, int k){printKmer(kmer, count, k); return print('\n');} +// public ByteStreamWriter printlnKmer(long kmer, long[] values, int k){printKmer(kmer, values, k); return print('\n');} + public ByteStreamWriter printlnKmer(long[] array, long count, int k){printKmer(array, count, k); return print('\n');} +// public ByteStreamWriter printlnKmer(long[] array, long[] values, int k){printKmer(array, values, k); return print('\n');} + + + + public ByteStreamWriter println(Read r, boolean paired){ + println(r); + if(paired && r.mate!=null){println(r.mate);} + return this; + } + + /*--------------------------------------------------------------*/ + /*---------------- Inherited ----------------*/ + /*--------------------------------------------------------------*/ + + @Override + public String toString(){ + return "BSW for "+fname; + } + + /*--------------------------------------------------------------*/ + /*---------------- Fields ----------------*/ + /*--------------------------------------------------------------*/ + + private ByteBuilder buffer; + + public int initialLen=36000; + public int maxLen=32768; + public final boolean overwrite; + public final boolean append; + public final boolean allowSubprocess; + public final String fname; + public final boolean ordered; + private final OutputStream outstream; + private final ArrayBlockingQueue<ByteBuilder> queue; + + /** For ordered output */ + private final HashMap<Long, ByteBuilder> map; + private long nextJobID=0; + private long maxJobID=-1; + + private boolean open=true; + private volatile boolean started=false; + + /** TODO */ + public boolean errorState=false; + + /*--------------------------------------------------------------*/ + + private final boolean BAM; + private final boolean SAM; + private final boolean FASTQ; + private final boolean FASTA; + private final boolean BREAD; + private final boolean SITES; + private final boolean INFO; + private final boolean OTHER; + + private final int FASTA_WRAP=Shared.FASTA_WRAP; + + /*--------------------------------------------------------------*/ + +// private static final ByteBuilder POISON=new ByteBuilder("POISON_ByteStreamWriter"); + private static final ByteBuilder POISON2=new ByteBuilder(1); + + public static boolean verbose=false; + /** Number of lists held before the stream blocks */ + private final int MAX_CAPACITY=256; + private final int ADD_LIMIT=MAX_CAPACITY/2; + private final int HALF_LIMIT=ADD_LIMIT/4; + +}