annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/icecream/ZMWStreamer.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 package icecream;
jpayne@68 2
jpayne@68 3 import java.util.concurrent.ArrayBlockingQueue;
jpayne@68 4
jpayne@68 5 import dna.Data;
jpayne@68 6 import fileIO.FileFormat;
jpayne@68 7 import fileIO.ReadWrite;
jpayne@68 8 import shared.Parse;
jpayne@68 9 import shared.Tools;
jpayne@68 10 import stream.ConcurrentReadInputStream;
jpayne@68 11 import stream.Read;
jpayne@68 12 import stream.SamReadStreamer;
jpayne@68 13 import stream.SamStreamer;
jpayne@68 14 import structures.ListNum;
jpayne@68 15
jpayne@68 16 /**
jpayne@68 17 * Wrapper for a ReadInputStream.
jpayne@68 18 * Produces one ZMW at a time for consumers.
jpayne@68 19 * Allows stopping after X reads or X ZMWs.
jpayne@68 20 * @author Brian Bushnell
jpayne@68 21 * @date June 5, 2020
jpayne@68 22 */
jpayne@68 23 public class ZMWStreamer implements Runnable {
jpayne@68 24
jpayne@68 25 public ZMWStreamer(FileFormat ff, int queuelen_, long maxReads_, long maxZMWs_){
jpayne@68 26 Data.USE_SAMBAMBA=false;//Sambamba changes PacBio headers.
jpayne@68 27 queuelen=Tools.mid(4, queuelen_, 64);
jpayne@68 28 maxReads=maxReads_;//(maxReads_<0 ? Long.MAX_VALUE : maxReads_);
jpayne@68 29 maxZMWs=maxZMWs_;
jpayne@68 30 // assert(false) : maxReads_+", "+maxReads;
jpayne@68 31 queue=new ArrayBlockingQueue<ZMW>(queuelen);
jpayne@68 32 if(ff.samOrBam() && useStreamer){
jpayne@68 33 cris=null;
jpayne@68 34 ss=makeStreamer(ff);
jpayne@68 35 }else{
jpayne@68 36 cris=makeCris(ff);
jpayne@68 37 ss=null;
jpayne@68 38 }
jpayne@68 39 assert((cris==null) != (ss==null)) : "Exactly one of cris or ss should exist.";
jpayne@68 40 }
jpayne@68 41
jpayne@68 42 public ZMWStreamer(ConcurrentReadInputStream cris_, SamStreamer ss_, int queuelen_){
jpayne@68 43 cris=cris_;
jpayne@68 44 ss=ss_;
jpayne@68 45 queuelen=Tools.mid(4, queuelen_, 64);
jpayne@68 46 maxReads=-1;
jpayne@68 47 maxZMWs=-1;
jpayne@68 48 assert((cris==null) != (ss==null)) : "Exactly one of cris or ss should exist.";
jpayne@68 49 queue=new ArrayBlockingQueue<ZMW>(queuelen);
jpayne@68 50 }
jpayne@68 51
jpayne@68 52 public Thread runStreamer(boolean makeThread){
jpayne@68 53 if(makeThread){
jpayne@68 54 Thread t=new Thread(this);
jpayne@68 55 t.start();
jpayne@68 56 return t;
jpayne@68 57 }else{
jpayne@68 58 run();
jpayne@68 59 return null;
jpayne@68 60 }
jpayne@68 61 }
jpayne@68 62
jpayne@68 63 @Override
jpayne@68 64 public void run(){
jpayne@68 65 if(cris!=null){
jpayne@68 66 handleCris();
jpayne@68 67 }else{
jpayne@68 68 handleStreamer();
jpayne@68 69 }
jpayne@68 70 }
jpayne@68 71
jpayne@68 72 private ConcurrentReadInputStream makeCris(FileFormat ff){
jpayne@68 73 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ff, null, null, null);
jpayne@68 74 cris.start(); //Start the stream
jpayne@68 75 if(verbose){System.err.println("Started cris");}
jpayne@68 76 return cris;
jpayne@68 77 }
jpayne@68 78
jpayne@68 79 private SamReadStreamer makeStreamer(FileFormat ff){
jpayne@68 80 SamReadStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads);
jpayne@68 81 ss.start(); //Start the stream
jpayne@68 82 if(verbose){System.err.println("Started sam streamer");}
jpayne@68 83 return ss;
jpayne@68 84 }
jpayne@68 85
jpayne@68 86 /**
jpayne@68 87 * Pull reads from the cris;
jpayne@68 88 * organize them into lists of subreads from the same ZMW;
jpayne@68 89 * put those lists into the shared queue.
jpayne@68 90 */
jpayne@68 91 private void handleCris(){
jpayne@68 92 //Grab the first ListNum of reads
jpayne@68 93 ListNum<Read> ln=cris.nextList();
jpayne@68 94
jpayne@68 95 ZMW buffer=new ZMW();buffer.id=ZMWs;
jpayne@68 96 long prevZmw=-1;
jpayne@68 97
jpayne@68 98 long readsAdded=0;
jpayne@68 99 // long zmwsAdded=0;
jpayne@68 100
jpayne@68 101 //As long as there is a nonempty read list...
jpayne@68 102 while(ln!=null && ln.size()>0){
jpayne@68 103
jpayne@68 104 for(Read r : ln) {
jpayne@68 105 long zmw;
jpayne@68 106 try {
jpayne@68 107 zmw=Parse.parseZmw(r.id);
jpayne@68 108 } catch (Exception e) {
jpayne@68 109 zmw=r.numericID;//For testing only; disable for production
jpayne@68 110 }
jpayne@68 111 if(zmw<0){zmw=r.numericID;}//For testing only; disable for production
jpayne@68 112 if(verbose){System.err.println("Fetched read "+r.id+"; "+(zmw!=prevZmw)+", "+buffer.isEmpty()+", "+zmw+", "+prevZmw);}
jpayne@68 113 if(zmw!=prevZmw && !buffer.isEmpty()){
jpayne@68 114 ZMWs++;
jpayne@68 115 addToQueue(buffer);
jpayne@68 116 readsAdded+=buffer.size();
jpayne@68 117 // zmwsAdded++;
jpayne@68 118 buffer=new ZMW();buffer.id=ZMWs;
jpayne@68 119 if(maxZMWs>0 && ZMWs>=maxZMWs){break;}
jpayne@68 120 }
jpayne@68 121 buffer.add(r);
jpayne@68 122 prevZmw=zmw;
jpayne@68 123 }
jpayne@68 124
jpayne@68 125 if(maxZMWs>0 && ZMWs>=maxZMWs){break;}
jpayne@68 126 cris.returnList(ln);
jpayne@68 127
jpayne@68 128 //Fetch a new list
jpayne@68 129 ln=cris.nextList();
jpayne@68 130 }
jpayne@68 131
jpayne@68 132 if(!buffer.isEmpty() && (maxZMWs<1 || ZMWs>=maxZMWs)){
jpayne@68 133 ZMWs++;
jpayne@68 134 readsAdded+=buffer.size();
jpayne@68 135 addToQueue(buffer);
jpayne@68 136 }
jpayne@68 137
jpayne@68 138 //Notify the input stream that the final list was used
jpayne@68 139 if(ln!=null){
jpayne@68 140 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
jpayne@68 141 // cris.returnList(ln.id, true);
jpayne@68 142 }
jpayne@68 143
jpayne@68 144 errorState|=ReadWrite.closeStreams(cris);
jpayne@68 145 addPoison();
jpayne@68 146 }
jpayne@68 147
jpayne@68 148 /**
jpayne@68 149 * Pull reads from the streamer;
jpayne@68 150 * organize them into lists of subreads from the same ZMW;
jpayne@68 151 * put those lists into the shared queue.
jpayne@68 152 */
jpayne@68 153 private void handleStreamer(){
jpayne@68 154 //Grab the first ListNum of reads
jpayne@68 155 ListNum<Read> ln=ss.nextList();
jpayne@68 156
jpayne@68 157 ZMW buffer=new ZMW();buffer.id=ZMWs;
jpayne@68 158 long prevZmw=-1;
jpayne@68 159
jpayne@68 160 long added=0;
jpayne@68 161
jpayne@68 162 //As long as there is a nonempty read list...
jpayne@68 163 while(ln!=null && ln.size()>0){
jpayne@68 164
jpayne@68 165 for(Read r : ln) {
jpayne@68 166 long zmw;
jpayne@68 167 try {
jpayne@68 168 zmw=Parse.parseZmw(r.id);
jpayne@68 169 } catch (Exception e) {
jpayne@68 170 zmw=r.numericID;//For testing only; disable for production
jpayne@68 171 }
jpayne@68 172 if(zmw<0){zmw=r.numericID;}//For testing only; disable for production
jpayne@68 173 if(verbose){System.err.println("Fetched read "+r.id+"; "+(zmw!=prevZmw)+", "+buffer.isEmpty()+", "+zmw+", "+prevZmw);}
jpayne@68 174 if(zmw!=prevZmw && !buffer.isEmpty()){
jpayne@68 175 ZMWs++;
jpayne@68 176 addToQueue(buffer);
jpayne@68 177 added+=buffer.size();
jpayne@68 178 buffer=new ZMW();buffer.id=ZMWs;
jpayne@68 179 }
jpayne@68 180 buffer.add(r);
jpayne@68 181 prevZmw=zmw;
jpayne@68 182 }
jpayne@68 183
jpayne@68 184 //Fetch a new list
jpayne@68 185 ln=ss.nextList();
jpayne@68 186 }
jpayne@68 187
jpayne@68 188 if(!buffer.isEmpty()){
jpayne@68 189 ZMWs++;
jpayne@68 190 added+=buffer.size();
jpayne@68 191 addToQueue(buffer);
jpayne@68 192 }
jpayne@68 193
jpayne@68 194 addPoison();
jpayne@68 195 }
jpayne@68 196
jpayne@68 197 private void addPoison(){
jpayne@68 198 // //Notify worker threads that there is no more data
jpayne@68 199 // for(int i=0; i<threads; i++){
jpayne@68 200 // addToQueue(POISON);
jpayne@68 201 // }
jpayne@68 202 addToQueue(POISON);
jpayne@68 203 }
jpayne@68 204
jpayne@68 205 private void addToQueue(ZMW buffer){
jpayne@68 206 if(verbose) {System.err.println("Adding to queue "+(buffer==POISON ? "poison" : buffer.get(0).id));}
jpayne@68 207 while(buffer!=null) {
jpayne@68 208 try {
jpayne@68 209 queue.put(buffer);
jpayne@68 210 buffer=null;
jpayne@68 211 } catch (InterruptedException e) {
jpayne@68 212 // TODO Auto-generated catch block
jpayne@68 213 e.printStackTrace();
jpayne@68 214 }
jpayne@68 215 }
jpayne@68 216 }
jpayne@68 217
jpayne@68 218 public ZMW nextZMW(){
jpayne@68 219 ZMW buffer=null;
jpayne@68 220 while(buffer==null) {
jpayne@68 221 try {
jpayne@68 222 buffer=queue.take();
jpayne@68 223 } catch (InterruptedException e) {
jpayne@68 224 // TODO Auto-generated catch block
jpayne@68 225 e.printStackTrace();
jpayne@68 226 }
jpayne@68 227 }
jpayne@68 228 if(verbose){System.err.println("Pulled from queue "+(buffer==POISON ? "poison" : buffer.get(0).id));}
jpayne@68 229 if(buffer==POISON){
jpayne@68 230 addToQueue(POISON);
jpayne@68 231 return null;
jpayne@68 232 }else{
jpayne@68 233 return buffer;
jpayne@68 234 }
jpayne@68 235 }
jpayne@68 236
jpayne@68 237 private final ConcurrentReadInputStream cris;
jpayne@68 238 private final SamStreamer ss;
jpayne@68 239 private final int queuelen;
jpayne@68 240 public long ZMWs=0;
jpayne@68 241 private final long maxReads;
jpayne@68 242 private final long maxZMWs;
jpayne@68 243 public boolean errorState=false;
jpayne@68 244
jpayne@68 245 private final ArrayBlockingQueue<ZMW> queue;
jpayne@68 246 private static final ZMW POISON=new ZMW(0);
jpayne@68 247 public static boolean verbose=false;
jpayne@68 248
jpayne@68 249 //Streamer seems to give more highly variable timings... sometimes. And it's not really needed.
jpayne@68 250 public static boolean useStreamer=false;
jpayne@68 251 //Only 1 thread for now to force ordered input
jpayne@68 252 public static final int streamerThreads=1;
jpayne@68 253
jpayne@68 254 }