comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/icecream/ZMWStreamer.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 package icecream;
2
3 import java.util.concurrent.ArrayBlockingQueue;
4
5 import dna.Data;
6 import fileIO.FileFormat;
7 import fileIO.ReadWrite;
8 import shared.Parse;
9 import shared.Tools;
10 import stream.ConcurrentReadInputStream;
11 import stream.Read;
12 import stream.SamReadStreamer;
13 import stream.SamStreamer;
14 import structures.ListNum;
15
16 /**
17 * Wrapper for a ReadInputStream.
18 * Produces one ZMW at a time for consumers.
19 * Allows stopping after X reads or X ZMWs.
20 * @author Brian Bushnell
21 * @date June 5, 2020
22 */
23 public class ZMWStreamer implements Runnable {
24
25 public ZMWStreamer(FileFormat ff, int queuelen_, long maxReads_, long maxZMWs_){
26 Data.USE_SAMBAMBA=false;//Sambamba changes PacBio headers.
27 queuelen=Tools.mid(4, queuelen_, 64);
28 maxReads=maxReads_;//(maxReads_<0 ? Long.MAX_VALUE : maxReads_);
29 maxZMWs=maxZMWs_;
30 // assert(false) : maxReads_+", "+maxReads;
31 queue=new ArrayBlockingQueue<ZMW>(queuelen);
32 if(ff.samOrBam() && useStreamer){
33 cris=null;
34 ss=makeStreamer(ff);
35 }else{
36 cris=makeCris(ff);
37 ss=null;
38 }
39 assert((cris==null) != (ss==null)) : "Exactly one of cris or ss should exist.";
40 }
41
42 public ZMWStreamer(ConcurrentReadInputStream cris_, SamStreamer ss_, int queuelen_){
43 cris=cris_;
44 ss=ss_;
45 queuelen=Tools.mid(4, queuelen_, 64);
46 maxReads=-1;
47 maxZMWs=-1;
48 assert((cris==null) != (ss==null)) : "Exactly one of cris or ss should exist.";
49 queue=new ArrayBlockingQueue<ZMW>(queuelen);
50 }
51
52 public Thread runStreamer(boolean makeThread){
53 if(makeThread){
54 Thread t=new Thread(this);
55 t.start();
56 return t;
57 }else{
58 run();
59 return null;
60 }
61 }
62
63 @Override
64 public void run(){
65 if(cris!=null){
66 handleCris();
67 }else{
68 handleStreamer();
69 }
70 }
71
72 private ConcurrentReadInputStream makeCris(FileFormat ff){
73 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ff, null, null, null);
74 cris.start(); //Start the stream
75 if(verbose){System.err.println("Started cris");}
76 return cris;
77 }
78
79 private SamReadStreamer makeStreamer(FileFormat ff){
80 SamReadStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads);
81 ss.start(); //Start the stream
82 if(verbose){System.err.println("Started sam streamer");}
83 return ss;
84 }
85
86 /**
87 * Pull reads from the cris;
88 * organize them into lists of subreads from the same ZMW;
89 * put those lists into the shared queue.
90 */
91 private void handleCris(){
92 //Grab the first ListNum of reads
93 ListNum<Read> ln=cris.nextList();
94
95 ZMW buffer=new ZMW();buffer.id=ZMWs;
96 long prevZmw=-1;
97
98 long readsAdded=0;
99 // long zmwsAdded=0;
100
101 //As long as there is a nonempty read list...
102 while(ln!=null && ln.size()>0){
103
104 for(Read r : ln) {
105 long zmw;
106 try {
107 zmw=Parse.parseZmw(r.id);
108 } catch (Exception e) {
109 zmw=r.numericID;//For testing only; disable for production
110 }
111 if(zmw<0){zmw=r.numericID;}//For testing only; disable for production
112 if(verbose){System.err.println("Fetched read "+r.id+"; "+(zmw!=prevZmw)+", "+buffer.isEmpty()+", "+zmw+", "+prevZmw);}
113 if(zmw!=prevZmw && !buffer.isEmpty()){
114 ZMWs++;
115 addToQueue(buffer);
116 readsAdded+=buffer.size();
117 // zmwsAdded++;
118 buffer=new ZMW();buffer.id=ZMWs;
119 if(maxZMWs>0 && ZMWs>=maxZMWs){break;}
120 }
121 buffer.add(r);
122 prevZmw=zmw;
123 }
124
125 if(maxZMWs>0 && ZMWs>=maxZMWs){break;}
126 cris.returnList(ln);
127
128 //Fetch a new list
129 ln=cris.nextList();
130 }
131
132 if(!buffer.isEmpty() && (maxZMWs<1 || ZMWs>=maxZMWs)){
133 ZMWs++;
134 readsAdded+=buffer.size();
135 addToQueue(buffer);
136 }
137
138 //Notify the input stream that the final list was used
139 if(ln!=null){
140 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
141 // cris.returnList(ln.id, true);
142 }
143
144 errorState|=ReadWrite.closeStreams(cris);
145 addPoison();
146 }
147
148 /**
149 * Pull reads from the streamer;
150 * organize them into lists of subreads from the same ZMW;
151 * put those lists into the shared queue.
152 */
153 private void handleStreamer(){
154 //Grab the first ListNum of reads
155 ListNum<Read> ln=ss.nextList();
156
157 ZMW buffer=new ZMW();buffer.id=ZMWs;
158 long prevZmw=-1;
159
160 long added=0;
161
162 //As long as there is a nonempty read list...
163 while(ln!=null && ln.size()>0){
164
165 for(Read r : ln) {
166 long zmw;
167 try {
168 zmw=Parse.parseZmw(r.id);
169 } catch (Exception e) {
170 zmw=r.numericID;//For testing only; disable for production
171 }
172 if(zmw<0){zmw=r.numericID;}//For testing only; disable for production
173 if(verbose){System.err.println("Fetched read "+r.id+"; "+(zmw!=prevZmw)+", "+buffer.isEmpty()+", "+zmw+", "+prevZmw);}
174 if(zmw!=prevZmw && !buffer.isEmpty()){
175 ZMWs++;
176 addToQueue(buffer);
177 added+=buffer.size();
178 buffer=new ZMW();buffer.id=ZMWs;
179 }
180 buffer.add(r);
181 prevZmw=zmw;
182 }
183
184 //Fetch a new list
185 ln=ss.nextList();
186 }
187
188 if(!buffer.isEmpty()){
189 ZMWs++;
190 added+=buffer.size();
191 addToQueue(buffer);
192 }
193
194 addPoison();
195 }
196
197 private void addPoison(){
198 // //Notify worker threads that there is no more data
199 // for(int i=0; i<threads; i++){
200 // addToQueue(POISON);
201 // }
202 addToQueue(POISON);
203 }
204
205 private void addToQueue(ZMW buffer){
206 if(verbose) {System.err.println("Adding to queue "+(buffer==POISON ? "poison" : buffer.get(0).id));}
207 while(buffer!=null) {
208 try {
209 queue.put(buffer);
210 buffer=null;
211 } catch (InterruptedException e) {
212 // TODO Auto-generated catch block
213 e.printStackTrace();
214 }
215 }
216 }
217
218 public ZMW nextZMW(){
219 ZMW buffer=null;
220 while(buffer==null) {
221 try {
222 buffer=queue.take();
223 } catch (InterruptedException e) {
224 // TODO Auto-generated catch block
225 e.printStackTrace();
226 }
227 }
228 if(verbose){System.err.println("Pulled from queue "+(buffer==POISON ? "poison" : buffer.get(0).id));}
229 if(buffer==POISON){
230 addToQueue(POISON);
231 return null;
232 }else{
233 return buffer;
234 }
235 }
236
237 private final ConcurrentReadInputStream cris;
238 private final SamStreamer ss;
239 private final int queuelen;
240 public long ZMWs=0;
241 private final long maxReads;
242 private final long maxZMWs;
243 public boolean errorState=false;
244
245 private final ArrayBlockingQueue<ZMW> queue;
246 private static final ZMW POISON=new ZMW(0);
247 public static boolean verbose=false;
248
249 //Streamer seems to give more highly variable timings... sometimes. And it's not really needed.
250 public static boolean useStreamer=false;
251 //Only 1 thread for now to force ordered input
252 public static final int streamerThreads=1;
253
254 }