一、什么是緩存穿透
當用戶想要查詢一個數據,發現redis內存數據庫沒有,出現緩存未命中,於是轉向持久層數據庫查詢。發現也沒有,於是本次查詢失敗。當用戶很多的時候,緩存都沒有命中,於是都去請求了持久層數據庫,給持久層數據庫造成很大的壓力,這就是緩存穿透。
於是我們就需要有一個能實現“快速判斷是否存在”的方案,在確定不存在時就不在去后台查詢數據庫了,避免了緩存穿透,布隆過濾器應運而生。
二、什么是布隆過濾器
Bloom Filter是一種空間效率很高的概率型數據結構,它利用位數組很簡潔地表示一個集合,並能判斷一個元素是否屬於這個集合。Bloom Filter的這種高效是有一定代價的:在判斷一個元素是否屬於某個集合時,有可能會把不屬於這個集合的元素誤認為屬於這個集合(false positive)。因此,Bloom Filter不適合那些“零錯誤”的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter通過極少的錯誤換取了存儲空間的極大節省。
那么它的誕生契機是什么呢?我們平常在檢測集合中是否存在某元素時,都會采用比較的方法。考慮以下情況:
- 如果集合用線性表存儲,查找的時間復雜度為O(n)。
- 如果用平衡BST(如AVL樹、紅黑樹)存儲,時間復雜度為O(logn)。
- 如果用哈希表存儲,並用鏈地址法與平衡BST解決哈希沖突(參考JDK8的HashMap實現方法),時間復雜度也要有O[log(n/m)],m為哈希分桶數。
三、布隆過濾器原理
BF是由一個長度為m比特的位數組(bit array)與k個哈希函數(hash function)組成的數據結構。位數組均初始化為0,所有哈希函數都可以分別把輸入數據盡量均勻地散列。
- 當要插入一個元素時,將其數據分別輸入k個哈希函數,產生k個哈希值。以哈希值作為位數組中的下標,將所有k個對應的比特置為1。
- 當要查詢(即判斷是否存在)一個元素時,同樣將其數據輸入哈希函數,然后檢查對應的k個比特。如果有任意一個比特為0,表明該元素一定不在集合中。如果所有比特均為1,表明該集合有(較大的)可能性在集合中。為什么不是一定在集合中呢?因為一個比特被置為1有可能會受到其他元素的影響,這就是所謂“假陽性”(false positive)。相對地,“假陰性”(false negative)在BF中是絕不會出現的。
下圖示出一個m=18, k=3的BF示例。集合中的x、y、z三個元素通過3個不同的哈希函數散列到位數組中。當查詢元素w時,因為有一個比特為0,因此w不在該集合中。

BF的優點是顯而易見的:
- 不需要存儲數據本身,只用比特表示,因此空間占用相對於傳統方式有巨大的優勢,並且能夠保密數據;
- 時間效率也較高,插入和查詢的時間復雜度均為O(k);
- 哈希函數之間相互獨立,可以在硬件指令層面並行計算。
但是,它的缺點也同樣明顯:
- 存在假陽性的概率,不適用於任何要求100%准確率的情境;
- 只能插入和查詢元素,不能刪除元素,這與產生假陽性的原因是相同的。我們可以簡單地想到通過計數(即將一個比特擴展為計數值)來記錄元素數,但仍然無法保證刪除的元素一定在集合中。
- 哈希函數個數k越多,假陽性概率越低;
- 位數組長度m越大,假陽性概率越低;
- 已插入元素的個數n越大,假陽性概率越高。
四、Guava中的布隆過濾器實現
1、Bloom Filter成員變量
Guava中,布隆過濾器的實現主要涉及到2個類,BloomFilter
和BloomFilterStrategies
,首先來看一下BloomFilter
的成員變量。需要注意的是不同Guava版本的BloomFilter
實現不同。
/** guava實現的以CAS方式設置每個bit位的bit數組 */ private final LockFreeBitArray bits; /** hash函數的個數 */ private final int numHashFunctions; /** guava中將對象轉換為byte的通道 */ private final Funnel<? super T> funnel; /** * 將byte轉換為n個bit的策略,也是bloomfilter hash映射的具體實現 */ private final Strategy strategy;
這是它的4個成員變量:
- LockFreeBitArray是定義在BloomFilterStrategies中的內部類,封裝了布隆過濾器底層bit數組的操作。
- numHashFunctions表示哈希函數的個數。
- Funnel,它和PrimitiveSink配套使用,能將任意類型的對象轉化成Java基本數據類型,默認用java.nio.ByteBuffer實現,最終均轉化為byte數組。
- Strategy是布隆過濾器的哈希策略,即數據如何映射到位數組,其具體方法在BloomFilterStrategies枚舉中,主要有2個:put和mightContain。
2、Bloom Filter構造
創建布隆過濾器,BloomFilter
並沒有公有的構造函數,只有一個私有構造函數,而對外它提供了5個重載的create
方法,在缺省情況下誤判率設定為3%,采用BloomFilterStrategies.MURMUR128_MITZ_64
的實現。
@VisibleForTesting static <T> BloomFilter<T> create( Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) { checkNotNull(funnel); checkArgument( expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions); checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp); checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp); checkNotNull(strategy); if (expectedInsertions == 0) { expectedInsertions = 1; } /* * TODO(user): Put a warning in the javadoc about tiny fpp values, since the resulting size * is proportional to -log(p), but there is not much of a point after all, e.g. * optimalM(1000, 0.0000000000000001) = 76680 which is less than 10kb. Who cares! */ long numBits = optimalNumOfBits(expectedInsertions, fpp); int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits); try { return new BloomFilter<T>(new LockFreeBitArray(numBits), numHashFunctions, funnel, strategy); } catch (IllegalArgumentException e) { throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e); } }
該方法接受4個參數:funnel是插入數據的Funnel,expectedInsertions是期望插入的元素總個數n,fpp即期望假陽性率p,strategy即哈希策略。由上可知,位數組的長度m和哈希函數的個數k分別通過optimalNumOfBits()方法和optimalNumOfHashFunctions()方法來估計。
3、估計最優m值和k值
@VisibleForTesting static long optimalNumOfBits(long n, double p) { if (p == 0) { p = Double.MIN_VALUE; } return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); } @VisibleForTesting static int optimalNumOfHashFunctions(long n, long m) { // (m / n) * log(2), but avoid truncation due to division! return Math.max(1, (int) Math.round((double) m / n * Math.log(2))); }
以上計算方式是基於推演得出的,此處不再詳述。
4、哈希函數
enum BloomFilterStrategies implements BloomFilter.Strategy { MURMUR128_MITZ_32() {//....} MURMUR128_MITZ_64() {//....} }
MURMUR128_MITZ_64() { @Override public <T> boolean put( T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) { long bitSize = bits.bitSize(); // 先利用murmur3 hash對輸入的funnel計算得到128位的哈希值,funnel現將object轉換為byte數組, // 然后在使用哈希函數轉換為long byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).getBytesInternal(); // 根據hash值的高低位算出hash1和hash2 long hash1 = lowerEight(bytes); long hash2 = upperEight(bytes); boolean bitsChanged = false; // 循環體內采用了2個函數模擬其他函數的思想,相當於每次累加hash2 long combinedHash = hash1; for (int i = 0; i < numHashFunctions; i++) { // Make the combined hash positive and indexable // 通過基於bitSize取模的方式獲取bit數組中的索引,然后調用set函數設置。 bitsChanged |= bits.set((combinedHash & Long.MAX_VALUE) % bitSize); combinedHash += hash2; } return bitsChanged; } @Override public <T> boolean mightContain( T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) { long bitSize = bits.bitSize(); byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).getBytesInternal(); long hash1 = lowerEight(bytes); long hash2 = upperEight(bytes); long combinedHash = hash1; for (int i = 0; i < numHashFunctions; i++) { // Make the combined hash positive and indexable // 和put的區別就在這里,從set轉換為get,來判斷是否存在 if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) { return false; } combinedHash += hash2; } return true; } private /* static */ long lowerEight(byte[] bytes) { return Longs.fromBytes( bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]); } private /* static */ long upperEight(byte[] bytes) { return Longs.fromBytes( bytes[15], bytes[14], bytes[13], bytes[12], bytes[11], bytes[10], bytes[9], bytes[8]); } };
其中put()方法負責向布隆過濾器中插入元素,mightContain()方法負責判斷元素是否存在。以put()方法為例講解一下流程吧。
- 使用MurmurHash算法對funnel的輸入數據進行散列,得到128bit(16B)的字節數組。
- 取低8字節作為第一個哈希值hash1,取高8字節作為第二個哈希值hash2。
- 進行k次循環,每次循環都用hash1與hash2的復合哈希做散列,然后對m取模,將位數組中的對應比特設為1。
這里需要注意兩點:
-
在循環中實際上應用了雙重哈希(double hashing)的思想,即可以用兩個哈希函數來模擬k個,其中i為步長:
這種方法在開放定址的哈希表中,也經常用來減少沖突。
- 哈希值有可能為負數,而負數是不能在位數組中定位的。所以哈希值需要與Long.MAX_VALUE做bitwise AND,直接將其最高位(符號位)置為0,就變成正數了。
put
方法中,先是將索引位置上的二進制置為1,然后用
bitsChanged
記錄插入結果,如果返回true表明沒有重復插入成功,而
mightContain
方法則是將索引位置上的數值取出,並判斷是否為0,只要其中出現一個0,那么立即判斷為不存在。
5、位數組具體實現
Guava
為了提供效率,自己實現了
LockFreeBitArray
來提供bit數組的無鎖設置和讀取,我們來看看LockFreeBitArray類的部分代碼:
static final class LockFreeBitArray { private static final int LONG_ADDRESSABLE_BITS = 6; final AtomicLongArray data; private final LongAddable bitCount; LockFreeBitArray(long bits) { this(new long[Ints.checkedCast(LongMath.divide(bits, 64, RoundingMode.CEILING))]); } // Used by serialization LockFreeBitArray(long[] data) { checkArgument(data.length > 0, "data length is zero!"); this.data = new AtomicLongArray(data); this.bitCount = LongAddables.create(); long bitCount = 0; for (long value : data) { bitCount += Long.bitCount(value); } this.bitCount.add(bitCount); } /** Returns true if the bit changed value. */ boolean set(long bitIndex) { if (get(bitIndex)) { return false; } int longIndex = (int) (bitIndex >>> LONG_ADDRESSABLE_BITS); long mask = 1L << bitIndex; // only cares about low 6 bits of bitIndex long oldValue; long newValue; // 經典的CAS自旋重試機制 do { oldValue = data.get(longIndex); newValue = oldValue | mask; if (oldValue == newValue) { return false; } } while (!data.compareAndSet(longIndex, oldValue, newValue)); // We turned the bit on, so increment bitCount. bitCount.increment(); return true; } boolean get(long bitIndex) { return (data.get((int) (bitIndex >>> 6)) & (1L << bitIndex)) != 0; } // .... }
它是采用原子類型AtomicLongArray作為位數組的存儲的,確實不需要加鎖。另外還有一個Guava中特有的LongAddable類型的計數器,用來統計置為1的比特數。
采用AtomicLongArray除了有並發上的優勢之外,更主要的是它可以表示非常長的位數組。一個長整型數占用64bit,因此data[0]可以代表第0~63bit,data[1]代表64~127bit,data[2]代表128~191bit……依次類推。這樣設計的話,將下標i無符號右移6位就可以獲得data數組中對應的位置,再在其基礎上左移i位就可以取得對應的比特了。
上面的代碼中用到了Long.bitCount()方法計算long型二進制表示中1的數量:
public static int bitCount(long i) { // HD, Figure 5-14 i = i - ((i >>> 1) & 0x5555555555555555L); i = (i & 0x3333333333333333L) + ((i >>> 2) & 0x3333333333333333L); i = (i + (i >>> 4)) & 0x0f0f0f0f0f0f0f0fL; i = i + (i >>> 8); i = i + (i >>> 16); i = i + (i >>> 32); return (int)i & 0x7f; }
五、Redis實現布隆過濾器
上面使用guava實現布隆過濾器是把數據放在本地內存中,無法實現布隆過濾器的共享,我們還可以把數據放在redis中,用 redis來實現布隆過濾器,我們要使用的數據結構是bitmap,你可能會有疑問,redis支持五種數據結構:String,List,Hash,Set,ZSet,沒有bitmap呀。沒錯,實際上bitmap的本質還是String。
要用redis來實現布隆過濾器,我們需要自己設計映射函數,自己度量二進制向量的長度。
public class RedisMain { static final int expectedInsertions = 100;//要插入多少數據 static final double fpp = 0.01;//期望的誤判率 //bit數組長度 private static long numBits; //hash函數數量 private static int numHashFunctions; static { numBits = optimalNumOfBits(expectedInsertions, fpp); numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits); } public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1", 6379); for (int i = 0; i < 100; i++) { long[] indexs = getIndexs(String.valueOf(i)); for (long index : indexs) { jedis.setbit("codebear:bloom", index, true); } } for (int i = 0; i < 100; i++) { long[] indexs = getIndexs(String.valueOf(i)); for (long index : indexs) { Boolean isContain = jedis.getbit("codebear:bloom", index); if (!isContain) { System.out.println(i + "肯定沒有重復"); } } System.out.println(i + "可能重復"); } } /** * 根據key獲取bitmap下標 */ private static long[] getIndexs(String key) { long hash1 = hash(key); long hash2 = hash1 >>> 16; long[] result = new long[numHashFunctions]; for (int i = 0; i < numHashFunctions; i++) { long combinedHash = hash1 + i * hash2; if (combinedHash < 0) { combinedHash = ~combinedHash; } result[i] = combinedHash % numBits; } return result; } private static long hash(String key) { Charset charset = Charset.forName("UTF-8"); return Hashing.murmur3_128().hashObject(key, Funnels.stringFunnel(charset)).asLong(); } //計算hash函數個數 private static int optimalNumOfHashFunctions(long n, long m) { return Math.max(1, (int) Math.round((double) m / n * Math.log(2))); } //計算bit數組長度 private static long optimalNumOfBits(long n, double p) { if (p == 0) { p = Double.MIN_VALUE; } return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); } }