Mercurial > repos > rliterman > csp2
view CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/fileIO/ByteStreamWriter.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line source
package fileIO; import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.util.HashMap; import java.util.Locale; import java.util.concurrent.ArrayBlockingQueue; import assemble.Contig; import dna.AminoAcid; import dna.Data; import kmer.AbstractKmerTable; import shared.Shared; import shared.Timer; import shared.Tools; import stream.Read; import structures.ByteBuilder; import ukmer.AbstractKmerTableU; /** * @author Brian Bushnell * @date Oct 21, 2014 * */ public class ByteStreamWriter extends Thread { /*--------------------------------------------------------------*/ /*---------------- Initialization ----------------*/ /*--------------------------------------------------------------*/ public static void main(String[] args){ Timer t=new Timer(); final int alen=1000; byte[] array=new byte[alen]; for(int i=0; i<array.length; i++){ array[i]=AminoAcid.numberToBase[i&3]; } array[array.length-1]='\n'; long iters=Long.parseLong(args[1]); String fname=args[0]; ByteStreamWriter bsw=new ByteStreamWriter(fname, true, false, true); bsw.start(); for(long i=0; i<iters; i++){ bsw.print(array); } bsw.poisonAndWait(); t.stop(); System.err.println("MB/s: \t"+String.format(Locale.ROOT, "%.2f", ((alen*iters)/(t.elapsed/1000.0)))); System.err.println("Time: \t"+t); } /** @See primary constructor */ public ByteStreamWriter(String fname_, boolean overwrite_, boolean append_, boolean allowSubprocess_){ this(fname_, overwrite_, append_, allowSubprocess_, 0); } /** @See primary constructor */ public ByteStreamWriter(String fname_, boolean overwrite_, boolean append_, boolean allowSubprocess_, int format){ this(FileFormat.testOutput(fname_, FileFormat.TEXT, format, 0, allowSubprocess_, overwrite_, append_, false)); } /** * Create a ByteStreamWriter for this FileFormat. * @param ff Contains information about the file name, output format, etc. */ public ByteStreamWriter(FileFormat ff){ FASTQ=ff.fastq() || ff.text(); FASTA=ff.fasta(); BREAD=ff.bread(); SAM=ff.samOrBam(); BAM=ff.bam(); SITES=ff.sites(); INFO=ff.attachment(); OTHER=(!FASTQ && !FASTA && !BREAD && !SAM && !BAM && !SITES && !INFO); fname=ff.name(); overwrite=ff.overwrite(); append=ff.append(); allowSubprocess=ff.allowSubprocess(); ordered=ff.ordered(); assert(!(overwrite&append)); assert(ff.canWrite()) : "File "+fname+" exists "+(new File(ff.name()).canWrite() ? ("and overwrite="+overwrite+".\nPlease add the flag ow to overwrite the file.\n") : "and is read-only."); if(append && !(ff.raw() || ff.gzip())){throw new RuntimeException("Can't append to compressed files.");} if(!BAM || !(Data.SAMTOOLS() /*|| Data.SAMBAMBA()*/) /*|| !Data.SH()*/){ outstream=ReadWrite.getOutputStream(fname, append, true, allowSubprocess); }else{ if(Data.SAMTOOLS()){ outstream=ReadWrite.getOutputStreamFromProcess(fname, "samtools view -S -b -h - ", true, append, true, true); }else{ outstream=ReadWrite.getOutputStreamFromProcess(fname, "sambamba view -S -f bam -h ", true, append, true, true); //Sambamba does not support stdin } } queue=new ArrayBlockingQueue<ByteBuilder>(5); if(ordered){ buffer=null; map=new HashMap<Long, ByteBuilder>(MAX_CAPACITY); }else{ buffer=new ByteBuilder(initialLen); map=null; } } public static ByteStreamWriter makeBSW(FileFormat ff){ if(ff==null){return null;} ByteStreamWriter bsw=new ByteStreamWriter(ff); bsw.start(); return bsw; } /*--------------------------------------------------------------*/ /*---------------- Primary Method ----------------*/ /*--------------------------------------------------------------*/ @Override public void run() { if(verbose){System.err.println("running");} assert(open) : fname; synchronized(this){ started=true; this.notify(); } if(verbose){System.err.println("waiting for jobs");} processJobs(); if(verbose){System.err.println("null/poison job");} // assert(false); open=false; ReadWrite.finishWriting(null, outstream, fname, allowSubprocess); if(verbose){System.err.println("finish writing");} synchronized(this){notifyAll();} if(verbose){System.err.println("done");} } public void processJobs() { ByteBuilder job=null; while(job==null){ try { job=queue.take(); // job.list=queue.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(verbose){System.err.println("processing jobs");} while(job!=null && job!=POISON2){ if(job.length()>0){ try { outstream.write(job.array, 0, job.length()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } job=null; while(job==null){ try { job=queue.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } /*--------------------------------------------------------------*/ /*---------------- Control and Helpers ----------------*/ /*--------------------------------------------------------------*/ @Override public synchronized void start(){ super.start(); if(verbose){System.err.println(this.getState());} synchronized(this){ while(!started){ try { this.wait(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public synchronized void poison(){ //Don't allow thread to shut down before it has started while(!started || this.getState()==Thread.State.NEW){ try { this.wait(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(!open){return;} if(ordered){ addOrdered(POISON2, maxJobID+1); }else{ if(buffer!=null){addJob(buffer);} } buffer=null; // System.err.println("Poisoned!"); // assert(false); // assert(false) : open+", "+this.getState()+", "+started; open=false; addJob(POISON2); } /** * Wait for this object's thread to terminate. * Should be poisoned first. */ public void waitForFinish(){ while(this.getState()!=Thread.State.TERMINATED){ try { this.join(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * Poison the thread, and wait for it to terminate. * @return true if there was an error, false otherwise */ public boolean poisonAndWait(){ poison(); waitForFinish(); return errorState; } //TODO Why is this synchronized? public synchronized void addJob(ByteBuilder bb){ // System.err.println("Got job "+(j.list==null ? "null" : j.list.size())); assert(started) : "Wait for start() to return before using the writer."; // while(!started || this.getState()==Thread.State.NEW){ // try { // this.wait(20); // } catch (InterruptedException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } // } boolean success=false; while(!success){ try { queue.put(bb); success=true; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); assert(!queue.contains(bb)); //Hopefully it was not added. } } } public final void forceFlushBuffer(){ flushBuffer(true); } /** Called after every write to the buffer */ public final void flushBuffer(boolean force){ final int x=buffer.length(); if(x>=maxLen || (force && x>0)){ addJob(buffer); buffer=new ByteBuilder(initialLen); } } /*--------------------------------------------------------------*/ /*---------------- Ordering ----------------*/ /*--------------------------------------------------------------*/ public synchronized void add(ByteBuilder job, long jobID){ if(ordered){ int size=map.size(); // System.err.print(size+", "); // System.err.println("A.Adding job "+jobID+"; next="+nextJobID+"; max="+maxJobID+", map="+map.keySet()); final boolean flag=(size>=HALF_LIMIT); if(jobID>nextJobID && size>=ADD_LIMIT){ // if(printBufferNotification){ // System.err.println("Output buffer became full; key "+jobID+" waiting on "+nextJobID+"."); // printBufferNotification=false; // } while(jobID>nextJobID && size>=HALF_LIMIT){ try { this.wait(2000); } catch (InterruptedException e) { e.printStackTrace(); } size=map.size(); } // if(printBufferNotification){ // System.err.println("Output buffer became clear for key "+jobID+"; next="+nextJobID+", size="+size); // } } // System.err.println("B.Adding ordered job "+jobID+"; next="+nextJobID+"; max="+maxJobID); addOrdered(job, jobID); assert(jobID!=nextJobID); if(flag && jobID<nextJobID){this.notifyAll();} }else{ addDisordered(job); } } private synchronized void addOrdered(ByteBuilder job, long jobID){ // System.err.println("addOrdered "+jobID+"; nextJobID="+nextJobID); // assert(false); assert(ordered); assert(job!=null) : jobID; assert(jobID>=nextJobID) : jobID+", "+nextJobID; maxJobID=Tools.max(maxJobID, jobID); ByteBuilder old=map.put(jobID, job); assert(old==null); // System.err.println("C.Adding ordered job "+jobID+"; next="+nextJobID+"; max="+maxJobID+", map="+map.keySet()); if(jobID==nextJobID){ do{ ByteBuilder value=map.remove(nextJobID); // System.err.println("Removing and queueing "+nextJobID+": "+value.toString()); addJob(value); nextJobID++; // System.err.println("D.nextJobID="+nextJobID); }while(map.containsKey(nextJobID)); if(map.isEmpty()){notifyAll();} }else{ assert(!map.containsKey(nextJobID)); } } private synchronized void addDisordered(ByteBuilder job){ assert(!ordered); assert(buffer==null || buffer.isEmpty()); addJob(job); } /*--------------------------------------------------------------*/ /*---------------- Print ----------------*/ /*--------------------------------------------------------------*/ /** * Skip the buffers and print directly. * Mainly for headers with ordered streams. * @param s String to print. */ public void forcePrint(String s){ forcePrint(s.getBytes()); } /** * Skip the buffers and print directly. * Mainly for headers with ordered streams. * @param b Data to print. */ public void forcePrint(byte[] b){ try { outstream.write(b, 0, b.length); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public ByteBuilder getBuffer() { assert(open); assert(buffer!=null); return buffer; } @Deprecated /** Avoid using this if possible. */ public ByteStreamWriter print(CharSequence x){ if(verbose){System.err.println("Added line '"+x+"'");} assert(open) : x; buffer.append(x); flushBuffer(false); return this; } @Deprecated /** Avoid using this if possible. */ public ByteStreamWriter print(StringBuilder x){ if(verbose){System.err.println("Added line '"+x+"'");} assert(open) : x; buffer.append(x); flushBuffer(false); return this; } @Deprecated /** Avoid using this if possible. */ public ByteStreamWriter print(String x){ if(verbose){System.err.println("Added line '"+x+"'");} assert(open) : x; buffer.append(x); flushBuffer(false); return this; } public ByteStreamWriter tab(){return print('\t');} public ByteStreamWriter nl(){return print('\n');} public ByteStreamWriter print(boolean x){ if(verbose){System.err.println("Added line '"+x+"'");} assert(open) : x; buffer.append(x); flushBuffer(false); return this; } public ByteStreamWriter print(int x){ if(verbose){System.err.println("Added line '"+(x)+"'");} assert(open) : x; buffer.append(x); flushBuffer(false); return this; } public ByteStreamWriter print(long x){ if(verbose){System.err.println("Added line '"+(x)+"'");} assert(open) : x; buffer.append(x); flushBuffer(false); return this; } // public ByteStreamWriter print(float x){ // if(verbose){System.err.println("Added line '"+(x)+"'");} // assert(open) : x; // buffer.appendSlow(x); // flushBuffer(false); // return this; // } // // public ByteStreamWriter print(double x){ // if(verbose){System.err.println("Added line '"+(x)+"'");} // assert(open) : x; // buffer.appendSlow(x); // flushBuffer(false); // return this; // } public ByteStreamWriter print(float x, int decimals){ if(verbose){System.err.println("Added line '"+(x)+"'");} assert(open) : x; buffer.append(x, decimals); flushBuffer(false); return this; } public ByteStreamWriter print(double x, int decimals){ if(verbose){System.err.println("Added line '"+(x)+"'");} assert(open) : x; buffer.append(x, decimals); flushBuffer(false); return this; } public ByteStreamWriter print(byte x){ if(verbose){System.err.println("Added line '"+((char)x)+"'");} assert(open) : ((char)x); buffer.append(x); flushBuffer(false); return this; } public ByteStreamWriter print(char x){ if(verbose){System.err.println("Added line '"+(x)+"'");} assert(open) : (x); buffer.append(x); flushBuffer(false); return this; } public ByteStreamWriter print(byte[] x){ if(verbose){System.err.println("Added line '"+new String(x)+"'");} assert(open) : new String(x); buffer.append(x); flushBuffer(false); return this; } public ByteStreamWriter println(byte[] x){ if(verbose){System.err.println("Added line '"+new String(x)+"'");} assert(open) : new String(x); buffer.append(x).nl(); flushBuffer(false); return this; } public ByteStreamWriter print(byte[] x, int len){ if(verbose){System.err.println("Added line '"+new String(x)+"'");} assert(open) : new String(x); buffer.append(x, len); flushBuffer(false); return this; } public ByteStreamWriter print(char[] x){ if(verbose){System.err.println("Added line '"+new String(x)+"'");} assert(open) : new String(x); buffer.append(x); flushBuffer(false); return this; } public ByteStreamWriter print(ByteBuilder x){ if(verbose){System.err.println("Added line '"+x+"'");} assert(open) : x; buffer.append(x); flushBuffer(false); return this; } public ByteStreamWriter print(ByteBuilder x, boolean destroy){ if(!destroy || buffer.length()>0){print(x);} else{ if(verbose){System.err.println("Added line '"+x+"'");} assert(open) : x; addJob(x); } return this; } public ByteStreamWriter print(Read r){ assert(!OTHER); ByteBuilder x=(FASTQ ? r.toFastq(buffer) : FASTA ? r.toFasta(FASTA_WRAP, buffer) : SAM ? r.toSam(buffer) : SITES ? r.toSites(buffer) : INFO ? r.toInfo(buffer) : r.toText(true, buffer)); flushBuffer(false); return this; } public ByteStreamWriter print(Contig c){ assert(!OTHER); c.toFasta(FASTA_WRAP, buffer); flushBuffer(false); return this; } public ByteStreamWriter printKmer(long kmer, long count, int k){ AbstractKmerTable.toBytes(kmer, count, k, buffer); flushBuffer(false); return this; } public ByteStreamWriter printKmer(long kmer, int[] values, int k){ AbstractKmerTable.toBytes(kmer, values, k, buffer); flushBuffer(false); return this; } public ByteStreamWriter printKmer(long[] array, long count, int k){ AbstractKmerTableU.toBytes(array, count, k, buffer); flushBuffer(false); return this; } public ByteStreamWriter printKmer(long[] array, int[] values, int k){ AbstractKmerTableU.toBytes(array, values, k, buffer); flushBuffer(false); return this; } // public ByteStreamWriter printKmer(long kmer, long[] values, int k){ // kmer64.AbstractKmerTable64.toBytes(kmer, values, k, buffer); // flushBuffer(false); // return this; // } /*--------------------------------------------------------------*/ /*---------------- Println ----------------*/ /*--------------------------------------------------------------*/ public ByteStreamWriter println(){return print('\n');} public ByteStreamWriter println(CharSequence x){print(x); return print('\n');} public ByteStreamWriter println(String x){print(x); return print('\n');} public ByteStreamWriter println(StringBuilder x){print(x); return print('\n');} public ByteStreamWriter println(int x){print(x); return print('\n');} public ByteStreamWriter println(long x){print(x); return print('\n');} // public void println(float x){print(x); print('\n');} // public void println(double x){print(x); print('\n');} public ByteStreamWriter println(float x, int d){print(x, d); return print('\n');} public ByteStreamWriter println(double x, int d){print(x, d); return print('\n');} public ByteStreamWriter println(byte x){print(x); return print('\n');} public ByteStreamWriter println(char x){print(x); return print('\n');} // public ByteStreamWriter println(byte[] x){print(x); return print('\n');} public ByteStreamWriter println(char[] x){print(x); return print('\n');} public ByteStreamWriter println(ByteBuilder x){print(x); return print('\n');} public ByteStreamWriter println(ByteBuilder x, boolean destroy){ if(destroy){return print(x.append('\n'));}else{print(x); return print('\n');} } public ByteStreamWriter printlnKmer(long kmer, int count, int k){printKmer(kmer, count, k); return print('\n');} public ByteStreamWriter printlnKmer(long kmer, int[] values, int k){printKmer(kmer, values, k); return print('\n');} public ByteStreamWriter printlnKmer(long[] array, int count, int k){printKmer(array, count, k); return print('\n');} public ByteStreamWriter printlnKmer(long[] array, int[] values, int k){printKmer(array, values, k); return print('\n');} public ByteStreamWriter println(Read r){print(r); return print('\n');} public ByteStreamWriter println(Contig c){print(c); return print('\n');} public ByteStreamWriter printlnKmer(long kmer, long count, int k){printKmer(kmer, count, k); return print('\n');} // public ByteStreamWriter printlnKmer(long kmer, long[] values, int k){printKmer(kmer, values, k); return print('\n');} public ByteStreamWriter printlnKmer(long[] array, long count, int k){printKmer(array, count, k); return print('\n');} // public ByteStreamWriter printlnKmer(long[] array, long[] values, int k){printKmer(array, values, k); return print('\n');} public ByteStreamWriter println(Read r, boolean paired){ println(r); if(paired && r.mate!=null){println(r.mate);} return this; } /*--------------------------------------------------------------*/ /*---------------- Inherited ----------------*/ /*--------------------------------------------------------------*/ @Override public String toString(){ return "BSW for "+fname; } /*--------------------------------------------------------------*/ /*---------------- Fields ----------------*/ /*--------------------------------------------------------------*/ private ByteBuilder buffer; public int initialLen=36000; public int maxLen=32768; public final boolean overwrite; public final boolean append; public final boolean allowSubprocess; public final String fname; public final boolean ordered; private final OutputStream outstream; private final ArrayBlockingQueue<ByteBuilder> queue; /** For ordered output */ private final HashMap<Long, ByteBuilder> map; private long nextJobID=0; private long maxJobID=-1; private boolean open=true; private volatile boolean started=false; /** TODO */ public boolean errorState=false; /*--------------------------------------------------------------*/ private final boolean BAM; private final boolean SAM; private final boolean FASTQ; private final boolean FASTA; private final boolean BREAD; private final boolean SITES; private final boolean INFO; private final boolean OTHER; private final int FASTA_WRAP=Shared.FASTA_WRAP; /*--------------------------------------------------------------*/ // private static final ByteBuilder POISON=new ByteBuilder("POISON_ByteStreamWriter"); private static final ByteBuilder POISON2=new ByteBuilder(1); public static boolean verbose=false; /** Number of lists held before the stream blocks */ private final int MAX_CAPACITY=256; private final int ADD_LIMIT=MAX_CAPACITY/2; private final int HALF_LIMIT=ADD_LIMIT/4; }