布隆算法最詳解
本文源地址: http://www.fullstackyang.com/...,轉發請注明該地址或segmentfault地址,謝謝!
1. 背景知識
在網上已經有很多關於布隆過濾器的介紹了,這里就不再贅述,下面簡單地提煉幾個要點:
- 布隆過濾器是用來判斷一個元素是否出現在給定集合中的重要工具,具有快速,比哈希表更節省空間等優點,而缺點在於有一定的誤識別率(false-positive,假陽性),亦即,它可能會把不是集合內的元素判定為存在於集合內,不過這樣的概率相當小,在大部分的生產環境中是可以接受的;
- 其原理比較簡單,如下圖所示,S集合中有n個元素,利用k個哈希函數,將S中的每個元素映射到一個長度為m的位(bit)數組B中不同的位置上,這些位置上的二進制數均置為1,如果待檢測的元素經過這k個哈希函數的映射后,發現其k個位置上的二進制數不全是1,那么這個元素一定不在集合S中,反之,該元素可能是S中的某一個元素(參考1);
- 綜上描述,那么到底需要多少個哈希函數,以及創建長度為多少的bit數組比較合適,為了估算出k和m的值,在構造一個布隆過濾器時,需要傳入兩個參數,即可以接受的誤判率fpp和元素總個數n(不一定完全精確)。至於參數估計的方法,有興趣的同學可以參考維基英文頁面,下面直接給出公式:
- 哈希函數的要求盡量滿足平均分布,這樣既降低誤判發生的概率,又可以充分利用bit數組的空間;
- 根據論文《Less Hashing, Same Performance: Building a Better Bloom Filter》提出的一個技巧,可以用2個哈希函數來模擬k個哈希函數,即gi(x) = h1(x) + ih2(x) ,其中0<=i<=k-1;
- 在吳軍博士的《數學之美》一書中展示了不同情況下的誤判率,例如,假定一個元素用16位比特,8個哈希函數,那么假陽性的概率是萬分之五,這已經相當小了。
目前已經有相應實現的開源類庫,如Google的Guava類庫,Twitter的Algebird類庫,和ScalaNLP breeze等等,其中Guava 11.0版本中增加了BloomFilter類,它使用了Funnel和Sink的設計,增強了泛化的能力,使其可以支持任何數據類型,其利用murmur3 hash來做哈希映射函數,不過它底層並沒有使用傳統的java.util.BitSet來做bit數組,而是用long型數組進行了重新封裝,大部分操作均基於位的運算,因此能達到一個非常好的性能;下面我們就Guava類庫中實現布隆過濾器的源碼作詳細分析,最后出於靈活性和解耦等因素的考慮,我們想要把布隆過濾器從JVM中拿出來,於是利用了Redis自帶的Bitmaps作為底層的bit數組進行重構,另外隨着插入的元素越來越多,當實際數量遠遠大於創建時設置的預計數量時,布隆過濾器的誤判率會越來越高,因此在重構的過程中增加了自動擴容的特性,最后通過測試驗證其正確性。
2. 布隆過濾器在Guava中的實現
Guava中,布隆過濾器的實現主要涉及到2個類,BloomFilter和BloomFilterStrategies,首先來看一下BloomFilter:
/** The bit set of the BloomFilter (not necessarily power of 2!) */
private final BitArray bits;
/** Number of hashes per element */
private final int numHashFunctions;
/** The funnel to translate Ts to bytes */
private final Funnel<? super T> funnel;
/**
* The strategy we employ to map an element T to {@code numHashFunctions} bit indexes.
*/
private final Strategy strategy;
這是它的4個成員變量:
- BitArrays是定義在BloomFilterStrategies中的內部類,封裝了布隆過濾器底層bit數組的操作,后文詳述;
- numHashFunctions表示哈希函數的個數,即上文提到的k;
- Funnel,這是Guava中定義的一個接口,它和PrimitiveSink配套使用,主要是把任意類型的數據轉化成Java基本數據類型(primitive value,如char,byte,int……),默認用java.nio.ByteBuffer實現,最終均轉化為byte數組;
- Strategy是定義在BloomFilter類內部的接口,代碼如下,有3個方法,put(插入元素),mightContain(判定元素是否存在)和ordinal方法(可以理解為枚舉類中那個默認方法)
interface Strategy extends java.io.Serializable {
/**
* Sets {@code numHashFunctions} bits of the given bit array, by hashing a user element.
*
* <p>Returns whether any bits changed as a result of this operation.
*/
<T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits);
/**
* Queries {@code numHashFunctions} bits of the given bit array, by hashing a user element;
* returns {@code true} if and only if all selected bits are set.
*/
<T> boolean mightContain(
T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits);
/**
* Identifier used to encode this strategy, when marshalled as part of a BloomFilter. Only
* values in the [-128, 127] range are valid for the compact serial form. Non-negative values
* are reserved for enums defined in BloomFilterStrategies; negative values are reserved for any
* custom, stateful strategy we may define (e.g. any kind of strategy that would depend on user
* input).
*/
int ordinal();
}
對於創建布隆過濾器,BloomFilter並沒有公有的構造函數,只有一個私有構造函數,而對外它提供了5個重載的create方法,在缺省情況下誤判率設定為3%,采用BloomFilterStrategies.MURMUR128_MITZ_64的實現。其中4個create方法最終都調用了同一個create方法,由它來負責調用私有構造函數,其源碼如下:
static <T> BloomFilter<T> create(
Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {
checkNotNull(funnel);
checkArgument(
expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
checkNotNull(strategy);
if (expectedInsertions == 0) {
expectedInsertions = 1;
}
/*
* TODO(user): Put a warning in the javadoc about tiny fpp values, since the resulting size
* is proportional to -log(p), but there is not much of a point after all, e.g.
* optimalM(1000, 0.0000000000000001) = 76680 which is less than 10kb. Who cares!
*/
long numBits = optimalNumOfBits(expectedInsertions, fpp);
int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
try {
return new BloomFilter<T>(new BitArray(numBits), numHashFunctions, funnel, strategy);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
}
}
在create中接受了4個參數,funnel(輸入的數據),expectedInsertions(預計插入的元素總數),fpp(期望誤判率),strategy(實現Strategy的實例),然后它計算了bit數組的長度以及哈希函數的個數(公式參考前文),最后用numBits創建了BitArray,並調用了構造函數完成賦值操作。
static long optimalNumOfBits(long n, double p) {
if (p == 0) {
p = Double.MIN_VALUE;
}
return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
}
static int optimalNumOfHashFunctions(long n, long m) {
// (m / n) * log(2), but avoid truncation due to division!
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
}
接着再來看一下BloomFilterStrategies類,首先它是實現了BloomFilter.Strategy 接口的一個枚舉類,其次它有兩個2枚舉值,MURMUR128_MITZ_32和MURMUR128_MITZ_64,分別對應了32位哈希映射函數,和64位哈希映射函數,后者使用了murmur3 hash生成的所有128位,具有更大的空間,不過原理是相通的,我們選擇默認的MURMUR128_MITZ_64來分析:
MURMUR128_MITZ_64() {
@Override
public <T> boolean put(
T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits) {
long bitSize = bits.bitSize();
byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).getBytesInternal();
long hash1 = lowerEight(bytes);
long hash2 = upperEight(bytes);
boolean bitsChanged = false;
long combinedHash = hash1;
for (int i = 0; i < numHashFunctions; i++) {
// Make the combined hash positive and indexable
bitsChanged |= bits.set((combinedHash & Long.MAX_VALUE) % bitSize);
combinedHash += hash2;
}
return bitsChanged;
}
@Override
public <T> boolean mightContain(
T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits) {
long bitSize = bits.bitSize();
byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).getBytesInternal();
long hash1 = lowerEight(bytes);
long hash2 = upperEight(bytes);
long combinedHash = hash1;
for (int i = 0; i < numHashFunctions; i++) {
// Make the combined hash positive and indexable
if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) {
return false;
}
combinedHash += hash2;
}
return true;
}
抽象來看,put是寫,mightContain是讀,兩個方法的代碼有一點相似,都是先利用murmur3 hash對輸入的funnel計算得到128位的字節數組,然后高低分別取8個字節(64位)創建2個long型整數hash1,hash2作為哈希值。循環體內采用了2個函數模擬其他函數的思想,即上文提到的gi(x) = h1(x) + ih2(x) ,這相當於每次累加hash2,然后通過基於bitSize取模的方式在bit數組中索引。
這里之所以要和Long.MAX_VALUE進行按位與的操作,是因為在除數和被除數符號不一致的情況下計算所得的結果是有差別的,在程序語言里,“%”准確來說是取余運算(C,C++和Java均如此,python是取模),如-5%3=-2,而取模的數學定義是x
mod y=x-y[x/y](向下取整),所以-5 mod 3=
-5-3*(-2)=1,因此當哈希值為負數的時候,其取余的結果為負(bitSize始終為正數),這樣就不方便在bit數組中取值,因此通過Long.MAX_VALUE(二進制為0111…1111),直接將開頭的符號位去掉,從而轉變為正數。當然也可以取絕對值,在另一個MURMUR128_MITZ_32的實現中就是這么做的。
在put方法中,先是將索引位置上的二進制置為1,然后用bitsChanged記錄插入結果,如果返回true表明沒有重復插入成功,而mightContain方法則是將索引位置上的數值取出,並判斷是否為0,只要其中出現一個0,那么立即判斷為不存在。
最后再說一下底層bit數組的實現,主要代碼如下:
static final class BitArray {
final long[] data;
long bitCount;
BitArray(long bits) {
this(new long[Ints.checkedCast(LongMath.divide(bits, 64, RoundingMode.CEILING))]);
}
// Used by serialization
BitArray(long[] data) {
checkArgument(data.length > 0, "data length is zero!");
this.data = data;
long bitCount = 0;
for (long value : data) {
bitCount += Long.bitCount(value);
}
this.bitCount = bitCount;
}
/** Returns true if the bit changed value. */
boolean set(long index) {
if (!get(index)) {
data[(int) (index >>> 6)] |= (1L << index);
bitCount++;
return true;
}
return false;
}
boolean get(long index) {
return (data[(int) (index >>> 6)] & (1L << index)) != 0;
}
/** Number of bits */
long bitSize() {
return (long) data.length * Long.SIZE;
}
...
}
之前也提到了Guava沒有使用java.util.BitSet,而是封裝了一個long型的數組,另外還有一個long型整數,用來統計數組中已經占用(置為1)的數量,在第一個構造函數中,它把傳入的long型整數按長度64分段(例如129分為3段),段數作為數組的長度,你可以想象成由若干個64位數組拼接成一個超長的數組,它的長度就是64乘以段數,即bitSize,在第二個構造函數中利用Long.bitCount方法來統計對應二進制編碼中的1個數,這個方法在JDK1.5中就有了,其算法設計得非常精妙,有精力的同學可以自行研究。
另外兩個重要的方法是set和get,在get方法中,參考put和mightContain方法,傳入的參數index是經過bitSize取模的,因此一定能落在這個超長數組的范圍之內,為了獲取index對應索引位置上的值,首先將其無符號右移6位,並且強制轉換成int型,這相當於除以64向下取整的操作,也就是換算成段數,得到該段上的數值之后,又將1左移index位,最后進行按位與的操作,如果結果等於0,那么返回false,從而在mightContain中判斷為不存在。在set方法中,首先調用了get方法判斷是否已經存在,如果不存在,則用同樣的邏輯取出data數組中對應索引位置的數值,然后按位或並賦值回去。
到這里,對Guava中布隆過濾器的實現就基本討論完了,簡單總結一下:
- BloomFilter類的作用在於接收輸入,利用公式完成對參數的估算,最后初始化Strategy接口的實例;
- BloomFilterStrategies是一個枚舉類,具有兩個實現了Strategy接口的成員,分別為MURMUR128_MITZ_32和MURMUR128_MITZ_64,另外封裝了long型的數組作為布隆過濾器底層的bit數組,其中在get和set方法中完成核心的位運算。
3. 利用Redis Bitmaps進行重構
通過上面的分析,主要算法和邏輯的部分大體都是一樣的,真正需要重構的部分是底層位數組的實現,在Guava中是封裝了一個long型的數組,而對於redis來說,本身自帶了Bitmaps的“數據結構”(本質上還是一個字符串),已經提供了位操作的接口,因此重構本身並不復雜,相對比較復雜的是,之前提到的實現自動擴容特性。
這里實現自動擴容的思想是,在redis中記錄一個自增的游標cursor,如果當前key對應的Bitmaps已經達到飽和狀態,則cursor自增,同時用其生成一個新的key,並創建規模同等的Bitmaps。然后在get的時候,需要判斷該元素是否存在於任意一個Bitmaps中。於是整個的邏輯就變成,一個元素在每個Bitmaps中都不存在時,才能插入當前cursor對應key的Bitmaps中。
下面是代碼的實現部分。
首先,為了簡化redis的操作,定義了2個函數式接口,分別執行單條命令和pipeline,另外還實現了一個簡單的工具類
@FunctionalInterface
public interface JedisExecutor<T> {
T execute(Jedis jedis);
}
@FunctionalInterface
public interface PipelineExecutor {
void load(Pipeline pipeline);
}
public class JedisUtils {
private static final GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
private JedisPool jedisPool;
public JedisUtils() {
jedisPool = new JedisPool(poolConfig, "localhost", 6379);
}
public <T> T execute(JedisExecutor<T> jedisExecutor) {
try (Jedis jedis = jedisPool.getResource()) {
return jedisExecutor.execute(jedis);
}
}
public List<Object> pipeline(List<PipelineExecutor> pipelineExecutors) {
try (Jedis jedis = jedisPool.getResource()) {
Pipeline pipeline = jedis.pipelined();
for (PipelineExecutor executor : pipelineExecutors)
executor.load(pipeline);
return pipeline.syncAndReturnAll();
}
}
}
其次在Strategy中,對put和mightContain作了一點修改,其中被注釋的部分是Guava中的實現。為了簡化,這里我們只接受String對象。
這里先把所有的隨機函數對應的索引位置收集到一個數組中,然后交由底層的RedisBitmaps處理get或set,具體過程后面會詳細說明。
bits.ensureCapacityInternal()方法,即表示自動擴容,這個函數名是從ArrayList中搬過來的。
@Override
public boolean put(String string, int numHashFunctions, RedisBitmaps bits) {
long bitSize = bits.bitSize();
byte[] bytes = Hashing.murmur3_128().hashString(string, Charsets.UTF_8).asBytes();
long hash1 = lowerEight(bytes);
long hash2 = upperEight(bytes);
boolean bitsChanged = false;
long combinedHash = hash1;
// for (int i = 0; i < numHashFunctions; i++) {
// bitsChanged |= bits.set((combinedHash & Long.MAX_VALUE) % bitSize);
// combinedHash += hash2;
// }
long[] offsets = new long[numHashFunctions];
for (int i = 0; i < numHashFunctions; i++) {
offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
combinedHash += hash2;
}
bitsChanged = bits.set(offsets);
bits.ensureCapacityInternal();//自動擴容
return bitsChanged;
}
@Override
public boolean mightContain(String object, int numHashFunctions, RedisBitmaps bits) {
long bitSize = bits.bitSize();
byte[] bytes = Hashing.murmur3_128().hashString(object, Charsets.UTF_8).asBytes();
long hash1 = lowerEight(bytes);
long hash2 = upperEight(bytes);
long combinedHash = hash1;
// for (int i = 0; i < numHashFunctions; i++) {
// if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) {
// return false;
// }
// combinedHash += hash2;
// }
// return true;
long[] offsets = new long[numHashFunctions];
for (int i = 0; i < numHashFunctions; i++) {
offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
combinedHash += hash2;
}
return bits.get(offsets);
}
最后,也是最重要的RedisBitmaps,其中bitSize用了Guava布隆過濾器中計算Long型數組長度的方法,得到bitSize之后使用setbit命令初始化一個全部為0的位數組。get(long offset)和set(long offset),這兩個與Guava布隆過濾器中的邏輯類似,這里就不再贅述了,而get(long[] offsets)方法中,所有的offset要與每一個cursor對應的Bitmaps進行判斷,若全部命中,那么這個元素就可能存在於該Bitmaps,反之若不能完全命中,則表示該元素不存在於任何一個Bitmaps,所以當滿足這個條件,在set(long[] offsets)方法中,就可以插入到當前key的Bitmaps中了。
在ensureCapacityInternal方法,需要擴容的判斷條件是bitCount*2>bitSize,bitCount表示一個Bitmaps中“1”出現的個數,也就是當“1”出現的個數超過總數的一半時,進行擴容操作——首先使用incr命令對cursor自增,然后使用新的key創建一個新的Bitmaps。
RedisBitmapsJava
class RedisBitmaps {
private static final String BASE_KEY = "bloomfilter";
private static final String CURSOR = "cursor";
private JedisUtils jedisUtils;
private long bitSize;
RedisBitmaps(long bits) {
this.jedisUtils = new JedisUtils();
this.bitSize = LongMath.divide(bits, 64, RoundingMode.CEILING) * Long.SIZE;//位數組的長度,相當於n個long的長度
if (bitCount() == 0) {
jedisUtils.execute((jedis -> jedis.setbit(currentKey(), bitSize - 1, false)));
}
}
boolean get(long[] offsets) {
for (long i = 0; i < cursor() + 1; i++) {
final long cursor = i;
//只要有一個cursor對應的bitmap中,offsets全部命中,則表示可能存在
boolean match = Arrays.stream(offsets).boxed()
.map(offset -> jedisUtils.execute(jedis -> jedis.getbit(genkey(cursor), offset)))
.allMatch(b -> (Boolean) b);
if (match)
return true;
}
return false;
}
boolean get(final long offset) {
return jedisUtils.execute(jedis -> jedis.getbit(currentKey(), offset));
}
boolean set(long[] offsets) {
if (cursor() > 0 && get(offsets)) {
return false;
}
boolean bitsChanged = false;
for (long offset : offsets)
bitsChanged |= set(offset);
return bitsChanged;
}
boolean set(long offset) {
if (!get(offset)) {
jedisUtils.execute(jedis -> jedis.setbit(currentKey(), offset, true));
return true;
}
return false;
}
long bitCount() {
return jedisUtils.execute(jedis -> jedis.bitcount(currentKey()));
}
long bitSize() {
return this.bitSize;
}
private String currentKey() {
return genkey(cursor());
}
private String genkey(long cursor) {
return BASE_KEY + "-" + cursor;
}
private Long cursor() {
String cursor = jedisUtils.execute(jedis -> jedis.get(CURSOR));
return cursor == null ? 0 : Longs.tryParse(cursor);
}
void ensureCapacityInternal() {
if (bitCount() * 2 > bitSize())
grow();
}
void grow() {
Long cursor = jedisUtils.execute(jedis -> jedis.incr(CURSOR));
jedisUtils.execute((jedis -> jedis.setbit(genkey(cursor), bitSize - 1, false)));
}
void reset() {
String[] keys = LongStream.range(0, cursor() + 1).boxed().map(this::genkey).toArray(String[]::new);
jedisUtils.execute(jedis -> jedis.del(keys));
jedisUtils.execute(jedis -> jedis.set(CURSOR, "0"));
jedisUtils.execute(jedis -> jedis.setbit(currentKey(), bitSize - 1, false));
}
private PipelineExecutor apply(PipelineExecutor executor) {
return executor;
}
}
下面我們做一個單元測試來驗證其正確性。
如果我們插入的數量等於原預計總數,RedisBloomFilter擴容了1次,而兩個布隆過濾器的結果一致,都為false,true,false。
如果插入的數量為原預計總數的3倍,RedisBloomFilter擴容了3次,並且仍判斷正確,而Guava布隆過濾器則在判斷str3時出現誤判。
public class TestRedisBloomFilter {
private static final int TOTAL = 10000;
private static final double FPP = 0.0005;
@Test
public void test() {
RedisBloomFilter redisBloomFilter = RedisBloomFilter.create(TOTAL, FPP);
redisBloomFilter.resetBitmap();
BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), TOTAL, FPP);
IntStream.range(0, /* 3* */TOTAL).boxed()
.map(i -> Hashing.md5().hashInt(i).toString())
.collect(toList()).forEach(s -> {
redisBloomFilter.put(s);
bloomFilter.put(s);
});
String str1 = Hashing.md5().hashInt(99999).toString();
String str2 = Hashing.md5().hashInt(9999).toString();
String str3 = "abcdefghijklmnopqrstuvwxyz123456";
System.out.println(redisBloomFilter.mightContain(str1) + ":" + bloomFilter.mightContain(str1));
System.out.println(redisBloomFilter.mightContain(str2) + ":" + bloomFilter.mightContain(str2));
System.out.println(redisBloomFilter.mightContain(str3) + ":" + bloomFilter.mightContain(str3));
}
}
>>
grow bloomfilter-1
false:false
true:true
false:false
>>
grow bloomfilter-1
grow bloomfilter-2
grow bloomfilter-3
false:false
true:true
false:true
綜上,本文利用了Guava布隆過濾器的思想,並結合Redis中的Bitmaps等特性實現了支持動態擴容的布隆過濾器,它將布隆過濾器底層的位數據裝載到了Redis數據庫中,這樣的好處在於可以部署在更復雜的多應用或分布式系統中,還可以利用Redis完成持久化,定時過期等功能。