annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/AlignmentThreadPool.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 package sketch;
jpayne@68 2
jpayne@68 3 import java.util.ArrayList;
jpayne@68 4 import java.util.concurrent.ArrayBlockingQueue;
jpayne@68 5 import java.util.concurrent.atomic.AtomicInteger;
jpayne@68 6
jpayne@68 7 import shared.Tools;
jpayne@68 8
jpayne@68 9 public class AlignmentThreadPool {
jpayne@68 10
jpayne@68 11 public AlignmentThreadPool(int maxThreads_) {
jpayne@68 12 maxThreads=maxThreads_;
jpayne@68 13 assert(maxThreads>0);
jpayne@68 14 tlist=new ArrayList<AlignmentThread>(maxThreads);
jpayne@68 15 }
jpayne@68 16
jpayne@68 17 public void addJobs(ArrayList<Comparison> list, int maxRecords){
jpayne@68 18 if(list==null || list.isEmpty() || maxRecords<1){return;}
jpayne@68 19 final int limit=Tools.min(list.size(), maxRecords);
jpayne@68 20 ArrayBlockingQueue<Comparison> dest=new ArrayBlockingQueue<Comparison>(limit);
jpayne@68 21 int added=0;
jpayne@68 22 for(int i=0; i<limit; i++){
jpayne@68 23 Comparison c=list.get(i);
jpayne@68 24 if(c.needsAlignment()){
jpayne@68 25 addJob(c, dest);
jpayne@68 26 added++;
jpayne@68 27 }
jpayne@68 28 }
jpayne@68 29 for(int i=0; i<added; i++){
jpayne@68 30 take(dest);
jpayne@68 31 }
jpayne@68 32 }
jpayne@68 33
jpayne@68 34 public void addJob(Comparison c, ArrayBlockingQueue<Comparison> dest){
jpayne@68 35 if(tlist.size()<maxThreads){spawnThread();}
jpayne@68 36 assert(!poisoned);
jpayne@68 37 AlignmentJob job=new AlignmentJob(c, dest);
jpayne@68 38 put(job);
jpayne@68 39 }
jpayne@68 40
jpayne@68 41 private synchronized void spawnThread(){
jpayne@68 42 final int size=tlist.size();
jpayne@68 43 if(size<maxThreads && busy.get()>=size){
jpayne@68 44 // AlignmentThread alt=new AlignmentThread(source, busy);
jpayne@68 45 AlignmentThread alt=new AlignmentThread();
jpayne@68 46 tlist.add(alt);
jpayne@68 47 alt.start();
jpayne@68 48 }
jpayne@68 49 }
jpayne@68 50
jpayne@68 51 synchronized void poison(){
jpayne@68 52 assert(!poisoned);
jpayne@68 53 if(poisoned){return;}
jpayne@68 54 put(poison);
jpayne@68 55 poisoned=true;
jpayne@68 56 }
jpayne@68 57
jpayne@68 58 private void put(AlignmentJob job){
jpayne@68 59 busy.incrementAndGet();
jpayne@68 60 boolean success=false;
jpayne@68 61 while(!success){
jpayne@68 62 try {
jpayne@68 63 source.put(job);
jpayne@68 64 success=true;
jpayne@68 65 } catch (InterruptedException e) {
jpayne@68 66 // TODO Auto-generated catch block
jpayne@68 67 e.printStackTrace();
jpayne@68 68 }
jpayne@68 69 }
jpayne@68 70 }
jpayne@68 71
jpayne@68 72 private final <X> X take(ArrayBlockingQueue<X> queue){
jpayne@68 73 X x=null;
jpayne@68 74 while(x==null){
jpayne@68 75 try {
jpayne@68 76 x=queue.take();
jpayne@68 77 } catch (InterruptedException e1) {
jpayne@68 78 // TODO Auto-generated catch block
jpayne@68 79 e1.printStackTrace();
jpayne@68 80 }
jpayne@68 81 }
jpayne@68 82 return x;
jpayne@68 83 }
jpayne@68 84
jpayne@68 85 private class AlignmentThread extends Thread {
jpayne@68 86
jpayne@68 87 AlignmentThread(){}
jpayne@68 88
jpayne@68 89 private final AlignmentJob next(){
jpayne@68 90 return take(source);
jpayne@68 91 }
jpayne@68 92
jpayne@68 93 @Override
jpayne@68 94 public void run(){
jpayne@68 95 AlignmentJob job=null;
jpayne@68 96 for(job=next(); !job.isPoison(); job=next()){
jpayne@68 97 job.doWork();
jpayne@68 98 busy.decrementAndGet();
jpayne@68 99 }
jpayne@68 100 put(poison);
jpayne@68 101 }
jpayne@68 102
jpayne@68 103 // private final ArrayBlockingQueue<AlignmentJob> source;
jpayne@68 104 // private final AtomicInteger busy;
jpayne@68 105 //
jpayne@68 106 // private static final AlignmentJob poison=new AlignmentJob(null, null);
jpayne@68 107
jpayne@68 108 }
jpayne@68 109
jpayne@68 110 final ArrayList<AlignmentThread> tlist;
jpayne@68 111 final int maxThreads;
jpayne@68 112 final AtomicInteger busy=new AtomicInteger(0);
jpayne@68 113 private boolean poisoned=false;
jpayne@68 114
jpayne@68 115 private static final AlignmentJob poison=new AlignmentJob(null, null);
jpayne@68 116 private static final ArrayBlockingQueue<AlignmentJob> source=new ArrayBlockingQueue<AlignmentJob>(4096);
jpayne@68 117
jpayne@68 118 }