Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/AlignmentThreadPool.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/AlignmentThreadPool.java Tue Mar 18 16:23:26 2025 -0400 @@ -0,0 +1,118 @@ +package sketch; + +import java.util.ArrayList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import shared.Tools; + +public class AlignmentThreadPool { + + public AlignmentThreadPool(int maxThreads_) { + maxThreads=maxThreads_; + assert(maxThreads>0); + tlist=new ArrayList<AlignmentThread>(maxThreads); + } + + public void addJobs(ArrayList<Comparison> list, int maxRecords){ + if(list==null || list.isEmpty() || maxRecords<1){return;} + final int limit=Tools.min(list.size(), maxRecords); + ArrayBlockingQueue<Comparison> dest=new ArrayBlockingQueue<Comparison>(limit); + int added=0; + for(int i=0; i<limit; i++){ + Comparison c=list.get(i); + if(c.needsAlignment()){ + addJob(c, dest); + added++; + } + } + for(int i=0; i<added; i++){ + take(dest); + } + } + + public void addJob(Comparison c, ArrayBlockingQueue<Comparison> dest){ + if(tlist.size()<maxThreads){spawnThread();} + assert(!poisoned); + AlignmentJob job=new AlignmentJob(c, dest); + put(job); + } + + private synchronized void spawnThread(){ + final int size=tlist.size(); + if(size<maxThreads && busy.get()>=size){ +// AlignmentThread alt=new AlignmentThread(source, busy); + AlignmentThread alt=new AlignmentThread(); + tlist.add(alt); + alt.start(); + } + } + + synchronized void poison(){ + assert(!poisoned); + if(poisoned){return;} + put(poison); + poisoned=true; + } + + private void put(AlignmentJob job){ + busy.incrementAndGet(); + boolean success=false; + while(!success){ + try { + source.put(job); + success=true; + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + + private final <X> X take(ArrayBlockingQueue<X> queue){ + X x=null; + while(x==null){ + try { + x=queue.take(); + } catch (InterruptedException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + } + return x; + } + + private class AlignmentThread extends Thread { + + AlignmentThread(){} + + private final AlignmentJob next(){ + return take(source); + } + + @Override + public void run(){ + AlignmentJob job=null; + for(job=next(); !job.isPoison(); job=next()){ + job.doWork(); + busy.decrementAndGet(); + } + put(poison); + } + +// private final ArrayBlockingQueue<AlignmentJob> source; +// private final AtomicInteger busy; +// +// private static final AlignmentJob poison=new AlignmentJob(null, null); + + } + + final ArrayList<AlignmentThread> tlist; + final int maxThreads; + final AtomicInteger busy=new AtomicInteger(0); + private boolean poisoned=false; + + private static final AlignmentJob poison=new AlignmentJob(null, null); + private static final ArrayBlockingQueue<AlignmentJob> source=new ArrayBlockingQueue<AlignmentJob>(4096); + +}