annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/fileIO/ByteFile2.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 package fileIO;
jpayne@68 2 import java.io.InputStream;
jpayne@68 3 import java.util.Arrays;
jpayne@68 4 import java.util.concurrent.ArrayBlockingQueue;
jpayne@68 5
jpayne@68 6 import shared.Timer;
jpayne@68 7 import shared.Tools;
jpayne@68 8
jpayne@68 9
jpayne@68 10 /**
jpayne@68 11 * Runs a ByteFile1 in a separate thread. Can speed up disk reading, particularly of compressed files, at cost of slightly more work done.
jpayne@68 12 * Drop-in compatible with ByteFile1.
jpayne@68 13 * @author Brian Bushnell
jpayne@68 14 * @date Sep 23, 2013
jpayne@68 15 *
jpayne@68 16 */
jpayne@68 17 public final class ByteFile2 extends ByteFile {
jpayne@68 18
jpayne@68 19
jpayne@68 20 public static void main(String[] args){
jpayne@68 21 ByteFile2 tf=new ByteFile2(args.length>0 ? args[0] : "stdin", true);
jpayne@68 22 long first=0, last=100;
jpayne@68 23 boolean speedtest=false;
jpayne@68 24 if(args.length>1){
jpayne@68 25 if(args[1].equalsIgnoreCase("speedtest")){
jpayne@68 26 speedtest=true;
jpayne@68 27 first=0;
jpayne@68 28 last=Long.MAX_VALUE;
jpayne@68 29 }else{
jpayne@68 30 first=Integer.parseInt(args[1]);
jpayne@68 31 last=first+100;
jpayne@68 32 }
jpayne@68 33 }
jpayne@68 34 if(args.length>2){
jpayne@68 35 last=Integer.parseInt(args[2]);
jpayne@68 36 }
jpayne@68 37 speedtest(tf, first, last, !speedtest);
jpayne@68 38
jpayne@68 39 tf.close();
jpayne@68 40 tf.reset();
jpayne@68 41 tf.close();
jpayne@68 42 }
jpayne@68 43
jpayne@68 44 private static void speedtest(ByteFile2 tf, long first, long last, boolean reprint){
jpayne@68 45 Timer t=new Timer();
jpayne@68 46 long lines=0;
jpayne@68 47 long bytes=0;
jpayne@68 48 for(long i=0; i<first; i++){tf.nextLine();}
jpayne@68 49 if(reprint){
jpayne@68 50 for(long i=first; i<last; i++){
jpayne@68 51 byte[] s=tf.nextLine();
jpayne@68 52 if(s==null){break;}
jpayne@68 53
jpayne@68 54 lines++;
jpayne@68 55 bytes+=s.length;
jpayne@68 56 System.out.println(new String(s));
jpayne@68 57 }
jpayne@68 58
jpayne@68 59 System.err.println("\n");
jpayne@68 60 System.err.println("Lines: "+lines);
jpayne@68 61 System.err.println("Bytes: "+bytes);
jpayne@68 62 }else{
jpayne@68 63 for(long i=first; i<last; i++){
jpayne@68 64 byte[] s=tf.nextLine();
jpayne@68 65 if(s==null){break;}
jpayne@68 66 lines++;
jpayne@68 67 bytes+=s.length;
jpayne@68 68 }
jpayne@68 69 }
jpayne@68 70 t.stop();
jpayne@68 71
jpayne@68 72 if(!reprint){
jpayne@68 73 System.err.println(Tools.timeLinesBytesProcessed(t, lines, bytes, 8));
jpayne@68 74 }
jpayne@68 75 }
jpayne@68 76
jpayne@68 77 // public ByteFile2(String name()){this(name(), false);}
jpayne@68 78
jpayne@68 79 public ByteFile2(String fname, boolean allowSubprocess_){
jpayne@68 80 this(FileFormat.testInput(fname, FileFormat.TEXT, null, allowSubprocess_, false));
jpayne@68 81 }
jpayne@68 82
jpayne@68 83 public ByteFile2(FileFormat ff){
jpayne@68 84 super(ff);
jpayne@68 85 if(verbose){System.err.println("ByteFile2("+ff+")");}
jpayne@68 86 open();
jpayne@68 87 }
jpayne@68 88
jpayne@68 89 @Override
jpayne@68 90 public final void reset(){
jpayne@68 91 close();
jpayne@68 92 open();
jpayne@68 93 superReset();
jpayne@68 94 }
jpayne@68 95
jpayne@68 96 @Override
jpayne@68 97 public synchronized final boolean close(){
jpayne@68 98 if(verbose){System.err.println("ByteFile2("+name()+").close()");}
jpayne@68 99 if(isOpen()){
jpayne@68 100 // errorState|=ReadWrite.killProcess(name());
jpayne@68 101 thread.shutdown();
jpayne@68 102 while(thread.getState()!=Thread.State.TERMINATED){
jpayne@68 103 try {
jpayne@68 104 thread.join();
jpayne@68 105 } catch (InterruptedException e) {
jpayne@68 106 // TODO Auto-generated catch block
jpayne@68 107 e.printStackTrace();
jpayne@68 108 }
jpayne@68 109 }
jpayne@68 110 thread.bf1.close();
jpayne@68 111 }
jpayne@68 112 thread=null;
jpayne@68 113 currentList=null;
jpayne@68 114 currentLoc=0;
jpayne@68 115 // assert(numIn==numOut) : numIn+", "+numOut;
jpayne@68 116 pushBack=null;
jpayne@68 117 if(verbose){System.err.println("ByteFile2("+name()+").close() returned "+errorState);}
jpayne@68 118 return errorState;
jpayne@68 119 }
jpayne@68 120
jpayne@68 121 @Override
jpayne@68 122 public final byte[] nextLine(){
jpayne@68 123
jpayne@68 124 if(pushBack!=null){//Commenting out does not seem to improve speed here.
jpayne@68 125 byte[] temp=pushBack;
jpayne@68 126 pushBack=null;
jpayne@68 127 return temp;
jpayne@68 128 }
jpayne@68 129
jpayne@68 130 // if(verbose){System.err.println("Reading line.");}
jpayne@68 131 // byte[] r=null;
jpayne@68 132
jpayne@68 133 byte[][] temp=currentList;
jpayne@68 134 int tempLoc=currentLoc;
jpayne@68 135
jpayne@68 136 if(temp==null || tempLoc>=temp.length || temp[tempLoc]==null){
jpayne@68 137 boolean b=getBuffer();
jpayne@68 138 if(!b){
jpayne@68 139 if(verbose2){System.err.println("nextLine()->getBuffer() returned false.");}
jpayne@68 140 return null;
jpayne@68 141 }
jpayne@68 142 temp=currentList;
jpayne@68 143 tempLoc=currentLoc;
jpayne@68 144 if(temp==null || temp==poison || temp[tempLoc]==null){
jpayne@68 145 return null;
jpayne@68 146 }
jpayne@68 147 }
jpayne@68 148
jpayne@68 149 //TODO: This is a race condition; currentList can be changed to null. A defensive copy could be created.
jpayne@68 150 //Note that I read the above warning and added "temp" and "temploc" but I'm not sure if that fixed anything.
jpayne@68 151 assert(temp!=null && temp!=poison);
jpayne@68 152 assert(tempLoc<temp.length);
jpayne@68 153 assert(temp[tempLoc]!=null);
jpayne@68 154 byte[] r=temp[tempLoc];
jpayne@68 155 assert(r!=null);
jpayne@68 156 currentLoc++;
jpayne@68 157 // numOut++;
jpayne@68 158 return r;
jpayne@68 159 }
jpayne@68 160
jpayne@68 161 private boolean getBuffer(){
jpayne@68 162 if(verbose2){System.err.println("Getting new buffer.");}
jpayne@68 163 currentLoc=0;
jpayne@68 164 final BF1Thread bft=thread;
jpayne@68 165 if(bft==null){
jpayne@68 166 currentList=null;
jpayne@68 167 if(verbose2){System.err.println("No buffers available. thread="+thread+", shutdown="+(thread==null ? "X" : ""+thread.shutdown));}
jpayne@68 168 return false;
jpayne@68 169 }
jpayne@68 170 if(currentList==poison){
jpayne@68 171 if(verbose2){System.err.println("A: Current list is poison.");}
jpayne@68 172 return false;
jpayne@68 173 }
jpayne@68 174 if(currentList!=null){
jpayne@68 175 Arrays.fill(currentList, null); //MUST be done or lines get recycled at end of file.
jpayne@68 176 while(currentList!=null){
jpayne@68 177 try {
jpayne@68 178 if(verbose2){System.err.println("adding to qEmpty list size "+currentList.length+"\n"+toString(currentList));}
jpayne@68 179 bft.qEmpty.put(currentList);
jpayne@68 180 currentList=null;
jpayne@68 181 } catch (InterruptedException e) {
jpayne@68 182 // TODO Auto-generated catch block
jpayne@68 183 e.printStackTrace();
jpayne@68 184 }
jpayne@68 185 }
jpayne@68 186 }
jpayne@68 187 assert(currentList==null);
jpayne@68 188 while(currentList==null){
jpayne@68 189 try {
jpayne@68 190 assert(bft!=null);
jpayne@68 191 if(verbose2){System.err.println("C: qFull.size()="+bft.qFull.size());}
jpayne@68 192 currentList=bft.qFull.take();
jpayne@68 193 } catch (InterruptedException e) {
jpayne@68 194 // TODO Auto-generated catch block
jpayne@68 195 e.printStackTrace();
jpayne@68 196 }
jpayne@68 197 }
jpayne@68 198 if(verbose2){
jpayne@68 199 if(currentList==poison){
jpayne@68 200 System.err.println("B: Current list is poison.");
jpayne@68 201 }else{
jpayne@68 202 System.err.println("getBuffer fetched a new buffer of size "+currentList.length);
jpayne@68 203 }
jpayne@68 204 }
jpayne@68 205 return currentList!=poison;
jpayne@68 206 }
jpayne@68 207
jpayne@68 208 private final synchronized BF1Thread open(){
jpayne@68 209 if(verbose2){System.err.println("ByteFile2("+name()+").open()");}
jpayne@68 210 assert(thread==null);
jpayne@68 211 currentList=null;
jpayne@68 212 currentLoc=0;
jpayne@68 213 // numIn=0;
jpayne@68 214 // numOut=0;
jpayne@68 215 thread=new BF1Thread(ff);
jpayne@68 216 thread.start();
jpayne@68 217 return thread;
jpayne@68 218 }
jpayne@68 219
jpayne@68 220 private class BF1Thread extends Thread{
jpayne@68 221
jpayne@68 222 // public BF1Thread(String fname){
jpayne@68 223 // bf1=new ByteFile1(fname, false, allowSubprocess);
jpayne@68 224 // qFull=new ArrayBlockingQueue<byte[][]>(buffs+2);
jpayne@68 225 // qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2);
jpayne@68 226 // for(int i=0; i<buffs; i++){
jpayne@68 227 // try {
jpayne@68 228 // qEmpty.put(new byte[bufflen][]);
jpayne@68 229 // } catch (InterruptedException e) {
jpayne@68 230 // // TODO Auto-generated catch block
jpayne@68 231 // e.printStackTrace();
jpayne@68 232 // }
jpayne@68 233 // }
jpayne@68 234 // }
jpayne@68 235
jpayne@68 236 public BF1Thread(FileFormat ff){
jpayne@68 237 bf1=new ByteFile1(ff);
jpayne@68 238 qFull=new ArrayBlockingQueue<byte[][]>(buffs+2);
jpayne@68 239 qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2);
jpayne@68 240 for(int i=0; i<buffs; i++){
jpayne@68 241 try {
jpayne@68 242 qEmpty.put(new byte[bufflen][]);
jpayne@68 243 } catch (InterruptedException e) {
jpayne@68 244 // TODO Auto-generated catch block
jpayne@68 245 e.printStackTrace();
jpayne@68 246 }
jpayne@68 247 }
jpayne@68 248 }
jpayne@68 249
jpayne@68 250 @Override
jpayne@68 251 public void run(){
jpayne@68 252 if(verbose){System.err.println("ByteFile2("+name()+").run()");}
jpayne@68 253 byte[] s=null;
jpayne@68 254 byte[][] list=null;
jpayne@68 255 while(list==null){
jpayne@68 256 try {
jpayne@68 257 list = qEmpty.take();
jpayne@68 258 } catch (InterruptedException e1) {
jpayne@68 259 // TODO Auto-generated catch block
jpayne@68 260 e1.printStackTrace();
jpayne@68 261 }
jpayne@68 262 }
jpayne@68 263 synchronized(this){
jpayne@68 264 if(list==poison || shutdown){
jpayne@68 265 shutdown();
jpayne@68 266 return;
jpayne@68 267 }
jpayne@68 268 }
jpayne@68 269
jpayne@68 270 int loc=0;
jpayne@68 271 long bases=0;
jpayne@68 272
jpayne@68 273 //At this point, list is not null
jpayne@68 274 for(s=bf1.nextLine(); s!=null; s=bf1.nextLine()){
jpayne@68 275 bases+=s.length;
jpayne@68 276 assert(list!=null) : "Somehow the list became null for "+bf1.name()+" at line "+cntr;
jpayne@68 277 list[loc]=s;
jpayne@68 278 loc++;
jpayne@68 279 // numIn++;
jpayne@68 280 // if(verbose){System.err.println("Added line "+numIn);}
jpayne@68 281 if(loc>=bufflen || bases>=buffcapacity){
jpayne@68 282 if(verbose2){System.err.println("Capacity exceeded.");}
jpayne@68 283 while(list!=null){
jpayne@68 284 try {
jpayne@68 285 // synchronized(this){
jpayne@68 286 // if(!shutdown){
jpayne@68 287 if(verbose2){
jpayne@68 288 System.err.println("A: Adding to qFull list of size "+loc);
jpayne@68 289 System.err.println(ByteFile2.toString(list));
jpayne@68 290 }
jpayne@68 291 cntr+=list.length;
jpayne@68 292 qFull.put(list);
jpayne@68 293 if(verbose2){System.err.println("A: qFull.size()="+qFull.size());}
jpayne@68 294 // }
jpayne@68 295 // }
jpayne@68 296 list=null;
jpayne@68 297 loc=0;
jpayne@68 298 } catch (InterruptedException e) {
jpayne@68 299 // TODO Auto-generated catch block
jpayne@68 300 e.printStackTrace();
jpayne@68 301 }
jpayne@68 302 }
jpayne@68 303 //At this point, list is null
jpayne@68 304 if(shutdown){
jpayne@68 305 if(verbose2){System.err.println("Break 1");}
jpayne@68 306 break;
jpayne@68 307 }
jpayne@68 308 while(list==null){
jpayne@68 309 if(verbose2){System.err.println("Taking empty list.");}
jpayne@68 310 try {
jpayne@68 311 list = qEmpty.take();
jpayne@68 312 } catch (InterruptedException e1) {
jpayne@68 313 // TODO Auto-generated catch block
jpayne@68 314 e1.printStackTrace();
jpayne@68 315 }
jpayne@68 316 }
jpayne@68 317 //At this point, list is not null
jpayne@68 318 bases=0;
jpayne@68 319 if(list==poison){
jpayne@68 320 if(verbose2){System.err.println("Break 2");}
jpayne@68 321 break;
jpayne@68 322 }
jpayne@68 323 //At this point, list is not null
jpayne@68 324 }
jpayne@68 325 }
jpayne@68 326 if(verbose2){System.err.println("Run loop exit.");}
jpayne@68 327
jpayne@68 328 while(list!=null && loc>0){
jpayne@68 329 try {
jpayne@68 330 // synchronized(this){
jpayne@68 331 // if(!shutdown){
jpayne@68 332 if(verbose2){System.err.println("B: Adding list of size "+loc);}
jpayne@68 333 qFull.put(list);
jpayne@68 334 if(verbose2){System.err.println("B: qFull.size()="+qFull.size());}
jpayne@68 335 // }
jpayne@68 336 // }
jpayne@68 337 list=null;
jpayne@68 338 loc=0;
jpayne@68 339 } catch (InterruptedException e) {
jpayne@68 340 // TODO Auto-generated catch block
jpayne@68 341 e.printStackTrace();
jpayne@68 342 }
jpayne@68 343 }
jpayne@68 344 //At this point, list is null
jpayne@68 345 shutdown();
jpayne@68 346
jpayne@68 347 if(verbose){System.err.println("ByteFile2("+name()+").run() finished");}
jpayne@68 348 }
jpayne@68 349
jpayne@68 350 synchronized void shutdown(){
jpayne@68 351 if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown()");}
jpayne@68 352 if(shutdown){return;}
jpayne@68 353 shutdown=true;
jpayne@68 354 if(verbose2){System.err.println("Adding poison.");}
jpayne@68 355 qFull.add(poison);
jpayne@68 356 qEmpty.add(poison);
jpayne@68 357 if(verbose2){System.err.println("D: qFull.size()="+qFull.size());}
jpayne@68 358 if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown() finished");}
jpayne@68 359 }
jpayne@68 360
jpayne@68 361 private boolean shutdown=false;
jpayne@68 362 final ByteFile1 bf1;
jpayne@68 363 final ArrayBlockingQueue<byte[][]> qFull;
jpayne@68 364 final ArrayBlockingQueue<byte[][]> qEmpty;
jpayne@68 365
jpayne@68 366 }
jpayne@68 367
jpayne@68 368 @Override
jpayne@68 369 public boolean isOpen(){
jpayne@68 370 final byte[][] list=currentList;
jpayne@68 371 final int loc=currentLoc;
jpayne@68 372 if(list!=null && loc<list.length && list[loc]!=null){return true;}
jpayne@68 373 final BF1Thread bft=thread;
jpayne@68 374 if(bft==null){
jpayne@68 375 return false;
jpayne@68 376 }
jpayne@68 377 return true;
jpayne@68 378 // synchronized(bft){
jpayne@68 379 // //NOTE!!! This cannot be used because qFull.size() will not return a correctly synchronized value. Poll() may work.
jpayne@68 380 // assert(bft.bf1.isOpen() || !bft.qFull.isEmpty()) : bft.bf1.isOpen()+", "+bft.qFull.isEmpty()+", "+bft.qFull.size();
jpayne@68 381 // return (bft.bf1.isOpen() || !bft.qFull.isEmpty());
jpayne@68 382 // }
jpayne@68 383 }
jpayne@68 384
jpayne@68 385 @Override
jpayne@68 386 public final void pushBack(byte[] line){
jpayne@68 387 assert(pushBack==null);
jpayne@68 388 pushBack=line;
jpayne@68 389 }
jpayne@68 390
jpayne@68 391 // @Override
jpayne@68 392 // public void pushBack(byte[] line) {
jpayne@68 393 // if(bstart>line.length){
jpayne@68 394 // bstart--;
jpayne@68 395 // buffer[bstart]='\n';
jpayne@68 396 // for(int i=0, j=bstart-line.length; i<line.length; i++, j++){
jpayne@68 397 // buffer[j]=line[i];
jpayne@68 398 // }
jpayne@68 399 // bstart=bstart-line.length;
jpayne@68 400 // return;
jpayne@68 401 // }
jpayne@68 402 //
jpayne@68 403 // int bLen=bstop-bstart;
jpayne@68 404 // int newLen=bLen+line.length+1;
jpayne@68 405 // int rShift=line.length+1-bstart;
jpayne@68 406 // assert(rShift>0) : bstop+", "+bstart+", "+line.length;
jpayne@68 407 // while(newLen>buffer.length){
jpayne@68 408 // //This could get big if pushback is used often,
jpayne@68 409 // //unless special steps are taken to prevent it, like leaving extra space for pushbacks.
jpayne@68 410 // buffer=Arrays.copyOf(buffer, buffer.length*2);
jpayne@68 411 // }
jpayne@68 412 //
jpayne@68 413 // Tools.shiftRight(buffer, rShift);
jpayne@68 414 //
jpayne@68 415 // for(int i=0; i<line.length; i++){
jpayne@68 416 // buffer[i]=line[i];
jpayne@68 417 // }
jpayne@68 418 // buffer[line.length]='\n';
jpayne@68 419 // bstart=0;
jpayne@68 420 // bstop=newLen;
jpayne@68 421 // }
jpayne@68 422
jpayne@68 423 /** For debugging */
jpayne@68 424 private static String toString(byte[][] x){
jpayne@68 425 StringBuilder sb=new StringBuilder();
jpayne@68 426 for(byte[] z : x){
jpayne@68 427 sb.append(z==null ? "null" : new String(z)).append('\n');
jpayne@68 428 }
jpayne@68 429 return sb.toString();
jpayne@68 430 }
jpayne@68 431
jpayne@68 432 @Override
jpayne@68 433 public final InputStream is(){return thread==null ? null : thread.bf1.is();}
jpayne@68 434
jpayne@68 435 @Override
jpayne@68 436 public final long lineNum(){return thread==null ? -1 : thread.bf1.lineNum();}
jpayne@68 437
jpayne@68 438 long cntr;
jpayne@68 439 private BF1Thread thread=null;
jpayne@68 440 private byte[][] currentList=null;
jpayne@68 441 private int currentLoc=0;
jpayne@68 442 // private int currentSize=0;
jpayne@68 443
jpayne@68 444 // private long numIn=0, numOut=0;
jpayne@68 445
jpayne@68 446 private byte[] pushBack=null;
jpayne@68 447
jpayne@68 448 static final byte[][] poison=new byte[0][];
jpayne@68 449 public static boolean verbose=false;
jpayne@68 450 private static final boolean verbose2=false;
jpayne@68 451 private static final int bufflen=1000;
jpayne@68 452 private static final int buffs=4;
jpayne@68 453 private static final int buffcapacity=256000;
jpayne@68 454
jpayne@68 455 private boolean errorState=false;
jpayne@68 456
jpayne@68 457 }