前面講到bloomfilter的原理及guava實現的bloomfilter的用法,現在看看redis如何實現:
一、bitmaps
我們知道計算機是以二進制位作為底層存儲的基礎單位,一個字節等於8位。
比如“big”字符串是由三個字符組成的,這三個字符對應的ASCII碼分為是98、105、103,對應的二進制存儲如下:
在Redis中,Bitmaps 提供了一套命令用來操作類似上面字符串中的每一個位,這種位存儲可以極大的降低redis的內存。位操作常用的命令為:
SETBIT KEY OFFSET VALUE
GETBIT KEY OFFSET
一、設置值
setbit key offset value
我們知道"b"的二進制表示為0110 0010,我們將第7位(從0開始)設置為1,那0110 0011 表示的就是字符“c”,所以最后的字符 “big”變成了“cig”。
二、獲取值
gitbit key offset
三、獲取位圖指定范圍值為1的個數
bitcount key [start end]
如果不指定,那就是獲取全部值為1的個數。
注意:start和end指定的是字節的個數,而不是位數組下標。
二、redis中實現bloomfilter方式
這種結構如果和BloomFilter 結合起來就可以實現分布式的布隆過濾器了。
2.1、Redisson
Redis 實現布隆過濾器的底層就是通過 bitmap 這種數據結構,至於如何實現,這里就不重復造輪子了,介紹業界比較好用的一個客戶端工具——Redisson。
Redisson 是用於在 Java 程序中操作 Redis 的庫,利用Redisson 我們可以在程序中輕松地使用 Redis。
下面我們就通過 Redisson 來構造布隆過濾器。
import org.redisson.Redisson; import org.redisson.api.RBloomFilter; import org.redisson.api.RedissonClient; import org.redisson.config.Config; public class RedissonBloomFilter { public static void main(String[] args) { Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.14.104:6379"); config.useSingleServer().setPassword("123"); //構造Redisson RedissonClient redisson = Redisson.create(config); RBloomFilter<String> bloomFilter = redisson.getBloomFilter("phoneList"); //初始化布隆過濾器:預計元素為100000000L,誤差率為3% bloomFilter.tryInit(100000000L,0.03); //將號碼10086插入到布隆過濾器中 bloomFilter.add("10086"); //判斷下面號碼是否在布隆過濾器中 System.out.println(bloomFilter.contains("123456"));//false System.out.println(bloomFilter.contains("10086"));//true } }
這是單節點的Redis實現方式,如果數據量比較大,期望的誤差率又很低,那單節點所提供的內存是無法滿足的,這時候可以使用分布式布隆過濾器,同樣也可以用 Redisson 來實現,這里我就不做代碼演示了,大家有興趣可以試試。
springboot使用bloomfilter的方法:
2.2、redisTemplate + Lua(redis4.0)
步驟一:編寫兩個 Lua 腳本
bloomFilterAdd.lua
local bloomName = KEYS[1] local value = KEYS[2] -- bloomFilter local result_1 = redis.call('BF.ADD', bloomName, value) return result_1
bloomFilterExist.lua
local bloomName = KEYS[1] local value = KEYS[2] -- bloomFilter local result_1 = redis.call('BF.EXISTS', bloomName, value) return result_1
步驟二:新建兩個方法
1)添加數據到指定名稱的布隆過濾器(bloomFilterAdd)
2)從指定名稱的布隆過濾器獲取 key 是否存在的腳本(bloomFilterExists)
java封裝:
public Boolean bloomFilterAdd(int value){ DefaultRedisScript<Boolean> bloomAdd = new DefaultRedisScript<>(); bloomAdd.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterAdd.lua"))); bloomAdd.setResultType(Boolean.class); List<Object> keyList= new ArrayList<>(); keyList.add(bloomFilterName); keyList.add(value+""); Boolean result = (Boolean) redisTemplate.execute(bloomAdd,keyList); return result; } public Boolean bloomFilterAdd(String bloomName,int value){ DefaultRedisScript<Boolean> bloomAdd = new DefaultRedisScript<>(); bloomAdd.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterAdd.lua"))); bloomAdd.setResultType(Boolean.class); List<Object> keyList= new ArrayList<>(); keyList.add(bloomName); keyList.add(value+""); Boolean result = (Boolean) redisTemplate.execute(bloomAdd,keyList); return result; } public Boolean bloomFilterExists(int value) { DefaultRedisScript<Boolean> bloomExists = new DefaultRedisScript<>(); bloomExists.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterExist.lua"))); bloomExists.setResultType(Boolean.class); List<Object> keyList = new ArrayList<>(); keyList.add(bloomFilterName); keyList.add(value + ""); Boolean result = (Boolean) redisTemplate.execute(bloomExists, keyList); return result; } public Boolean bloomFilterExists(String bloomName,int value) { DefaultRedisScript<Boolean> bloomExists = new DefaultRedisScript<>(); bloomExists.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterExist.lua"))); bloomExists.setResultType(Boolean.class); List<Object> keyList = new ArrayList<>(); keyList.add(bloomName); keyList.add(value + ""); Boolean result = (Boolean) redisTemplate.execute(bloomExists, keyList); return result; }
步驟三:進行測試
@RestController public class BloomFilterController { @Resource private RedisService redisService; @RequestMapping("/bloom/redisIdExists") public boolean redisidExists(int id){ return redisService.bloomFilterExists(id); } @RequestMapping("/bloom/redisIdAdd") public boolean redisidAdd(int id){ return redisService.bloomFilterAdd(id); } }
2.3、通過JedisCluster(集群)或redisTemplate的setbit/getbit來操作bitmap 模擬guava的bloomfilter實現
如果自己實現的話,借鑒guava的bloomfilter思路為:
- 對校驗的對象做K次hash得到位移offset
- 調用getbit 命令檢查是不是每次返回的值都是1
- 如果返回K個1表示這個對象已經被存儲過
- 如果沒有的話, 可以對該對象進行存儲
經過上述講解, 流程和邏輯基本都差不多了,萬事俱備開始擼碼:
因為我們在使用布隆過濾器之前, 我們可以預先預估誤判率P和想要插入的個數n
2.3.1、關鍵方法說明
2.3.1.1、計算獲bitMap預分配的長度
從《Bloom Filter(布隆過濾器)的概念和原理》中公式可以推算bit 的長度, 但是需要注意的是公式計算出來的是浮點數:
/** * 計算bit數組的長度, * m = -n * Math.log(p)/Math.pow(ln2,2) * @param n 插入條數 * @param p 誤判概率 */ private int numOfBits(int n, double p) { if (p == 0) { p = Double.MIN_VALUE; } int sizeOfBitArray = (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); return sizeOfBitArray; }
2.3.1.2、計算hash的次數
/** * 計算hash方法執行次數 * k = m/n*ln2 * @param n 插入的數據條數 * @param m 數據位數 */ private int numberOfHashFunctions(long n, long m) { int countOfHash = Math.max(1, (int) Math.round((double) m / n * Math.log(2))); return countOfHash; }
2.3.1.3、獲取hash函數計算之后的位移集合
這個hash函數采用的是guava中的murmur函數
public int[] murmurHashOffset(T value) { int[] offset = new int[numHashFunctions]; long hash64 = Hashing.murmur3_128().hashObject(value, funnel).asLong(); int hash1 = (int) hash64; int hash2 = (int) (hash64 >>> 32); for (int i = 1; i <= numHashFunctions; i++) { int nextHash = hash1 + i * hash2; if (nextHash < 0) { nextHash = ~nextHash; } offset[i - 1] = nextHash % bitSize; } return offset; }
2.3.2、完整代碼
完整的bloomfilter的封裝如下:
import com.google.common.base.Preconditions; import com.google.common.hash.Funnel; import com.google.common.hash.Hashing; import org.springframework.beans.factory.annotation.Configurable; @Configurable public class BloomFilterHelper<T> { // hash函數的次數 private int numHashFunctions; // bit長度 private int bitSize; private Funnel<T> funnel; public BloomFilterHelper(Funnel<T> funnel, int expectedInsertions, double fpp) { Preconditions.checkArgument(funnel != null, "funnel不能為空"); this.funnel = funnel; bitSize = optimalNumOfBits(expectedInsertions, fpp); numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, bitSize); } public int[] murmurHashOffset(T value) { int[] offset = new int[numHashFunctions]; long hash64 = Hashing.murmur3_128().hashObject(value, funnel).asLong(); int hash1 = (int) hash64; int hash2 = (int) (hash64 >>> 32); for (int i = 1; i <= numHashFunctions; i++) { int nextHash = hash1 + i * hash2; if (nextHash < 0) { nextHash = ~nextHash; } offset[i - 1] = nextHash % bitSize; } return offset; } /** * 計算bit數組的長度, m = -n * Math.log(p)/Math.pow(ln2,2) * * @param n * 插入條數 * @param p * 誤判概率 */ private int optimalNumOfBits(int n, double p) { if (p == 0) { p = Double.MIN_VALUE; } // sizeOfBitArray return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); } /** * 計算hash方法執行次數 k = m/n*ln2 * * @param n * 插入的數據條數 * @param m * 數據位數 */ private int optimalNumOfHashFunctions(long n, long m) { // countOfHash return Math.max(1, (int) Math.round((double) m / n * Math.log(2))); } }
單機的布隆過濾器已經建好了, 接下來就是和redis整合了,
先看不用lua的封裝的類:
import com.config.BloomFilterHelper; import com.google.common.base.Preconditions; import org.springframework.stereotype.Component; import redis.clients.jedis.JedisCluster; @Component public class RedisBloomFilter<T> { private JedisCluster cluster; public RedisBloomFilter(JedisCluster jedisCluster) { this.cluster = jedisCluster; } /** * 根據給定的布隆過濾器添加值 */ public <T> void addByBloomFilter(BloomFilterHelper<T> bloomFilterHelper, String key, T value) { Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能為空"); int[] offset = bloomFilterHelper.murmurHashOffset(value); for (int i : offset) { //redisTemplate.opsForValue().setBit(key, i, true); cluster.setbit(key, i, true); } } /** * 根據給定的布隆過濾器判斷值是否存在 */ public <T> boolean includeByBloomFilter(BloomFilterHelper<T> bloomFilterHelper, String key, T value) { Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能為空"); int[] offset = bloomFilterHelper.murmurHashOffset(value); for (int i : offset) { //if (!redisTemplate.opsForValue().getBit(key, i)) { if (!cluster.getbit(key, i)) { return false; } } return true; } }
原子性問題,上面的紅色代碼段 由於可能會有多次的setbit操作,這樣可能會發生多次的網絡請求, 不一定執行成功。
所以考慮的是用lua腳本來執行:
private static final String GET_BIT_LUA = "for i=1,#ARGV\n" + "do\n" + " local value = redis.call(\"GETBIT\", KEYS[1], ARGV[i])\n" + " if value == 0\n" + " then\n" + " return 0\n" + " end\n" + "end\n" + "return 1"; private static final String SET_BIT_LUA = "for i=1, #ARGV\n" + "do\n" + " redis.call(\"SETBIT\",KEYS[1], ARGV[i],1)\n" + "end\n";
布隆過濾器的插入和判斷操作分別如下:
public static <T> void addByBloomFilter(IRedisHelper redisHelper, BloomFilterHelper<T> bloomFilterHelper, Object key, T value) { Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能為空"); List<Long> offsetList = bloomFilterHelper.murmurHashOffset(value); if(CollectionUtils.isEmpty(offsetList)){ return ; } redisHelper.eval(routeKey, SET_BIT_LUA, Lists.newArrayList(key.getRawKey()), offsetList); } /** * 根據給定的布隆過濾器判斷值是否存在 */ public static <T> boolean includeByBloomFilter(IRedisHelper redisHelper, BloomFilterHelper<T> bloomFilterHelper, Object key, T value) { Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能為空"); List<Long> offsetList = bloomFilterHelper.murmurHashOffset(value); if(CollectionUtils.isEmpty(offsetList)){ return false; } String result = String.valueOf(eval); if("1".equalsIgnoreCase(result)){ return true; } return false; }
對於redis的bitmap 存在一個問題,就是內存初始化的問題,
下面是來自官方的原話:
When setting the last possible bit (offset equal to 2^32 -1) and the string value stored at key does not yet hold a string value, or holds a small string value, Redis needs to allocate all intermediate memory which can block the server for some time. On a 2010 MacBook Pro, setting bit number 2^32 -1 (512MB allocation) takes ~300ms, setting bit number 2^30 -1 (128MB allocation) takes ~80ms, setting bit number 2^28 -1 (32MB allocation) takes ~30ms and setting bit number 2^26 -1 (8MB allocation) takes ~8ms.
redis集群配置:
spring.redis.cluster.nodes=127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005 spring.redis.password= #連接池最大連接數(使用負值表示沒有限制) spring.redis.pool.max-active=8 #連接池最大阻塞等待時間(使用負值表示沒有限制) spring.redis.pool.max-wait=-1 #連接池中的最大空閑連接 spring.redis.pool.max-idle=8 #連接池中的最小空閑連接 spring.redis.pool.min-idle=0 #連接超時時間(毫秒) spring.redis.timeout=0
@Configuration public class RedisConfig { private Logger logger = LoggerFactory.getLogger(RedisConfig.class); @Value("${spring.redis.cluster.nodes}") private String clusterNodes; @Value("${spring.redis.timeout}") private int timeout; @Value("${spring.redis.pool.max-idle}") private int maxIdle; @Value("${spring.redis.pool.max-wait}") private long maxWaitMillis; @Value("${spring.redis.commandTimeout}") private int commandTimeout; @Bean public JedisCluster getJedisCluster() { String[] cNodes = clusterNodes.split(","); Set<HostAndPort> nodes = new HashSet<>(); // 分割出集群節點 for (String node : cNodes) { String[] hp = node.split(":"); nodes.add(new HostAndPort(hp[0], Integer.parseInt(hp[1]))); } JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxIdle(maxIdle); jedisPoolConfig.setMaxWaitMillis(maxWaitMillis); return new JedisCluster(nodes, commandTimeout, jedisPoolConfig); } /** * redis序列化 * * @param connectionFactory * @return */ @Bean public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory connectionFactory) { RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>(); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); redisTemplate.setConnectionFactory(connectionFactory); return redisTemplate; } }
測試
@SpringBootApplication @EnableDiscoveryClient @ComponentScan(value = {"com.annotaion", "cn.springcloud", "com.config", "com.redislock"}) public class Ch34EurekaClientApplication implements ApplicationRunner { private static BloomFilterHelper<CharSequence> bloomFilterHelper; @Autowired RedisBloomFilter redisBloomFilter; public static void main(String[] args) { SpringApplication.run(Ch34EurekaClientApplication.class, args); } @PostConstruct public void init() { bloomFilterHelper = new BloomFilterHelper<>(Funnels.stringFunnel(Charset.defaultCharset()), 1000, 0.1); } @Override public void run(ApplicationArguments args) throws Exception { //******* Redis集群測試布隆方法********* int j = 0; for (int i = 0; i < 100; i++) { redisBloomFilter.addByBloomFilter(bloomFilterHelper, "bloom", i+""); } for (int i = 0; i < 1000; i++) { boolean result = redisBloomFilter.includeByBloomFilter(bloomFilterHelper, "bloom", i+""); if (!result) { j++; } } System.out.println("漏掉了" + j + "個"); } }
完全符合我上面測試的預期結果,大家可以可以自行調節數量進行測試,另外實際生產中聲明過濾器的時候 size 設置大一點,一般一百萬,錯誤率設置 0.001。
三、總結
布隆過濾器
巧妙的使用hash算法和bitmap位存儲的方式,極大的節約了空間。
由於主要用的是hash算法的特點,所有滿足和hash算法相同的規則:當過濾器返回 true時(表示很有可能該值是存在的),有一定概率是誤判的,即可能不存在;當過濾器返回false時(表示確定不存在),是可以完全相信的。
我們換個數據的角度來看規則:當數據添加到布隆過濾器中時,對該數據的查詢一定會返回true;當數據沒有插入過濾器時,對該數據的查詢大部分情況返回false,但有小概率返回true,也就是誤判。
我們知道它最終滿足的規則和hash的規則是一致的,只是組合了多個hash,使用了bitmap來存儲,大大優化了存儲的空間和判斷的效率。
參考:
https://www.cnblogs.com/jwen1994/p/12264717.html
https://blog.csdn.net/weixin_38003389/article/details/89918747
https://www.jianshu.com/p/550278d10546