Mercurial > repos > rliterman > csp2
view CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/fileIO/ByteFile2.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line source
package fileIO; import java.io.InputStream; import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import shared.Timer; import shared.Tools; /** * Runs a ByteFile1 in a separate thread. Can speed up disk reading, particularly of compressed files, at cost of slightly more work done. * Drop-in compatible with ByteFile1. * @author Brian Bushnell * @date Sep 23, 2013 * */ public final class ByteFile2 extends ByteFile { public static void main(String[] args){ ByteFile2 tf=new ByteFile2(args.length>0 ? args[0] : "stdin", true); long first=0, last=100; boolean speedtest=false; if(args.length>1){ if(args[1].equalsIgnoreCase("speedtest")){ speedtest=true; first=0; last=Long.MAX_VALUE; }else{ first=Integer.parseInt(args[1]); last=first+100; } } if(args.length>2){ last=Integer.parseInt(args[2]); } speedtest(tf, first, last, !speedtest); tf.close(); tf.reset(); tf.close(); } private static void speedtest(ByteFile2 tf, long first, long last, boolean reprint){ Timer t=new Timer(); long lines=0; long bytes=0; for(long i=0; i<first; i++){tf.nextLine();} if(reprint){ for(long i=first; i<last; i++){ byte[] s=tf.nextLine(); if(s==null){break;} lines++; bytes+=s.length; System.out.println(new String(s)); } System.err.println("\n"); System.err.println("Lines: "+lines); System.err.println("Bytes: "+bytes); }else{ for(long i=first; i<last; i++){ byte[] s=tf.nextLine(); if(s==null){break;} lines++; bytes+=s.length; } } t.stop(); if(!reprint){ System.err.println(Tools.timeLinesBytesProcessed(t, lines, bytes, 8)); } } // public ByteFile2(String name()){this(name(), false);} public ByteFile2(String fname, boolean allowSubprocess_){ this(FileFormat.testInput(fname, FileFormat.TEXT, null, allowSubprocess_, false)); } public ByteFile2(FileFormat ff){ super(ff); if(verbose){System.err.println("ByteFile2("+ff+")");} open(); } @Override public final void reset(){ close(); open(); superReset(); } @Override public synchronized final boolean close(){ if(verbose){System.err.println("ByteFile2("+name()+").close()");} if(isOpen()){ // errorState|=ReadWrite.killProcess(name()); thread.shutdown(); while(thread.getState()!=Thread.State.TERMINATED){ try { thread.join(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } thread.bf1.close(); } thread=null; currentList=null; currentLoc=0; // assert(numIn==numOut) : numIn+", "+numOut; pushBack=null; if(verbose){System.err.println("ByteFile2("+name()+").close() returned "+errorState);} return errorState; } @Override public final byte[] nextLine(){ if(pushBack!=null){//Commenting out does not seem to improve speed here. byte[] temp=pushBack; pushBack=null; return temp; } // if(verbose){System.err.println("Reading line.");} // byte[] r=null; byte[][] temp=currentList; int tempLoc=currentLoc; if(temp==null || tempLoc>=temp.length || temp[tempLoc]==null){ boolean b=getBuffer(); if(!b){ if(verbose2){System.err.println("nextLine()->getBuffer() returned false.");} return null; } temp=currentList; tempLoc=currentLoc; if(temp==null || temp==poison || temp[tempLoc]==null){ return null; } } //TODO: This is a race condition; currentList can be changed to null. A defensive copy could be created. //Note that I read the above warning and added "temp" and "temploc" but I'm not sure if that fixed anything. assert(temp!=null && temp!=poison); assert(tempLoc<temp.length); assert(temp[tempLoc]!=null); byte[] r=temp[tempLoc]; assert(r!=null); currentLoc++; // numOut++; return r; } private boolean getBuffer(){ if(verbose2){System.err.println("Getting new buffer.");} currentLoc=0; final BF1Thread bft=thread; if(bft==null){ currentList=null; if(verbose2){System.err.println("No buffers available. thread="+thread+", shutdown="+(thread==null ? "X" : ""+thread.shutdown));} return false; } if(currentList==poison){ if(verbose2){System.err.println("A: Current list is poison.");} return false; } if(currentList!=null){ Arrays.fill(currentList, null); //MUST be done or lines get recycled at end of file. while(currentList!=null){ try { if(verbose2){System.err.println("adding to qEmpty list size "+currentList.length+"\n"+toString(currentList));} bft.qEmpty.put(currentList); currentList=null; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } assert(currentList==null); while(currentList==null){ try { assert(bft!=null); if(verbose2){System.err.println("C: qFull.size()="+bft.qFull.size());} currentList=bft.qFull.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(verbose2){ if(currentList==poison){ System.err.println("B: Current list is poison."); }else{ System.err.println("getBuffer fetched a new buffer of size "+currentList.length); } } return currentList!=poison; } private final synchronized BF1Thread open(){ if(verbose2){System.err.println("ByteFile2("+name()+").open()");} assert(thread==null); currentList=null; currentLoc=0; // numIn=0; // numOut=0; thread=new BF1Thread(ff); thread.start(); return thread; } private class BF1Thread extends Thread{ // public BF1Thread(String fname){ // bf1=new ByteFile1(fname, false, allowSubprocess); // qFull=new ArrayBlockingQueue<byte[][]>(buffs+2); // qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2); // for(int i=0; i<buffs; i++){ // try { // qEmpty.put(new byte[bufflen][]); // } catch (InterruptedException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } // } // } public BF1Thread(FileFormat ff){ bf1=new ByteFile1(ff); qFull=new ArrayBlockingQueue<byte[][]>(buffs+2); qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2); for(int i=0; i<buffs; i++){ try { qEmpty.put(new byte[bufflen][]); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } @Override public void run(){ if(verbose){System.err.println("ByteFile2("+name()+").run()");} byte[] s=null; byte[][] list=null; while(list==null){ try { list = qEmpty.take(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } synchronized(this){ if(list==poison || shutdown){ shutdown(); return; } } int loc=0; long bases=0; //At this point, list is not null for(s=bf1.nextLine(); s!=null; s=bf1.nextLine()){ bases+=s.length; assert(list!=null) : "Somehow the list became null for "+bf1.name()+" at line "+cntr; list[loc]=s; loc++; // numIn++; // if(verbose){System.err.println("Added line "+numIn);} if(loc>=bufflen || bases>=buffcapacity){ if(verbose2){System.err.println("Capacity exceeded.");} while(list!=null){ try { // synchronized(this){ // if(!shutdown){ if(verbose2){ System.err.println("A: Adding to qFull list of size "+loc); System.err.println(ByteFile2.toString(list)); } cntr+=list.length; qFull.put(list); if(verbose2){System.err.println("A: qFull.size()="+qFull.size());} // } // } list=null; loc=0; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //At this point, list is null if(shutdown){ if(verbose2){System.err.println("Break 1");} break; } while(list==null){ if(verbose2){System.err.println("Taking empty list.");} try { list = qEmpty.take(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } //At this point, list is not null bases=0; if(list==poison){ if(verbose2){System.err.println("Break 2");} break; } //At this point, list is not null } } if(verbose2){System.err.println("Run loop exit.");} while(list!=null && loc>0){ try { // synchronized(this){ // if(!shutdown){ if(verbose2){System.err.println("B: Adding list of size "+loc);} qFull.put(list); if(verbose2){System.err.println("B: qFull.size()="+qFull.size());} // } // } list=null; loc=0; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //At this point, list is null shutdown(); if(verbose){System.err.println("ByteFile2("+name()+").run() finished");} } synchronized void shutdown(){ if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown()");} if(shutdown){return;} shutdown=true; if(verbose2){System.err.println("Adding poison.");} qFull.add(poison); qEmpty.add(poison); if(verbose2){System.err.println("D: qFull.size()="+qFull.size());} if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown() finished");} } private boolean shutdown=false; final ByteFile1 bf1; final ArrayBlockingQueue<byte[][]> qFull; final ArrayBlockingQueue<byte[][]> qEmpty; } @Override public boolean isOpen(){ final byte[][] list=currentList; final int loc=currentLoc; if(list!=null && loc<list.length && list[loc]!=null){return true;} final BF1Thread bft=thread; if(bft==null){ return false; } return true; // synchronized(bft){ // //NOTE!!! This cannot be used because qFull.size() will not return a correctly synchronized value. Poll() may work. // assert(bft.bf1.isOpen() || !bft.qFull.isEmpty()) : bft.bf1.isOpen()+", "+bft.qFull.isEmpty()+", "+bft.qFull.size(); // return (bft.bf1.isOpen() || !bft.qFull.isEmpty()); // } } @Override public final void pushBack(byte[] line){ assert(pushBack==null); pushBack=line; } // @Override // public void pushBack(byte[] line) { // if(bstart>line.length){ // bstart--; // buffer[bstart]='\n'; // for(int i=0, j=bstart-line.length; i<line.length; i++, j++){ // buffer[j]=line[i]; // } // bstart=bstart-line.length; // return; // } // // int bLen=bstop-bstart; // int newLen=bLen+line.length+1; // int rShift=line.length+1-bstart; // assert(rShift>0) : bstop+", "+bstart+", "+line.length; // while(newLen>buffer.length){ // //This could get big if pushback is used often, // //unless special steps are taken to prevent it, like leaving extra space for pushbacks. // buffer=Arrays.copyOf(buffer, buffer.length*2); // } // // Tools.shiftRight(buffer, rShift); // // for(int i=0; i<line.length; i++){ // buffer[i]=line[i]; // } // buffer[line.length]='\n'; // bstart=0; // bstop=newLen; // } /** For debugging */ private static String toString(byte[][] x){ StringBuilder sb=new StringBuilder(); for(byte[] z : x){ sb.append(z==null ? "null" : new String(z)).append('\n'); } return sb.toString(); } @Override public final InputStream is(){return thread==null ? null : thread.bf1.is();} @Override public final long lineNum(){return thread==null ? -1 : thread.bf1.lineNum();} long cntr; private BF1Thread thread=null; private byte[][] currentList=null; private int currentLoc=0; // private int currentSize=0; // private long numIn=0, numOut=0; private byte[] pushBack=null; static final byte[][] poison=new byte[0][]; public static boolean verbose=false; private static final boolean verbose2=false; private static final int bufflen=1000; private static final int buffs=4; private static final int buffcapacity=256000; private boolean errorState=false; }