jpayne@68: package bloom; jpayne@68: jpayne@68: import java.util.Arrays; jpayne@68: import java.util.Random; jpayne@68: import java.util.concurrent.ArrayBlockingQueue; jpayne@68: jpayne@68: import shared.Primes; jpayne@68: import shared.Shared; jpayne@68: import shared.Timer; jpayne@68: import shared.Tools; jpayne@68: import structures.ByteBuilder; jpayne@68: jpayne@68: jpayne@68: /** jpayne@68: * jpayne@68: * Uses prime numbers for array lengths. jpayne@68: * jpayne@68: * @author Brian Bushnell jpayne@68: * @date Aug 17, 2012 jpayne@68: * jpayne@68: */ jpayne@68: public class KCountArray7MT extends KCountArray { jpayne@68: jpayne@68: /** jpayne@68: * jpayne@68: */ jpayne@68: private static final long serialVersionUID = -8767643111803866913L; jpayne@68: jpayne@68: public static void main(String[] args){ jpayne@68: long cells=Long.parseLong(args[0]); jpayne@68: int bits=Integer.parseInt(args[1]); jpayne@68: int hashes=Integer.parseInt(args[2]); jpayne@68: jpayne@68: verbose=false; jpayne@68: jpayne@68: KCountArray7MT kca=new KCountArray7MT(cells, bits, hashes); jpayne@68: jpayne@68: System.out.println(kca.read(0)); jpayne@68: kca.increment(0); jpayne@68: System.out.println(kca.read(0)); jpayne@68: kca.increment(0); jpayne@68: System.out.println(kca.read(0)); jpayne@68: System.out.println(); jpayne@68: jpayne@68: System.out.println(kca.read(1)); jpayne@68: kca.increment(1); jpayne@68: System.out.println(kca.read(1)); jpayne@68: kca.increment(1); jpayne@68: System.out.println(kca.read(1)); jpayne@68: System.out.println(); jpayne@68: jpayne@68: System.out.println(kca.read(100)); jpayne@68: kca.increment(100); jpayne@68: System.out.println(kca.read(100)); jpayne@68: kca.increment(100); jpayne@68: System.out.println(kca.read(100)); jpayne@68: kca.increment(100); jpayne@68: System.out.println(kca.read(100)); jpayne@68: System.out.println(); jpayne@68: jpayne@68: jpayne@68: System.out.println(kca.read(150)); jpayne@68: kca.increment(150); jpayne@68: System.out.println(kca.read(150)); jpayne@68: System.out.println(); jpayne@68: jpayne@68: } jpayne@68: jpayne@68: public KCountArray7MT(long cells_, int bits_, int hashes_){ jpayne@68: super(getPrimeCells(cells_, bits_), bits_, getDesiredArrays(cells_, bits_)); jpayne@68: // verbose=false; jpayne@68: // assert(false); jpayne@68: // System.out.println(cells); jpayne@68: cellsPerArray=cells/numArrays; jpayne@68: wordsPerArray=(int)((cellsPerArray%cellsPerWord)==0 ? (cellsPerArray/cellsPerWord) : (cellsPerArray/cellsPerWord+1)); jpayne@68: cellMod=cellsPerArray; jpayne@68: hashes=hashes_; jpayne@68: // System.out.println("cells="+cells+", words="+words+", wordsPerArray="+wordsPerArray+", numArrays="+numArrays+", hashes="+hashes); jpayne@68: // assert(false); jpayne@68: matrix=new int[numArrays][]; jpayne@68: assert(hashes>0 && hashes<=hashMasks.length); jpayne@68: } jpayne@68: jpayne@68: private static int getDesiredArrays(long desiredCells, int bits){ jpayne@68: jpayne@68: long words=Tools.max((desiredCells*bits+31)/32, minArrays); jpayne@68: int arrays=minArrays; jpayne@68: while(words/arrays>=Integer.MAX_VALUE){ jpayne@68: arrays*=2; jpayne@68: } jpayne@68: // assert(false) : arrays; jpayne@68: return arrays; jpayne@68: } jpayne@68: jpayne@68: private static long getPrimeCells(long desiredCells, int bits){ jpayne@68: jpayne@68: int arrays=getDesiredArrays(desiredCells, bits); jpayne@68: jpayne@68: long x=(desiredCells+arrays-1)/arrays; jpayne@68: long x2=Primes.primeAtMost(x); jpayne@68: return x2*arrays; jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: public int read(final long rawKey){ jpayne@68: assert(finished); jpayne@68: if(verbose){System.err.println("Reading raw key "+rawKey);} jpayne@68: long key2=hash(rawKey, 0); jpayne@68: int min=readHashed(key2); jpayne@68: for(int i=1; i0; i++){ jpayne@68: if(verbose){System.err.println("Reading. i="+i+", key2="+key2);} jpayne@68: key2=Long.rotateRight(key2, hashBits); jpayne@68: key2=hash(key2, i); jpayne@68: if(verbose){System.err.println("Rot/hash. i="+i+", key2="+key2);} jpayne@68: min=min(min, readHashed(key2)); jpayne@68: } jpayne@68: return min; jpayne@68: } jpayne@68: jpayne@68: private int readHashed(long key){ jpayne@68: if(verbose){System.err.print("Reading hashed key "+key);} jpayne@68: // System.out.println("key="+key); jpayne@68: int arrayNum=(int)(key&arrayMask); jpayne@68: key=(key>>>arrayBits)%(cellMod); jpayne@68: // key=(key>>>(arrayBits+1))%(cellMod); jpayne@68: // System.out.println("array="+arrayNum); jpayne@68: // System.out.println("key2="+key); jpayne@68: int[] array=matrix[arrayNum]; jpayne@68: int index=(int)(key>>>indexShift); jpayne@68: // assert(false) : indexShift; jpayne@68: // System.out.println("index="+index); jpayne@68: int word=array[index]; jpayne@68: // System.out.println("word="+Integer.toHexString(word)); jpayne@68: assert(word>>>(cellBits*key) == word>>>(cellBits*(key&cellMask))); jpayne@68: // int cellShift=(int)(cellBits*(key&cellMask)); jpayne@68: int cellShift=(int)(cellBits*key); jpayne@68: if(verbose){System.err.println(", array="+arrayNum+", index="+index+", cellShift="+(cellShift%32)+", value="+((int)((word>>>cellShift)&valueMask)));} jpayne@68: // System.out.println("cellShift="+cellShift); jpayne@68: return (int)((word>>>cellShift)&valueMask); jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: public void write(final long key, int value){ jpayne@68: throw new RuntimeException("Not allowed for this class."); jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: /** This should increase speed by doing the first hash outside the critical section, but it does not seem to help. */ jpayne@68: public void increment(long[] keys){ jpayne@68: for(int i=0; i>>=cellBits; jpayne@68: comma=", "; jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: sb.append("]"); jpayne@68: return sb; jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: public double usedFraction(){return cellsUsed/(double)cells;} jpayne@68: jpayne@68: @Override jpayne@68: public double usedFraction(int mindepth){return cellsUsed(mindepth)/(double)cells;} jpayne@68: jpayne@68: @Override jpayne@68: public long cellsUsed(int mindepth){ jpayne@68: long count=0; jpayne@68: // System.out.println("A"); jpayne@68: for(int[] array : matrix){ jpayne@68: // System.out.println("B"); jpayne@68: if(array!=null){ jpayne@68: // System.out.println("C"); jpayne@68: for(int word : array){ jpayne@68: // System.out.println("D: "+Integer.toHexString(word)); jpayne@68: while(word>0){ jpayne@68: int x=word&valueMask; jpayne@68: // System.out.println("E: "+x+", "+mindepth); jpayne@68: if(x>=mindepth){count++;} jpayne@68: word>>>=cellBits; jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: return count; jpayne@68: } jpayne@68: jpayne@68: jpayne@68: @Override jpayne@68: final long hash(long key, int row){ jpayne@68: int cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1)); jpayne@68: // int cell=(int)(hashCellMask&(key)); jpayne@68: jpayne@68: if(row==0){//Doublehash only first time jpayne@68: key=key^hashMasks[(row+4)%hashMasks.length][cell]; jpayne@68: cell=(int)(hashCellMask&(key>>5)); jpayne@68: // cell=(int)(hashCellMask&(key>>hashBits)); jpayne@68: // cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1)); jpayne@68: } jpayne@68: jpayne@68: return key^hashMasks[row][cell]; jpayne@68: } jpayne@68: jpayne@68: /** jpayne@68: * @param i jpayne@68: * @param j jpayne@68: * @return jpayne@68: */ jpayne@68: private static long[][] makeMasks(int rows, int cols) { jpayne@68: jpayne@68: long seed; jpayne@68: synchronized(KCountArray7MT.class){ jpayne@68: seed=counter; jpayne@68: counter++; jpayne@68: } jpayne@68: jpayne@68: Timer t=new Timer(); jpayne@68: long[][] r=new long[rows][cols]; jpayne@68: Random randy=Shared.threadLocalRandom(seed); jpayne@68: for(int i=0; i200000000L){System.out.println("Mask-creation time: "+t);} jpayne@68: return r; jpayne@68: } jpayne@68: jpayne@68: private static void fillMasks(long[] r, Random randy) { jpayne@68: // for(int i=0; i16){ jpayne@68: x&=(~(1L<16){ jpayne@68: x&=(~(1L<<(randy.nextInt(32)+32))); jpayne@68: } jpayne@68: jpayne@68: // System.out.print("."); jpayne@68: // y=(((int)(x&mask))^i); jpayne@68: y=(((int)(x&mask))); jpayne@68: z=(int)((x>>hashBits)&mask); jpayne@68: if(count1[y]>0 || count2[z]>0){ jpayne@68: x=0; jpayne@68: } jpayne@68: } jpayne@68: // System.out.println(Long.toBinaryString(x)); jpayne@68: r[i]=(x&Long.MAX_VALUE); jpayne@68: count1[y]++; jpayne@68: count2[z]++; jpayne@68: } jpayne@68: jpayne@68: } jpayne@68: jpayne@68: jpayne@68: @Override jpayne@68: public void initialize(){ jpayne@68: for(int i=0; i0){ jpayne@68: writers[i].add(array); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: //Add poison jpayne@68: for(WriteThread wt : writers){ jpayne@68: wt.add(poison); jpayne@68: } jpayne@68: jpayne@68: //Wait for termination jpayne@68: for(WriteThread wt : writers){ jpayne@68: // System.out.println("wt"+wt.num+" is alive: "+wt.isAlive()); jpayne@68: while(wt.isAlive()){ jpayne@68: // System.out.println("wt"+wt.num+" is alive: "+wt.isAlive()); jpayne@68: try { jpayne@68: wt.join(10000); jpayne@68: } catch (InterruptedException e) { jpayne@68: // TODO Auto-generated catch block jpayne@68: e.printStackTrace(); jpayne@68: } jpayne@68: if(wt.isAlive()){System.err.println(wt.getClass().getCanonicalName()+" is taking a long time to die.");} jpayne@68: } jpayne@68: cellsUsed+=wt.cellsUsedPersonal; jpayne@68: // System.out.println("cellsUsed="+cellsUsed); jpayne@68: // System.err.println("wt.cellsUsedPersonal="+wt.cellsUsedPersonal); jpayne@68: } jpayne@68: jpayne@68: assert(!finished); jpayne@68: finished=true; jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private class WriteThread extends Thread{ jpayne@68: jpayne@68: public WriteThread(int tnum){ jpayne@68: num=tnum; jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: public void run(){ jpayne@68: assert(matrix[num]==null); jpayne@68: array=new int[wordsPerArray]; //Makes NUMA systems use local memory. jpayne@68: jpayne@68: matrix[num]=array; jpayne@68: jpayne@68: long[] keys=null; jpayne@68: while(!shutdown){ jpayne@68: jpayne@68: if(verbose){System.err.println(" - Reading keys for wt"+num+".");} jpayne@68: while(keys==null){ jpayne@68: try { jpayne@68: keys=writeQueue.take(); jpayne@68: } catch (InterruptedException e) { jpayne@68: // TODO Auto-generated catch block jpayne@68: e.printStackTrace(); jpayne@68: } jpayne@68: } jpayne@68: if(keys==poison){ jpayne@68: // assert(false); jpayne@68: shutdown=true; jpayne@68: }else{ jpayne@68: for(long key : keys){ jpayne@68: incrementHashedLocal(key); jpayne@68: } jpayne@68: } jpayne@68: // System.out.println(" -- Read keys for wt"+num+". poison="+(keys==poison)+", len="+keys.length); jpayne@68: if(verbose){System.err.println(" -- Read keys for wt"+num+". (success)");} jpayne@68: keys=null; jpayne@68: if(verbose){System.err.println("shutdown="+shutdown);} jpayne@68: } jpayne@68: jpayne@68: // System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+"."); jpayne@68: // assert(false) : ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+"."; jpayne@68: jpayne@68: array=null; jpayne@68: } jpayne@68: jpayne@68: void add(long[] keys){ jpayne@68: // assert(isAlive()); jpayne@68: assert(!shutdown); jpayne@68: if(shutdown){return;} jpayne@68: // assert(keys!=poison); jpayne@68: if(verbose){System.err.println(" + Adding keys to wt"+num+".");} jpayne@68: boolean success=false; jpayne@68: while(!success){ jpayne@68: try { jpayne@68: writeQueue.put(keys); jpayne@68: success=true; jpayne@68: } catch (InterruptedException e) { jpayne@68: // TODO Auto-generated catch block jpayne@68: e.printStackTrace(); jpayne@68: } jpayne@68: } jpayne@68: if(verbose){System.err.println(" ++ Added keys to wt"+num+". (success)");} jpayne@68: } jpayne@68: jpayne@68: private int incrementHashedLocal(long key){ jpayne@68: assert((key&arrayMask)==num); jpayne@68: key=(key>>>arrayBits)%(cellMod); jpayne@68: // key=(key>>>(arrayBits+1))%(cellMod); jpayne@68: int index=(int)(key>>>indexShift); jpayne@68: int word=array[index]; jpayne@68: int cellShift=(int)(cellBits*key); jpayne@68: int value=((word>>>cellShift)&valueMask); jpayne@68: if(value==0){cellsUsedPersonal++;} jpayne@68: value=min(value+1, maxValue); jpayne@68: word=(value< writeQueue=new ArrayBlockingQueue(16); jpayne@68: public boolean shutdown=false; jpayne@68: jpayne@68: } jpayne@68: jpayne@68: jpayne@68: public long cellsUsed(){return cellsUsed;} jpayne@68: jpayne@68: private boolean finished=false; jpayne@68: jpayne@68: private long cellsUsed; jpayne@68: final int[][] matrix; jpayne@68: private final WriteThread[] writers=new WriteThread[numArrays]; jpayne@68: private final int hashes; jpayne@68: final int wordsPerArray; jpayne@68: private final long cellsPerArray; jpayne@68: final long cellMod; jpayne@68: private final long[][] hashMasks=makeMasks(8, hashArrayLength); jpayne@68: jpayne@68: private final long[][] buffers=new long[numArrays][500]; jpayne@68: private final int[] bufferlen=new int[numArrays]; jpayne@68: jpayne@68: private static final int hashBits=6; jpayne@68: private static final int hashArrayLength=1<