jpayne@68: package icecream; jpayne@68: jpayne@68: import java.util.concurrent.ArrayBlockingQueue; jpayne@68: jpayne@68: import dna.Data; jpayne@68: import fileIO.FileFormat; jpayne@68: import fileIO.ReadWrite; jpayne@68: import shared.Parse; jpayne@68: import shared.Tools; jpayne@68: import stream.ConcurrentReadInputStream; jpayne@68: import stream.Read; jpayne@68: import stream.SamReadStreamer; jpayne@68: import stream.SamStreamer; jpayne@68: import structures.ListNum; jpayne@68: jpayne@68: /** jpayne@68: * Wrapper for a ReadInputStream. jpayne@68: * Produces one ZMW at a time for consumers. jpayne@68: * Allows stopping after X reads or X ZMWs. jpayne@68: * @author Brian Bushnell jpayne@68: * @date June 5, 2020 jpayne@68: */ jpayne@68: public class ZMWStreamer implements Runnable { jpayne@68: jpayne@68: public ZMWStreamer(FileFormat ff, int queuelen_, long maxReads_, long maxZMWs_){ jpayne@68: Data.USE_SAMBAMBA=false;//Sambamba changes PacBio headers. jpayne@68: queuelen=Tools.mid(4, queuelen_, 64); jpayne@68: maxReads=maxReads_;//(maxReads_<0 ? Long.MAX_VALUE : maxReads_); jpayne@68: maxZMWs=maxZMWs_; jpayne@68: // assert(false) : maxReads_+", "+maxReads; jpayne@68: queue=new ArrayBlockingQueue(queuelen); jpayne@68: if(ff.samOrBam() && useStreamer){ jpayne@68: cris=null; jpayne@68: ss=makeStreamer(ff); jpayne@68: }else{ jpayne@68: cris=makeCris(ff); jpayne@68: ss=null; jpayne@68: } jpayne@68: assert((cris==null) != (ss==null)) : "Exactly one of cris or ss should exist."; jpayne@68: } jpayne@68: jpayne@68: public ZMWStreamer(ConcurrentReadInputStream cris_, SamStreamer ss_, int queuelen_){ jpayne@68: cris=cris_; jpayne@68: ss=ss_; jpayne@68: queuelen=Tools.mid(4, queuelen_, 64); jpayne@68: maxReads=-1; jpayne@68: maxZMWs=-1; jpayne@68: assert((cris==null) != (ss==null)) : "Exactly one of cris or ss should exist."; jpayne@68: queue=new ArrayBlockingQueue(queuelen); jpayne@68: } jpayne@68: jpayne@68: public Thread runStreamer(boolean makeThread){ jpayne@68: if(makeThread){ jpayne@68: Thread t=new Thread(this); jpayne@68: t.start(); jpayne@68: return t; jpayne@68: }else{ jpayne@68: run(); jpayne@68: return null; jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: public void run(){ jpayne@68: if(cris!=null){ jpayne@68: handleCris(); jpayne@68: }else{ jpayne@68: handleStreamer(); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private ConcurrentReadInputStream makeCris(FileFormat ff){ jpayne@68: ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ff, null, null, null); jpayne@68: cris.start(); //Start the stream jpayne@68: if(verbose){System.err.println("Started cris");} jpayne@68: return cris; jpayne@68: } jpayne@68: jpayne@68: private SamReadStreamer makeStreamer(FileFormat ff){ jpayne@68: SamReadStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads); jpayne@68: ss.start(); //Start the stream jpayne@68: if(verbose){System.err.println("Started sam streamer");} jpayne@68: return ss; jpayne@68: } jpayne@68: jpayne@68: /** jpayne@68: * Pull reads from the cris; jpayne@68: * organize them into lists of subreads from the same ZMW; jpayne@68: * put those lists into the shared queue. jpayne@68: */ jpayne@68: private void handleCris(){ jpayne@68: //Grab the first ListNum of reads jpayne@68: ListNum ln=cris.nextList(); jpayne@68: jpayne@68: ZMW buffer=new ZMW();buffer.id=ZMWs; jpayne@68: long prevZmw=-1; jpayne@68: jpayne@68: long readsAdded=0; jpayne@68: // long zmwsAdded=0; jpayne@68: jpayne@68: //As long as there is a nonempty read list... jpayne@68: while(ln!=null && ln.size()>0){ jpayne@68: jpayne@68: for(Read r : ln) { jpayne@68: long zmw; jpayne@68: try { jpayne@68: zmw=Parse.parseZmw(r.id); jpayne@68: } catch (Exception e) { jpayne@68: zmw=r.numericID;//For testing only; disable for production jpayne@68: } jpayne@68: if(zmw<0){zmw=r.numericID;}//For testing only; disable for production jpayne@68: if(verbose){System.err.println("Fetched read "+r.id+"; "+(zmw!=prevZmw)+", "+buffer.isEmpty()+", "+zmw+", "+prevZmw);} jpayne@68: if(zmw!=prevZmw && !buffer.isEmpty()){ jpayne@68: ZMWs++; jpayne@68: addToQueue(buffer); jpayne@68: readsAdded+=buffer.size(); jpayne@68: // zmwsAdded++; jpayne@68: buffer=new ZMW();buffer.id=ZMWs; jpayne@68: if(maxZMWs>0 && ZMWs>=maxZMWs){break;} jpayne@68: } jpayne@68: buffer.add(r); jpayne@68: prevZmw=zmw; jpayne@68: } jpayne@68: jpayne@68: if(maxZMWs>0 && ZMWs>=maxZMWs){break;} jpayne@68: cris.returnList(ln); jpayne@68: jpayne@68: //Fetch a new list jpayne@68: ln=cris.nextList(); jpayne@68: } jpayne@68: jpayne@68: if(!buffer.isEmpty() && (maxZMWs<1 || ZMWs>=maxZMWs)){ jpayne@68: ZMWs++; jpayne@68: readsAdded+=buffer.size(); jpayne@68: addToQueue(buffer); jpayne@68: } jpayne@68: jpayne@68: //Notify the input stream that the final list was used jpayne@68: if(ln!=null){ jpayne@68: cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); jpayne@68: // cris.returnList(ln.id, true); jpayne@68: } jpayne@68: jpayne@68: errorState|=ReadWrite.closeStreams(cris); jpayne@68: addPoison(); jpayne@68: } jpayne@68: jpayne@68: /** jpayne@68: * Pull reads from the streamer; jpayne@68: * organize them into lists of subreads from the same ZMW; jpayne@68: * put those lists into the shared queue. jpayne@68: */ jpayne@68: private void handleStreamer(){ jpayne@68: //Grab the first ListNum of reads jpayne@68: ListNum ln=ss.nextList(); jpayne@68: jpayne@68: ZMW buffer=new ZMW();buffer.id=ZMWs; jpayne@68: long prevZmw=-1; jpayne@68: jpayne@68: long added=0; jpayne@68: jpayne@68: //As long as there is a nonempty read list... jpayne@68: while(ln!=null && ln.size()>0){ jpayne@68: jpayne@68: for(Read r : ln) { jpayne@68: long zmw; jpayne@68: try { jpayne@68: zmw=Parse.parseZmw(r.id); jpayne@68: } catch (Exception e) { jpayne@68: zmw=r.numericID;//For testing only; disable for production jpayne@68: } jpayne@68: if(zmw<0){zmw=r.numericID;}//For testing only; disable for production jpayne@68: if(verbose){System.err.println("Fetched read "+r.id+"; "+(zmw!=prevZmw)+", "+buffer.isEmpty()+", "+zmw+", "+prevZmw);} jpayne@68: if(zmw!=prevZmw && !buffer.isEmpty()){ jpayne@68: ZMWs++; jpayne@68: addToQueue(buffer); jpayne@68: added+=buffer.size(); jpayne@68: buffer=new ZMW();buffer.id=ZMWs; jpayne@68: } jpayne@68: buffer.add(r); jpayne@68: prevZmw=zmw; jpayne@68: } jpayne@68: jpayne@68: //Fetch a new list jpayne@68: ln=ss.nextList(); jpayne@68: } jpayne@68: jpayne@68: if(!buffer.isEmpty()){ jpayne@68: ZMWs++; jpayne@68: added+=buffer.size(); jpayne@68: addToQueue(buffer); jpayne@68: } jpayne@68: jpayne@68: addPoison(); jpayne@68: } jpayne@68: jpayne@68: private void addPoison(){ jpayne@68: // //Notify worker threads that there is no more data jpayne@68: // for(int i=0; i queue; jpayne@68: private static final ZMW POISON=new ZMW(0); jpayne@68: public static boolean verbose=false; jpayne@68: jpayne@68: //Streamer seems to give more highly variable timings... sometimes. And it's not really needed. jpayne@68: public static boolean useStreamer=false; jpayne@68: //Only 1 thread for now to force ordered input jpayne@68: public static final int streamerThreads=1; jpayne@68: jpayne@68: }