jpayne@68
|
1 package icecream;
|
jpayne@68
|
2
|
jpayne@68
|
3 import java.util.concurrent.ArrayBlockingQueue;
|
jpayne@68
|
4
|
jpayne@68
|
5 import dna.Data;
|
jpayne@68
|
6 import fileIO.FileFormat;
|
jpayne@68
|
7 import fileIO.ReadWrite;
|
jpayne@68
|
8 import shared.Parse;
|
jpayne@68
|
9 import shared.Tools;
|
jpayne@68
|
10 import stream.ConcurrentReadInputStream;
|
jpayne@68
|
11 import stream.Read;
|
jpayne@68
|
12 import stream.SamReadStreamer;
|
jpayne@68
|
13 import stream.SamStreamer;
|
jpayne@68
|
14 import structures.ListNum;
|
jpayne@68
|
15
|
jpayne@68
|
16 /**
|
jpayne@68
|
17 * Wrapper for a ReadInputStream.
|
jpayne@68
|
18 * Produces one ZMW at a time for consumers.
|
jpayne@68
|
19 * Allows stopping after X reads or X ZMWs.
|
jpayne@68
|
20 * @author Brian Bushnell
|
jpayne@68
|
21 * @date June 5, 2020
|
jpayne@68
|
22 */
|
jpayne@68
|
23 public class ZMWStreamer implements Runnable {
|
jpayne@68
|
24
|
jpayne@68
|
25 public ZMWStreamer(FileFormat ff, int queuelen_, long maxReads_, long maxZMWs_){
|
jpayne@68
|
26 Data.USE_SAMBAMBA=false;//Sambamba changes PacBio headers.
|
jpayne@68
|
27 queuelen=Tools.mid(4, queuelen_, 64);
|
jpayne@68
|
28 maxReads=maxReads_;//(maxReads_<0 ? Long.MAX_VALUE : maxReads_);
|
jpayne@68
|
29 maxZMWs=maxZMWs_;
|
jpayne@68
|
30 // assert(false) : maxReads_+", "+maxReads;
|
jpayne@68
|
31 queue=new ArrayBlockingQueue<ZMW>(queuelen);
|
jpayne@68
|
32 if(ff.samOrBam() && useStreamer){
|
jpayne@68
|
33 cris=null;
|
jpayne@68
|
34 ss=makeStreamer(ff);
|
jpayne@68
|
35 }else{
|
jpayne@68
|
36 cris=makeCris(ff);
|
jpayne@68
|
37 ss=null;
|
jpayne@68
|
38 }
|
jpayne@68
|
39 assert((cris==null) != (ss==null)) : "Exactly one of cris or ss should exist.";
|
jpayne@68
|
40 }
|
jpayne@68
|
41
|
jpayne@68
|
42 public ZMWStreamer(ConcurrentReadInputStream cris_, SamStreamer ss_, int queuelen_){
|
jpayne@68
|
43 cris=cris_;
|
jpayne@68
|
44 ss=ss_;
|
jpayne@68
|
45 queuelen=Tools.mid(4, queuelen_, 64);
|
jpayne@68
|
46 maxReads=-1;
|
jpayne@68
|
47 maxZMWs=-1;
|
jpayne@68
|
48 assert((cris==null) != (ss==null)) : "Exactly one of cris or ss should exist.";
|
jpayne@68
|
49 queue=new ArrayBlockingQueue<ZMW>(queuelen);
|
jpayne@68
|
50 }
|
jpayne@68
|
51
|
jpayne@68
|
52 public Thread runStreamer(boolean makeThread){
|
jpayne@68
|
53 if(makeThread){
|
jpayne@68
|
54 Thread t=new Thread(this);
|
jpayne@68
|
55 t.start();
|
jpayne@68
|
56 return t;
|
jpayne@68
|
57 }else{
|
jpayne@68
|
58 run();
|
jpayne@68
|
59 return null;
|
jpayne@68
|
60 }
|
jpayne@68
|
61 }
|
jpayne@68
|
62
|
jpayne@68
|
63 @Override
|
jpayne@68
|
64 public void run(){
|
jpayne@68
|
65 if(cris!=null){
|
jpayne@68
|
66 handleCris();
|
jpayne@68
|
67 }else{
|
jpayne@68
|
68 handleStreamer();
|
jpayne@68
|
69 }
|
jpayne@68
|
70 }
|
jpayne@68
|
71
|
jpayne@68
|
72 private ConcurrentReadInputStream makeCris(FileFormat ff){
|
jpayne@68
|
73 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ff, null, null, null);
|
jpayne@68
|
74 cris.start(); //Start the stream
|
jpayne@68
|
75 if(verbose){System.err.println("Started cris");}
|
jpayne@68
|
76 return cris;
|
jpayne@68
|
77 }
|
jpayne@68
|
78
|
jpayne@68
|
79 private SamReadStreamer makeStreamer(FileFormat ff){
|
jpayne@68
|
80 SamReadStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads);
|
jpayne@68
|
81 ss.start(); //Start the stream
|
jpayne@68
|
82 if(verbose){System.err.println("Started sam streamer");}
|
jpayne@68
|
83 return ss;
|
jpayne@68
|
84 }
|
jpayne@68
|
85
|
jpayne@68
|
86 /**
|
jpayne@68
|
87 * Pull reads from the cris;
|
jpayne@68
|
88 * organize them into lists of subreads from the same ZMW;
|
jpayne@68
|
89 * put those lists into the shared queue.
|
jpayne@68
|
90 */
|
jpayne@68
|
91 private void handleCris(){
|
jpayne@68
|
92 //Grab the first ListNum of reads
|
jpayne@68
|
93 ListNum<Read> ln=cris.nextList();
|
jpayne@68
|
94
|
jpayne@68
|
95 ZMW buffer=new ZMW();buffer.id=ZMWs;
|
jpayne@68
|
96 long prevZmw=-1;
|
jpayne@68
|
97
|
jpayne@68
|
98 long readsAdded=0;
|
jpayne@68
|
99 // long zmwsAdded=0;
|
jpayne@68
|
100
|
jpayne@68
|
101 //As long as there is a nonempty read list...
|
jpayne@68
|
102 while(ln!=null && ln.size()>0){
|
jpayne@68
|
103
|
jpayne@68
|
104 for(Read r : ln) {
|
jpayne@68
|
105 long zmw;
|
jpayne@68
|
106 try {
|
jpayne@68
|
107 zmw=Parse.parseZmw(r.id);
|
jpayne@68
|
108 } catch (Exception e) {
|
jpayne@68
|
109 zmw=r.numericID;//For testing only; disable for production
|
jpayne@68
|
110 }
|
jpayne@68
|
111 if(zmw<0){zmw=r.numericID;}//For testing only; disable for production
|
jpayne@68
|
112 if(verbose){System.err.println("Fetched read "+r.id+"; "+(zmw!=prevZmw)+", "+buffer.isEmpty()+", "+zmw+", "+prevZmw);}
|
jpayne@68
|
113 if(zmw!=prevZmw && !buffer.isEmpty()){
|
jpayne@68
|
114 ZMWs++;
|
jpayne@68
|
115 addToQueue(buffer);
|
jpayne@68
|
116 readsAdded+=buffer.size();
|
jpayne@68
|
117 // zmwsAdded++;
|
jpayne@68
|
118 buffer=new ZMW();buffer.id=ZMWs;
|
jpayne@68
|
119 if(maxZMWs>0 && ZMWs>=maxZMWs){break;}
|
jpayne@68
|
120 }
|
jpayne@68
|
121 buffer.add(r);
|
jpayne@68
|
122 prevZmw=zmw;
|
jpayne@68
|
123 }
|
jpayne@68
|
124
|
jpayne@68
|
125 if(maxZMWs>0 && ZMWs>=maxZMWs){break;}
|
jpayne@68
|
126 cris.returnList(ln);
|
jpayne@68
|
127
|
jpayne@68
|
128 //Fetch a new list
|
jpayne@68
|
129 ln=cris.nextList();
|
jpayne@68
|
130 }
|
jpayne@68
|
131
|
jpayne@68
|
132 if(!buffer.isEmpty() && (maxZMWs<1 || ZMWs>=maxZMWs)){
|
jpayne@68
|
133 ZMWs++;
|
jpayne@68
|
134 readsAdded+=buffer.size();
|
jpayne@68
|
135 addToQueue(buffer);
|
jpayne@68
|
136 }
|
jpayne@68
|
137
|
jpayne@68
|
138 //Notify the input stream that the final list was used
|
jpayne@68
|
139 if(ln!=null){
|
jpayne@68
|
140 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
|
jpayne@68
|
141 // cris.returnList(ln.id, true);
|
jpayne@68
|
142 }
|
jpayne@68
|
143
|
jpayne@68
|
144 errorState|=ReadWrite.closeStreams(cris);
|
jpayne@68
|
145 addPoison();
|
jpayne@68
|
146 }
|
jpayne@68
|
147
|
jpayne@68
|
148 /**
|
jpayne@68
|
149 * Pull reads from the streamer;
|
jpayne@68
|
150 * organize them into lists of subreads from the same ZMW;
|
jpayne@68
|
151 * put those lists into the shared queue.
|
jpayne@68
|
152 */
|
jpayne@68
|
153 private void handleStreamer(){
|
jpayne@68
|
154 //Grab the first ListNum of reads
|
jpayne@68
|
155 ListNum<Read> ln=ss.nextList();
|
jpayne@68
|
156
|
jpayne@68
|
157 ZMW buffer=new ZMW();buffer.id=ZMWs;
|
jpayne@68
|
158 long prevZmw=-1;
|
jpayne@68
|
159
|
jpayne@68
|
160 long added=0;
|
jpayne@68
|
161
|
jpayne@68
|
162 //As long as there is a nonempty read list...
|
jpayne@68
|
163 while(ln!=null && ln.size()>0){
|
jpayne@68
|
164
|
jpayne@68
|
165 for(Read r : ln) {
|
jpayne@68
|
166 long zmw;
|
jpayne@68
|
167 try {
|
jpayne@68
|
168 zmw=Parse.parseZmw(r.id);
|
jpayne@68
|
169 } catch (Exception e) {
|
jpayne@68
|
170 zmw=r.numericID;//For testing only; disable for production
|
jpayne@68
|
171 }
|
jpayne@68
|
172 if(zmw<0){zmw=r.numericID;}//For testing only; disable for production
|
jpayne@68
|
173 if(verbose){System.err.println("Fetched read "+r.id+"; "+(zmw!=prevZmw)+", "+buffer.isEmpty()+", "+zmw+", "+prevZmw);}
|
jpayne@68
|
174 if(zmw!=prevZmw && !buffer.isEmpty()){
|
jpayne@68
|
175 ZMWs++;
|
jpayne@68
|
176 addToQueue(buffer);
|
jpayne@68
|
177 added+=buffer.size();
|
jpayne@68
|
178 buffer=new ZMW();buffer.id=ZMWs;
|
jpayne@68
|
179 }
|
jpayne@68
|
180 buffer.add(r);
|
jpayne@68
|
181 prevZmw=zmw;
|
jpayne@68
|
182 }
|
jpayne@68
|
183
|
jpayne@68
|
184 //Fetch a new list
|
jpayne@68
|
185 ln=ss.nextList();
|
jpayne@68
|
186 }
|
jpayne@68
|
187
|
jpayne@68
|
188 if(!buffer.isEmpty()){
|
jpayne@68
|
189 ZMWs++;
|
jpayne@68
|
190 added+=buffer.size();
|
jpayne@68
|
191 addToQueue(buffer);
|
jpayne@68
|
192 }
|
jpayne@68
|
193
|
jpayne@68
|
194 addPoison();
|
jpayne@68
|
195 }
|
jpayne@68
|
196
|
jpayne@68
|
197 private void addPoison(){
|
jpayne@68
|
198 // //Notify worker threads that there is no more data
|
jpayne@68
|
199 // for(int i=0; i<threads; i++){
|
jpayne@68
|
200 // addToQueue(POISON);
|
jpayne@68
|
201 // }
|
jpayne@68
|
202 addToQueue(POISON);
|
jpayne@68
|
203 }
|
jpayne@68
|
204
|
jpayne@68
|
205 private void addToQueue(ZMW buffer){
|
jpayne@68
|
206 if(verbose) {System.err.println("Adding to queue "+(buffer==POISON ? "poison" : buffer.get(0).id));}
|
jpayne@68
|
207 while(buffer!=null) {
|
jpayne@68
|
208 try {
|
jpayne@68
|
209 queue.put(buffer);
|
jpayne@68
|
210 buffer=null;
|
jpayne@68
|
211 } catch (InterruptedException e) {
|
jpayne@68
|
212 // TODO Auto-generated catch block
|
jpayne@68
|
213 e.printStackTrace();
|
jpayne@68
|
214 }
|
jpayne@68
|
215 }
|
jpayne@68
|
216 }
|
jpayne@68
|
217
|
jpayne@68
|
218 public ZMW nextZMW(){
|
jpayne@68
|
219 ZMW buffer=null;
|
jpayne@68
|
220 while(buffer==null) {
|
jpayne@68
|
221 try {
|
jpayne@68
|
222 buffer=queue.take();
|
jpayne@68
|
223 } catch (InterruptedException e) {
|
jpayne@68
|
224 // TODO Auto-generated catch block
|
jpayne@68
|
225 e.printStackTrace();
|
jpayne@68
|
226 }
|
jpayne@68
|
227 }
|
jpayne@68
|
228 if(verbose){System.err.println("Pulled from queue "+(buffer==POISON ? "poison" : buffer.get(0).id));}
|
jpayne@68
|
229 if(buffer==POISON){
|
jpayne@68
|
230 addToQueue(POISON);
|
jpayne@68
|
231 return null;
|
jpayne@68
|
232 }else{
|
jpayne@68
|
233 return buffer;
|
jpayne@68
|
234 }
|
jpayne@68
|
235 }
|
jpayne@68
|
236
|
jpayne@68
|
237 private final ConcurrentReadInputStream cris;
|
jpayne@68
|
238 private final SamStreamer ss;
|
jpayne@68
|
239 private final int queuelen;
|
jpayne@68
|
240 public long ZMWs=0;
|
jpayne@68
|
241 private final long maxReads;
|
jpayne@68
|
242 private final long maxZMWs;
|
jpayne@68
|
243 public boolean errorState=false;
|
jpayne@68
|
244
|
jpayne@68
|
245 private final ArrayBlockingQueue<ZMW> queue;
|
jpayne@68
|
246 private static final ZMW POISON=new ZMW(0);
|
jpayne@68
|
247 public static boolean verbose=false;
|
jpayne@68
|
248
|
jpayne@68
|
249 //Streamer seems to give more highly variable timings... sometimes. And it's not really needed.
|
jpayne@68
|
250 public static boolean useStreamer=false;
|
jpayne@68
|
251 //Only 1 thread for now to force ordered input
|
jpayne@68
|
252 public static final int streamerThreads=1;
|
jpayne@68
|
253
|
jpayne@68
|
254 }
|