comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/fileIO/ByteFile2.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 package fileIO;
2 import java.io.InputStream;
3 import java.util.Arrays;
4 import java.util.concurrent.ArrayBlockingQueue;
5
6 import shared.Timer;
7 import shared.Tools;
8
9
10 /**
11 * Runs a ByteFile1 in a separate thread. Can speed up disk reading, particularly of compressed files, at cost of slightly more work done.
12 * Drop-in compatible with ByteFile1.
13 * @author Brian Bushnell
14 * @date Sep 23, 2013
15 *
16 */
17 public final class ByteFile2 extends ByteFile {
18
19
20 public static void main(String[] args){
21 ByteFile2 tf=new ByteFile2(args.length>0 ? args[0] : "stdin", true);
22 long first=0, last=100;
23 boolean speedtest=false;
24 if(args.length>1){
25 if(args[1].equalsIgnoreCase("speedtest")){
26 speedtest=true;
27 first=0;
28 last=Long.MAX_VALUE;
29 }else{
30 first=Integer.parseInt(args[1]);
31 last=first+100;
32 }
33 }
34 if(args.length>2){
35 last=Integer.parseInt(args[2]);
36 }
37 speedtest(tf, first, last, !speedtest);
38
39 tf.close();
40 tf.reset();
41 tf.close();
42 }
43
44 private static void speedtest(ByteFile2 tf, long first, long last, boolean reprint){
45 Timer t=new Timer();
46 long lines=0;
47 long bytes=0;
48 for(long i=0; i<first; i++){tf.nextLine();}
49 if(reprint){
50 for(long i=first; i<last; i++){
51 byte[] s=tf.nextLine();
52 if(s==null){break;}
53
54 lines++;
55 bytes+=s.length;
56 System.out.println(new String(s));
57 }
58
59 System.err.println("\n");
60 System.err.println("Lines: "+lines);
61 System.err.println("Bytes: "+bytes);
62 }else{
63 for(long i=first; i<last; i++){
64 byte[] s=tf.nextLine();
65 if(s==null){break;}
66 lines++;
67 bytes+=s.length;
68 }
69 }
70 t.stop();
71
72 if(!reprint){
73 System.err.println(Tools.timeLinesBytesProcessed(t, lines, bytes, 8));
74 }
75 }
76
77 // public ByteFile2(String name()){this(name(), false);}
78
79 public ByteFile2(String fname, boolean allowSubprocess_){
80 this(FileFormat.testInput(fname, FileFormat.TEXT, null, allowSubprocess_, false));
81 }
82
83 public ByteFile2(FileFormat ff){
84 super(ff);
85 if(verbose){System.err.println("ByteFile2("+ff+")");}
86 open();
87 }
88
89 @Override
90 public final void reset(){
91 close();
92 open();
93 superReset();
94 }
95
96 @Override
97 public synchronized final boolean close(){
98 if(verbose){System.err.println("ByteFile2("+name()+").close()");}
99 if(isOpen()){
100 // errorState|=ReadWrite.killProcess(name());
101 thread.shutdown();
102 while(thread.getState()!=Thread.State.TERMINATED){
103 try {
104 thread.join();
105 } catch (InterruptedException e) {
106 // TODO Auto-generated catch block
107 e.printStackTrace();
108 }
109 }
110 thread.bf1.close();
111 }
112 thread=null;
113 currentList=null;
114 currentLoc=0;
115 // assert(numIn==numOut) : numIn+", "+numOut;
116 pushBack=null;
117 if(verbose){System.err.println("ByteFile2("+name()+").close() returned "+errorState);}
118 return errorState;
119 }
120
121 @Override
122 public final byte[] nextLine(){
123
124 if(pushBack!=null){//Commenting out does not seem to improve speed here.
125 byte[] temp=pushBack;
126 pushBack=null;
127 return temp;
128 }
129
130 // if(verbose){System.err.println("Reading line.");}
131 // byte[] r=null;
132
133 byte[][] temp=currentList;
134 int tempLoc=currentLoc;
135
136 if(temp==null || tempLoc>=temp.length || temp[tempLoc]==null){
137 boolean b=getBuffer();
138 if(!b){
139 if(verbose2){System.err.println("nextLine()->getBuffer() returned false.");}
140 return null;
141 }
142 temp=currentList;
143 tempLoc=currentLoc;
144 if(temp==null || temp==poison || temp[tempLoc]==null){
145 return null;
146 }
147 }
148
149 //TODO: This is a race condition; currentList can be changed to null. A defensive copy could be created.
150 //Note that I read the above warning and added "temp" and "temploc" but I'm not sure if that fixed anything.
151 assert(temp!=null && temp!=poison);
152 assert(tempLoc<temp.length);
153 assert(temp[tempLoc]!=null);
154 byte[] r=temp[tempLoc];
155 assert(r!=null);
156 currentLoc++;
157 // numOut++;
158 return r;
159 }
160
161 private boolean getBuffer(){
162 if(verbose2){System.err.println("Getting new buffer.");}
163 currentLoc=0;
164 final BF1Thread bft=thread;
165 if(bft==null){
166 currentList=null;
167 if(verbose2){System.err.println("No buffers available. thread="+thread+", shutdown="+(thread==null ? "X" : ""+thread.shutdown));}
168 return false;
169 }
170 if(currentList==poison){
171 if(verbose2){System.err.println("A: Current list is poison.");}
172 return false;
173 }
174 if(currentList!=null){
175 Arrays.fill(currentList, null); //MUST be done or lines get recycled at end of file.
176 while(currentList!=null){
177 try {
178 if(verbose2){System.err.println("adding to qEmpty list size "+currentList.length+"\n"+toString(currentList));}
179 bft.qEmpty.put(currentList);
180 currentList=null;
181 } catch (InterruptedException e) {
182 // TODO Auto-generated catch block
183 e.printStackTrace();
184 }
185 }
186 }
187 assert(currentList==null);
188 while(currentList==null){
189 try {
190 assert(bft!=null);
191 if(verbose2){System.err.println("C: qFull.size()="+bft.qFull.size());}
192 currentList=bft.qFull.take();
193 } catch (InterruptedException e) {
194 // TODO Auto-generated catch block
195 e.printStackTrace();
196 }
197 }
198 if(verbose2){
199 if(currentList==poison){
200 System.err.println("B: Current list is poison.");
201 }else{
202 System.err.println("getBuffer fetched a new buffer of size "+currentList.length);
203 }
204 }
205 return currentList!=poison;
206 }
207
208 private final synchronized BF1Thread open(){
209 if(verbose2){System.err.println("ByteFile2("+name()+").open()");}
210 assert(thread==null);
211 currentList=null;
212 currentLoc=0;
213 // numIn=0;
214 // numOut=0;
215 thread=new BF1Thread(ff);
216 thread.start();
217 return thread;
218 }
219
220 private class BF1Thread extends Thread{
221
222 // public BF1Thread(String fname){
223 // bf1=new ByteFile1(fname, false, allowSubprocess);
224 // qFull=new ArrayBlockingQueue<byte[][]>(buffs+2);
225 // qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2);
226 // for(int i=0; i<buffs; i++){
227 // try {
228 // qEmpty.put(new byte[bufflen][]);
229 // } catch (InterruptedException e) {
230 // // TODO Auto-generated catch block
231 // e.printStackTrace();
232 // }
233 // }
234 // }
235
236 public BF1Thread(FileFormat ff){
237 bf1=new ByteFile1(ff);
238 qFull=new ArrayBlockingQueue<byte[][]>(buffs+2);
239 qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2);
240 for(int i=0; i<buffs; i++){
241 try {
242 qEmpty.put(new byte[bufflen][]);
243 } catch (InterruptedException e) {
244 // TODO Auto-generated catch block
245 e.printStackTrace();
246 }
247 }
248 }
249
250 @Override
251 public void run(){
252 if(verbose){System.err.println("ByteFile2("+name()+").run()");}
253 byte[] s=null;
254 byte[][] list=null;
255 while(list==null){
256 try {
257 list = qEmpty.take();
258 } catch (InterruptedException e1) {
259 // TODO Auto-generated catch block
260 e1.printStackTrace();
261 }
262 }
263 synchronized(this){
264 if(list==poison || shutdown){
265 shutdown();
266 return;
267 }
268 }
269
270 int loc=0;
271 long bases=0;
272
273 //At this point, list is not null
274 for(s=bf1.nextLine(); s!=null; s=bf1.nextLine()){
275 bases+=s.length;
276 assert(list!=null) : "Somehow the list became null for "+bf1.name()+" at line "+cntr;
277 list[loc]=s;
278 loc++;
279 // numIn++;
280 // if(verbose){System.err.println("Added line "+numIn);}
281 if(loc>=bufflen || bases>=buffcapacity){
282 if(verbose2){System.err.println("Capacity exceeded.");}
283 while(list!=null){
284 try {
285 // synchronized(this){
286 // if(!shutdown){
287 if(verbose2){
288 System.err.println("A: Adding to qFull list of size "+loc);
289 System.err.println(ByteFile2.toString(list));
290 }
291 cntr+=list.length;
292 qFull.put(list);
293 if(verbose2){System.err.println("A: qFull.size()="+qFull.size());}
294 // }
295 // }
296 list=null;
297 loc=0;
298 } catch (InterruptedException e) {
299 // TODO Auto-generated catch block
300 e.printStackTrace();
301 }
302 }
303 //At this point, list is null
304 if(shutdown){
305 if(verbose2){System.err.println("Break 1");}
306 break;
307 }
308 while(list==null){
309 if(verbose2){System.err.println("Taking empty list.");}
310 try {
311 list = qEmpty.take();
312 } catch (InterruptedException e1) {
313 // TODO Auto-generated catch block
314 e1.printStackTrace();
315 }
316 }
317 //At this point, list is not null
318 bases=0;
319 if(list==poison){
320 if(verbose2){System.err.println("Break 2");}
321 break;
322 }
323 //At this point, list is not null
324 }
325 }
326 if(verbose2){System.err.println("Run loop exit.");}
327
328 while(list!=null && loc>0){
329 try {
330 // synchronized(this){
331 // if(!shutdown){
332 if(verbose2){System.err.println("B: Adding list of size "+loc);}
333 qFull.put(list);
334 if(verbose2){System.err.println("B: qFull.size()="+qFull.size());}
335 // }
336 // }
337 list=null;
338 loc=0;
339 } catch (InterruptedException e) {
340 // TODO Auto-generated catch block
341 e.printStackTrace();
342 }
343 }
344 //At this point, list is null
345 shutdown();
346
347 if(verbose){System.err.println("ByteFile2("+name()+").run() finished");}
348 }
349
350 synchronized void shutdown(){
351 if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown()");}
352 if(shutdown){return;}
353 shutdown=true;
354 if(verbose2){System.err.println("Adding poison.");}
355 qFull.add(poison);
356 qEmpty.add(poison);
357 if(verbose2){System.err.println("D: qFull.size()="+qFull.size());}
358 if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown() finished");}
359 }
360
361 private boolean shutdown=false;
362 final ByteFile1 bf1;
363 final ArrayBlockingQueue<byte[][]> qFull;
364 final ArrayBlockingQueue<byte[][]> qEmpty;
365
366 }
367
368 @Override
369 public boolean isOpen(){
370 final byte[][] list=currentList;
371 final int loc=currentLoc;
372 if(list!=null && loc<list.length && list[loc]!=null){return true;}
373 final BF1Thread bft=thread;
374 if(bft==null){
375 return false;
376 }
377 return true;
378 // synchronized(bft){
379 // //NOTE!!! This cannot be used because qFull.size() will not return a correctly synchronized value. Poll() may work.
380 // assert(bft.bf1.isOpen() || !bft.qFull.isEmpty()) : bft.bf1.isOpen()+", "+bft.qFull.isEmpty()+", "+bft.qFull.size();
381 // return (bft.bf1.isOpen() || !bft.qFull.isEmpty());
382 // }
383 }
384
385 @Override
386 public final void pushBack(byte[] line){
387 assert(pushBack==null);
388 pushBack=line;
389 }
390
391 // @Override
392 // public void pushBack(byte[] line) {
393 // if(bstart>line.length){
394 // bstart--;
395 // buffer[bstart]='\n';
396 // for(int i=0, j=bstart-line.length; i<line.length; i++, j++){
397 // buffer[j]=line[i];
398 // }
399 // bstart=bstart-line.length;
400 // return;
401 // }
402 //
403 // int bLen=bstop-bstart;
404 // int newLen=bLen+line.length+1;
405 // int rShift=line.length+1-bstart;
406 // assert(rShift>0) : bstop+", "+bstart+", "+line.length;
407 // while(newLen>buffer.length){
408 // //This could get big if pushback is used often,
409 // //unless special steps are taken to prevent it, like leaving extra space for pushbacks.
410 // buffer=Arrays.copyOf(buffer, buffer.length*2);
411 // }
412 //
413 // Tools.shiftRight(buffer, rShift);
414 //
415 // for(int i=0; i<line.length; i++){
416 // buffer[i]=line[i];
417 // }
418 // buffer[line.length]='\n';
419 // bstart=0;
420 // bstop=newLen;
421 // }
422
423 /** For debugging */
424 private static String toString(byte[][] x){
425 StringBuilder sb=new StringBuilder();
426 for(byte[] z : x){
427 sb.append(z==null ? "null" : new String(z)).append('\n');
428 }
429 return sb.toString();
430 }
431
432 @Override
433 public final InputStream is(){return thread==null ? null : thread.bf1.is();}
434
435 @Override
436 public final long lineNum(){return thread==null ? -1 : thread.bf1.lineNum();}
437
438 long cntr;
439 private BF1Thread thread=null;
440 private byte[][] currentList=null;
441 private int currentLoc=0;
442 // private int currentSize=0;
443
444 // private long numIn=0, numOut=0;
445
446 private byte[] pushBack=null;
447
448 static final byte[][] poison=new byte[0][];
449 public static boolean verbose=false;
450 private static final boolean verbose2=false;
451 private static final int bufflen=1000;
452 private static final int buffs=4;
453 private static final int buffcapacity=256000;
454
455 private boolean errorState=false;
456
457 }