Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/sketch/AlignmentThreadPool.java @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 package sketch; | |
2 | |
3 import java.util.ArrayList; | |
4 import java.util.concurrent.ArrayBlockingQueue; | |
5 import java.util.concurrent.atomic.AtomicInteger; | |
6 | |
7 import shared.Tools; | |
8 | |
9 public class AlignmentThreadPool { | |
10 | |
11 public AlignmentThreadPool(int maxThreads_) { | |
12 maxThreads=maxThreads_; | |
13 assert(maxThreads>0); | |
14 tlist=new ArrayList<AlignmentThread>(maxThreads); | |
15 } | |
16 | |
17 public void addJobs(ArrayList<Comparison> list, int maxRecords){ | |
18 if(list==null || list.isEmpty() || maxRecords<1){return;} | |
19 final int limit=Tools.min(list.size(), maxRecords); | |
20 ArrayBlockingQueue<Comparison> dest=new ArrayBlockingQueue<Comparison>(limit); | |
21 int added=0; | |
22 for(int i=0; i<limit; i++){ | |
23 Comparison c=list.get(i); | |
24 if(c.needsAlignment()){ | |
25 addJob(c, dest); | |
26 added++; | |
27 } | |
28 } | |
29 for(int i=0; i<added; i++){ | |
30 take(dest); | |
31 } | |
32 } | |
33 | |
34 public void addJob(Comparison c, ArrayBlockingQueue<Comparison> dest){ | |
35 if(tlist.size()<maxThreads){spawnThread();} | |
36 assert(!poisoned); | |
37 AlignmentJob job=new AlignmentJob(c, dest); | |
38 put(job); | |
39 } | |
40 | |
41 private synchronized void spawnThread(){ | |
42 final int size=tlist.size(); | |
43 if(size<maxThreads && busy.get()>=size){ | |
44 // AlignmentThread alt=new AlignmentThread(source, busy); | |
45 AlignmentThread alt=new AlignmentThread(); | |
46 tlist.add(alt); | |
47 alt.start(); | |
48 } | |
49 } | |
50 | |
51 synchronized void poison(){ | |
52 assert(!poisoned); | |
53 if(poisoned){return;} | |
54 put(poison); | |
55 poisoned=true; | |
56 } | |
57 | |
58 private void put(AlignmentJob job){ | |
59 busy.incrementAndGet(); | |
60 boolean success=false; | |
61 while(!success){ | |
62 try { | |
63 source.put(job); | |
64 success=true; | |
65 } catch (InterruptedException e) { | |
66 // TODO Auto-generated catch block | |
67 e.printStackTrace(); | |
68 } | |
69 } | |
70 } | |
71 | |
72 private final <X> X take(ArrayBlockingQueue<X> queue){ | |
73 X x=null; | |
74 while(x==null){ | |
75 try { | |
76 x=queue.take(); | |
77 } catch (InterruptedException e1) { | |
78 // TODO Auto-generated catch block | |
79 e1.printStackTrace(); | |
80 } | |
81 } | |
82 return x; | |
83 } | |
84 | |
85 private class AlignmentThread extends Thread { | |
86 | |
87 AlignmentThread(){} | |
88 | |
89 private final AlignmentJob next(){ | |
90 return take(source); | |
91 } | |
92 | |
93 @Override | |
94 public void run(){ | |
95 AlignmentJob job=null; | |
96 for(job=next(); !job.isPoison(); job=next()){ | |
97 job.doWork(); | |
98 busy.decrementAndGet(); | |
99 } | |
100 put(poison); | |
101 } | |
102 | |
103 // private final ArrayBlockingQueue<AlignmentJob> source; | |
104 // private final AtomicInteger busy; | |
105 // | |
106 // private static final AlignmentJob poison=new AlignmentJob(null, null); | |
107 | |
108 } | |
109 | |
110 final ArrayList<AlignmentThread> tlist; | |
111 final int maxThreads; | |
112 final AtomicInteger busy=new AtomicInteger(0); | |
113 private boolean poisoned=false; | |
114 | |
115 private static final AlignmentJob poison=new AlignmentJob(null, null); | |
116 private static final ArrayBlockingQueue<AlignmentJob> source=new ArrayBlockingQueue<AlignmentJob>(4096); | |
117 | |
118 } |