Mercurial > repos > rliterman > csp2
view CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/icecream/ZMWStreamer.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line source
package icecream; import java.util.concurrent.ArrayBlockingQueue; import dna.Data; import fileIO.FileFormat; import fileIO.ReadWrite; import shared.Parse; import shared.Tools; import stream.ConcurrentReadInputStream; import stream.Read; import stream.SamReadStreamer; import stream.SamStreamer; import structures.ListNum; /** * Wrapper for a ReadInputStream. * Produces one ZMW at a time for consumers. * Allows stopping after X reads or X ZMWs. * @author Brian Bushnell * @date June 5, 2020 */ public class ZMWStreamer implements Runnable { public ZMWStreamer(FileFormat ff, int queuelen_, long maxReads_, long maxZMWs_){ Data.USE_SAMBAMBA=false;//Sambamba changes PacBio headers. queuelen=Tools.mid(4, queuelen_, 64); maxReads=maxReads_;//(maxReads_<0 ? Long.MAX_VALUE : maxReads_); maxZMWs=maxZMWs_; // assert(false) : maxReads_+", "+maxReads; queue=new ArrayBlockingQueue<ZMW>(queuelen); if(ff.samOrBam() && useStreamer){ cris=null; ss=makeStreamer(ff); }else{ cris=makeCris(ff); ss=null; } assert((cris==null) != (ss==null)) : "Exactly one of cris or ss should exist."; } public ZMWStreamer(ConcurrentReadInputStream cris_, SamStreamer ss_, int queuelen_){ cris=cris_; ss=ss_; queuelen=Tools.mid(4, queuelen_, 64); maxReads=-1; maxZMWs=-1; assert((cris==null) != (ss==null)) : "Exactly one of cris or ss should exist."; queue=new ArrayBlockingQueue<ZMW>(queuelen); } public Thread runStreamer(boolean makeThread){ if(makeThread){ Thread t=new Thread(this); t.start(); return t; }else{ run(); return null; } } @Override public void run(){ if(cris!=null){ handleCris(); }else{ handleStreamer(); } } private ConcurrentReadInputStream makeCris(FileFormat ff){ ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ff, null, null, null); cris.start(); //Start the stream if(verbose){System.err.println("Started cris");} return cris; } private SamReadStreamer makeStreamer(FileFormat ff){ SamReadStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads); ss.start(); //Start the stream if(verbose){System.err.println("Started sam streamer");} return ss; } /** * Pull reads from the cris; * organize them into lists of subreads from the same ZMW; * put those lists into the shared queue. */ private void handleCris(){ //Grab the first ListNum of reads ListNum<Read> ln=cris.nextList(); ZMW buffer=new ZMW();buffer.id=ZMWs; long prevZmw=-1; long readsAdded=0; // long zmwsAdded=0; //As long as there is a nonempty read list... while(ln!=null && ln.size()>0){ for(Read r : ln) { long zmw; try { zmw=Parse.parseZmw(r.id); } catch (Exception e) { zmw=r.numericID;//For testing only; disable for production } if(zmw<0){zmw=r.numericID;}//For testing only; disable for production if(verbose){System.err.println("Fetched read "+r.id+"; "+(zmw!=prevZmw)+", "+buffer.isEmpty()+", "+zmw+", "+prevZmw);} if(zmw!=prevZmw && !buffer.isEmpty()){ ZMWs++; addToQueue(buffer); readsAdded+=buffer.size(); // zmwsAdded++; buffer=new ZMW();buffer.id=ZMWs; if(maxZMWs>0 && ZMWs>=maxZMWs){break;} } buffer.add(r); prevZmw=zmw; } if(maxZMWs>0 && ZMWs>=maxZMWs){break;} cris.returnList(ln); //Fetch a new list ln=cris.nextList(); } if(!buffer.isEmpty() && (maxZMWs<1 || ZMWs>=maxZMWs)){ ZMWs++; readsAdded+=buffer.size(); addToQueue(buffer); } //Notify the input stream that the final list was used if(ln!=null){ cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); // cris.returnList(ln.id, true); } errorState|=ReadWrite.closeStreams(cris); addPoison(); } /** * Pull reads from the streamer; * organize them into lists of subreads from the same ZMW; * put those lists into the shared queue. */ private void handleStreamer(){ //Grab the first ListNum of reads ListNum<Read> ln=ss.nextList(); ZMW buffer=new ZMW();buffer.id=ZMWs; long prevZmw=-1; long added=0; //As long as there is a nonempty read list... while(ln!=null && ln.size()>0){ for(Read r : ln) { long zmw; try { zmw=Parse.parseZmw(r.id); } catch (Exception e) { zmw=r.numericID;//For testing only; disable for production } if(zmw<0){zmw=r.numericID;}//For testing only; disable for production if(verbose){System.err.println("Fetched read "+r.id+"; "+(zmw!=prevZmw)+", "+buffer.isEmpty()+", "+zmw+", "+prevZmw);} if(zmw!=prevZmw && !buffer.isEmpty()){ ZMWs++; addToQueue(buffer); added+=buffer.size(); buffer=new ZMW();buffer.id=ZMWs; } buffer.add(r); prevZmw=zmw; } //Fetch a new list ln=ss.nextList(); } if(!buffer.isEmpty()){ ZMWs++; added+=buffer.size(); addToQueue(buffer); } addPoison(); } private void addPoison(){ // //Notify worker threads that there is no more data // for(int i=0; i<threads; i++){ // addToQueue(POISON); // } addToQueue(POISON); } private void addToQueue(ZMW buffer){ if(verbose) {System.err.println("Adding to queue "+(buffer==POISON ? "poison" : buffer.get(0).id));} while(buffer!=null) { try { queue.put(buffer); buffer=null; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public ZMW nextZMW(){ ZMW buffer=null; while(buffer==null) { try { buffer=queue.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(verbose){System.err.println("Pulled from queue "+(buffer==POISON ? "poison" : buffer.get(0).id));} if(buffer==POISON){ addToQueue(POISON); return null; }else{ return buffer; } } private final ConcurrentReadInputStream cris; private final SamStreamer ss; private final int queuelen; public long ZMWs=0; private final long maxReads; private final long maxZMWs; public boolean errorState=false; private final ArrayBlockingQueue<ZMW> queue; private static final ZMW POISON=new ZMW(0); public static boolean verbose=false; //Streamer seems to give more highly variable timings... sometimes. And it's not really needed. public static boolean useStreamer=false; //Only 1 thread for now to force ordered input public static final int streamerThreads=1; }