annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/opt/bbmap-39.01-1/current/bloom/KCountArray8MT.java @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 package bloom;
jpayne@68 2
jpayne@68 3 import java.util.Arrays;
jpayne@68 4 import java.util.Random;
jpayne@68 5 import java.util.concurrent.ArrayBlockingQueue;
jpayne@68 6
jpayne@68 7 import shared.Primes;
jpayne@68 8 import shared.Shared;
jpayne@68 9 import shared.Timer;
jpayne@68 10 import shared.Tools;
jpayne@68 11 import structures.ByteBuilder;
jpayne@68 12
jpayne@68 13
jpayne@68 14 /**
jpayne@68 15 *
jpayne@68 16 * Uses prime numbers for array lengths.
jpayne@68 17 * Supports a prefilter that is checked before looking at the main filter.
jpayne@68 18 *
jpayne@68 19 * @author Brian Bushnell
jpayne@68 20 * @date Aug 17, 2012
jpayne@68 21 *
jpayne@68 22 */
jpayne@68 23 public class KCountArray8MT extends KCountArray {
jpayne@68 24
jpayne@68 25 /**
jpayne@68 26 *
jpayne@68 27 */
jpayne@68 28 private static final long serialVersionUID = -3146298383509476887L;
jpayne@68 29
jpayne@68 30 public static void main(String[] args){
jpayne@68 31 long cells=Long.parseLong(args[0]);
jpayne@68 32 int bits=Integer.parseInt(args[1]);
jpayne@68 33 int hashes=Integer.parseInt(args[3]);
jpayne@68 34
jpayne@68 35 verbose=false;
jpayne@68 36
jpayne@68 37 KCountArray8MT kca=new KCountArray8MT(cells, bits, hashes, null);
jpayne@68 38
jpayne@68 39 System.out.println(kca.read(0));
jpayne@68 40 kca.increment(0);
jpayne@68 41 System.out.println(kca.read(0));
jpayne@68 42 kca.increment(0);
jpayne@68 43 System.out.println(kca.read(0));
jpayne@68 44 System.out.println();
jpayne@68 45
jpayne@68 46 System.out.println(kca.read(1));
jpayne@68 47 kca.increment(1);
jpayne@68 48 System.out.println(kca.read(1));
jpayne@68 49 kca.increment(1);
jpayne@68 50 System.out.println(kca.read(1));
jpayne@68 51 System.out.println();
jpayne@68 52
jpayne@68 53 System.out.println(kca.read(100));
jpayne@68 54 kca.increment(100);
jpayne@68 55 System.out.println(kca.read(100));
jpayne@68 56 kca.increment(100);
jpayne@68 57 System.out.println(kca.read(100));
jpayne@68 58 kca.increment(100);
jpayne@68 59 System.out.println(kca.read(100));
jpayne@68 60 System.out.println();
jpayne@68 61
jpayne@68 62
jpayne@68 63 System.out.println(kca.read(150));
jpayne@68 64 kca.increment(150);
jpayne@68 65 System.out.println(kca.read(150));
jpayne@68 66 System.out.println();
jpayne@68 67
jpayne@68 68 }
jpayne@68 69
jpayne@68 70 public KCountArray8MT(long cells_, int bits_, int hashes_, KCountArray prefilter_){
jpayne@68 71 super(getPrimeCells(cells_, bits_), bits_, getDesiredArrays(cells_, bits_));
jpayne@68 72 // verbose=false;
jpayne@68 73 // assert(false);
jpayne@68 74 cellsPerArray=cells/numArrays;
jpayne@68 75 wordsPerArray=(int)((cellsPerArray%cellsPerWord)==0 ? (cellsPerArray/cellsPerWord) : (cellsPerArray/cellsPerWord+1));
jpayne@68 76 cellMod=cellsPerArray;
jpayne@68 77 hashes=hashes_;
jpayne@68 78 // System.out.println("cells="+cells+", words="+words+", wordsPerArray="+wordsPerArray+", numArrays="+numArrays+", hashes="+hashes);
jpayne@68 79 // assert(false);
jpayne@68 80 matrix=new int[numArrays][];
jpayne@68 81 prefilter=prefilter_;
jpayne@68 82 assert(prefilter!=null);
jpayne@68 83 assert(hashes>0 && hashes<=hashMasks.length);
jpayne@68 84 }
jpayne@68 85
jpayne@68 86 private static int getDesiredArrays(long desiredCells, int bits){
jpayne@68 87
jpayne@68 88 long words=Tools.max((desiredCells*bits+31)/32, minArrays);
jpayne@68 89 int arrays=minArrays;
jpayne@68 90 while(words/arrays>=Integer.MAX_VALUE){
jpayne@68 91 arrays*=2;
jpayne@68 92 }
jpayne@68 93 return arrays;
jpayne@68 94 }
jpayne@68 95
jpayne@68 96 private static long getPrimeCells(long desiredCells, int bits){
jpayne@68 97
jpayne@68 98 int arrays=getDesiredArrays(desiredCells, bits);
jpayne@68 99
jpayne@68 100 long x=(desiredCells+arrays-1)/arrays;
jpayne@68 101 long x2=Primes.primeAtMost(x);
jpayne@68 102 return x2*arrays;
jpayne@68 103 }
jpayne@68 104
jpayne@68 105 @Override
jpayne@68 106 public int read(final long rawKey){
jpayne@68 107 assert(finished);
jpayne@68 108 if(verbose){System.err.println("Reading raw key "+rawKey);}
jpayne@68 109 int pre=0;
jpayne@68 110 if(prefilter!=null){
jpayne@68 111 pre=prefilter.read(rawKey);
jpayne@68 112 if(pre<prefilter.maxValue){return pre;}
jpayne@68 113 }
jpayne@68 114 long key2=hash(rawKey, 0);
jpayne@68 115 int min=readHashed(key2);
jpayne@68 116 for(int i=1; i<hashes && min>0; i++){
jpayne@68 117 if(verbose){System.err.println("Reading. i="+i+", key2="+key2);}
jpayne@68 118 key2=Long.rotateRight(key2, hashBits);
jpayne@68 119 key2=hash(key2, i);
jpayne@68 120 if(verbose){System.err.println("Rot/hash. i="+i+", key2="+key2);}
jpayne@68 121 min=min(min, readHashed(key2));
jpayne@68 122 }
jpayne@68 123 return min;
jpayne@68 124 }
jpayne@68 125
jpayne@68 126 private int readHashed(long key){
jpayne@68 127 if(verbose){System.err.print("Reading hashed key "+key);}
jpayne@68 128 // System.out.println("key="+key);
jpayne@68 129 int arrayNum=(int)(key&arrayMask);
jpayne@68 130 key=(key>>>arrayBits)%(cellMod);
jpayne@68 131 // key=(key>>>(arrayBits+1))%(cellMod);
jpayne@68 132 // System.out.println("array="+arrayNum);
jpayne@68 133 // System.out.println("key2="+key);
jpayne@68 134 int[] array=matrix[arrayNum];
jpayne@68 135 int index=(int)(key>>>indexShift);
jpayne@68 136 // assert(false) : indexShift;
jpayne@68 137 // System.out.println("index="+index);
jpayne@68 138 int word=array[index];
jpayne@68 139 // System.out.println("word="+Integer.toHexString(word));
jpayne@68 140 assert(word>>>(cellBits*key) == word>>>(cellBits*(key&cellMask)));
jpayne@68 141 // int cellShift=(int)(cellBits*(key&cellMask));
jpayne@68 142 int cellShift=(int)(cellBits*key);
jpayne@68 143 if(verbose){System.err.println(", array="+arrayNum+", index="+index+", cellShift="+(cellShift%32)+", value="+((int)((word>>>cellShift)&valueMask)));}
jpayne@68 144 // System.out.println("cellShift="+cellShift);
jpayne@68 145 return (int)((word>>>cellShift)&valueMask);
jpayne@68 146 }
jpayne@68 147
jpayne@68 148 @Override
jpayne@68 149 public void write(final long key, int value){
jpayne@68 150 throw new RuntimeException("Not allowed for this class.");
jpayne@68 151 }
jpayne@68 152
jpayne@68 153 @Override
jpayne@68 154 /** This should increase speed by doing the first hash outside the critical section, but it does not seem to help. */
jpayne@68 155 public void increment(long[] keys){
jpayne@68 156 if(prefilter==null){
jpayne@68 157 for(int i=0; i<keys.length; i++){
jpayne@68 158 keys[i]=hash(keys[i], 0);
jpayne@68 159 }
jpayne@68 160 synchronized(buffers){
jpayne@68 161 for(long key : keys){
jpayne@68 162 incrementPartiallyHashed(key);
jpayne@68 163 }
jpayne@68 164 }
jpayne@68 165 }else{
jpayne@68 166 int j=0;
jpayne@68 167 for(int i=0; i<keys.length; i++){
jpayne@68 168 long key=keys[i];
jpayne@68 169 int x=prefilter.read(key);
jpayne@68 170 if(x==prefilter.maxValue){
jpayne@68 171 keys[j]=hash(key, 0);
jpayne@68 172 j++;
jpayne@68 173 }
jpayne@68 174 }
jpayne@68 175 synchronized(buffers){
jpayne@68 176 for(int i=0; i<j; i++){
jpayne@68 177 incrementPartiallyHashed(keys[i]);
jpayne@68 178 }
jpayne@68 179 }
jpayne@68 180 }
jpayne@68 181 }
jpayne@68 182
jpayne@68 183 //Slow
jpayne@68 184 @Override
jpayne@68 185 public void increment(final long rawKey, int amt){
jpayne@68 186 for(int i=0; i<amt; i++){increment0(rawKey);}
jpayne@68 187 }
jpayne@68 188
jpayne@68 189 public void increment0(final long rawKey){
jpayne@68 190 if(verbose){System.err.println("\n*** Incrementing raw key "+rawKey+" ***");}
jpayne@68 191 if(prefilter!=null){
jpayne@68 192 int pre=prefilter.read(rawKey);
jpayne@68 193 if(pre<prefilter.maxValue){return;}
jpayne@68 194 }
jpayne@68 195
jpayne@68 196 long key2=rawKey;
jpayne@68 197 for(int i=0; i<hashes; i++){
jpayne@68 198 key2=hash(key2, i);
jpayne@68 199 if(verbose){System.err.println("key2="+key2+", value="+readHashed(key2));}
jpayne@68 200 // assert(readHashed(key2)==0);
jpayne@68 201
jpayne@68 202 int bnum=(int)(key2&arrayMask);
jpayne@68 203 long[] array=buffers[bnum];
jpayne@68 204 int loc=bufferlen[bnum];
jpayne@68 205 array[loc]=key2;
jpayne@68 206 bufferlen[bnum]++;
jpayne@68 207 if(verbose){System.err.println("bufferlen["+bnum+"] = "+bufferlen[bnum]);}
jpayne@68 208 if(bufferlen[bnum]>=array.length){
jpayne@68 209
jpayne@68 210 if(verbose){System.err.println("Moving array.");}
jpayne@68 211 bufferlen[bnum]=0;
jpayne@68 212 buffers[bnum]=new long[array.length];
jpayne@68 213
jpayne@68 214 writers[bnum].add(array);
jpayne@68 215 if(verbose){System.err.println("Moved.");}
jpayne@68 216 }
jpayne@68 217 // assert(read(rawKey)<=min+incr) : "i="+i+", original="+min+", new should be <="+(min+incr)+", new="+read(rawKey)+", max="+maxValue+", key="+rawKey;
jpayne@68 218 // assert(readHashed(key2)>=min+incr) : "i="+i+", original="+min+", new should be <="+(min+incr)+", new="+read(rawKey)+", max="+maxValue+", key="+rawKey;
jpayne@68 219 key2=Long.rotateRight(key2, hashBits);
jpayne@68 220 }
jpayne@68 221 }
jpayne@68 222
jpayne@68 223 private void incrementPartiallyHashed(final long pKey){
jpayne@68 224 if(verbose){System.err.println("\n*** Incrementing key "+pKey+" ***");}
jpayne@68 225
jpayne@68 226 long key2=pKey;
jpayne@68 227
jpayne@68 228 {
jpayne@68 229 int bnum=(int)(key2&arrayMask);
jpayne@68 230 long[] array=buffers[bnum];
jpayne@68 231 int loc=bufferlen[bnum];
jpayne@68 232 array[loc]=key2;
jpayne@68 233 bufferlen[bnum]++;
jpayne@68 234 if(verbose){System.err.println("bufferlen["+bnum+"] = "+bufferlen[bnum]);}
jpayne@68 235 if(bufferlen[bnum]>=array.length){
jpayne@68 236
jpayne@68 237 if(verbose){System.err.println("Moving array.");}
jpayne@68 238 bufferlen[bnum]=0;
jpayne@68 239 buffers[bnum]=new long[array.length];
jpayne@68 240
jpayne@68 241 writers[bnum].add(array);
jpayne@68 242 if(verbose){System.err.println("Moved.");}
jpayne@68 243 }
jpayne@68 244 }
jpayne@68 245
jpayne@68 246 for(int i=1; i<hashes; i++){
jpayne@68 247 key2=Long.rotateRight(key2, hashBits);
jpayne@68 248 key2=hash(key2, i);
jpayne@68 249 if(verbose){System.err.println("key2="+key2+", value="+readHashed(key2));}
jpayne@68 250 // assert(readHashed(key2)==0);
jpayne@68 251
jpayne@68 252 int bnum=(int)(key2&arrayMask);
jpayne@68 253 long[] array=buffers[bnum];
jpayne@68 254 int loc=bufferlen[bnum];
jpayne@68 255 array[loc]=key2;
jpayne@68 256 bufferlen[bnum]++;
jpayne@68 257 if(verbose){System.err.println("bufferlen["+bnum+"] = "+bufferlen[bnum]);}
jpayne@68 258 if(bufferlen[bnum]>=array.length){
jpayne@68 259
jpayne@68 260 if(verbose){System.err.println("Moving array.");}
jpayne@68 261 bufferlen[bnum]=0;
jpayne@68 262 buffers[bnum]=new long[array.length];
jpayne@68 263
jpayne@68 264 writers[bnum].add(array);
jpayne@68 265 if(verbose){System.err.println("Moved.");}
jpayne@68 266 }
jpayne@68 267 }
jpayne@68 268 }
jpayne@68 269
jpayne@68 270 /** Returns unincremented value */
jpayne@68 271 @Override
jpayne@68 272 public int incrementAndReturnUnincremented(long key, int incr){
jpayne@68 273 throw new RuntimeException("Operation not supported.");
jpayne@68 274 }
jpayne@68 275
jpayne@68 276 @Override
jpayne@68 277 public long[] transformToFrequency(){
jpayne@68 278 return transformToFrequency(matrix);
jpayne@68 279 }
jpayne@68 280
jpayne@68 281 @Override
jpayne@68 282 public ByteBuilder toContentsString(){
jpayne@68 283 ByteBuilder sb=new ByteBuilder();
jpayne@68 284 sb.append('[');
jpayne@68 285 String comma="";
jpayne@68 286 for(int[] array : matrix){
jpayne@68 287 for(int i=0; i<array.length; i++){
jpayne@68 288 int word=array[i];
jpayne@68 289 for(int j=0; j<cellsPerWord; j++){
jpayne@68 290 int x=word&valueMask;
jpayne@68 291 sb.append(comma);
jpayne@68 292 sb.append(x);
jpayne@68 293 word>>>=cellBits;
jpayne@68 294 comma=", ";
jpayne@68 295 }
jpayne@68 296 }
jpayne@68 297 }
jpayne@68 298 sb.append(']');
jpayne@68 299 return sb;
jpayne@68 300 }
jpayne@68 301
jpayne@68 302 @Override
jpayne@68 303 public double usedFraction(){return cellsUsed/(double)cells;}
jpayne@68 304
jpayne@68 305 @Override
jpayne@68 306 public double usedFraction(int mindepth){return cellsUsed(mindepth)/(double)cells;}
jpayne@68 307
jpayne@68 308 @Override
jpayne@68 309 public long cellsUsed(int mindepth){
jpayne@68 310 long count=0;
jpayne@68 311 // System.out.println("A: "+cellBits+", "+Integer.toBinaryString(valueMask));
jpayne@68 312 for(int[] array : matrix){
jpayne@68 313 // System.out.println("B");
jpayne@68 314 if(array!=null){
jpayne@68 315 // System.out.println("C");
jpayne@68 316 for(int word : array){
jpayne@68 317 // System.out.println("D: "+Integer.toBinaryString(word));
jpayne@68 318 while(word>0){
jpayne@68 319 int x=word&valueMask;
jpayne@68 320 // System.out.println("E: "+x+", "+mindepth);
jpayne@68 321 if(x>=mindepth){count++;}
jpayne@68 322 word>>>=cellBits;
jpayne@68 323 }
jpayne@68 324 }
jpayne@68 325 }
jpayne@68 326 }
jpayne@68 327 return count;
jpayne@68 328 }
jpayne@68 329
jpayne@68 330 @Override
jpayne@68 331 final long hash(long key, int row){
jpayne@68 332 int cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1));
jpayne@68 333 // int cell=(int)(hashCellMask&(key));
jpayne@68 334
jpayne@68 335 if(row==0){//Doublehash only first time
jpayne@68 336 key=key^hashMasks[(row+4)%hashMasks.length][cell];
jpayne@68 337 cell=(int)(hashCellMask&(key>>5));
jpayne@68 338 // cell=(int)(hashCellMask&(key>>hashBits));
jpayne@68 339 // cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1));
jpayne@68 340 }
jpayne@68 341
jpayne@68 342 return key^hashMasks[row][cell];
jpayne@68 343 }
jpayne@68 344
jpayne@68 345 /**
jpayne@68 346 * @param i
jpayne@68 347 * @param j
jpayne@68 348 * @return
jpayne@68 349 */
jpayne@68 350 private static long[][] makeMasks(int rows, int cols) {
jpayne@68 351
jpayne@68 352 long seed;
jpayne@68 353 synchronized(KCountArray8MT.class){
jpayne@68 354 seed=counter;
jpayne@68 355 counter++;
jpayne@68 356 }
jpayne@68 357
jpayne@68 358 Timer t=new Timer();
jpayne@68 359 long[][] r=new long[rows][cols];
jpayne@68 360 Random randy=Shared.threadLocalRandom(seed);
jpayne@68 361 for(int i=0; i<r.length; i++){
jpayne@68 362 fillMasks(r[i], randy);
jpayne@68 363 }
jpayne@68 364 t.stop();
jpayne@68 365 if(t.elapsed>200000000L){System.out.println("Mask-creation time: "+t);}
jpayne@68 366 return r;
jpayne@68 367 }
jpayne@68 368
jpayne@68 369 private static void fillMasks(long[] r, Random randy) {
jpayne@68 370 // for(int i=0; i<r.length; i++){
jpayne@68 371 // long x=0;
jpayne@68 372 // while(Long.bitCount(x&0xFFFFFFFF)!=16){
jpayne@68 373 // x=randy.nextLong();
jpayne@68 374 // }
jpayne@68 375 // r[i]=(x&Long.MAX_VALUE);
jpayne@68 376 // }
jpayne@68 377
jpayne@68 378 final int hlen=(1<<hashBits);
jpayne@68 379 assert(r.length==hlen);
jpayne@68 380 int[] count1=new int[hlen];
jpayne@68 381 int[] count2=new int[hlen];
jpayne@68 382 final long mask=hlen-1;
jpayne@68 383
jpayne@68 384 for(int i=0; i<r.length; i++){
jpayne@68 385 long x=0;
jpayne@68 386 int y=0;
jpayne@68 387 int z=0;
jpayne@68 388 while(Long.bitCount(x&0xFFFFFFFFL)!=16){
jpayne@68 389 x=randy.nextLong();
jpayne@68 390 while(Long.bitCount(x&0xFFFFFFFFL)<16){
jpayne@68 391 x|=(1L<<randy.nextInt(32));
jpayne@68 392 }
jpayne@68 393 while(Long.bitCount(x&0xFFFFFFFFL)>16){
jpayne@68 394 x&=(~(1L<<randy.nextInt(32)));
jpayne@68 395 }
jpayne@68 396 while(Long.bitCount(x&0xFFFFFFFF00000000L)<16){
jpayne@68 397 x|=(1L<<(randy.nextInt(32)+32));
jpayne@68 398 }
jpayne@68 399 while(Long.bitCount(x&0xFFFFFFFF00000000L)>16){
jpayne@68 400 x&=(~(1L<<(randy.nextInt(32)+32)));
jpayne@68 401 }
jpayne@68 402
jpayne@68 403 // System.out.print(".");
jpayne@68 404 // y=(((int)(x&mask))^i);
jpayne@68 405 y=(((int)(x&mask)));
jpayne@68 406 z=(int)((x>>hashBits)&mask);
jpayne@68 407 if(count1[y]>0 || count2[z]>0){
jpayne@68 408 x=0;
jpayne@68 409 }
jpayne@68 410 }
jpayne@68 411 // System.out.println(Long.toBinaryString(x));
jpayne@68 412 r[i]=(x&Long.MAX_VALUE);
jpayne@68 413 count1[y]++;
jpayne@68 414 count2[z]++;
jpayne@68 415 }
jpayne@68 416
jpayne@68 417 }
jpayne@68 418
jpayne@68 419
jpayne@68 420 @Override
jpayne@68 421 public void initialize(){
jpayne@68 422 for(int i=0; i<writers.length; i++){
jpayne@68 423 writers[i]=new WriteThread(i);
jpayne@68 424 writers[i].start();
jpayne@68 425
jpayne@68 426 // while(!writers[i].isAlive()){
jpayne@68 427 // System.out.print(".");
jpayne@68 428 // }
jpayne@68 429 }
jpayne@68 430 }
jpayne@68 431
jpayne@68 432 @Override
jpayne@68 433 public void shutdown(){
jpayne@68 434 if(finished){return;}
jpayne@68 435 synchronized(this){
jpayne@68 436 if(finished){return;}
jpayne@68 437
jpayne@68 438 //Clear buffers
jpayne@68 439 for(int i=0; i<numArrays; i++){
jpayne@68 440 long[] array=buffers[i];
jpayne@68 441 int len=bufferlen[i];
jpayne@68 442 buffers[i]=null;
jpayne@68 443 bufferlen[i]=0;
jpayne@68 444
jpayne@68 445 if(len<array.length){
jpayne@68 446 array=Arrays.copyOf(array, len);
jpayne@68 447 }
jpayne@68 448
jpayne@68 449 if(array.length>0){
jpayne@68 450 writers[i].add(array);
jpayne@68 451 }
jpayne@68 452 }
jpayne@68 453
jpayne@68 454 //Add poison
jpayne@68 455 for(WriteThread wt : writers){
jpayne@68 456 wt.add(poison);
jpayne@68 457 }
jpayne@68 458
jpayne@68 459 //Wait for termination
jpayne@68 460 for(WriteThread wt : writers){
jpayne@68 461 // System.out.println("wt"+wt.num+" is alive: "+wt.isAlive());
jpayne@68 462 while(wt.isAlive()){
jpayne@68 463 // System.out.println("wt"+wt.num+" is alive: "+wt.isAlive());
jpayne@68 464 try {
jpayne@68 465 wt.join(10000);
jpayne@68 466 } catch (InterruptedException e) {
jpayne@68 467 // TODO Auto-generated catch block
jpayne@68 468 e.printStackTrace();
jpayne@68 469 }
jpayne@68 470 if(wt.isAlive()){System.err.println(wt.getClass().getCanonicalName()+" is taking a long time to die.");}
jpayne@68 471 }
jpayne@68 472 cellsUsed+=wt.cellsUsedPersonal;
jpayne@68 473 // System.out.println("cellsUsed="+cellsUsed);
jpayne@68 474 }
jpayne@68 475
jpayne@68 476 assert(!finished);
jpayne@68 477 finished=true;
jpayne@68 478 }
jpayne@68 479 }
jpayne@68 480
jpayne@68 481 private class WriteThread extends Thread{
jpayne@68 482
jpayne@68 483 public WriteThread(int tnum){
jpayne@68 484 num=tnum;
jpayne@68 485 }
jpayne@68 486
jpayne@68 487 @Override
jpayne@68 488 public void run(){
jpayne@68 489 assert(matrix[num]==null);
jpayne@68 490 array=new int[wordsPerArray]; //Makes NUMA systems use local memory.
jpayne@68 491
jpayne@68 492 matrix[num]=array;
jpayne@68 493
jpayne@68 494 long[] keys=null;
jpayne@68 495 while(!shutdown){
jpayne@68 496
jpayne@68 497 if(verbose){System.err.println(" - Reading keys for wt"+num+".");}
jpayne@68 498 while(keys==null){
jpayne@68 499 try {
jpayne@68 500 keys=writeQueue.take();
jpayne@68 501 } catch (InterruptedException e) {
jpayne@68 502 // TODO Auto-generated catch block
jpayne@68 503 e.printStackTrace();
jpayne@68 504 }
jpayne@68 505 }
jpayne@68 506 if(keys==poison){
jpayne@68 507 // assert(false);
jpayne@68 508 shutdown=true;
jpayne@68 509 }else{
jpayne@68 510 for(long key : keys){
jpayne@68 511 incrementHashedLocal(key);
jpayne@68 512 }
jpayne@68 513 }
jpayne@68 514 // System.out.println(" -- Read keys for wt"+num+". poison="+(keys==poison)+", len="+keys.length);
jpayne@68 515 if(verbose){System.err.println(" -- Read keys for wt"+num+". (success)");}
jpayne@68 516 keys=null;
jpayne@68 517 if(verbose){System.err.println("shutdown="+shutdown);}
jpayne@68 518 }
jpayne@68 519
jpayne@68 520 // System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+".");
jpayne@68 521 // assert(false) : ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+".";
jpayne@68 522
jpayne@68 523 array=null;
jpayne@68 524 }
jpayne@68 525
jpayne@68 526 void add(long[] keys){
jpayne@68 527 // assert(isAlive());
jpayne@68 528 assert(!shutdown);
jpayne@68 529 if(shutdown){return;}
jpayne@68 530 // assert(keys!=poison);
jpayne@68 531 if(verbose){System.err.println(" + Adding keys to wt"+num+".");}
jpayne@68 532 boolean success=false;
jpayne@68 533 while(!success){
jpayne@68 534 try {
jpayne@68 535 writeQueue.put(keys);
jpayne@68 536 success=true;
jpayne@68 537 } catch (InterruptedException e) {
jpayne@68 538 // TODO Auto-generated catch block
jpayne@68 539 e.printStackTrace();
jpayne@68 540 }
jpayne@68 541 }
jpayne@68 542 if(verbose){System.err.println(" ++ Added keys to wt"+num+". (success)");}
jpayne@68 543 }
jpayne@68 544
jpayne@68 545 private int incrementHashedLocal(long key){
jpayne@68 546 assert((key&arrayMask)==num);
jpayne@68 547 key=(key>>>arrayBits)%(cellMod);
jpayne@68 548 // key=(key>>>(arrayBits+1))%(cellMod);
jpayne@68 549 int index=(int)(key>>>indexShift);
jpayne@68 550 int word=array[index];
jpayne@68 551 int cellShift=(int)(cellBits*key);
jpayne@68 552 int value=((word>>>cellShift)&valueMask);
jpayne@68 553 if(value==0){cellsUsedPersonal++;}
jpayne@68 554 value=min(value+1, maxValue);
jpayne@68 555 word=(value<<cellShift)|(word&~((valueMask)<<cellShift));
jpayne@68 556 array[index]=word;
jpayne@68 557 return value;
jpayne@68 558 }
jpayne@68 559
jpayne@68 560 private int[] array;
jpayne@68 561 private final int num;
jpayne@68 562 public long cellsUsedPersonal=0;
jpayne@68 563
jpayne@68 564 public ArrayBlockingQueue<long[]> writeQueue=new ArrayBlockingQueue<long[]>(16);
jpayne@68 565 public boolean shutdown=false;
jpayne@68 566
jpayne@68 567 }
jpayne@68 568
jpayne@68 569
jpayne@68 570 public long cellsUsed(){return cellsUsed;}
jpayne@68 571
jpayne@68 572 private boolean finished=false;
jpayne@68 573
jpayne@68 574 private long cellsUsed;
jpayne@68 575 final int[][] matrix;
jpayne@68 576 private final WriteThread[] writers=new WriteThread[numArrays];
jpayne@68 577 private final int hashes;
jpayne@68 578 final int wordsPerArray;
jpayne@68 579 private final long cellsPerArray;
jpayne@68 580 final long cellMod;
jpayne@68 581 private final long[][] hashMasks=makeMasks(8, hashArrayLength);
jpayne@68 582
jpayne@68 583 private final long[][] buffers=new long[numArrays][500];
jpayne@68 584 private final int[] bufferlen=new int[numArrays];
jpayne@68 585
jpayne@68 586 public final KCountArray prefilter;
jpayne@68 587
jpayne@68 588 private static final int hashBits=6;
jpayne@68 589 private static final int hashArrayLength=1<<hashBits;
jpayne@68 590 private static final int hashCellMask=hashArrayLength-1;
jpayne@68 591 static final long[] poison=new long[0];
jpayne@68 592
jpayne@68 593 private static long counter=0;
jpayne@68 594
jpayne@68 595 }