comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/AlignmentThreadPool.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 package sketch;
2
3 import java.util.ArrayList;
4 import java.util.concurrent.ArrayBlockingQueue;
5 import java.util.concurrent.atomic.AtomicInteger;
6
7 import shared.Tools;
8
9 public class AlignmentThreadPool {
10
11 public AlignmentThreadPool(int maxThreads_) {
12 maxThreads=maxThreads_;
13 assert(maxThreads>0);
14 tlist=new ArrayList<AlignmentThread>(maxThreads);
15 }
16
17 public void addJobs(ArrayList<Comparison> list, int maxRecords){
18 if(list==null || list.isEmpty() || maxRecords<1){return;}
19 final int limit=Tools.min(list.size(), maxRecords);
20 ArrayBlockingQueue<Comparison> dest=new ArrayBlockingQueue<Comparison>(limit);
21 int added=0;
22 for(int i=0; i<limit; i++){
23 Comparison c=list.get(i);
24 if(c.needsAlignment()){
25 addJob(c, dest);
26 added++;
27 }
28 }
29 for(int i=0; i<added; i++){
30 take(dest);
31 }
32 }
33
34 public void addJob(Comparison c, ArrayBlockingQueue<Comparison> dest){
35 if(tlist.size()<maxThreads){spawnThread();}
36 assert(!poisoned);
37 AlignmentJob job=new AlignmentJob(c, dest);
38 put(job);
39 }
40
41 private synchronized void spawnThread(){
42 final int size=tlist.size();
43 if(size<maxThreads && busy.get()>=size){
44 // AlignmentThread alt=new AlignmentThread(source, busy);
45 AlignmentThread alt=new AlignmentThread();
46 tlist.add(alt);
47 alt.start();
48 }
49 }
50
51 synchronized void poison(){
52 assert(!poisoned);
53 if(poisoned){return;}
54 put(poison);
55 poisoned=true;
56 }
57
58 private void put(AlignmentJob job){
59 busy.incrementAndGet();
60 boolean success=false;
61 while(!success){
62 try {
63 source.put(job);
64 success=true;
65 } catch (InterruptedException e) {
66 // TODO Auto-generated catch block
67 e.printStackTrace();
68 }
69 }
70 }
71
72 private final <X> X take(ArrayBlockingQueue<X> queue){
73 X x=null;
74 while(x==null){
75 try {
76 x=queue.take();
77 } catch (InterruptedException e1) {
78 // TODO Auto-generated catch block
79 e1.printStackTrace();
80 }
81 }
82 return x;
83 }
84
85 private class AlignmentThread extends Thread {
86
87 AlignmentThread(){}
88
89 private final AlignmentJob next(){
90 return take(source);
91 }
92
93 @Override
94 public void run(){
95 AlignmentJob job=null;
96 for(job=next(); !job.isPoison(); job=next()){
97 job.doWork();
98 busy.decrementAndGet();
99 }
100 put(poison);
101 }
102
103 // private final ArrayBlockingQueue<AlignmentJob> source;
104 // private final AtomicInteger busy;
105 //
106 // private static final AlignmentJob poison=new AlignmentJob(null, null);
107
108 }
109
110 final ArrayList<AlignmentThread> tlist;
111 final int maxThreads;
112 final AtomicInteger busy=new AtomicInteger(0);
113 private boolean poisoned=false;
114
115 private static final AlignmentJob poison=new AlignmentJob(null, null);
116 private static final ArrayBlockingQueue<AlignmentJob> source=new ArrayBlockingQueue<AlignmentJob>(4096);
117
118 }