試想一下這樣的場景,當黑客故意訪問不存在的數據,導致程序不斷訪問DB數據庫的數據,數據庫會不會掛掉?答案是會的。所以為了避免這種情況發生,當黑客訪問不存在的緩存時能夠迅速返回避免緩存及DB掛掉,引出了今天講的布隆過濾器。
布隆過濾器(Bloom Filter)是1970年由布隆提出的。它實際上是一個很長的二進制向量和一系列隨機映射函數。布隆過濾器可以用於檢索一個元素是否在一個集合中。它的優點是空間效率和查詢時間都遠遠超過一般的算法,缺點是有一定的誤識別率和刪除困難。
優點:相比於其它的數據結構,布隆過濾器在空間和時間方面都有巨大的優勢。布隆過濾器存儲空間和插入/查詢時間都是常數。另外,散列函數相互之間沒有關系,方便由硬件並行實現。布隆過濾器不需要存儲元素本身,在某些對保密要求非常嚴格的場合有優勢
缺點:布隆過濾器的缺點和優點一樣明顯。誤算率是其中之一。隨着存入的元素數量增加,誤算率隨之增加。但是如果元素數量太少,則使用散列表足矣
Spring Boot 實現谷歌布隆過濾器——以會員抽獎為例
步驟一:引入依賴
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>21.0</version> </dependency>
步驟二:將需要判斷數據是否存在的key值
@Service public class BloomFilterService { @Resource private SysUserMapper sysUserMapper; private BloomFilter<Integer> bf; /*** * PostConstruct 程序啟動時候加載此方法 */ @PostConstruct public void initBloomFilter() { SysUserExample sysUserExample = new SysUserExample(); List<SysUser> sysUserList = sysUserMapper.selectByExample(sysUserExample); if(CollectionUtils.isEmpty(sysUserList)){ return; } //創建布隆過濾器(默認3%誤差) bf = BloomFilter.create(Funnels.integerFunnel(),sysUserList.size()); for (SysUser sysUser:sysUserList) { bf.put(sysUser.getId()); } } /*** * 判斷id可能存在於布隆過濾器里面 * @param id * @return */ public boolean userIdExists(int id){ return bf.mightContain(id); } }
步驟三:進行測試
@RestController public class BloomFilterController { @Resource private BloomFilterService bloomFilterService; @RequestMapping("/bloom/idExists") public boolean ifExists(int id){ return bloomFilterService.userIdExists(id); } }
基於內存的 google 布隆過濾器的缺陷與思考
- 重啟即失效
- 本地內存無法用在分布式場景
- 不支持大數據量存儲
為了解決這些問題,我們可以使用 Redis 布隆過濾器,它的好處有:
- 可擴展性Bloom過濾器
- 一旦Bloom過濾器達到容量,就會在其上創建一個新的過濾器
- 不存在重啟即失效或者定時任務維護的成本
- 基於goole實現的布隆過濾器需要啟動之后初始化布隆過濾器
它的缺點:需要網絡 IO,性能比基於內存的過濾器低
優先基於數據量進行考慮選擇哪個布隆過濾器
基於 Lua 腳本實現 Spring Boot 和布隆過濾器的整合
步驟一:編寫兩個 Lua 腳本
bloomFilterAdd.lua
local bloomName = KEYS[1] local value = KEYS[2] -- bloomFilter local result_1 = redis.call('BF.ADD', bloomName, value) return result_1
bloomFilterExist.lua
local bloomName = KEYS[1] local value = KEYS[2] -- bloomFilter local result_1 = redis.call('BF.EXISTS', bloomName, value) return result_1
步驟二:新建兩個方法
1)添加數據到指定名稱的布隆過濾器(bloomFilterAdd)
2)從指定名稱的布隆過濾器獲取 key 是否存在的腳本(bloomFilterExists)
@Service public class RedisService { @Autowired private RedisTemplate redisTemplate; private static final String bloomFilterName = "isVipBloom"; public Boolean bloomFilterAdd(int value){ DefaultRedisScript<Boolean> bloomAdd = new DefaultRedisScript<>(); bloomAdd.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterAdd.lua"))); bloomAdd.setResultType(Boolean.class); List<Object> keyList= new ArrayList<>(); keyList.add(bloomFilterName); keyList.add(value+""); Boolean result = (Boolean) redisTemplate.execute(bloomAdd,keyList); return result; } public Boolean bloomFilterExists(int value){ DefaultRedisScript<Boolean> bloomExists= new DefaultRedisScript<>(); bloomExists.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterExist.lua"))); bloomExists.setResultType(Boolean.class); List<Object> keyList= new ArrayList<>(); keyList.add(bloomFilterName); keyList.add(value+""); Boolean result = (Boolean) redisTemplate.execute(bloomExists,keyList); return result; } }
步驟三:進行測試
@RestController public class BloomFilterController { @Resource private RedisService redisService; @RequestMapping("/bloom/redisIdExists") public boolean redisidExists(int id){ return redisService.bloomFilterExists(id); } @RequestMapping("/bloom/redisIdAdd") public boolean redisidAdd(int id){ return redisService.bloomFilterAdd(id); } }
實現一個秒殺業務
1)利用 Redis 緩存 incr 攔截流量
首先通過數據控制模塊,提前將秒殺商品緩存到讀寫分離 Redis,並設置秒殺開始標記如下:
- skuId_start: 0 開始標記,0表示秒殺還沒開始
- skuId_count: 10000 表示總數
- skuId_access: 12000 表示接受搶購數
秒殺開始前,服務集群讀取 skuId_start 為 0,直接返回未開始。之所以設置這個值而不是根據時間判斷是否開始,是因為服務時間可能不一致(相差幾百毫秒)這樣可能導致流量傾斜(其他服務沒開始,會將大量的流量堆積到開始的服務上)
數據控制模塊將 skuId_start 改為1,標志秒殺開始。
當接受下單數達到 skuId_count*1.2 后,繼續攔截所有請求。
2)利用 Redis 緩存加速庫存扣量
- skuId_booked: 0 表示沒有搶購
3)將用戶訂單數據寫入mq
4)監聽mq入庫
代碼實現
@Service public class SeckillService { private static final String secStartPrefix = "skuId_start_"; private static final String secAccess = "skuId_access_"; private static final String secCount = "skuId_count_"; private static final String filterName = "skuId_bloomfilter_"; private static final String bookedName = "skuId_booked_"; @Resource private RedisService redisService; public String seckill(int uid, int skuId) { //流量攔截層 //1、判斷秒殺是否開始 0_1554045087 開始標識_開始時間 String isStart = (String) redisService.get(secStartPrefix + skuId); if (StringUtils.isBlank(isStart)) { return "還未開始"; } if (isStart.contains("_")) { Integer isStartInt = Integer.parseInt(isStart.split("_")[0]); Integer startTime = Integer.parseInt(isStart.split("_")[1]); if (isStartInt == 0) { if (startTime > getNow()) { return "還未開始"; } else { //代表秒殺已經開始 redisService.set(secStartPrefix + skuId, 1 + ""); } } else { return "系統異常"; } } else { if (Integer.parseInt(isStart) != 1) { return "系統異常"; } } //2、流量攔截 String skuIdAccessName = secAccess + skuId; Integer accessNumInt = 0; String accessNum = (String) redisService.get(skuIdAccessName); if (StringUtils.isNotBlank(accessNum)) { accessNumInt = Integer.parseInt(accessNum); } String skuIdCountName = secCount + skuId; Integer countNumInt = Integer.parseInt((String) redisService.get(skuIdCountName)); if (countNumInt * 1.2 < accessNumInt) { return "搶購已經完成,歡迎下次參與"; } else { redisService.incr(skuIdAccessName); } //信息校驗層 if (redisService.bloomFilterExists(filterName, uid)) { return "您已經搶購過該商品,請勿重復下發!"; } else { redisService.bloomFilterAdd(filterName, uid); } Boolean isSuccess = redisService.getAndIncrLua(bookedName + skuId); if (isSuccess) { return "恭喜您搶購成功!!!"; } else { return "搶購結束,歡迎下次參與"; } } private long getNow() { return System.currentTimeMillis() / 1000; } }
RedisService

@Service public class RedisService { @Autowired private RedisTemplate redisTemplate; private static double size = Math.pow(2, 32); /** * 寫入緩存 * * @param key * @param offset 位 8Bit=1Byte * @return */ public boolean setBit(String key, long offset, boolean isShow) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.setBit(key, offset, isShow); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 寫入緩存 * * @param key * @param offset * @return */ public boolean getBit(String key, long offset) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); result = operations.getBit(key, offset); } catch (Exception e) { e.printStackTrace(); } return result; } /** * 寫入緩存 * * @param key * @param value * @return */ public boolean set(final String key, Object value) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); redisTemplate.opsForList(); operations.set(key, value); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 寫入緩存 * * @param key * @return */ public Object get(final String key) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); return operations.get(key); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 寫入緩存 * * @param key * @param value * @return */ public boolean decr(final String key, int value) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.increment(key, -value); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 寫入緩存 * * @param key * @return */ public boolean incr(final String key) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.increment(key, 1); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 寫入緩存設置時效時間 * * @param key * @param value * @return */ public boolean set(final String key, Object value, Long expireTime) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.set(key, value); redisTemplate.expire(key, expireTime, TimeUnit.SECONDS); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 批量刪除對應的value * * @param keys */ public void remove(final String... keys) { for (String key : keys) { remove(key); } } /** * 刪除對應的value * * @param key */ public void remove(final String key) { if (exists(key)) { redisTemplate.delete(key); } } /** * 判斷緩存中是否有對應的value * * @param key * @return */ public boolean exists(final String key) { return redisTemplate.hasKey(key); } /** * 讀取緩存 * * @param key * @return */ public Object genValue(final String key) { Object result = null; ValueOperations<String, String> operations = redisTemplate.opsForValue(); result = operations.get(key); return result; } /** * 哈希 添加 * * @param key * @param hashKey * @param value */ public void hmSet(String key, Object hashKey, Object value) { HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); hash.put(key, hashKey, value); } /** * 哈希獲取數據 * * @param key * @param hashKey * @return */ public Object hmGet(String key, Object hashKey) { HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); return hash.get(key, hashKey); } /** * 列表添加 * * @param k * @param v */ public void lPush(String k, Object v) { ListOperations<String, Object> list = redisTemplate.opsForList(); list.rightPush(k, v); } /** * 列表獲取 * * @param k * @param l * @param l1 * @return */ public List<Object> lRange(String k, long l, long l1) { ListOperations<String, Object> list = redisTemplate.opsForList(); return list.range(k, l, l1); } /** * 集合添加 * * @param key * @param value */ public void add(String key, Object value) { SetOperations<String, Object> set = redisTemplate.opsForSet(); set.add(key, value); } /** * 集合獲取 * * @param key * @return */ public Set<Object> setMembers(String key) { SetOperations<String, Object> set = redisTemplate.opsForSet(); return set.members(key); } /** * 有序集合添加 * * @param key * @param value * @param scoure */ public void zAdd(String key, Object value, double scoure) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); zset.add(key, value, scoure); } /** * 有序集合獲取 * * @param key * @param scoure * @param scoure1 * @return */ public Set<Object> rangeByScore(String key, double scoure, double scoure1) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); redisTemplate.opsForValue(); return zset.rangeByScore(key, scoure, scoure1); } //第一次加載的時候將數據加載到redis中 public void saveDataToRedis(String name) { double index = Math.abs(name.hashCode() % size); long indexLong = new Double(index).longValue(); boolean availableUsers = setBit("availableUsers", indexLong, true); } //第一次加載的時候將數據加載到redis中 public boolean getDataToRedis(String name) { double index = Math.abs(name.hashCode() % size); long indexLong = new Double(index).longValue(); return getBit("availableUsers", indexLong); } /** * 有序集合獲取排名 * * @param key 集合名稱 * @param value 值 */ public Long zRank(String key, Object value) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); return zset.rank(key, value); } /** * 有序集合獲取排名 * * @param key */ public Set<ZSetOperations.TypedTuple<Object>> zRankWithScore(String key, long start, long end) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); Set<ZSetOperations.TypedTuple<Object>> ret = zset.rangeWithScores(key, start, end); return ret; } /** * 有序集合添加 * * @param key * @param value */ public Double zSetScore(String key, Object value) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); return zset.score(key, value); } /** * 有序集合添加分數 * * @param key * @param value * @param scoure */ public void incrementScore(String key, Object value, double scoure) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); zset.incrementScore(key, value, scoure); } /** * 有序集合獲取排名 * * @param key */ public Set<ZSetOperations.TypedTuple<Object>> reverseZRankWithScore(String key, long start, long end) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); Set<ZSetOperations.TypedTuple<Object>> ret = zset.reverseRangeByScoreWithScores(key, start, end); return ret; } /** * 有序集合獲取排名 * * @param key */ public Set<ZSetOperations.TypedTuple<Object>> reverseZRankWithRank(String key, long start, long end) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); Set<ZSetOperations.TypedTuple<Object>> ret = zset.reverseRangeWithScores(key, start, end); return ret; } public Boolean bloomFilterAdd(String filterName, int value) { DefaultRedisScript<Boolean> bloomAdd = new DefaultRedisScript<>(); bloomAdd.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterAdd.lua"))); bloomAdd.setResultType(Boolean.class); List<Object> keyList = new ArrayList<>(); keyList.add(filterName); keyList.add(value + ""); Boolean result = (Boolean) redisTemplate.execute(bloomAdd, keyList); return result; } public Boolean bloomFilterExists(String filterName, int value) { DefaultRedisScript<Boolean> bloomExists = new DefaultRedisScript<>(); bloomExists.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterExist.lua"))); bloomExists.setResultType(Boolean.class); List<Object> keyList = new ArrayList<>(); keyList.add(filterName); keyList.add(value + ""); Boolean result = (Boolean) redisTemplate.execute(bloomExists, keyList); return result; } public Boolean getAndIncrLua(String key) { DefaultRedisScript<Boolean> bloomExists = new DefaultRedisScript<>(); bloomExists.setScriptSource(new ResourceScriptSource(new ClassPathResource("secKillIncr.lua"))); bloomExists.setResultType(Boolean.class); List<Object> keyList = new ArrayList<>(); keyList.add(key); Boolean result = (Boolean) redisTemplate.execute(bloomExists, keyList); return result; } }
secKillIncr.lua
local lockKey = KEYS[1] -- get info local result_1 = redis.call('GET', lockKey) if tonumber(result_1) <10000 then local result_2= redis.call('INCR', lockKey) return result_1 else return result_1 end
測試:
@RestController public class SeckillController { @Resource private SeckillService seckillService; @RequestMapping("/redis/seckill") public String secKill(int uid,int skuId){ return seckillService.seckill(uid,skuId); } }