Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/icecream/ZMWStreamer.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 package icecream; | |
2 | |
3 import java.util.concurrent.ArrayBlockingQueue; | |
4 | |
5 import dna.Data; | |
6 import fileIO.FileFormat; | |
7 import fileIO.ReadWrite; | |
8 import shared.Parse; | |
9 import shared.Tools; | |
10 import stream.ConcurrentReadInputStream; | |
11 import stream.Read; | |
12 import stream.SamReadStreamer; | |
13 import stream.SamStreamer; | |
14 import structures.ListNum; | |
15 | |
16 /** | |
17 * Wrapper for a ReadInputStream. | |
18 * Produces one ZMW at a time for consumers. | |
19 * Allows stopping after X reads or X ZMWs. | |
20 * @author Brian Bushnell | |
21 * @date June 5, 2020 | |
22 */ | |
23 public class ZMWStreamer implements Runnable { | |
24 | |
25 public ZMWStreamer(FileFormat ff, int queuelen_, long maxReads_, long maxZMWs_){ | |
26 Data.USE_SAMBAMBA=false;//Sambamba changes PacBio headers. | |
27 queuelen=Tools.mid(4, queuelen_, 64); | |
28 maxReads=maxReads_;//(maxReads_<0 ? Long.MAX_VALUE : maxReads_); | |
29 maxZMWs=maxZMWs_; | |
30 // assert(false) : maxReads_+", "+maxReads; | |
31 queue=new ArrayBlockingQueue<ZMW>(queuelen); | |
32 if(ff.samOrBam() && useStreamer){ | |
33 cris=null; | |
34 ss=makeStreamer(ff); | |
35 }else{ | |
36 cris=makeCris(ff); | |
37 ss=null; | |
38 } | |
39 assert((cris==null) != (ss==null)) : "Exactly one of cris or ss should exist."; | |
40 } | |
41 | |
42 public ZMWStreamer(ConcurrentReadInputStream cris_, SamStreamer ss_, int queuelen_){ | |
43 cris=cris_; | |
44 ss=ss_; | |
45 queuelen=Tools.mid(4, queuelen_, 64); | |
46 maxReads=-1; | |
47 maxZMWs=-1; | |
48 assert((cris==null) != (ss==null)) : "Exactly one of cris or ss should exist."; | |
49 queue=new ArrayBlockingQueue<ZMW>(queuelen); | |
50 } | |
51 | |
52 public Thread runStreamer(boolean makeThread){ | |
53 if(makeThread){ | |
54 Thread t=new Thread(this); | |
55 t.start(); | |
56 return t; | |
57 }else{ | |
58 run(); | |
59 return null; | |
60 } | |
61 } | |
62 | |
63 @Override | |
64 public void run(){ | |
65 if(cris!=null){ | |
66 handleCris(); | |
67 }else{ | |
68 handleStreamer(); | |
69 } | |
70 } | |
71 | |
72 private ConcurrentReadInputStream makeCris(FileFormat ff){ | |
73 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ff, null, null, null); | |
74 cris.start(); //Start the stream | |
75 if(verbose){System.err.println("Started cris");} | |
76 return cris; | |
77 } | |
78 | |
79 private SamReadStreamer makeStreamer(FileFormat ff){ | |
80 SamReadStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads); | |
81 ss.start(); //Start the stream | |
82 if(verbose){System.err.println("Started sam streamer");} | |
83 return ss; | |
84 } | |
85 | |
86 /** | |
87 * Pull reads from the cris; | |
88 * organize them into lists of subreads from the same ZMW; | |
89 * put those lists into the shared queue. | |
90 */ | |
91 private void handleCris(){ | |
92 //Grab the first ListNum of reads | |
93 ListNum<Read> ln=cris.nextList(); | |
94 | |
95 ZMW buffer=new ZMW();buffer.id=ZMWs; | |
96 long prevZmw=-1; | |
97 | |
98 long readsAdded=0; | |
99 // long zmwsAdded=0; | |
100 | |
101 //As long as there is a nonempty read list... | |
102 while(ln!=null && ln.size()>0){ | |
103 | |
104 for(Read r : ln) { | |
105 long zmw; | |
106 try { | |
107 zmw=Parse.parseZmw(r.id); | |
108 } catch (Exception e) { | |
109 zmw=r.numericID;//For testing only; disable for production | |
110 } | |
111 if(zmw<0){zmw=r.numericID;}//For testing only; disable for production | |
112 if(verbose){System.err.println("Fetched read "+r.id+"; "+(zmw!=prevZmw)+", "+buffer.isEmpty()+", "+zmw+", "+prevZmw);} | |
113 if(zmw!=prevZmw && !buffer.isEmpty()){ | |
114 ZMWs++; | |
115 addToQueue(buffer); | |
116 readsAdded+=buffer.size(); | |
117 // zmwsAdded++; | |
118 buffer=new ZMW();buffer.id=ZMWs; | |
119 if(maxZMWs>0 && ZMWs>=maxZMWs){break;} | |
120 } | |
121 buffer.add(r); | |
122 prevZmw=zmw; | |
123 } | |
124 | |
125 if(maxZMWs>0 && ZMWs>=maxZMWs){break;} | |
126 cris.returnList(ln); | |
127 | |
128 //Fetch a new list | |
129 ln=cris.nextList(); | |
130 } | |
131 | |
132 if(!buffer.isEmpty() && (maxZMWs<1 || ZMWs>=maxZMWs)){ | |
133 ZMWs++; | |
134 readsAdded+=buffer.size(); | |
135 addToQueue(buffer); | |
136 } | |
137 | |
138 //Notify the input stream that the final list was used | |
139 if(ln!=null){ | |
140 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); | |
141 // cris.returnList(ln.id, true); | |
142 } | |
143 | |
144 errorState|=ReadWrite.closeStreams(cris); | |
145 addPoison(); | |
146 } | |
147 | |
148 /** | |
149 * Pull reads from the streamer; | |
150 * organize them into lists of subreads from the same ZMW; | |
151 * put those lists into the shared queue. | |
152 */ | |
153 private void handleStreamer(){ | |
154 //Grab the first ListNum of reads | |
155 ListNum<Read> ln=ss.nextList(); | |
156 | |
157 ZMW buffer=new ZMW();buffer.id=ZMWs; | |
158 long prevZmw=-1; | |
159 | |
160 long added=0; | |
161 | |
162 //As long as there is a nonempty read list... | |
163 while(ln!=null && ln.size()>0){ | |
164 | |
165 for(Read r : ln) { | |
166 long zmw; | |
167 try { | |
168 zmw=Parse.parseZmw(r.id); | |
169 } catch (Exception e) { | |
170 zmw=r.numericID;//For testing only; disable for production | |
171 } | |
172 if(zmw<0){zmw=r.numericID;}//For testing only; disable for production | |
173 if(verbose){System.err.println("Fetched read "+r.id+"; "+(zmw!=prevZmw)+", "+buffer.isEmpty()+", "+zmw+", "+prevZmw);} | |
174 if(zmw!=prevZmw && !buffer.isEmpty()){ | |
175 ZMWs++; | |
176 addToQueue(buffer); | |
177 added+=buffer.size(); | |
178 buffer=new ZMW();buffer.id=ZMWs; | |
179 } | |
180 buffer.add(r); | |
181 prevZmw=zmw; | |
182 } | |
183 | |
184 //Fetch a new list | |
185 ln=ss.nextList(); | |
186 } | |
187 | |
188 if(!buffer.isEmpty()){ | |
189 ZMWs++; | |
190 added+=buffer.size(); | |
191 addToQueue(buffer); | |
192 } | |
193 | |
194 addPoison(); | |
195 } | |
196 | |
197 private void addPoison(){ | |
198 // //Notify worker threads that there is no more data | |
199 // for(int i=0; i<threads; i++){ | |
200 // addToQueue(POISON); | |
201 // } | |
202 addToQueue(POISON); | |
203 } | |
204 | |
205 private void addToQueue(ZMW buffer){ | |
206 if(verbose) {System.err.println("Adding to queue "+(buffer==POISON ? "poison" : buffer.get(0).id));} | |
207 while(buffer!=null) { | |
208 try { | |
209 queue.put(buffer); | |
210 buffer=null; | |
211 } catch (InterruptedException e) { | |
212 // TODO Auto-generated catch block | |
213 e.printStackTrace(); | |
214 } | |
215 } | |
216 } | |
217 | |
218 public ZMW nextZMW(){ | |
219 ZMW buffer=null; | |
220 while(buffer==null) { | |
221 try { | |
222 buffer=queue.take(); | |
223 } catch (InterruptedException e) { | |
224 // TODO Auto-generated catch block | |
225 e.printStackTrace(); | |
226 } | |
227 } | |
228 if(verbose){System.err.println("Pulled from queue "+(buffer==POISON ? "poison" : buffer.get(0).id));} | |
229 if(buffer==POISON){ | |
230 addToQueue(POISON); | |
231 return null; | |
232 }else{ | |
233 return buffer; | |
234 } | |
235 } | |
236 | |
237 private final ConcurrentReadInputStream cris; | |
238 private final SamStreamer ss; | |
239 private final int queuelen; | |
240 public long ZMWs=0; | |
241 private final long maxReads; | |
242 private final long maxZMWs; | |
243 public boolean errorState=false; | |
244 | |
245 private final ArrayBlockingQueue<ZMW> queue; | |
246 private static final ZMW POISON=new ZMW(0); | |
247 public static boolean verbose=false; | |
248 | |
249 //Streamer seems to give more highly variable timings... sometimes. And it's not really needed. | |
250 public static boolean useStreamer=false; | |
251 //Only 1 thread for now to force ordered input | |
252 public static final int streamerThreads=1; | |
253 | |
254 } |