jpayne@68
|
1 package bloom;
|
jpayne@68
|
2
|
jpayne@68
|
3 import java.util.Arrays;
|
jpayne@68
|
4 import java.util.Random;
|
jpayne@68
|
5 import java.util.concurrent.ArrayBlockingQueue;
|
jpayne@68
|
6
|
jpayne@68
|
7 import shared.Primes;
|
jpayne@68
|
8 import shared.Shared;
|
jpayne@68
|
9 import shared.Timer;
|
jpayne@68
|
10 import shared.Tools;
|
jpayne@68
|
11 import structures.ByteBuilder;
|
jpayne@68
|
12
|
jpayne@68
|
13
|
jpayne@68
|
14 /**
|
jpayne@68
|
15 *
|
jpayne@68
|
16 * Uses prime numbers for array lengths.
|
jpayne@68
|
17 * Supports a prefilter that is checked before looking at the main filter.
|
jpayne@68
|
18 *
|
jpayne@68
|
19 * @author Brian Bushnell
|
jpayne@68
|
20 * @date Aug 17, 2012
|
jpayne@68
|
21 *
|
jpayne@68
|
22 */
|
jpayne@68
|
23 public class KCountArray8MT extends KCountArray {
|
jpayne@68
|
24
|
jpayne@68
|
25 /**
|
jpayne@68
|
26 *
|
jpayne@68
|
27 */
|
jpayne@68
|
28 private static final long serialVersionUID = -3146298383509476887L;
|
jpayne@68
|
29
|
jpayne@68
|
30 public static void main(String[] args){
|
jpayne@68
|
31 long cells=Long.parseLong(args[0]);
|
jpayne@68
|
32 int bits=Integer.parseInt(args[1]);
|
jpayne@68
|
33 int hashes=Integer.parseInt(args[3]);
|
jpayne@68
|
34
|
jpayne@68
|
35 verbose=false;
|
jpayne@68
|
36
|
jpayne@68
|
37 KCountArray8MT kca=new KCountArray8MT(cells, bits, hashes, null);
|
jpayne@68
|
38
|
jpayne@68
|
39 System.out.println(kca.read(0));
|
jpayne@68
|
40 kca.increment(0);
|
jpayne@68
|
41 System.out.println(kca.read(0));
|
jpayne@68
|
42 kca.increment(0);
|
jpayne@68
|
43 System.out.println(kca.read(0));
|
jpayne@68
|
44 System.out.println();
|
jpayne@68
|
45
|
jpayne@68
|
46 System.out.println(kca.read(1));
|
jpayne@68
|
47 kca.increment(1);
|
jpayne@68
|
48 System.out.println(kca.read(1));
|
jpayne@68
|
49 kca.increment(1);
|
jpayne@68
|
50 System.out.println(kca.read(1));
|
jpayne@68
|
51 System.out.println();
|
jpayne@68
|
52
|
jpayne@68
|
53 System.out.println(kca.read(100));
|
jpayne@68
|
54 kca.increment(100);
|
jpayne@68
|
55 System.out.println(kca.read(100));
|
jpayne@68
|
56 kca.increment(100);
|
jpayne@68
|
57 System.out.println(kca.read(100));
|
jpayne@68
|
58 kca.increment(100);
|
jpayne@68
|
59 System.out.println(kca.read(100));
|
jpayne@68
|
60 System.out.println();
|
jpayne@68
|
61
|
jpayne@68
|
62
|
jpayne@68
|
63 System.out.println(kca.read(150));
|
jpayne@68
|
64 kca.increment(150);
|
jpayne@68
|
65 System.out.println(kca.read(150));
|
jpayne@68
|
66 System.out.println();
|
jpayne@68
|
67
|
jpayne@68
|
68 }
|
jpayne@68
|
69
|
jpayne@68
|
70 public KCountArray8MT(long cells_, int bits_, int hashes_, KCountArray prefilter_){
|
jpayne@68
|
71 super(getPrimeCells(cells_, bits_), bits_, getDesiredArrays(cells_, bits_));
|
jpayne@68
|
72 // verbose=false;
|
jpayne@68
|
73 // assert(false);
|
jpayne@68
|
74 cellsPerArray=cells/numArrays;
|
jpayne@68
|
75 wordsPerArray=(int)((cellsPerArray%cellsPerWord)==0 ? (cellsPerArray/cellsPerWord) : (cellsPerArray/cellsPerWord+1));
|
jpayne@68
|
76 cellMod=cellsPerArray;
|
jpayne@68
|
77 hashes=hashes_;
|
jpayne@68
|
78 // System.out.println("cells="+cells+", words="+words+", wordsPerArray="+wordsPerArray+", numArrays="+numArrays+", hashes="+hashes);
|
jpayne@68
|
79 // assert(false);
|
jpayne@68
|
80 matrix=new int[numArrays][];
|
jpayne@68
|
81 prefilter=prefilter_;
|
jpayne@68
|
82 assert(prefilter!=null);
|
jpayne@68
|
83 assert(hashes>0 && hashes<=hashMasks.length);
|
jpayne@68
|
84 }
|
jpayne@68
|
85
|
jpayne@68
|
86 private static int getDesiredArrays(long desiredCells, int bits){
|
jpayne@68
|
87
|
jpayne@68
|
88 long words=Tools.max((desiredCells*bits+31)/32, minArrays);
|
jpayne@68
|
89 int arrays=minArrays;
|
jpayne@68
|
90 while(words/arrays>=Integer.MAX_VALUE){
|
jpayne@68
|
91 arrays*=2;
|
jpayne@68
|
92 }
|
jpayne@68
|
93 return arrays;
|
jpayne@68
|
94 }
|
jpayne@68
|
95
|
jpayne@68
|
96 private static long getPrimeCells(long desiredCells, int bits){
|
jpayne@68
|
97
|
jpayne@68
|
98 int arrays=getDesiredArrays(desiredCells, bits);
|
jpayne@68
|
99
|
jpayne@68
|
100 long x=(desiredCells+arrays-1)/arrays;
|
jpayne@68
|
101 long x2=Primes.primeAtMost(x);
|
jpayne@68
|
102 return x2*arrays;
|
jpayne@68
|
103 }
|
jpayne@68
|
104
|
jpayne@68
|
105 @Override
|
jpayne@68
|
106 public int read(final long rawKey){
|
jpayne@68
|
107 assert(finished);
|
jpayne@68
|
108 if(verbose){System.err.println("Reading raw key "+rawKey);}
|
jpayne@68
|
109 int pre=0;
|
jpayne@68
|
110 if(prefilter!=null){
|
jpayne@68
|
111 pre=prefilter.read(rawKey);
|
jpayne@68
|
112 if(pre<prefilter.maxValue){return pre;}
|
jpayne@68
|
113 }
|
jpayne@68
|
114 long key2=hash(rawKey, 0);
|
jpayne@68
|
115 int min=readHashed(key2);
|
jpayne@68
|
116 for(int i=1; i<hashes && min>0; i++){
|
jpayne@68
|
117 if(verbose){System.err.println("Reading. i="+i+", key2="+key2);}
|
jpayne@68
|
118 key2=Long.rotateRight(key2, hashBits);
|
jpayne@68
|
119 key2=hash(key2, i);
|
jpayne@68
|
120 if(verbose){System.err.println("Rot/hash. i="+i+", key2="+key2);}
|
jpayne@68
|
121 min=min(min, readHashed(key2));
|
jpayne@68
|
122 }
|
jpayne@68
|
123 return min;
|
jpayne@68
|
124 }
|
jpayne@68
|
125
|
jpayne@68
|
126 private int readHashed(long key){
|
jpayne@68
|
127 if(verbose){System.err.print("Reading hashed key "+key);}
|
jpayne@68
|
128 // System.out.println("key="+key);
|
jpayne@68
|
129 int arrayNum=(int)(key&arrayMask);
|
jpayne@68
|
130 key=(key>>>arrayBits)%(cellMod);
|
jpayne@68
|
131 // key=(key>>>(arrayBits+1))%(cellMod);
|
jpayne@68
|
132 // System.out.println("array="+arrayNum);
|
jpayne@68
|
133 // System.out.println("key2="+key);
|
jpayne@68
|
134 int[] array=matrix[arrayNum];
|
jpayne@68
|
135 int index=(int)(key>>>indexShift);
|
jpayne@68
|
136 // assert(false) : indexShift;
|
jpayne@68
|
137 // System.out.println("index="+index);
|
jpayne@68
|
138 int word=array[index];
|
jpayne@68
|
139 // System.out.println("word="+Integer.toHexString(word));
|
jpayne@68
|
140 assert(word>>>(cellBits*key) == word>>>(cellBits*(key&cellMask)));
|
jpayne@68
|
141 // int cellShift=(int)(cellBits*(key&cellMask));
|
jpayne@68
|
142 int cellShift=(int)(cellBits*key);
|
jpayne@68
|
143 if(verbose){System.err.println(", array="+arrayNum+", index="+index+", cellShift="+(cellShift%32)+", value="+((int)((word>>>cellShift)&valueMask)));}
|
jpayne@68
|
144 // System.out.println("cellShift="+cellShift);
|
jpayne@68
|
145 return (int)((word>>>cellShift)&valueMask);
|
jpayne@68
|
146 }
|
jpayne@68
|
147
|
jpayne@68
|
148 @Override
|
jpayne@68
|
149 public void write(final long key, int value){
|
jpayne@68
|
150 throw new RuntimeException("Not allowed for this class.");
|
jpayne@68
|
151 }
|
jpayne@68
|
152
|
jpayne@68
|
153 @Override
|
jpayne@68
|
154 /** This should increase speed by doing the first hash outside the critical section, but it does not seem to help. */
|
jpayne@68
|
155 public void increment(long[] keys){
|
jpayne@68
|
156 if(prefilter==null){
|
jpayne@68
|
157 for(int i=0; i<keys.length; i++){
|
jpayne@68
|
158 keys[i]=hash(keys[i], 0);
|
jpayne@68
|
159 }
|
jpayne@68
|
160 synchronized(buffers){
|
jpayne@68
|
161 for(long key : keys){
|
jpayne@68
|
162 incrementPartiallyHashed(key);
|
jpayne@68
|
163 }
|
jpayne@68
|
164 }
|
jpayne@68
|
165 }else{
|
jpayne@68
|
166 int j=0;
|
jpayne@68
|
167 for(int i=0; i<keys.length; i++){
|
jpayne@68
|
168 long key=keys[i];
|
jpayne@68
|
169 int x=prefilter.read(key);
|
jpayne@68
|
170 if(x==prefilter.maxValue){
|
jpayne@68
|
171 keys[j]=hash(key, 0);
|
jpayne@68
|
172 j++;
|
jpayne@68
|
173 }
|
jpayne@68
|
174 }
|
jpayne@68
|
175 synchronized(buffers){
|
jpayne@68
|
176 for(int i=0; i<j; i++){
|
jpayne@68
|
177 incrementPartiallyHashed(keys[i]);
|
jpayne@68
|
178 }
|
jpayne@68
|
179 }
|
jpayne@68
|
180 }
|
jpayne@68
|
181 }
|
jpayne@68
|
182
|
jpayne@68
|
183 //Slow
|
jpayne@68
|
184 @Override
|
jpayne@68
|
185 public void increment(final long rawKey, int amt){
|
jpayne@68
|
186 for(int i=0; i<amt; i++){increment0(rawKey);}
|
jpayne@68
|
187 }
|
jpayne@68
|
188
|
jpayne@68
|
189 public void increment0(final long rawKey){
|
jpayne@68
|
190 if(verbose){System.err.println("\n*** Incrementing raw key "+rawKey+" ***");}
|
jpayne@68
|
191 if(prefilter!=null){
|
jpayne@68
|
192 int pre=prefilter.read(rawKey);
|
jpayne@68
|
193 if(pre<prefilter.maxValue){return;}
|
jpayne@68
|
194 }
|
jpayne@68
|
195
|
jpayne@68
|
196 long key2=rawKey;
|
jpayne@68
|
197 for(int i=0; i<hashes; i++){
|
jpayne@68
|
198 key2=hash(key2, i);
|
jpayne@68
|
199 if(verbose){System.err.println("key2="+key2+", value="+readHashed(key2));}
|
jpayne@68
|
200 // assert(readHashed(key2)==0);
|
jpayne@68
|
201
|
jpayne@68
|
202 int bnum=(int)(key2&arrayMask);
|
jpayne@68
|
203 long[] array=buffers[bnum];
|
jpayne@68
|
204 int loc=bufferlen[bnum];
|
jpayne@68
|
205 array[loc]=key2;
|
jpayne@68
|
206 bufferlen[bnum]++;
|
jpayne@68
|
207 if(verbose){System.err.println("bufferlen["+bnum+"] = "+bufferlen[bnum]);}
|
jpayne@68
|
208 if(bufferlen[bnum]>=array.length){
|
jpayne@68
|
209
|
jpayne@68
|
210 if(verbose){System.err.println("Moving array.");}
|
jpayne@68
|
211 bufferlen[bnum]=0;
|
jpayne@68
|
212 buffers[bnum]=new long[array.length];
|
jpayne@68
|
213
|
jpayne@68
|
214 writers[bnum].add(array);
|
jpayne@68
|
215 if(verbose){System.err.println("Moved.");}
|
jpayne@68
|
216 }
|
jpayne@68
|
217 // assert(read(rawKey)<=min+incr) : "i="+i+", original="+min+", new should be <="+(min+incr)+", new="+read(rawKey)+", max="+maxValue+", key="+rawKey;
|
jpayne@68
|
218 // assert(readHashed(key2)>=min+incr) : "i="+i+", original="+min+", new should be <="+(min+incr)+", new="+read(rawKey)+", max="+maxValue+", key="+rawKey;
|
jpayne@68
|
219 key2=Long.rotateRight(key2, hashBits);
|
jpayne@68
|
220 }
|
jpayne@68
|
221 }
|
jpayne@68
|
222
|
jpayne@68
|
223 private void incrementPartiallyHashed(final long pKey){
|
jpayne@68
|
224 if(verbose){System.err.println("\n*** Incrementing key "+pKey+" ***");}
|
jpayne@68
|
225
|
jpayne@68
|
226 long key2=pKey;
|
jpayne@68
|
227
|
jpayne@68
|
228 {
|
jpayne@68
|
229 int bnum=(int)(key2&arrayMask);
|
jpayne@68
|
230 long[] array=buffers[bnum];
|
jpayne@68
|
231 int loc=bufferlen[bnum];
|
jpayne@68
|
232 array[loc]=key2;
|
jpayne@68
|
233 bufferlen[bnum]++;
|
jpayne@68
|
234 if(verbose){System.err.println("bufferlen["+bnum+"] = "+bufferlen[bnum]);}
|
jpayne@68
|
235 if(bufferlen[bnum]>=array.length){
|
jpayne@68
|
236
|
jpayne@68
|
237 if(verbose){System.err.println("Moving array.");}
|
jpayne@68
|
238 bufferlen[bnum]=0;
|
jpayne@68
|
239 buffers[bnum]=new long[array.length];
|
jpayne@68
|
240
|
jpayne@68
|
241 writers[bnum].add(array);
|
jpayne@68
|
242 if(verbose){System.err.println("Moved.");}
|
jpayne@68
|
243 }
|
jpayne@68
|
244 }
|
jpayne@68
|
245
|
jpayne@68
|
246 for(int i=1; i<hashes; i++){
|
jpayne@68
|
247 key2=Long.rotateRight(key2, hashBits);
|
jpayne@68
|
248 key2=hash(key2, i);
|
jpayne@68
|
249 if(verbose){System.err.println("key2="+key2+", value="+readHashed(key2));}
|
jpayne@68
|
250 // assert(readHashed(key2)==0);
|
jpayne@68
|
251
|
jpayne@68
|
252 int bnum=(int)(key2&arrayMask);
|
jpayne@68
|
253 long[] array=buffers[bnum];
|
jpayne@68
|
254 int loc=bufferlen[bnum];
|
jpayne@68
|
255 array[loc]=key2;
|
jpayne@68
|
256 bufferlen[bnum]++;
|
jpayne@68
|
257 if(verbose){System.err.println("bufferlen["+bnum+"] = "+bufferlen[bnum]);}
|
jpayne@68
|
258 if(bufferlen[bnum]>=array.length){
|
jpayne@68
|
259
|
jpayne@68
|
260 if(verbose){System.err.println("Moving array.");}
|
jpayne@68
|
261 bufferlen[bnum]=0;
|
jpayne@68
|
262 buffers[bnum]=new long[array.length];
|
jpayne@68
|
263
|
jpayne@68
|
264 writers[bnum].add(array);
|
jpayne@68
|
265 if(verbose){System.err.println("Moved.");}
|
jpayne@68
|
266 }
|
jpayne@68
|
267 }
|
jpayne@68
|
268 }
|
jpayne@68
|
269
|
jpayne@68
|
270 /** Returns unincremented value */
|
jpayne@68
|
271 @Override
|
jpayne@68
|
272 public int incrementAndReturnUnincremented(long key, int incr){
|
jpayne@68
|
273 throw new RuntimeException("Operation not supported.");
|
jpayne@68
|
274 }
|
jpayne@68
|
275
|
jpayne@68
|
276 @Override
|
jpayne@68
|
277 public long[] transformToFrequency(){
|
jpayne@68
|
278 return transformToFrequency(matrix);
|
jpayne@68
|
279 }
|
jpayne@68
|
280
|
jpayne@68
|
281 @Override
|
jpayne@68
|
282 public ByteBuilder toContentsString(){
|
jpayne@68
|
283 ByteBuilder sb=new ByteBuilder();
|
jpayne@68
|
284 sb.append('[');
|
jpayne@68
|
285 String comma="";
|
jpayne@68
|
286 for(int[] array : matrix){
|
jpayne@68
|
287 for(int i=0; i<array.length; i++){
|
jpayne@68
|
288 int word=array[i];
|
jpayne@68
|
289 for(int j=0; j<cellsPerWord; j++){
|
jpayne@68
|
290 int x=word&valueMask;
|
jpayne@68
|
291 sb.append(comma);
|
jpayne@68
|
292 sb.append(x);
|
jpayne@68
|
293 word>>>=cellBits;
|
jpayne@68
|
294 comma=", ";
|
jpayne@68
|
295 }
|
jpayne@68
|
296 }
|
jpayne@68
|
297 }
|
jpayne@68
|
298 sb.append(']');
|
jpayne@68
|
299 return sb;
|
jpayne@68
|
300 }
|
jpayne@68
|
301
|
jpayne@68
|
302 @Override
|
jpayne@68
|
303 public double usedFraction(){return cellsUsed/(double)cells;}
|
jpayne@68
|
304
|
jpayne@68
|
305 @Override
|
jpayne@68
|
306 public double usedFraction(int mindepth){return cellsUsed(mindepth)/(double)cells;}
|
jpayne@68
|
307
|
jpayne@68
|
308 @Override
|
jpayne@68
|
309 public long cellsUsed(int mindepth){
|
jpayne@68
|
310 long count=0;
|
jpayne@68
|
311 // System.out.println("A: "+cellBits+", "+Integer.toBinaryString(valueMask));
|
jpayne@68
|
312 for(int[] array : matrix){
|
jpayne@68
|
313 // System.out.println("B");
|
jpayne@68
|
314 if(array!=null){
|
jpayne@68
|
315 // System.out.println("C");
|
jpayne@68
|
316 for(int word : array){
|
jpayne@68
|
317 // System.out.println("D: "+Integer.toBinaryString(word));
|
jpayne@68
|
318 while(word>0){
|
jpayne@68
|
319 int x=word&valueMask;
|
jpayne@68
|
320 // System.out.println("E: "+x+", "+mindepth);
|
jpayne@68
|
321 if(x>=mindepth){count++;}
|
jpayne@68
|
322 word>>>=cellBits;
|
jpayne@68
|
323 }
|
jpayne@68
|
324 }
|
jpayne@68
|
325 }
|
jpayne@68
|
326 }
|
jpayne@68
|
327 return count;
|
jpayne@68
|
328 }
|
jpayne@68
|
329
|
jpayne@68
|
330 @Override
|
jpayne@68
|
331 final long hash(long key, int row){
|
jpayne@68
|
332 int cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1));
|
jpayne@68
|
333 // int cell=(int)(hashCellMask&(key));
|
jpayne@68
|
334
|
jpayne@68
|
335 if(row==0){//Doublehash only first time
|
jpayne@68
|
336 key=key^hashMasks[(row+4)%hashMasks.length][cell];
|
jpayne@68
|
337 cell=(int)(hashCellMask&(key>>5));
|
jpayne@68
|
338 // cell=(int)(hashCellMask&(key>>hashBits));
|
jpayne@68
|
339 // cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1));
|
jpayne@68
|
340 }
|
jpayne@68
|
341
|
jpayne@68
|
342 return key^hashMasks[row][cell];
|
jpayne@68
|
343 }
|
jpayne@68
|
344
|
jpayne@68
|
345 /**
|
jpayne@68
|
346 * @param i
|
jpayne@68
|
347 * @param j
|
jpayne@68
|
348 * @return
|
jpayne@68
|
349 */
|
jpayne@68
|
350 private static long[][] makeMasks(int rows, int cols) {
|
jpayne@68
|
351
|
jpayne@68
|
352 long seed;
|
jpayne@68
|
353 synchronized(KCountArray8MT.class){
|
jpayne@68
|
354 seed=counter;
|
jpayne@68
|
355 counter++;
|
jpayne@68
|
356 }
|
jpayne@68
|
357
|
jpayne@68
|
358 Timer t=new Timer();
|
jpayne@68
|
359 long[][] r=new long[rows][cols];
|
jpayne@68
|
360 Random randy=Shared.threadLocalRandom(seed);
|
jpayne@68
|
361 for(int i=0; i<r.length; i++){
|
jpayne@68
|
362 fillMasks(r[i], randy);
|
jpayne@68
|
363 }
|
jpayne@68
|
364 t.stop();
|
jpayne@68
|
365 if(t.elapsed>200000000L){System.out.println("Mask-creation time: "+t);}
|
jpayne@68
|
366 return r;
|
jpayne@68
|
367 }
|
jpayne@68
|
368
|
jpayne@68
|
369 private static void fillMasks(long[] r, Random randy) {
|
jpayne@68
|
370 // for(int i=0; i<r.length; i++){
|
jpayne@68
|
371 // long x=0;
|
jpayne@68
|
372 // while(Long.bitCount(x&0xFFFFFFFF)!=16){
|
jpayne@68
|
373 // x=randy.nextLong();
|
jpayne@68
|
374 // }
|
jpayne@68
|
375 // r[i]=(x&Long.MAX_VALUE);
|
jpayne@68
|
376 // }
|
jpayne@68
|
377
|
jpayne@68
|
378 final int hlen=(1<<hashBits);
|
jpayne@68
|
379 assert(r.length==hlen);
|
jpayne@68
|
380 int[] count1=new int[hlen];
|
jpayne@68
|
381 int[] count2=new int[hlen];
|
jpayne@68
|
382 final long mask=hlen-1;
|
jpayne@68
|
383
|
jpayne@68
|
384 for(int i=0; i<r.length; i++){
|
jpayne@68
|
385 long x=0;
|
jpayne@68
|
386 int y=0;
|
jpayne@68
|
387 int z=0;
|
jpayne@68
|
388 while(Long.bitCount(x&0xFFFFFFFFL)!=16){
|
jpayne@68
|
389 x=randy.nextLong();
|
jpayne@68
|
390 while(Long.bitCount(x&0xFFFFFFFFL)<16){
|
jpayne@68
|
391 x|=(1L<<randy.nextInt(32));
|
jpayne@68
|
392 }
|
jpayne@68
|
393 while(Long.bitCount(x&0xFFFFFFFFL)>16){
|
jpayne@68
|
394 x&=(~(1L<<randy.nextInt(32)));
|
jpayne@68
|
395 }
|
jpayne@68
|
396 while(Long.bitCount(x&0xFFFFFFFF00000000L)<16){
|
jpayne@68
|
397 x|=(1L<<(randy.nextInt(32)+32));
|
jpayne@68
|
398 }
|
jpayne@68
|
399 while(Long.bitCount(x&0xFFFFFFFF00000000L)>16){
|
jpayne@68
|
400 x&=(~(1L<<(randy.nextInt(32)+32)));
|
jpayne@68
|
401 }
|
jpayne@68
|
402
|
jpayne@68
|
403 // System.out.print(".");
|
jpayne@68
|
404 // y=(((int)(x&mask))^i);
|
jpayne@68
|
405 y=(((int)(x&mask)));
|
jpayne@68
|
406 z=(int)((x>>hashBits)&mask);
|
jpayne@68
|
407 if(count1[y]>0 || count2[z]>0){
|
jpayne@68
|
408 x=0;
|
jpayne@68
|
409 }
|
jpayne@68
|
410 }
|
jpayne@68
|
411 // System.out.println(Long.toBinaryString(x));
|
jpayne@68
|
412 r[i]=(x&Long.MAX_VALUE);
|
jpayne@68
|
413 count1[y]++;
|
jpayne@68
|
414 count2[z]++;
|
jpayne@68
|
415 }
|
jpayne@68
|
416
|
jpayne@68
|
417 }
|
jpayne@68
|
418
|
jpayne@68
|
419
|
jpayne@68
|
420 @Override
|
jpayne@68
|
421 public void initialize(){
|
jpayne@68
|
422 for(int i=0; i<writers.length; i++){
|
jpayne@68
|
423 writers[i]=new WriteThread(i);
|
jpayne@68
|
424 writers[i].start();
|
jpayne@68
|
425
|
jpayne@68
|
426 // while(!writers[i].isAlive()){
|
jpayne@68
|
427 // System.out.print(".");
|
jpayne@68
|
428 // }
|
jpayne@68
|
429 }
|
jpayne@68
|
430 }
|
jpayne@68
|
431
|
jpayne@68
|
432 @Override
|
jpayne@68
|
433 public void shutdown(){
|
jpayne@68
|
434 if(finished){return;}
|
jpayne@68
|
435 synchronized(this){
|
jpayne@68
|
436 if(finished){return;}
|
jpayne@68
|
437
|
jpayne@68
|
438 //Clear buffers
|
jpayne@68
|
439 for(int i=0; i<numArrays; i++){
|
jpayne@68
|
440 long[] array=buffers[i];
|
jpayne@68
|
441 int len=bufferlen[i];
|
jpayne@68
|
442 buffers[i]=null;
|
jpayne@68
|
443 bufferlen[i]=0;
|
jpayne@68
|
444
|
jpayne@68
|
445 if(len<array.length){
|
jpayne@68
|
446 array=Arrays.copyOf(array, len);
|
jpayne@68
|
447 }
|
jpayne@68
|
448
|
jpayne@68
|
449 if(array.length>0){
|
jpayne@68
|
450 writers[i].add(array);
|
jpayne@68
|
451 }
|
jpayne@68
|
452 }
|
jpayne@68
|
453
|
jpayne@68
|
454 //Add poison
|
jpayne@68
|
455 for(WriteThread wt : writers){
|
jpayne@68
|
456 wt.add(poison);
|
jpayne@68
|
457 }
|
jpayne@68
|
458
|
jpayne@68
|
459 //Wait for termination
|
jpayne@68
|
460 for(WriteThread wt : writers){
|
jpayne@68
|
461 // System.out.println("wt"+wt.num+" is alive: "+wt.isAlive());
|
jpayne@68
|
462 while(wt.isAlive()){
|
jpayne@68
|
463 // System.out.println("wt"+wt.num+" is alive: "+wt.isAlive());
|
jpayne@68
|
464 try {
|
jpayne@68
|
465 wt.join(10000);
|
jpayne@68
|
466 } catch (InterruptedException e) {
|
jpayne@68
|
467 // TODO Auto-generated catch block
|
jpayne@68
|
468 e.printStackTrace();
|
jpayne@68
|
469 }
|
jpayne@68
|
470 if(wt.isAlive()){System.err.println(wt.getClass().getCanonicalName()+" is taking a long time to die.");}
|
jpayne@68
|
471 }
|
jpayne@68
|
472 cellsUsed+=wt.cellsUsedPersonal;
|
jpayne@68
|
473 // System.out.println("cellsUsed="+cellsUsed);
|
jpayne@68
|
474 }
|
jpayne@68
|
475
|
jpayne@68
|
476 assert(!finished);
|
jpayne@68
|
477 finished=true;
|
jpayne@68
|
478 }
|
jpayne@68
|
479 }
|
jpayne@68
|
480
|
jpayne@68
|
481 private class WriteThread extends Thread{
|
jpayne@68
|
482
|
jpayne@68
|
483 public WriteThread(int tnum){
|
jpayne@68
|
484 num=tnum;
|
jpayne@68
|
485 }
|
jpayne@68
|
486
|
jpayne@68
|
487 @Override
|
jpayne@68
|
488 public void run(){
|
jpayne@68
|
489 assert(matrix[num]==null);
|
jpayne@68
|
490 array=new int[wordsPerArray]; //Makes NUMA systems use local memory.
|
jpayne@68
|
491
|
jpayne@68
|
492 matrix[num]=array;
|
jpayne@68
|
493
|
jpayne@68
|
494 long[] keys=null;
|
jpayne@68
|
495 while(!shutdown){
|
jpayne@68
|
496
|
jpayne@68
|
497 if(verbose){System.err.println(" - Reading keys for wt"+num+".");}
|
jpayne@68
|
498 while(keys==null){
|
jpayne@68
|
499 try {
|
jpayne@68
|
500 keys=writeQueue.take();
|
jpayne@68
|
501 } catch (InterruptedException e) {
|
jpayne@68
|
502 // TODO Auto-generated catch block
|
jpayne@68
|
503 e.printStackTrace();
|
jpayne@68
|
504 }
|
jpayne@68
|
505 }
|
jpayne@68
|
506 if(keys==poison){
|
jpayne@68
|
507 // assert(false);
|
jpayne@68
|
508 shutdown=true;
|
jpayne@68
|
509 }else{
|
jpayne@68
|
510 for(long key : keys){
|
jpayne@68
|
511 incrementHashedLocal(key);
|
jpayne@68
|
512 }
|
jpayne@68
|
513 }
|
jpayne@68
|
514 // System.out.println(" -- Read keys for wt"+num+". poison="+(keys==poison)+", len="+keys.length);
|
jpayne@68
|
515 if(verbose){System.err.println(" -- Read keys for wt"+num+". (success)");}
|
jpayne@68
|
516 keys=null;
|
jpayne@68
|
517 if(verbose){System.err.println("shutdown="+shutdown);}
|
jpayne@68
|
518 }
|
jpayne@68
|
519
|
jpayne@68
|
520 // System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+".");
|
jpayne@68
|
521 // assert(false) : ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+".";
|
jpayne@68
|
522
|
jpayne@68
|
523 array=null;
|
jpayne@68
|
524 }
|
jpayne@68
|
525
|
jpayne@68
|
526 void add(long[] keys){
|
jpayne@68
|
527 // assert(isAlive());
|
jpayne@68
|
528 assert(!shutdown);
|
jpayne@68
|
529 if(shutdown){return;}
|
jpayne@68
|
530 // assert(keys!=poison);
|
jpayne@68
|
531 if(verbose){System.err.println(" + Adding keys to wt"+num+".");}
|
jpayne@68
|
532 boolean success=false;
|
jpayne@68
|
533 while(!success){
|
jpayne@68
|
534 try {
|
jpayne@68
|
535 writeQueue.put(keys);
|
jpayne@68
|
536 success=true;
|
jpayne@68
|
537 } catch (InterruptedException e) {
|
jpayne@68
|
538 // TODO Auto-generated catch block
|
jpayne@68
|
539 e.printStackTrace();
|
jpayne@68
|
540 }
|
jpayne@68
|
541 }
|
jpayne@68
|
542 if(verbose){System.err.println(" ++ Added keys to wt"+num+". (success)");}
|
jpayne@68
|
543 }
|
jpayne@68
|
544
|
jpayne@68
|
545 private int incrementHashedLocal(long key){
|
jpayne@68
|
546 assert((key&arrayMask)==num);
|
jpayne@68
|
547 key=(key>>>arrayBits)%(cellMod);
|
jpayne@68
|
548 // key=(key>>>(arrayBits+1))%(cellMod);
|
jpayne@68
|
549 int index=(int)(key>>>indexShift);
|
jpayne@68
|
550 int word=array[index];
|
jpayne@68
|
551 int cellShift=(int)(cellBits*key);
|
jpayne@68
|
552 int value=((word>>>cellShift)&valueMask);
|
jpayne@68
|
553 if(value==0){cellsUsedPersonal++;}
|
jpayne@68
|
554 value=min(value+1, maxValue);
|
jpayne@68
|
555 word=(value<<cellShift)|(word&~((valueMask)<<cellShift));
|
jpayne@68
|
556 array[index]=word;
|
jpayne@68
|
557 return value;
|
jpayne@68
|
558 }
|
jpayne@68
|
559
|
jpayne@68
|
560 private int[] array;
|
jpayne@68
|
561 private final int num;
|
jpayne@68
|
562 public long cellsUsedPersonal=0;
|
jpayne@68
|
563
|
jpayne@68
|
564 public ArrayBlockingQueue<long[]> writeQueue=new ArrayBlockingQueue<long[]>(16);
|
jpayne@68
|
565 public boolean shutdown=false;
|
jpayne@68
|
566
|
jpayne@68
|
567 }
|
jpayne@68
|
568
|
jpayne@68
|
569
|
jpayne@68
|
570 public long cellsUsed(){return cellsUsed;}
|
jpayne@68
|
571
|
jpayne@68
|
572 private boolean finished=false;
|
jpayne@68
|
573
|
jpayne@68
|
574 private long cellsUsed;
|
jpayne@68
|
575 final int[][] matrix;
|
jpayne@68
|
576 private final WriteThread[] writers=new WriteThread[numArrays];
|
jpayne@68
|
577 private final int hashes;
|
jpayne@68
|
578 final int wordsPerArray;
|
jpayne@68
|
579 private final long cellsPerArray;
|
jpayne@68
|
580 final long cellMod;
|
jpayne@68
|
581 private final long[][] hashMasks=makeMasks(8, hashArrayLength);
|
jpayne@68
|
582
|
jpayne@68
|
583 private final long[][] buffers=new long[numArrays][500];
|
jpayne@68
|
584 private final int[] bufferlen=new int[numArrays];
|
jpayne@68
|
585
|
jpayne@68
|
586 public final KCountArray prefilter;
|
jpayne@68
|
587
|
jpayne@68
|
588 private static final int hashBits=6;
|
jpayne@68
|
589 private static final int hashArrayLength=1<<hashBits;
|
jpayne@68
|
590 private static final int hashCellMask=hashArrayLength-1;
|
jpayne@68
|
591 static final long[] poison=new long[0];
|
jpayne@68
|
592
|
jpayne@68
|
593 private static long counter=0;
|
jpayne@68
|
594
|
jpayne@68
|
595 }
|