diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/fileIO/ByteFile2.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/fileIO/ByteFile2.java	Tue Mar 18 16:23:26 2025 -0400
@@ -0,0 +1,457 @@
+package fileIO;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import shared.Timer;
+import shared.Tools;
+
+
+/**
+ * Runs a ByteFile1 in a separate thread.  Can speed up disk reading, particularly of compressed files, at cost of slightly more work done.
+ * Drop-in compatible with ByteFile1.
+ * @author Brian Bushnell
+ * @date Sep 23, 2013
+ *
+ */
+public final class ByteFile2 extends ByteFile {
+	
+	
+	public static void main(String[] args){
+		ByteFile2 tf=new ByteFile2(args.length>0 ? args[0] : "stdin", true);
+		long first=0, last=100;
+		boolean speedtest=false;
+		if(args.length>1){
+			if(args[1].equalsIgnoreCase("speedtest")){
+				speedtest=true;
+				first=0;
+				last=Long.MAX_VALUE;
+			}else{
+				first=Integer.parseInt(args[1]);
+				last=first+100;
+			}
+		}
+		if(args.length>2){
+			last=Integer.parseInt(args[2]);
+		}
+		speedtest(tf, first, last, !speedtest);
+		
+		tf.close();
+		tf.reset();
+		tf.close();
+	}
+	
+	private static void speedtest(ByteFile2 tf, long first, long last, boolean reprint){
+		Timer t=new Timer();
+		long lines=0;
+		long bytes=0;
+		for(long i=0; i<first; i++){tf.nextLine();}
+		if(reprint){
+			for(long i=first; i<last; i++){
+				byte[] s=tf.nextLine();
+				if(s==null){break;}
+
+				lines++;
+				bytes+=s.length;
+				System.out.println(new String(s));
+			}
+			
+			System.err.println("\n");
+			System.err.println("Lines: "+lines);
+			System.err.println("Bytes: "+bytes);
+		}else{
+			for(long i=first; i<last; i++){
+				byte[] s=tf.nextLine();
+				if(s==null){break;}
+				lines++;
+				bytes+=s.length;
+			}
+		}
+		t.stop();
+		
+		if(!reprint){
+			System.err.println(Tools.timeLinesBytesProcessed(t, lines, bytes, 8));
+		}
+	}
+	
+//	public ByteFile2(String name()){this(name(), false);}
+	
+	public ByteFile2(String fname, boolean allowSubprocess_){
+		this(FileFormat.testInput(fname, FileFormat.TEXT, null, allowSubprocess_, false));
+	}
+	
+	public ByteFile2(FileFormat ff){
+		super(ff);
+		if(verbose){System.err.println("ByteFile2("+ff+")");}
+		open();
+	}
+	
+	@Override
+	public final void reset(){
+		close();
+		open();
+		superReset();
+	}
+	
+	@Override
+	public synchronized final boolean close(){
+		if(verbose){System.err.println("ByteFile2("+name()+").close()");}
+		if(isOpen()){
+//			errorState|=ReadWrite.killProcess(name());
+			thread.shutdown();
+			while(thread.getState()!=Thread.State.TERMINATED){
+				try {
+					thread.join();
+				} catch (InterruptedException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+			thread.bf1.close();
+		}
+		thread=null;
+		currentList=null;
+		currentLoc=0;
+//		assert(numIn==numOut) : numIn+", "+numOut;
+		pushBack=null;
+		if(verbose){System.err.println("ByteFile2("+name()+").close() returned "+errorState);}
+		return errorState;
+	}
+	
+	@Override
+	public final byte[] nextLine(){
+		
+		if(pushBack!=null){//Commenting out does not seem to improve speed here.
+			byte[] temp=pushBack;
+			pushBack=null;
+			return temp;
+		}
+		
+//		if(verbose){System.err.println("Reading line.");}
+//		byte[] r=null;
+		
+		byte[][] temp=currentList;
+		int tempLoc=currentLoc;
+		
+		if(temp==null || tempLoc>=temp.length || temp[tempLoc]==null){
+			boolean b=getBuffer();
+			if(!b){
+				if(verbose2){System.err.println("nextLine()->getBuffer() returned false.");}
+				return null;
+			}
+			temp=currentList;
+			tempLoc=currentLoc;
+			if(temp==null || temp==poison || temp[tempLoc]==null){
+				return null;
+			}
+		}
+		
+		//TODO: This is a race condition; currentList can be changed to null.  A defensive copy could be created.
+		//Note that I read the above warning and added "temp" and "temploc" but I'm not sure if that fixed anything.
+		assert(temp!=null && temp!=poison);
+		assert(tempLoc<temp.length);
+		assert(temp[tempLoc]!=null);
+		byte[] r=temp[tempLoc];
+		assert(r!=null);
+		currentLoc++;
+//		numOut++;
+		return r;
+	}
+	
+	private boolean getBuffer(){
+		if(verbose2){System.err.println("Getting new buffer.");}
+		currentLoc=0;
+		final BF1Thread bft=thread;
+		if(bft==null){
+			currentList=null;
+			if(verbose2){System.err.println("No buffers available.  thread="+thread+", shutdown="+(thread==null ? "X" : ""+thread.shutdown));}
+			return false;
+		}
+		if(currentList==poison){
+			if(verbose2){System.err.println("A: Current list is poison.");}
+			return false;
+		}
+		if(currentList!=null){
+			Arrays.fill(currentList, null); //MUST be done or lines get recycled at end of file.
+			while(currentList!=null){
+				try {
+					if(verbose2){System.err.println("adding to qEmpty list size "+currentList.length+"\n"+toString(currentList));}
+					bft.qEmpty.put(currentList);
+					currentList=null;
+				} catch (InterruptedException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+		}
+		assert(currentList==null);
+		while(currentList==null){
+			try {
+				assert(bft!=null);
+				if(verbose2){System.err.println("C: qFull.size()="+bft.qFull.size());}
+				currentList=bft.qFull.take();
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		}
+		if(verbose2){
+			if(currentList==poison){
+				System.err.println("B: Current list is poison.");
+			}else{
+				System.err.println("getBuffer fetched a new buffer of size "+currentList.length);
+			}
+		}
+		return currentList!=poison;
+	}
+	
+	private final synchronized BF1Thread open(){
+		if(verbose2){System.err.println("ByteFile2("+name()+").open()");}
+		assert(thread==null);
+		currentList=null;
+		currentLoc=0;
+//		numIn=0;
+//		numOut=0;
+		thread=new BF1Thread(ff);
+		thread.start();
+		return thread;
+	}
+	
+	private class BF1Thread extends Thread{
+		
+//		public BF1Thread(String fname){
+//			bf1=new ByteFile1(fname, false, allowSubprocess);
+//			qFull=new ArrayBlockingQueue<byte[][]>(buffs+2);
+//			qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2);
+//			for(int i=0; i<buffs; i++){
+//				try {
+//					qEmpty.put(new byte[bufflen][]);
+//				} catch (InterruptedException e) {
+//					// TODO Auto-generated catch block
+//					e.printStackTrace();
+//				}
+//			}
+//		}
+		
+		public BF1Thread(FileFormat ff){
+			bf1=new ByteFile1(ff);
+			qFull=new ArrayBlockingQueue<byte[][]>(buffs+2);
+			qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2);
+			for(int i=0; i<buffs; i++){
+				try {
+					qEmpty.put(new byte[bufflen][]);
+				} catch (InterruptedException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+		}
+		
+		@Override
+		public void run(){
+			if(verbose){System.err.println("ByteFile2("+name()+").run()");}
+			byte[] s=null;
+			byte[][] list=null;
+			while(list==null){
+				try {
+					list = qEmpty.take();
+				} catch (InterruptedException e1) {
+					// TODO Auto-generated catch block
+					e1.printStackTrace();
+				}
+			}
+			synchronized(this){
+				if(list==poison || shutdown){
+					shutdown();
+					return;
+				}
+			}
+			
+			int loc=0;
+			long bases=0;
+			
+			//At this point, list is not null
+			for(s=bf1.nextLine(); s!=null; s=bf1.nextLine()){
+				bases+=s.length;
+				assert(list!=null) : "Somehow the list became null for "+bf1.name()+" at line "+cntr;
+				list[loc]=s;
+				loc++;
+//				numIn++;
+//				if(verbose){System.err.println("Added line "+numIn);}
+				if(loc>=bufflen || bases>=buffcapacity){
+					if(verbose2){System.err.println("Capacity exceeded.");}
+					while(list!=null){
+						try {
+//							synchronized(this){
+//								if(!shutdown){
+									if(verbose2){
+										System.err.println("A: Adding to qFull list of size "+loc);
+										System.err.println(ByteFile2.toString(list));
+									}
+									cntr+=list.length;
+									qFull.put(list);
+									if(verbose2){System.err.println("A: qFull.size()="+qFull.size());}
+//								}
+//							}
+							list=null;
+							loc=0;
+						} catch (InterruptedException e) {
+							// TODO Auto-generated catch block
+							e.printStackTrace();
+						}
+					}
+					//At this point, list is null
+					if(shutdown){
+						if(verbose2){System.err.println("Break 1");}
+						break;
+					}
+					while(list==null){
+						if(verbose2){System.err.println("Taking empty list.");}
+						try {
+							list = qEmpty.take();
+						} catch (InterruptedException e1) {
+							// TODO Auto-generated catch block
+							e1.printStackTrace();
+						}
+					}
+					//At this point, list is not null
+					bases=0;
+					if(list==poison){
+						if(verbose2){System.err.println("Break 2");}
+						break;
+					}
+					//At this point, list is not null
+				}
+			}
+			if(verbose2){System.err.println("Run loop exit.");}
+			
+			while(list!=null && loc>0){
+				try {
+//					synchronized(this){
+//						if(!shutdown){
+							if(verbose2){System.err.println("B: Adding list of size "+loc);}
+							qFull.put(list);
+							if(verbose2){System.err.println("B: qFull.size()="+qFull.size());}
+//						}
+//					}
+					list=null;
+					loc=0;
+				} catch (InterruptedException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			}
+			//At this point, list is null
+			shutdown();
+			
+			if(verbose){System.err.println("ByteFile2("+name()+").run() finished");}
+		}
+		
+		synchronized void shutdown(){
+			if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown()");}
+			if(shutdown){return;}
+			shutdown=true;
+			if(verbose2){System.err.println("Adding poison.");}
+			qFull.add(poison);
+			qEmpty.add(poison);
+			if(verbose2){System.err.println("D: qFull.size()="+qFull.size());}
+			if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown() finished");}
+		}
+		
+		private boolean shutdown=false;
+		final ByteFile1 bf1;
+		final ArrayBlockingQueue<byte[][]> qFull;
+		final ArrayBlockingQueue<byte[][]> qEmpty;
+		
+	}
+	
+	@Override
+	public boolean isOpen(){
+		final byte[][] list=currentList;
+		final int loc=currentLoc;
+		if(list!=null && loc<list.length && list[loc]!=null){return true;}
+		final BF1Thread bft=thread;
+		if(bft==null){
+			return false;
+		}
+		return true;
+//		synchronized(bft){
+//		//NOTE!!!  This cannot be used because qFull.size() will not return a correctly synchronized value.  Poll() may work.
+//			assert(bft.bf1.isOpen() || !bft.qFull.isEmpty()) : bft.bf1.isOpen()+", "+bft.qFull.isEmpty()+", "+bft.qFull.size();
+//			return (bft.bf1.isOpen() || !bft.qFull.isEmpty());
+//		}
+	}
+	
+	@Override
+	public final void pushBack(byte[] line){
+		assert(pushBack==null);
+		pushBack=line;
+	}
+
+//	@Override
+//	public void pushBack(byte[] line) {
+//		if(bstart>line.length){
+//			bstart--;
+//			buffer[bstart]='\n';
+//			for(int i=0, j=bstart-line.length; i<line.length; i++, j++){
+//				buffer[j]=line[i];
+//			}
+//			bstart=bstart-line.length;
+//			return;
+//		}
+//		
+//		int bLen=bstop-bstart;
+//		int newLen=bLen+line.length+1;
+//		int rShift=line.length+1-bstart;
+//		assert(rShift>0) : bstop+", "+bstart+", "+line.length;
+//		while(newLen>buffer.length){
+//			//This could get big if pushback is used often,
+//			//unless special steps are taken to prevent it, like leaving extra space for pushbacks.
+//			buffer=Arrays.copyOf(buffer, buffer.length*2);
+//		}
+//		
+//		Tools.shiftRight(buffer, rShift);
+//		
+//		for(int i=0; i<line.length; i++){
+//			buffer[i]=line[i];
+//		}
+//		buffer[line.length]='\n';
+//		bstart=0;
+//		bstop=newLen;
+//	}
+	
+	/** For debugging */
+	private static String toString(byte[][] x){
+		StringBuilder sb=new StringBuilder();
+		for(byte[] z : x){
+			sb.append(z==null ? "null" : new String(z)).append('\n');
+		}
+		return sb.toString();
+	}
+	
+	@Override
+	public final InputStream is(){return thread==null ? null : thread.bf1.is();}
+	
+	@Override
+	public final long lineNum(){return thread==null ? -1 : thread.bf1.lineNum();}
+
+	long cntr;
+	private BF1Thread thread=null;
+	private byte[][] currentList=null;
+	private int currentLoc=0;
+//	private int currentSize=0;
+	
+//	private long numIn=0, numOut=0;
+	
+	private byte[] pushBack=null;
+	
+	static final byte[][] poison=new byte[0][];
+	public static boolean verbose=false;
+	private static final boolean verbose2=false;
+	private static final int bufflen=1000;
+	private static final int buffs=4;
+	private static final int buffcapacity=256000;
+	
+	private boolean errorState=false;
+	
+}