jpayne@68
|
1 package sketch;
|
jpayne@68
|
2
|
jpayne@68
|
3 import java.util.ArrayList;
|
jpayne@68
|
4 import java.util.concurrent.ArrayBlockingQueue;
|
jpayne@68
|
5 import java.util.concurrent.atomic.AtomicInteger;
|
jpayne@68
|
6
|
jpayne@68
|
7 import shared.Tools;
|
jpayne@68
|
8
|
jpayne@68
|
9 public class AlignmentThreadPool {
|
jpayne@68
|
10
|
jpayne@68
|
11 public AlignmentThreadPool(int maxThreads_) {
|
jpayne@68
|
12 maxThreads=maxThreads_;
|
jpayne@68
|
13 assert(maxThreads>0);
|
jpayne@68
|
14 tlist=new ArrayList<AlignmentThread>(maxThreads);
|
jpayne@68
|
15 }
|
jpayne@68
|
16
|
jpayne@68
|
17 public void addJobs(ArrayList<Comparison> list, int maxRecords){
|
jpayne@68
|
18 if(list==null || list.isEmpty() || maxRecords<1){return;}
|
jpayne@68
|
19 final int limit=Tools.min(list.size(), maxRecords);
|
jpayne@68
|
20 ArrayBlockingQueue<Comparison> dest=new ArrayBlockingQueue<Comparison>(limit);
|
jpayne@68
|
21 int added=0;
|
jpayne@68
|
22 for(int i=0; i<limit; i++){
|
jpayne@68
|
23 Comparison c=list.get(i);
|
jpayne@68
|
24 if(c.needsAlignment()){
|
jpayne@68
|
25 addJob(c, dest);
|
jpayne@68
|
26 added++;
|
jpayne@68
|
27 }
|
jpayne@68
|
28 }
|
jpayne@68
|
29 for(int i=0; i<added; i++){
|
jpayne@68
|
30 take(dest);
|
jpayne@68
|
31 }
|
jpayne@68
|
32 }
|
jpayne@68
|
33
|
jpayne@68
|
34 public void addJob(Comparison c, ArrayBlockingQueue<Comparison> dest){
|
jpayne@68
|
35 if(tlist.size()<maxThreads){spawnThread();}
|
jpayne@68
|
36 assert(!poisoned);
|
jpayne@68
|
37 AlignmentJob job=new AlignmentJob(c, dest);
|
jpayne@68
|
38 put(job);
|
jpayne@68
|
39 }
|
jpayne@68
|
40
|
jpayne@68
|
41 private synchronized void spawnThread(){
|
jpayne@68
|
42 final int size=tlist.size();
|
jpayne@68
|
43 if(size<maxThreads && busy.get()>=size){
|
jpayne@68
|
44 // AlignmentThread alt=new AlignmentThread(source, busy);
|
jpayne@68
|
45 AlignmentThread alt=new AlignmentThread();
|
jpayne@68
|
46 tlist.add(alt);
|
jpayne@68
|
47 alt.start();
|
jpayne@68
|
48 }
|
jpayne@68
|
49 }
|
jpayne@68
|
50
|
jpayne@68
|
51 synchronized void poison(){
|
jpayne@68
|
52 assert(!poisoned);
|
jpayne@68
|
53 if(poisoned){return;}
|
jpayne@68
|
54 put(poison);
|
jpayne@68
|
55 poisoned=true;
|
jpayne@68
|
56 }
|
jpayne@68
|
57
|
jpayne@68
|
58 private void put(AlignmentJob job){
|
jpayne@68
|
59 busy.incrementAndGet();
|
jpayne@68
|
60 boolean success=false;
|
jpayne@68
|
61 while(!success){
|
jpayne@68
|
62 try {
|
jpayne@68
|
63 source.put(job);
|
jpayne@68
|
64 success=true;
|
jpayne@68
|
65 } catch (InterruptedException e) {
|
jpayne@68
|
66 // TODO Auto-generated catch block
|
jpayne@68
|
67 e.printStackTrace();
|
jpayne@68
|
68 }
|
jpayne@68
|
69 }
|
jpayne@68
|
70 }
|
jpayne@68
|
71
|
jpayne@68
|
72 private final <X> X take(ArrayBlockingQueue<X> queue){
|
jpayne@68
|
73 X x=null;
|
jpayne@68
|
74 while(x==null){
|
jpayne@68
|
75 try {
|
jpayne@68
|
76 x=queue.take();
|
jpayne@68
|
77 } catch (InterruptedException e1) {
|
jpayne@68
|
78 // TODO Auto-generated catch block
|
jpayne@68
|
79 e1.printStackTrace();
|
jpayne@68
|
80 }
|
jpayne@68
|
81 }
|
jpayne@68
|
82 return x;
|
jpayne@68
|
83 }
|
jpayne@68
|
84
|
jpayne@68
|
85 private class AlignmentThread extends Thread {
|
jpayne@68
|
86
|
jpayne@68
|
87 AlignmentThread(){}
|
jpayne@68
|
88
|
jpayne@68
|
89 private final AlignmentJob next(){
|
jpayne@68
|
90 return take(source);
|
jpayne@68
|
91 }
|
jpayne@68
|
92
|
jpayne@68
|
93 @Override
|
jpayne@68
|
94 public void run(){
|
jpayne@68
|
95 AlignmentJob job=null;
|
jpayne@68
|
96 for(job=next(); !job.isPoison(); job=next()){
|
jpayne@68
|
97 job.doWork();
|
jpayne@68
|
98 busy.decrementAndGet();
|
jpayne@68
|
99 }
|
jpayne@68
|
100 put(poison);
|
jpayne@68
|
101 }
|
jpayne@68
|
102
|
jpayne@68
|
103 // private final ArrayBlockingQueue<AlignmentJob> source;
|
jpayne@68
|
104 // private final AtomicInteger busy;
|
jpayne@68
|
105 //
|
jpayne@68
|
106 // private static final AlignmentJob poison=new AlignmentJob(null, null);
|
jpayne@68
|
107
|
jpayne@68
|
108 }
|
jpayne@68
|
109
|
jpayne@68
|
110 final ArrayList<AlignmentThread> tlist;
|
jpayne@68
|
111 final int maxThreads;
|
jpayne@68
|
112 final AtomicInteger busy=new AtomicInteger(0);
|
jpayne@68
|
113 private boolean poisoned=false;
|
jpayne@68
|
114
|
jpayne@68
|
115 private static final AlignmentJob poison=new AlignmentJob(null, null);
|
jpayne@68
|
116 private static final ArrayBlockingQueue<AlignmentJob> source=new ArrayBlockingQueue<AlignmentJob>(4096);
|
jpayne@68
|
117
|
jpayne@68
|
118 }
|