Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/fileIO/ByteFile2.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 package fileIO; | |
2 import java.io.InputStream; | |
3 import java.util.Arrays; | |
4 import java.util.concurrent.ArrayBlockingQueue; | |
5 | |
6 import shared.Timer; | |
7 import shared.Tools; | |
8 | |
9 | |
10 /** | |
11 * Runs a ByteFile1 in a separate thread. Can speed up disk reading, particularly of compressed files, at cost of slightly more work done. | |
12 * Drop-in compatible with ByteFile1. | |
13 * @author Brian Bushnell | |
14 * @date Sep 23, 2013 | |
15 * | |
16 */ | |
17 public final class ByteFile2 extends ByteFile { | |
18 | |
19 | |
20 public static void main(String[] args){ | |
21 ByteFile2 tf=new ByteFile2(args.length>0 ? args[0] : "stdin", true); | |
22 long first=0, last=100; | |
23 boolean speedtest=false; | |
24 if(args.length>1){ | |
25 if(args[1].equalsIgnoreCase("speedtest")){ | |
26 speedtest=true; | |
27 first=0; | |
28 last=Long.MAX_VALUE; | |
29 }else{ | |
30 first=Integer.parseInt(args[1]); | |
31 last=first+100; | |
32 } | |
33 } | |
34 if(args.length>2){ | |
35 last=Integer.parseInt(args[2]); | |
36 } | |
37 speedtest(tf, first, last, !speedtest); | |
38 | |
39 tf.close(); | |
40 tf.reset(); | |
41 tf.close(); | |
42 } | |
43 | |
44 private static void speedtest(ByteFile2 tf, long first, long last, boolean reprint){ | |
45 Timer t=new Timer(); | |
46 long lines=0; | |
47 long bytes=0; | |
48 for(long i=0; i<first; i++){tf.nextLine();} | |
49 if(reprint){ | |
50 for(long i=first; i<last; i++){ | |
51 byte[] s=tf.nextLine(); | |
52 if(s==null){break;} | |
53 | |
54 lines++; | |
55 bytes+=s.length; | |
56 System.out.println(new String(s)); | |
57 } | |
58 | |
59 System.err.println("\n"); | |
60 System.err.println("Lines: "+lines); | |
61 System.err.println("Bytes: "+bytes); | |
62 }else{ | |
63 for(long i=first; i<last; i++){ | |
64 byte[] s=tf.nextLine(); | |
65 if(s==null){break;} | |
66 lines++; | |
67 bytes+=s.length; | |
68 } | |
69 } | |
70 t.stop(); | |
71 | |
72 if(!reprint){ | |
73 System.err.println(Tools.timeLinesBytesProcessed(t, lines, bytes, 8)); | |
74 } | |
75 } | |
76 | |
77 // public ByteFile2(String name()){this(name(), false);} | |
78 | |
79 public ByteFile2(String fname, boolean allowSubprocess_){ | |
80 this(FileFormat.testInput(fname, FileFormat.TEXT, null, allowSubprocess_, false)); | |
81 } | |
82 | |
83 public ByteFile2(FileFormat ff){ | |
84 super(ff); | |
85 if(verbose){System.err.println("ByteFile2("+ff+")");} | |
86 open(); | |
87 } | |
88 | |
89 @Override | |
90 public final void reset(){ | |
91 close(); | |
92 open(); | |
93 superReset(); | |
94 } | |
95 | |
96 @Override | |
97 public synchronized final boolean close(){ | |
98 if(verbose){System.err.println("ByteFile2("+name()+").close()");} | |
99 if(isOpen()){ | |
100 // errorState|=ReadWrite.killProcess(name()); | |
101 thread.shutdown(); | |
102 while(thread.getState()!=Thread.State.TERMINATED){ | |
103 try { | |
104 thread.join(); | |
105 } catch (InterruptedException e) { | |
106 // TODO Auto-generated catch block | |
107 e.printStackTrace(); | |
108 } | |
109 } | |
110 thread.bf1.close(); | |
111 } | |
112 thread=null; | |
113 currentList=null; | |
114 currentLoc=0; | |
115 // assert(numIn==numOut) : numIn+", "+numOut; | |
116 pushBack=null; | |
117 if(verbose){System.err.println("ByteFile2("+name()+").close() returned "+errorState);} | |
118 return errorState; | |
119 } | |
120 | |
121 @Override | |
122 public final byte[] nextLine(){ | |
123 | |
124 if(pushBack!=null){//Commenting out does not seem to improve speed here. | |
125 byte[] temp=pushBack; | |
126 pushBack=null; | |
127 return temp; | |
128 } | |
129 | |
130 // if(verbose){System.err.println("Reading line.");} | |
131 // byte[] r=null; | |
132 | |
133 byte[][] temp=currentList; | |
134 int tempLoc=currentLoc; | |
135 | |
136 if(temp==null || tempLoc>=temp.length || temp[tempLoc]==null){ | |
137 boolean b=getBuffer(); | |
138 if(!b){ | |
139 if(verbose2){System.err.println("nextLine()->getBuffer() returned false.");} | |
140 return null; | |
141 } | |
142 temp=currentList; | |
143 tempLoc=currentLoc; | |
144 if(temp==null || temp==poison || temp[tempLoc]==null){ | |
145 return null; | |
146 } | |
147 } | |
148 | |
149 //TODO: This is a race condition; currentList can be changed to null. A defensive copy could be created. | |
150 //Note that I read the above warning and added "temp" and "temploc" but I'm not sure if that fixed anything. | |
151 assert(temp!=null && temp!=poison); | |
152 assert(tempLoc<temp.length); | |
153 assert(temp[tempLoc]!=null); | |
154 byte[] r=temp[tempLoc]; | |
155 assert(r!=null); | |
156 currentLoc++; | |
157 // numOut++; | |
158 return r; | |
159 } | |
160 | |
161 private boolean getBuffer(){ | |
162 if(verbose2){System.err.println("Getting new buffer.");} | |
163 currentLoc=0; | |
164 final BF1Thread bft=thread; | |
165 if(bft==null){ | |
166 currentList=null; | |
167 if(verbose2){System.err.println("No buffers available. thread="+thread+", shutdown="+(thread==null ? "X" : ""+thread.shutdown));} | |
168 return false; | |
169 } | |
170 if(currentList==poison){ | |
171 if(verbose2){System.err.println("A: Current list is poison.");} | |
172 return false; | |
173 } | |
174 if(currentList!=null){ | |
175 Arrays.fill(currentList, null); //MUST be done or lines get recycled at end of file. | |
176 while(currentList!=null){ | |
177 try { | |
178 if(verbose2){System.err.println("adding to qEmpty list size "+currentList.length+"\n"+toString(currentList));} | |
179 bft.qEmpty.put(currentList); | |
180 currentList=null; | |
181 } catch (InterruptedException e) { | |
182 // TODO Auto-generated catch block | |
183 e.printStackTrace(); | |
184 } | |
185 } | |
186 } | |
187 assert(currentList==null); | |
188 while(currentList==null){ | |
189 try { | |
190 assert(bft!=null); | |
191 if(verbose2){System.err.println("C: qFull.size()="+bft.qFull.size());} | |
192 currentList=bft.qFull.take(); | |
193 } catch (InterruptedException e) { | |
194 // TODO Auto-generated catch block | |
195 e.printStackTrace(); | |
196 } | |
197 } | |
198 if(verbose2){ | |
199 if(currentList==poison){ | |
200 System.err.println("B: Current list is poison."); | |
201 }else{ | |
202 System.err.println("getBuffer fetched a new buffer of size "+currentList.length); | |
203 } | |
204 } | |
205 return currentList!=poison; | |
206 } | |
207 | |
208 private final synchronized BF1Thread open(){ | |
209 if(verbose2){System.err.println("ByteFile2("+name()+").open()");} | |
210 assert(thread==null); | |
211 currentList=null; | |
212 currentLoc=0; | |
213 // numIn=0; | |
214 // numOut=0; | |
215 thread=new BF1Thread(ff); | |
216 thread.start(); | |
217 return thread; | |
218 } | |
219 | |
220 private class BF1Thread extends Thread{ | |
221 | |
222 // public BF1Thread(String fname){ | |
223 // bf1=new ByteFile1(fname, false, allowSubprocess); | |
224 // qFull=new ArrayBlockingQueue<byte[][]>(buffs+2); | |
225 // qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2); | |
226 // for(int i=0; i<buffs; i++){ | |
227 // try { | |
228 // qEmpty.put(new byte[bufflen][]); | |
229 // } catch (InterruptedException e) { | |
230 // // TODO Auto-generated catch block | |
231 // e.printStackTrace(); | |
232 // } | |
233 // } | |
234 // } | |
235 | |
236 public BF1Thread(FileFormat ff){ | |
237 bf1=new ByteFile1(ff); | |
238 qFull=new ArrayBlockingQueue<byte[][]>(buffs+2); | |
239 qEmpty=new ArrayBlockingQueue<byte[][]>(buffs+2); | |
240 for(int i=0; i<buffs; i++){ | |
241 try { | |
242 qEmpty.put(new byte[bufflen][]); | |
243 } catch (InterruptedException e) { | |
244 // TODO Auto-generated catch block | |
245 e.printStackTrace(); | |
246 } | |
247 } | |
248 } | |
249 | |
250 @Override | |
251 public void run(){ | |
252 if(verbose){System.err.println("ByteFile2("+name()+").run()");} | |
253 byte[] s=null; | |
254 byte[][] list=null; | |
255 while(list==null){ | |
256 try { | |
257 list = qEmpty.take(); | |
258 } catch (InterruptedException e1) { | |
259 // TODO Auto-generated catch block | |
260 e1.printStackTrace(); | |
261 } | |
262 } | |
263 synchronized(this){ | |
264 if(list==poison || shutdown){ | |
265 shutdown(); | |
266 return; | |
267 } | |
268 } | |
269 | |
270 int loc=0; | |
271 long bases=0; | |
272 | |
273 //At this point, list is not null | |
274 for(s=bf1.nextLine(); s!=null; s=bf1.nextLine()){ | |
275 bases+=s.length; | |
276 assert(list!=null) : "Somehow the list became null for "+bf1.name()+" at line "+cntr; | |
277 list[loc]=s; | |
278 loc++; | |
279 // numIn++; | |
280 // if(verbose){System.err.println("Added line "+numIn);} | |
281 if(loc>=bufflen || bases>=buffcapacity){ | |
282 if(verbose2){System.err.println("Capacity exceeded.");} | |
283 while(list!=null){ | |
284 try { | |
285 // synchronized(this){ | |
286 // if(!shutdown){ | |
287 if(verbose2){ | |
288 System.err.println("A: Adding to qFull list of size "+loc); | |
289 System.err.println(ByteFile2.toString(list)); | |
290 } | |
291 cntr+=list.length; | |
292 qFull.put(list); | |
293 if(verbose2){System.err.println("A: qFull.size()="+qFull.size());} | |
294 // } | |
295 // } | |
296 list=null; | |
297 loc=0; | |
298 } catch (InterruptedException e) { | |
299 // TODO Auto-generated catch block | |
300 e.printStackTrace(); | |
301 } | |
302 } | |
303 //At this point, list is null | |
304 if(shutdown){ | |
305 if(verbose2){System.err.println("Break 1");} | |
306 break; | |
307 } | |
308 while(list==null){ | |
309 if(verbose2){System.err.println("Taking empty list.");} | |
310 try { | |
311 list = qEmpty.take(); | |
312 } catch (InterruptedException e1) { | |
313 // TODO Auto-generated catch block | |
314 e1.printStackTrace(); | |
315 } | |
316 } | |
317 //At this point, list is not null | |
318 bases=0; | |
319 if(list==poison){ | |
320 if(verbose2){System.err.println("Break 2");} | |
321 break; | |
322 } | |
323 //At this point, list is not null | |
324 } | |
325 } | |
326 if(verbose2){System.err.println("Run loop exit.");} | |
327 | |
328 while(list!=null && loc>0){ | |
329 try { | |
330 // synchronized(this){ | |
331 // if(!shutdown){ | |
332 if(verbose2){System.err.println("B: Adding list of size "+loc);} | |
333 qFull.put(list); | |
334 if(verbose2){System.err.println("B: qFull.size()="+qFull.size());} | |
335 // } | |
336 // } | |
337 list=null; | |
338 loc=0; | |
339 } catch (InterruptedException e) { | |
340 // TODO Auto-generated catch block | |
341 e.printStackTrace(); | |
342 } | |
343 } | |
344 //At this point, list is null | |
345 shutdown(); | |
346 | |
347 if(verbose){System.err.println("ByteFile2("+name()+").run() finished");} | |
348 } | |
349 | |
350 synchronized void shutdown(){ | |
351 if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown()");} | |
352 if(shutdown){return;} | |
353 shutdown=true; | |
354 if(verbose2){System.err.println("Adding poison.");} | |
355 qFull.add(poison); | |
356 qEmpty.add(poison); | |
357 if(verbose2){System.err.println("D: qFull.size()="+qFull.size());} | |
358 if(verbose || verbose2){System.err.println("ByteFile2("+name()+").shutdown() finished");} | |
359 } | |
360 | |
361 private boolean shutdown=false; | |
362 final ByteFile1 bf1; | |
363 final ArrayBlockingQueue<byte[][]> qFull; | |
364 final ArrayBlockingQueue<byte[][]> qEmpty; | |
365 | |
366 } | |
367 | |
368 @Override | |
369 public boolean isOpen(){ | |
370 final byte[][] list=currentList; | |
371 final int loc=currentLoc; | |
372 if(list!=null && loc<list.length && list[loc]!=null){return true;} | |
373 final BF1Thread bft=thread; | |
374 if(bft==null){ | |
375 return false; | |
376 } | |
377 return true; | |
378 // synchronized(bft){ | |
379 // //NOTE!!! This cannot be used because qFull.size() will not return a correctly synchronized value. Poll() may work. | |
380 // assert(bft.bf1.isOpen() || !bft.qFull.isEmpty()) : bft.bf1.isOpen()+", "+bft.qFull.isEmpty()+", "+bft.qFull.size(); | |
381 // return (bft.bf1.isOpen() || !bft.qFull.isEmpty()); | |
382 // } | |
383 } | |
384 | |
385 @Override | |
386 public final void pushBack(byte[] line){ | |
387 assert(pushBack==null); | |
388 pushBack=line; | |
389 } | |
390 | |
391 // @Override | |
392 // public void pushBack(byte[] line) { | |
393 // if(bstart>line.length){ | |
394 // bstart--; | |
395 // buffer[bstart]='\n'; | |
396 // for(int i=0, j=bstart-line.length; i<line.length; i++, j++){ | |
397 // buffer[j]=line[i]; | |
398 // } | |
399 // bstart=bstart-line.length; | |
400 // return; | |
401 // } | |
402 // | |
403 // int bLen=bstop-bstart; | |
404 // int newLen=bLen+line.length+1; | |
405 // int rShift=line.length+1-bstart; | |
406 // assert(rShift>0) : bstop+", "+bstart+", "+line.length; | |
407 // while(newLen>buffer.length){ | |
408 // //This could get big if pushback is used often, | |
409 // //unless special steps are taken to prevent it, like leaving extra space for pushbacks. | |
410 // buffer=Arrays.copyOf(buffer, buffer.length*2); | |
411 // } | |
412 // | |
413 // Tools.shiftRight(buffer, rShift); | |
414 // | |
415 // for(int i=0; i<line.length; i++){ | |
416 // buffer[i]=line[i]; | |
417 // } | |
418 // buffer[line.length]='\n'; | |
419 // bstart=0; | |
420 // bstop=newLen; | |
421 // } | |
422 | |
423 /** For debugging */ | |
424 private static String toString(byte[][] x){ | |
425 StringBuilder sb=new StringBuilder(); | |
426 for(byte[] z : x){ | |
427 sb.append(z==null ? "null" : new String(z)).append('\n'); | |
428 } | |
429 return sb.toString(); | |
430 } | |
431 | |
432 @Override | |
433 public final InputStream is(){return thread==null ? null : thread.bf1.is();} | |
434 | |
435 @Override | |
436 public final long lineNum(){return thread==null ? -1 : thread.bf1.lineNum();} | |
437 | |
438 long cntr; | |
439 private BF1Thread thread=null; | |
440 private byte[][] currentList=null; | |
441 private int currentLoc=0; | |
442 // private int currentSize=0; | |
443 | |
444 // private long numIn=0, numOut=0; | |
445 | |
446 private byte[] pushBack=null; | |
447 | |
448 static final byte[][] poison=new byte[0][]; | |
449 public static boolean verbose=false; | |
450 private static final boolean verbose2=false; | |
451 private static final int bufflen=1000; | |
452 private static final int buffs=4; | |
453 private static final int buffcapacity=256000; | |
454 | |
455 private boolean errorState=false; | |
456 | |
457 } |