Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/fileIO/ByteFile2.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/fileIO/ByteFile2.java Tue Mar 18 16:23:26 2025 -0400 @@ -0,0 +1,457 @@ +package fileIO; +import java.io.InputStream; +import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; + +import shared.Timer; +import shared.Tools; + + +/** + * Runs a ByteFile1 in a separate thread. Can speed up disk reading, particularly of compressed files, at cost of slightly more work done. + * Drop-in compatible with ByteFile1. + * @author Brian Bushnell + * @date Sep 23, 2013 + * + */ +public final class ByteFile2 extends ByteFile { + + + public static void main(String[] args){ + ByteFile2 tf=new ByteFile2(args.length>0 ? args[0] : "stdin", true); + long first=0, last=100; + boolean speedtest=false; + if(args.length>1){ + if(args[1].equalsIgnoreCase("speedtest")){ + speedtest=true; + first=0; + last=Long.MAX_VALUE; + }else{ + first=Integer.parseInt(args[1]); + last=first+100; + } + } + if(args.length>2){ + last=Integer.parseInt(args[2]); + } + speedtest(tf, first, last, !speedtest); + + tf.close(); + tf.reset(); + tf.close(); + } + + private static void speedtest(ByteFile2 tf, long first, long last, boolean reprint){ + Timer t=new Timer(); + long lines=0; + long bytes=0; + for(long i=0; i<first; i++){tf.nextLine();} + if(reprint){ + for(long i=first; i<last; i++){ + byte[] s=tf.nextLine(); + if(s==null){break;} + + lines++; + bytes+=s.length; + System.out.println(new String(s)); + } + + System.err.println("\n"); + System.err.println("Lines: "+lines); + System.err.println("Bytes: "+bytes); + }else{ + for(long i=first; i<last; i++){ + byte[] s=tf.nextLine(); + if(s==null){break;} + lines++; + bytes+=s.length; + } + } + t.stop(); + + if(!reprint){ + System.err.println(Tools.timeLinesBytesProcessed(t, lines, bytes, 8)); + } + } + +// public ByteFile2(String name()){this(name(), false);} + + public ByteFile2(String fname, boolean allowSubprocess_){ + this(FileFormat.testInput(fname, FileFormat.TEXT, null, allowSubprocess_, false)); + } + + public ByteFile2(FileFormat ff){ + super(ff); + if(verbose){System.err.println("ByteFile2("+ff+")");} + open(); + } + + @Override + public final void reset(){ + close(); + open(); + superReset(); + } + + @Override + public synchronized final boolean close(){ + if(verbose){System.err.println("ByteFile2("+name()+").close()");} + if(isOpen()){ +// errorState|=ReadWrite.killProcess(name()); + thread.shutdown(); + while(thread.getState()!=Thread.State.TERMINATED){ + try { + thread.join(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + thread.bf1.close(); + } + thread=null; + currentList=null; + currentLoc=0; +// assert(numIn==numOut) : numIn+", "+numOut; + pushBack=null; + if(verbose){System.err.println("ByteFile2("+name()+").close() returned "+errorState);} + return errorState; + } + + @Override + public final byte[] nextLine(){ + + if(pushBack!=null){//Commenting out does not seem to improve speed here. + byte[] temp=pushBack; + pushBack=null; + return temp; + } + +// if(verbose){System.err.println("Reading line.");} +// byte[] r=null; + + byte[][] temp=currentList; + int tempLoc=currentLoc; + + if(temp==null || tempLoc>=temp.length || temp[tempLoc]==null){ + boolean b=getBuffer(); + if(!b){ + if(verbose2){System.err.println("nextLine()->getBuffer() returned false.");} + return null; + } + temp=currentList; + tempLoc=currentLoc; + if(temp==null || temp==poison || temp[tempLoc]==null){ + return null; + } + } + + //TODO: This is a race condition; currentList can be changed to null. A defensive copy could be created. + //Note that I read the above warning and added "temp" and "temploc" but I'm not sure if that fixed anything. + assert(temp!=null && temp!=poison); + assert(tempLoc<temp.length); + assert(temp[tempLoc]!=null); + byte[] r=temp[tempLoc]; + assert(r!=null); + currentLoc++; +// numOut++; + return r; + } + + private boolean getBuffer(){ + if(verbose2){System.err.println("Getting new buffer.");} + currentLoc=0; + final BF1Thread bft=thread; + if(bft==null){ + currentList=null; + if(verbose2){System.err.println("No buffers available. thread="+thread+", shutdown="+(thread==null ? "X" : ""+thread.shutdown));} + return false; + } + if(currentList==poison){ + if(verbose2){System.err.println("A: Current list is poison.");} + return false; + } + if(currentList!=null){ + Arrays.fill(currentList, null); //MUST be done or lines get recycled at end of file. + while(currentList!=null){ + try { + if(verbose2){System.err.println("adding to qEmpty list size "+currentList.length+"\n"+toString(currentList));} + bft.qEmpty.put(currentList); + currentList=null; + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + assert(currentList==null); + while(currentList==null){ + try { + assert(bft!=null); + if(verbose2){System.err.println("C: qFull.size()="+bft.qFull.size());} + currentList=bft.qFull.take(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + if(verbose2){ + if(currentList==poison){ + System.err.println("B: Current list is poison."); + }else{ + System.err.println("getBuffer fetched a new buffer of size "+currentList.length); + } + } + return currentList!=poison; + } + + private final synchronized BF1Thread open(){ + if(verbose2){System.err.println("ByteFile2("+name()+").open()");} + assert(thread==null); + currentList=null; + currentLoc=0; +// numIn=0; +// numOut=0; + thread=new BF1Thread(ff); + thread.start(); + return thread; + } + + private class BF1Thread extends Thread{ + +// public BF1Thread(String fname){ +// bf1=new ByteFile1(fname, false, allowSubprocess); +// qFull=new ArrayBlockingQueue<byte[][]>(buffs+2); +// qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2); +// for(int i=0; i<buffs; i++){ +// try { +// qEmpty.put(new byte[bufflen][]); +// } catch (InterruptedException e) { +// // TODO Auto-generated catch block +// e.printStackTrace(); +// } +// } +// } + + public BF1Thread(FileFormat ff){ + bf1=new ByteFile1(ff); + qFull=new ArrayBlockingQueue<byte[][]>(buffs+2); + qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2); + for(int i=0; i<buffs; i++){ + try { + qEmpty.put(new byte[bufflen][]); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + + @Override + public void run(){ + if(verbose){System.err.println("ByteFile2("+name()+").run()");} + byte[] s=null; + byte[][] list=null; + while(list==null){ + try { + list = qEmpty.take(); + } catch (InterruptedException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + } + synchronized(this){ + if(list==poison || shutdown){ + shutdown(); + return; + } + } + + int loc=0; + long bases=0; + + //At this point, list is not null + for(s=bf1.nextLine(); s!=null; s=bf1.nextLine()){ + bases+=s.length; + assert(list!=null) : "Somehow the list became null for "+bf1.name()+" at line "+cntr; + list[loc]=s; + loc++; +// numIn++; +// if(verbose){System.err.println("Added line "+numIn);} + if(loc>=bufflen || bases>=buffcapacity){ + if(verbose2){System.err.println("Capacity exceeded.");} + while(list!=null){ + try { +// synchronized(this){ +// if(!shutdown){ + if(verbose2){ + System.err.println("A: Adding to qFull list of size "+loc); + System.err.println(ByteFile2.toString(list)); + } + cntr+=list.length; + qFull.put(list); + if(verbose2){System.err.println("A: qFull.size()="+qFull.size());} +// } +// } + list=null; + loc=0; + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + //At this point, list is null + if(shutdown){ + if(verbose2){System.err.println("Break 1");} + break; + } + while(list==null){ + if(verbose2){System.err.println("Taking empty list.");} + try { + list = qEmpty.take(); + } catch (InterruptedException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + } + //At this point, list is not null + bases=0; + if(list==poison){ + if(verbose2){System.err.println("Break 2");} + break; + } + //At this point, list is not null + } + } + if(verbose2){System.err.println("Run loop exit.");} + + while(list!=null && loc>0){ + try { +// synchronized(this){ +// if(!shutdown){ + if(verbose2){System.err.println("B: Adding list of size "+loc);} + qFull.put(list); + if(verbose2){System.err.println("B: qFull.size()="+qFull.size());} +// } +// } + list=null; + loc=0; + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + //At this point, list is null + shutdown(); + + if(verbose){System.err.println("ByteFile2("+name()+").run() finished");} + } + + synchronized void shutdown(){ + if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown()");} + if(shutdown){return;} + shutdown=true; + if(verbose2){System.err.println("Adding poison.");} + qFull.add(poison); + qEmpty.add(poison); + if(verbose2){System.err.println("D: qFull.size()="+qFull.size());} + if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown() finished");} + } + + private boolean shutdown=false; + final ByteFile1 bf1; + final ArrayBlockingQueue<byte[][]> qFull; + final ArrayBlockingQueue<byte[][]> qEmpty; + + } + + @Override + public boolean isOpen(){ + final byte[][] list=currentList; + final int loc=currentLoc; + if(list!=null && loc<list.length && list[loc]!=null){return true;} + final BF1Thread bft=thread; + if(bft==null){ + return false; + } + return true; +// synchronized(bft){ +// //NOTE!!! This cannot be used because qFull.size() will not return a correctly synchronized value. Poll() may work. +// assert(bft.bf1.isOpen() || !bft.qFull.isEmpty()) : bft.bf1.isOpen()+", "+bft.qFull.isEmpty()+", "+bft.qFull.size(); +// return (bft.bf1.isOpen() || !bft.qFull.isEmpty()); +// } + } + + @Override + public final void pushBack(byte[] line){ + assert(pushBack==null); + pushBack=line; + } + +// @Override +// public void pushBack(byte[] line) { +// if(bstart>line.length){ +// bstart--; +// buffer[bstart]='\n'; +// for(int i=0, j=bstart-line.length; i<line.length; i++, j++){ +// buffer[j]=line[i]; +// } +// bstart=bstart-line.length; +// return; +// } +// +// int bLen=bstop-bstart; +// int newLen=bLen+line.length+1; +// int rShift=line.length+1-bstart; +// assert(rShift>0) : bstop+", "+bstart+", "+line.length; +// while(newLen>buffer.length){ +// //This could get big if pushback is used often, +// //unless special steps are taken to prevent it, like leaving extra space for pushbacks. +// buffer=Arrays.copyOf(buffer, buffer.length*2); +// } +// +// Tools.shiftRight(buffer, rShift); +// +// for(int i=0; i<line.length; i++){ +// buffer[i]=line[i]; +// } +// buffer[line.length]='\n'; +// bstart=0; +// bstop=newLen; +// } + + /** For debugging */ + private static String toString(byte[][] x){ + StringBuilder sb=new StringBuilder(); + for(byte[] z : x){ + sb.append(z==null ? "null" : new String(z)).append('\n'); + } + return sb.toString(); + } + + @Override + public final InputStream is(){return thread==null ? null : thread.bf1.is();} + + @Override + public final long lineNum(){return thread==null ? -1 : thread.bf1.lineNum();} + + long cntr; + private BF1Thread thread=null; + private byte[][] currentList=null; + private int currentLoc=0; +// private int currentSize=0; + +// private long numIn=0, numOut=0; + + private byte[] pushBack=null; + + static final byte[][] poison=new byte[0][]; + public static boolean verbose=false; + private static final boolean verbose2=false; + private static final int bufflen=1000; + private static final int buffs=4; + private static final int buffcapacity=256000; + + private boolean errorState=false; + +}