Redis實現布隆過濾器


前面講到bloomfilter的原理及guava實現的bloomfilter的用法,現在看看redis如何實現:

 

一、bitmaps

  我們知道計算機是以二進制位作為底層存儲的基礎單位,一個字節等於8位。

  比如“big”字符串是由三個字符組成的,這三個字符對應的ASCII碼分為是98、105、103,對應的二進制存儲如下:

  

在Redis中,Bitmaps 提供了一套命令用來操作類似上面字符串中的每一個位,這種位存儲可以極大的降低redis的內存。位操作常用的命令為:

SETBIT KEY OFFSET VALUE 
GETBIT KEY OFFSET

一、設置值

setbit key offset value

我們知道"b"的二進制表示為0110 0010,我們將第7位(從0開始)設置為1,那0110 0011 表示的就是字符“c”,所以最后的字符 “big”變成了“cig”。

二、獲取值

gitbit key offset

 

三、獲取位圖指定范圍值為1的個數

bitcount key [start end]

  如果不指定,那就是獲取全部值為1的個數。

  注意:start和end指定的是字節的個數,而不是位數組下標。

  

二、redis中實現bloomfilter方式

這種結構如果和BloomFilter 結合起來就可以實現分布式的布隆過濾器了。

2.1、Redisson

  Redis 實現布隆過濾器的底層就是通過 bitmap 這種數據結構,至於如何實現,這里就不重復造輪子了,介紹業界比較好用的一個客戶端工具——Redisson。

  Redisson 是用於在 Java 程序中操作 Redis 的庫,利用Redisson 我們可以在程序中輕松地使用 Redis。

  下面我們就通過 Redisson 來構造布隆過濾器。

import org.redisson.Redisson;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

public class RedissonBloomFilter {

    public static void main(String[] args) {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.14.104:6379");
        config.useSingleServer().setPassword("123");
        //構造Redisson
        RedissonClient redisson = Redisson.create(config);

        RBloomFilter<String> bloomFilter = redisson.getBloomFilter("phoneList");
        //初始化布隆過濾器:預計元素為100000000L,誤差率為3%
        bloomFilter.tryInit(100000000L,0.03);
        //將號碼10086插入到布隆過濾器中
        bloomFilter.add("10086");

        //判斷下面號碼是否在布隆過濾器中
        System.out.println(bloomFilter.contains("123456"));//false
        System.out.println(bloomFilter.contains("10086"));//true
    }
}

這是單節點的Redis實現方式,如果數據量比較大,期望的誤差率又很低,那單節點所提供的內存是無法滿足的,這時候可以使用分布式布隆過濾器,同樣也可以用 Redisson 來實現,這里我就不做代碼演示了,大家有興趣可以試試。

springboot使用bloomfilter的方法:

2.2、redisTemplate + Lua(redis4.0)

Redis 4.0的布隆過濾器插件

步驟一:編寫兩個 Lua 腳本

bloomFilterAdd.lua

local bloomName = KEYS[1]
local value = KEYS[2]

-- bloomFilter
local result_1 = redis.call('BF.ADD', bloomName, value)
return result_1

bloomFilterExist.lua

local bloomName = KEYS[1]
local value = KEYS[2]

-- bloomFilter
local result_1 = redis.call('BF.EXISTS', bloomName, value)
return result_1

步驟二:新建兩個方法

1)添加數據到指定名稱的布隆過濾器(bloomFilterAdd)

2)從指定名稱的布隆過濾器獲取 key 是否存在的腳本(bloomFilterExists)

java封裝:

    public Boolean bloomFilterAdd(int value){
        DefaultRedisScript<Boolean> bloomAdd = new DefaultRedisScript<>();
        bloomAdd.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterAdd.lua")));
        bloomAdd.setResultType(Boolean.class);
        List<Object> keyList= new ArrayList<>();
        keyList.add(bloomFilterName);
        keyList.add(value+"");
        Boolean result = (Boolean) redisTemplate.execute(bloomAdd,keyList);
        return result;
    }
    public Boolean bloomFilterAdd(String bloomName,int value){
        DefaultRedisScript<Boolean> bloomAdd = new DefaultRedisScript<>();
        bloomAdd.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterAdd.lua")));
        bloomAdd.setResultType(Boolean.class);
        List<Object> keyList= new ArrayList<>();
        keyList.add(bloomName);
        keyList.add(value+"");
        Boolean result = (Boolean) redisTemplate.execute(bloomAdd,keyList);
        return result;
    }


    public Boolean bloomFilterExists(int value) {
        DefaultRedisScript<Boolean> bloomExists = new DefaultRedisScript<>();
        bloomExists.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterExist.lua")));
        bloomExists.setResultType(Boolean.class);
        List<Object> keyList = new ArrayList<>();
        keyList.add(bloomFilterName);
        keyList.add(value + "");
        Boolean result = (Boolean) redisTemplate.execute(bloomExists, keyList);
        return result;
    }

    public Boolean bloomFilterExists(String bloomName,int value) {
        DefaultRedisScript<Boolean> bloomExists = new DefaultRedisScript<>();
        bloomExists.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterExist.lua")));
        bloomExists.setResultType(Boolean.class);
        List<Object> keyList = new ArrayList<>();
        keyList.add(bloomName);
        keyList.add(value + "");
        Boolean result = (Boolean) redisTemplate.execute(bloomExists, keyList);
        return result;
    }

步驟三:進行測試

@RestController
public class BloomFilterController {
    @Resource
    private RedisService redisService;

    @RequestMapping("/bloom/redisIdExists")
    public boolean redisidExists(int id){
        return redisService.bloomFilterExists(id);
    }

    @RequestMapping("/bloom/redisIdAdd")
    public boolean redisidAdd(int id){
        return redisService.bloomFilterAdd(id);
    }
}

2.3、通過JedisCluster(集群)或redisTemplate的setbit/getbit來操作bitmap 模擬guava的bloomfilter實現

如果自己實現的話,借鑒guava的bloomfilter思路為:

  1. 對校驗的對象做K次hash得到位移offset
  2. 調用getbit 命令檢查是不是每次返回的值都是1
  3. 如果返回K個1表示這個對象已經被存儲過
  4. 如果沒有的話, 可以對該對象進行存儲

經過上述講解, 流程和邏輯基本都差不多了,萬事俱備開始擼碼:
因為我們在使用布隆過濾器之前, 我們可以預先預估誤判率P和想要插入的個數n

2.3.1、關鍵方法說明

2.3.1.1、計算獲bitMap預分配的長度

從《Bloom Filter(布隆過濾器)的概念和原理》中公式可以推算bit 的長度, 但是需要注意的是公式計算出來的是浮點數:

    /**
     * 計算bit數組的長度,
     * m = -n * Math.log(p)/Math.pow(ln2,2)
     * @param n 插入條數
     * @param p 誤判概率
     */
    private int numOfBits(int n, double p) {
        if (p == 0) {
            p = Double.MIN_VALUE;
        }
        int sizeOfBitArray = (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
        return sizeOfBitArray;
    }

 

2.3.1.2、計算hash的次數
    /**
     * 計算hash方法執行次數
     * k = m/n*ln2
     * @param n 插入的數據條數
     * @param m 數據位數
     */
    private int numberOfHashFunctions(long n, long m) {
        int countOfHash = Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
        return countOfHash;
    }

 

2.3.1.3、獲取hash函數計算之后的位移集合

這個hash函數采用的是guava中的murmur函數

    public int[] murmurHashOffset(T value) {
        int[] offset = new int[numHashFunctions];
 
        long hash64 = Hashing.murmur3_128().hashObject(value, funnel).asLong();
        int hash1 = (int) hash64;
        int hash2 = (int) (hash64 >>> 32);
        for (int i = 1; i <= numHashFunctions; i++) {
            int nextHash = hash1 + i * hash2;
            if (nextHash < 0) {
                nextHash = ~nextHash;
            }
            offset[i - 1] = nextHash % bitSize;
        }
 
        return offset;
    }

 

2.3.2、完整代碼

完整的bloomfilter的封裝如下:

import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;
import org.springframework.beans.factory.annotation.Configurable;

@Configurable
public class BloomFilterHelper<T> {
    // hash函數的次數
    private int numHashFunctions;
    // bit長度
    private int bitSize;
    private Funnel<T> funnel;

    public BloomFilterHelper(Funnel<T> funnel, int expectedInsertions, double fpp) {
        Preconditions.checkArgument(funnel != null, "funnel不能為空");
        this.funnel = funnel;
        bitSize = optimalNumOfBits(expectedInsertions, fpp);
        numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, bitSize);
    }

    public int[] murmurHashOffset(T value) {
        int[] offset = new int[numHashFunctions];

        long hash64 = Hashing.murmur3_128().hashObject(value, funnel).asLong();
        int hash1 = (int) hash64;
        int hash2 = (int) (hash64 >>> 32);
        for (int i = 1; i <= numHashFunctions; i++) {
            int nextHash = hash1 + i * hash2;
            if (nextHash < 0) {
                nextHash = ~nextHash;
            }
            offset[i - 1] = nextHash % bitSize;
        }

        return offset;
    }

    /**
     * 計算bit數組的長度, m = -n * Math.log(p)/Math.pow(ln2,2)
     * 
     * @param n
     *            插入條數
     * @param p
     *            誤判概率
     */
    private int optimalNumOfBits(int n, double p) {
        if (p == 0) {
            p = Double.MIN_VALUE;
        }
        // sizeOfBitArray
        return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
    }

    /**
     * 計算hash方法執行次數 k = m/n*ln2
     * 
     * @param n
     *            插入的數據條數
     * @param m
     *            數據位數
     */
    private int optimalNumOfHashFunctions(long n, long m) {
        // countOfHash
        return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
    }

}

單機的布隆過濾器已經建好了, 接下來就是和redis整合了, 

先看不用lua的封裝的類:

import com.config.BloomFilterHelper;
import com.google.common.base.Preconditions;
import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisCluster;
 
@Component
public class RedisBloomFilter<T> {
    private JedisCluster cluster;
 
    public RedisBloomFilter(JedisCluster jedisCluster) {
        this.cluster = jedisCluster;
    }
 
    /**
     * 根據給定的布隆過濾器添加值
     */
    public <T> void addByBloomFilter(BloomFilterHelper<T> bloomFilterHelper, String key, T value) {
        Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能為空");
        int[] offset = bloomFilterHelper.murmurHashOffset(value);
        for (int i : offset) {
            //redisTemplate.opsForValue().setBit(key, i, true);
            cluster.setbit(key, i, true); }
    }
 
    /**
     * 根據給定的布隆過濾器判斷值是否存在
     */
    public <T> boolean includeByBloomFilter(BloomFilterHelper<T> bloomFilterHelper, String key, T value) {
        Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能為空");
        int[] offset = bloomFilterHelper.murmurHashOffset(value);
        for (int i : offset) {
            //if (!redisTemplate.opsForValue().getBit(key, i)) {
            if (!cluster.getbit(key, i)) {
                return false;
            }
        }
 
        return true;
    }
}

原子性問題,上面的紅色代碼段 由於可能會有多次的setbit操作,這樣可能會發生多次的網絡請求, 不一定執行成功。

所以考慮的是用lua腳本來執行:

  private static final String GET_BIT_LUA = "for i=1,#ARGV\n" +
            "do\n" +
            "    local value =  redis.call(\"GETBIT\", KEYS[1], ARGV[i])\n" +
            "    if value == 0\n" +
            "    then\n" +
            "        return 0\n" +
            "    end\n" +
            "end\n" +
            "return 1";

    private static final String SET_BIT_LUA = "for i=1, #ARGV\n" +
            "do\n" +
            "    redis.call(\"SETBIT\",KEYS[1], ARGV[i],1)\n" +
            "end\n";

布隆過濾器的插入和判斷操作分別如下:

public static <T> void addByBloomFilter(IRedisHelper redisHelper, BloomFilterHelper<T> bloomFilterHelper, Object key, T value) {
        Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能為空");
        List<Long> offsetList = bloomFilterHelper.murmurHashOffset(value);
        if(CollectionUtils.isEmpty(offsetList)){
            return ;
        } 
       redisHelper.eval(routeKey, SET_BIT_LUA, Lists.newArrayList(key.getRawKey()), offsetList);
    }

    /**
     * 根據給定的布隆過濾器判斷值是否存在
     */
    public static <T> boolean includeByBloomFilter(IRedisHelper redisHelper, BloomFilterHelper<T> bloomFilterHelper, Object key, T value) {
        Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能為空");
        List<Long> offsetList = bloomFilterHelper.murmurHashOffset(value);
        if(CollectionUtils.isEmpty(offsetList)){
            return false;
        }

      String result = String.valueOf(eval);
      if("1".equalsIgnoreCase(result)){
        return true;
       }        
    return false;
    }

 

對於redis的bitmap 存在一個問題,就是內存初始化的問題,
下面是來自官方的原話:

 When setting the last possible bit (offset equal to 2^32 -1) and the string value stored at key does not yet 
hold a string value, or holds a small string value, Redis needs to allocate all intermediate memory which can
 block the server for some time. On a 2010 MacBook Pro, setting bit number 2^32 -1 (512MB allocation) 
takes ~300ms, setting bit number 2^30 -1 (128MB allocation) takes ~80ms, 
setting bit number 2^28 -1 (32MB allocation) takes ~30ms and setting bit number 2^26 -1 (8MB allocation) takes ~8ms.

 

如果bitmap的長度是2^32的話,可能需要300ms 分配內存, 2^30 需要80ms, 2^28需要30ms, 2&26只需要8ms, 假如項目需要對性能和延遲有要求, 那么如何分配這個bitmap是個需要考慮的問題。

redis集群配置:

spring.redis.cluster.nodes=127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005
spring.redis.password=
#連接池最大連接數(使用負值表示沒有限制)
spring.redis.pool.max-active=8
#連接池最大阻塞等待時間(使用負值表示沒有限制)
spring.redis.pool.max-wait=-1
#連接池中的最大空閑連接
spring.redis.pool.max-idle=8
#連接池中的最小空閑連接
spring.redis.pool.min-idle=0
#連接超時時間(毫秒)
spring.redis.timeout=0

 

@Configuration
public class RedisConfig {
    private Logger logger = LoggerFactory.getLogger(RedisConfig.class);
 
    @Value("${spring.redis.cluster.nodes}")
    private String clusterNodes;
    @Value("${spring.redis.timeout}")
    private int timeout;
    @Value("${spring.redis.pool.max-idle}")
    private int maxIdle;
    @Value("${spring.redis.pool.max-wait}")
    private long maxWaitMillis;
    @Value("${spring.redis.commandTimeout}")
    private int commandTimeout;
 
    @Bean
    public JedisCluster getJedisCluster() {
        String[] cNodes = clusterNodes.split(",");
        Set<HostAndPort> nodes = new HashSet<>();
        // 分割出集群節點
        for (String node : cNodes) {
            String[] hp = node.split(":");
            nodes.add(new HostAndPort(hp[0], Integer.parseInt(hp[1])));
        }
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(maxIdle);
        jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
        return new JedisCluster(nodes, commandTimeout, jedisPoolConfig);
 
    }
 
 
    /**
     * redis序列化
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory connectionFactory) {
        RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setConnectionFactory(connectionFactory);
        return redisTemplate;
    }
 
}

測試

@SpringBootApplication
@EnableDiscoveryClient
@ComponentScan(value = {"com.annotaion", "cn.springcloud", "com.config", "com.redislock"})
public class Ch34EurekaClientApplication implements ApplicationRunner {
 
    private static BloomFilterHelper<CharSequence> bloomFilterHelper;
 
    @Autowired
    RedisBloomFilter redisBloomFilter;
 
    public static void main(String[] args) {
        SpringApplication.run(Ch34EurekaClientApplication.class, args);
 
    }
 
    @PostConstruct
    public void init() {
        bloomFilterHelper = new BloomFilterHelper<>(Funnels.stringFunnel(Charset.defaultCharset()), 1000, 0.1);
    }
 
    @Override
    public void run(ApplicationArguments args) throws Exception {
 
 
 
        //******* Redis集群測試布隆方法*********
        int j = 0;
        for (int i = 0; i < 100; i++) {
            redisBloomFilter.addByBloomFilter(bloomFilterHelper, "bloom", i+"");
        }
        for (int i = 0; i < 1000; i++) {
            boolean result = redisBloomFilter.includeByBloomFilter(bloomFilterHelper, "bloom", i+"");
            if (!result) {
                j++;
            }
        }
        System.out.println("漏掉了" + j + "個");
    }
}

完全符合我上面測試的預期結果,大家可以可以自行調節數量進行測試,另外實際生產中聲明過濾器的時候 size 設置大一點,一般一百萬,錯誤率設置 0.001。

 

三、總結

布隆過濾器

       巧妙的使用hash算法和bitmap位存儲的方式,極大的節約了空間。

       由於主要用的是hash算法的特點,所有滿足和hash算法相同的規則:當過濾器返回 true時(表示很有可能該值是存在的),有一定概率是誤判的,即可能不存在;當過濾器返回false時(表示確定不存在),是可以完全相信的。

       我們換個數據的角度來看規則:當數據添加到布隆過濾器中時,對該數據的查詢一定會返回true;當數據沒有插入過濾器時,對該數據的查詢大部分情況返回false,但有小概率返回true,也就是誤判。

   我們知道它最終滿足的規則和hash的規則是一致的,只是組合了多個hash,使用了bitmap來存儲,大大優化了存儲的空間和判斷的效率。

 

參考:

 https://www.cnblogs.com/jwen1994/p/12264717.html

https://blog.csdn.net/weixin_38003389/article/details/89918747

https://www.jianshu.com/p/550278d10546


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM