jpayne@68: package bloom; jpayne@68: jpayne@68: import java.util.Arrays; jpayne@68: import java.util.Random; jpayne@68: import java.util.concurrent.ArrayBlockingQueue; jpayne@68: jpayne@68: import shared.Shared; jpayne@68: import shared.Timer; jpayne@68: import structures.ByteBuilder; jpayne@68: jpayne@68: jpayne@68: /** jpayne@68: * jpayne@68: * Uses hashing rather than direct-mapping to support longer kmers. jpayne@68: * jpayne@68: * @author Brian Bushnell jpayne@68: * @date Aug 17, 2012 jpayne@68: * jpayne@68: */ jpayne@68: public class KCountArray6MT extends KCountArray { jpayne@68: jpayne@68: /** jpayne@68: * jpayne@68: */ jpayne@68: private static final long serialVersionUID = -1524266549200637631L; jpayne@68: jpayne@68: public static void main(String[] args){ jpayne@68: long cells=Long.parseLong(args[0]); jpayne@68: int bits=Integer.parseInt(args[1]); jpayne@68: int gap=Integer.parseInt(args[2]); jpayne@68: int hashes=Integer.parseInt(args[3]); jpayne@68: jpayne@68: verbose=false; jpayne@68: jpayne@68: KCountArray6MT kca=new KCountArray6MT(cells, bits, gap, hashes); jpayne@68: jpayne@68: System.out.println(kca.read(0)); jpayne@68: kca.increment(0); jpayne@68: System.out.println(kca.read(0)); jpayne@68: kca.increment(0); jpayne@68: System.out.println(kca.read(0)); jpayne@68: System.out.println(); jpayne@68: jpayne@68: System.out.println(kca.read(1)); jpayne@68: kca.increment(1); jpayne@68: System.out.println(kca.read(1)); jpayne@68: kca.increment(1); jpayne@68: System.out.println(kca.read(1)); jpayne@68: System.out.println(); jpayne@68: jpayne@68: System.out.println(kca.read(100)); jpayne@68: kca.increment(100); jpayne@68: System.out.println(kca.read(100)); jpayne@68: kca.increment(100); jpayne@68: System.out.println(kca.read(100)); jpayne@68: kca.increment(100); jpayne@68: System.out.println(kca.read(100)); jpayne@68: System.out.println(); jpayne@68: jpayne@68: jpayne@68: System.out.println(kca.read(150)); jpayne@68: kca.increment(150); jpayne@68: System.out.println(kca.read(150)); jpayne@68: System.out.println(); jpayne@68: jpayne@68: } jpayne@68: jpayne@68: public KCountArray6MT(long cells_, int bits_, int gap_, int hashes_){ jpayne@68: super(cells_, bits_); jpayne@68: // verbose=false; jpayne@68: long words=cells/cellsPerWord; jpayne@68: assert(words/numArrays<=Integer.MAX_VALUE); jpayne@68: wordsPerArray=(int)(words/numArrays); jpayne@68: cellsPerArray=cells/numArrays; jpayne@68: cellMod=cellsPerArray-1; jpayne@68: hashes=hashes_; jpayne@68: // System.out.println("cells="+cells+", words="+words+", wordsPerArray="+wordsPerArray+", numArrays="+numArrays+", hashes="+hashes); jpayne@68: // assert(false); jpayne@68: matrix=new int[numArrays][]; jpayne@68: assert(hashes>0 && hashes<=hashMasks.length); jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: public int read(final long rawKey){ jpayne@68: assert(finished); jpayne@68: if(verbose){System.err.println("Reading raw key "+rawKey);} jpayne@68: jpayne@68: long key1=hash(rawKey, 3); jpayne@68: int arrayNum=(int)(key1&arrayMask); jpayne@68: long key2=hash(rawKey, 0); jpayne@68: jpayne@68: int min=readHashed(key2, arrayNum); jpayne@68: for(int i=1; i0; i++){ jpayne@68: if(verbose){System.err.println("Reading. i="+i+", key2="+key2);} jpayne@68: key2=Long.rotateRight(key2, hashBits); jpayne@68: key2=hash(key2, i); jpayne@68: if(verbose){System.err.println("Rot/hash. i="+i+", key2="+key2);} jpayne@68: min=min(min, readHashed(key2, arrayNum)); jpayne@68: } jpayne@68: return min; jpayne@68: } jpayne@68: jpayne@68: int readHashed(long key, int arrayNum){ jpayne@68: if(verbose){System.err.print("Reading hashed key "+key);} jpayne@68: // System.out.println("key="+key); jpayne@68: // int arrayNum=(int)(key&arrayMask); jpayne@68: key=(key&Long.MAX_VALUE)%(cellMod); jpayne@68: // key=(key>>>(arrayBits+1))%(cellMod); jpayne@68: // System.out.println("array="+arrayNum); jpayne@68: // System.out.println("key2="+key); jpayne@68: int[] array=matrix[arrayNum]; jpayne@68: int index=(int)(key>>>indexShift); jpayne@68: // assert(false) : indexShift; jpayne@68: // System.out.println("index="+index); jpayne@68: int word=array[index]; jpayne@68: // System.out.println("word="+Integer.toHexString(word)); jpayne@68: assert(word>>>(cellBits*key) == word>>>(cellBits*(key&cellMask))); jpayne@68: // int cellShift=(int)(cellBits*(key&cellMask)); jpayne@68: int cellShift=(int)(cellBits*key); jpayne@68: if(verbose){System.err.println(", array="+arrayNum+", index="+index+", cellShift="+(cellShift%32)+", value="+((int)((word>>>cellShift)&valueMask)));} jpayne@68: // System.out.println("cellShift="+cellShift); jpayne@68: return (int)((word>>>cellShift)&valueMask); jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: public void write(final long key, int value){ jpayne@68: throw new RuntimeException("Not allowed for this class."); jpayne@68: } jpayne@68: jpayne@68: //Slow jpayne@68: @Override jpayne@68: public void increment(final long rawKey, int amt){ jpayne@68: for(int i=0; i>>=cellBits; jpayne@68: comma=", "; jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: sb.append(']'); jpayne@68: return sb; jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: public double usedFraction(){return cellsUsed/(double)cells;} jpayne@68: jpayne@68: @Override jpayne@68: public double usedFraction(int mindepth){return cellsUsed(mindepth)/(double)cells;} jpayne@68: jpayne@68: @Override jpayne@68: public long cellsUsed(int mindepth){ jpayne@68: long count=0; jpayne@68: for(int[] array : matrix){ jpayne@68: if(array!=null){ jpayne@68: for(int word : array){ jpayne@68: while(word>0){ jpayne@68: int x=word&valueMask; jpayne@68: if(x>=mindepth){count++;} jpayne@68: word>>>=cellBits; jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: } jpayne@68: return count; jpayne@68: } jpayne@68: jpayne@68: jpayne@68: @Override jpayne@68: final long hash(long key, int row){ jpayne@68: int cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1)); jpayne@68: // int cell=(int)(hashCellMask&(key)); jpayne@68: jpayne@68: if(row==0){//Doublehash only first time jpayne@68: key=key^hashMasks[(row+4)%hashMasks.length][cell]; jpayne@68: cell=(int)(hashCellMask&(key>>4)); jpayne@68: // cell=(int)(hashCellMask&(key>>hashBits)); jpayne@68: // cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1)); jpayne@68: } jpayne@68: jpayne@68: return key^hashMasks[row][cell]; jpayne@68: } jpayne@68: jpayne@68: /** jpayne@68: * @param i jpayne@68: * @param j jpayne@68: * @return jpayne@68: */ jpayne@68: private static long[][] makeMasks(int rows, int cols) { jpayne@68: jpayne@68: long seed; jpayne@68: synchronized(KCountArray6MT.class){ jpayne@68: seed=counter; jpayne@68: counter++; jpayne@68: } jpayne@68: jpayne@68: Timer t=new Timer(); jpayne@68: long[][] r=new long[rows][cols]; jpayne@68: Random randy=Shared.threadLocalRandom(seed); jpayne@68: for(int i=0; i200000000L){System.out.println("Mask-creation time: "+t);} jpayne@68: return r; jpayne@68: } jpayne@68: jpayne@68: private static void fillMasks(long[] r, Random randy) { jpayne@68: // for(int i=0; i16){ jpayne@68: x&=(~(1L<16){ jpayne@68: x&=(~(1L<<(randy.nextInt(32)+32))); jpayne@68: } jpayne@68: jpayne@68: // System.out.print("."); jpayne@68: // y=(((int)(x&mask))^i); jpayne@68: y=(((int)(x&mask))); jpayne@68: z=(int)((x>>hashBits)&mask); jpayne@68: if(count1[y]>0 || count2[z]>0){ jpayne@68: x=0; jpayne@68: } jpayne@68: } jpayne@68: // System.out.println(Long.toBinaryString(x)); jpayne@68: r[i]=(x&Long.MAX_VALUE); jpayne@68: count1[y]++; jpayne@68: count2[z]++; jpayne@68: } jpayne@68: jpayne@68: } jpayne@68: jpayne@68: jpayne@68: @Override jpayne@68: public void initialize(){ jpayne@68: for(int i=0; i0){ jpayne@68: writers[i].add(array); jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: //Add poison jpayne@68: for(WriteThread wt : writers){ jpayne@68: wt.add(poison); jpayne@68: } jpayne@68: jpayne@68: //Wait for termination jpayne@68: for(WriteThread wt : writers){ jpayne@68: // System.out.println("wt"+wt.num+" is alive: "+wt.isAlive()); jpayne@68: while(wt.isAlive()){ jpayne@68: // System.out.println("wt"+wt.num+" is alive: "+wt.isAlive()); jpayne@68: try { jpayne@68: wt.join(10000); jpayne@68: } catch (InterruptedException e) { jpayne@68: // TODO Auto-generated catch block jpayne@68: e.printStackTrace(); jpayne@68: } jpayne@68: if(wt.isAlive()){System.err.println(wt.getClass().getCanonicalName()+" is taking a long time to die.");} jpayne@68: } jpayne@68: cellsUsed+=wt.cellsUsedPersonal; jpayne@68: // System.out.println("cellsUsed="+cellsUsed); jpayne@68: } jpayne@68: jpayne@68: assert(!finished); jpayne@68: finished=true; jpayne@68: } jpayne@68: } jpayne@68: jpayne@68: private class WriteThread extends Thread{ jpayne@68: jpayne@68: public WriteThread(int tnum){ jpayne@68: num=tnum; jpayne@68: } jpayne@68: jpayne@68: @Override jpayne@68: public void run(){ jpayne@68: assert(matrix[num]==null); jpayne@68: array=new int[wordsPerArray]; //Makes NUMA systems use local memory. jpayne@68: jpayne@68: matrix[num]=array; jpayne@68: jpayne@68: long[] keys=null; jpayne@68: while(!shutdown){ jpayne@68: jpayne@68: if(verbose){System.err.println(" - Reading keys for wt"+num+".");} jpayne@68: while(keys==null){ jpayne@68: try { jpayne@68: keys=writeQueue.take(); jpayne@68: } catch (InterruptedException e) { jpayne@68: // TODO Auto-generated catch block jpayne@68: e.printStackTrace(); jpayne@68: } jpayne@68: } jpayne@68: if(keys==poison){ jpayne@68: // assert(false); jpayne@68: shutdown=true; jpayne@68: }else{ jpayne@68: for(long key : keys){ jpayne@68: incrementRawLocal(key); jpayne@68: } jpayne@68: } jpayne@68: // System.out.println(" -- Read keys for wt"+num+". poison="+(keys==poison)+", len="+keys.length); jpayne@68: if(verbose){System.err.println(" -- Read keys for wt"+num+". (success)");} jpayne@68: keys=null; jpayne@68: if(verbose){System.err.println("shutdown="+shutdown);} jpayne@68: } jpayne@68: jpayne@68: // System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+"."); jpayne@68: // assert(false) : ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+"."; jpayne@68: jpayne@68: array=null; jpayne@68: } jpayne@68: jpayne@68: void add(long[] keys){ jpayne@68: // assert(isAlive()); jpayne@68: assert(!shutdown); jpayne@68: if(shutdown){return;} jpayne@68: // assert(keys!=poison); jpayne@68: if(verbose){System.err.println(" + Adding keys to wt"+num+".");} jpayne@68: boolean success=false; jpayne@68: while(!success){ jpayne@68: try { jpayne@68: writeQueue.put(keys); jpayne@68: success=true; jpayne@68: } catch (InterruptedException e) { jpayne@68: // TODO Auto-generated catch block jpayne@68: e.printStackTrace(); jpayne@68: } jpayne@68: } jpayne@68: if(verbose){System.err.println(" ++ Added keys to wt"+num+". (success)");} jpayne@68: } jpayne@68: jpayne@68: private int incrementRawLocal(long rawKey){ jpayne@68: // verbose=(rawKey==32662670693L); jpayne@68: if(verbose){System.err.println("\n*** Incrementing raw key "+rawKey+" ***");} jpayne@68: // verbose=true; jpayne@68: assert(1>0); jpayne@68: jpayne@68: long key2=rawKey; jpayne@68: if(hashes==1){ jpayne@68: key2=hash(key2, 0); jpayne@68: // int x=incrementHashedIfAtMost(key2, 1, maxValue-1); jpayne@68: int x=incrementHashedLocal(key2); jpayne@68: assert(x>=1) : "original=?, new should be >="+(1)+", new="+read(rawKey)+", max="+maxValue+", key="+rawKey; jpayne@68: return x; jpayne@68: } jpayne@68: jpayne@68: int min=0; jpayne@68: // final int min=read(rawKey); jpayne@68: // if(min>=maxValue){return maxValue;} jpayne@68: jpayne@68: assert(key2==rawKey); jpayne@68: for(int i=0; i