Mercurial > repos > rliterman > csp2
view CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/AlignmentThreadPool.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line source
package sketch; import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import shared.Tools; public class AlignmentThreadPool { public AlignmentThreadPool(int maxThreads_) { maxThreads=maxThreads_; assert(maxThreads>0); tlist=new ArrayList<AlignmentThread>(maxThreads); } public void addJobs(ArrayList<Comparison> list, int maxRecords){ if(list==null || list.isEmpty() || maxRecords<1){return;} final int limit=Tools.min(list.size(), maxRecords); ArrayBlockingQueue<Comparison> dest=new ArrayBlockingQueue<Comparison>(limit); int added=0; for(int i=0; i<limit; i++){ Comparison c=list.get(i); if(c.needsAlignment()){ addJob(c, dest); added++; } } for(int i=0; i<added; i++){ take(dest); } } public void addJob(Comparison c, ArrayBlockingQueue<Comparison> dest){ if(tlist.size()<maxThreads){spawnThread();} assert(!poisoned); AlignmentJob job=new AlignmentJob(c, dest); put(job); } private synchronized void spawnThread(){ final int size=tlist.size(); if(size<maxThreads && busy.get()>=size){ // AlignmentThread alt=new AlignmentThread(source, busy); AlignmentThread alt=new AlignmentThread(); tlist.add(alt); alt.start(); } } synchronized void poison(){ assert(!poisoned); if(poisoned){return;} put(poison); poisoned=true; } private void put(AlignmentJob job){ busy.incrementAndGet(); boolean success=false; while(!success){ try { source.put(job); success=true; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private final <X> X take(ArrayBlockingQueue<X> queue){ X x=null; while(x==null){ try { x=queue.take(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } return x; } private class AlignmentThread extends Thread { AlignmentThread(){} private final AlignmentJob next(){ return take(source); } @Override public void run(){ AlignmentJob job=null; for(job=next(); !job.isPoison(); job=next()){ job.doWork(); busy.decrementAndGet(); } put(poison); } // private final ArrayBlockingQueue<AlignmentJob> source; // private final AtomicInteger busy; // // private static final AlignmentJob poison=new AlignmentJob(null, null); } final ArrayList<AlignmentThread> tlist; final int maxThreads; final AtomicInteger busy=new AtomicInteger(0); private boolean poisoned=false; private static final AlignmentJob poison=new AlignmentJob(null, null); private static final ArrayBlockingQueue<AlignmentJob> source=new ArrayBlockingQueue<AlignmentJob>(4096); }