Springboot Redis pipeline


工作中經常使用redis作為隊列,但redis隊列彈出值時,只能逐個彈出,無法批量獲取數據,在數據量很大時,在連接的獲取和釋放占用了較多的時間,效率上不是很好,只能逐個入庫。Redis pipeline可以解決該問題,允許發送多個請求,批量獲取數據

Springboot pipeline

 springboot pipeline使用比較簡單,直接調用方法即可,如下

public List<String> getQueueValues(final String queueKey, final int getCount) {
        List<Object> result = stringRedisTemplate.executePipelined(new RedisCallback<List<String>>() {

            @Override
            public List<String> doInRedis(RedisConnection connection) throws DataAccessException {
                for (int i = 0; i < getCount; i++) {
                    connection.lPop(queueKey.getBytes());
                }
                return null;
            }

        });

        List<String> collect = null;
        if (result.size() > 0) {
            collect = result.stream().filter(item -> item != null).map(item -> item.toString())
                    .collect(Collectors.toList());
        }

        return collect;
    }

此處使用的為RedisConnection,但其真實類型為StringRedisConnection,如果有需要,可以將其轉換為StringRedisConnection,在進行后面操作。這里的返回值必須為null,否則將會出現如下錯誤

Caused by: org.springframework.dao.InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline
	at org.springframework.data.redis.core.RedisTemplate.lambda$executePipelined$1(RedisTemplate.java:330) ~[spring-data-redis-2.3.3.RELEASE.jar:2.3.3.RELEASE]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228) ~[spring-data-redis-2.3.3.RELEASE.jar:2.3.3.RELEASE]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) ~[spring-data-redis-2.3.3.RELEASE.jar:2.3.3.RELEASE]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:175) ~[spring-data-redis-2.3.3.RELEASE.jar:2.3.3.RELEASE]
	at org.springframework.data.redis.core.RedisTemplate.executePipelined(RedisTemplate.java:324) ~[spring-data-redis-2.3.3.RELEASE.jar:2.3.3.RELEASE]
	at org.springframework.data.redis.core.RedisTemplate.executePipelined(RedisTemplate.java:314) ~[spring-data-redis-2.3.3.RELEASE.jar:2.3.3.RELEASE]
	at com.redispro.pipleline.RedisUtil.getQueueValues(RedisUtil.java:30) ~[classes/:na]
	at com.redispro.pipleline.test.RedisUtilTest.run(RedisUtilTest.java:34) ~[classes/:na]
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:795) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
	... 5 common frames omitted

但上述代碼在使用時需要進行一些處理,比如現在隊列中只有1個值,但是每次都批量獲取500條數據,則也會組織500個pop命令,並且在返回的集合中也只有1個值,其他的都是null。可以在使用之前先判斷隊列大小,隊列大於500,則按照500,小於等於500,則按照實際大小來獲取,這樣可以避免每次都組織很多多余的命令。

添加入隊列方法

public void pushQueue(String queueKey, List<String> queueValues) {
        stringRedisTemplate.opsForList().rightPushAll(queueKey, queueValues);
    }

測試

List<String> list = new ArrayList<String>();

int count = 5;
for (int i = 0; i < count; i++) {
     list.add(StringUtils.join("queue_value", i));
}

redisUtil.pushQueue("testQueue", list);
logger.info("Pipeline result: {}", redisUtil.getQueueValues("testQueue", count));       

結果,先進先出,正常。

2020-08-16 18:34:26.459  INFO 8880 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : Pipeline result: [queue_value0, queue_value1, queue_value2, queue_value3, queue_value4]

Stringboot pipeline和逐個獲取隊列值用時對比

redis性能受到很多因素的影響,本次只是簡單的做下測試,可能不是很准確,不過可以簡單說明pipeline比單個獲取效率更高。

逐個獲取隊列值

public List<String> getQueueValuesSingle(String queueKey, int getCount) {

        List<String> result = new ArrayList<String>(getCount);
        String leftPop = null;
        for (int i = 0; i < getCount; i++) {
            leftPop = stringRedisTemplate.opsForList().leftPop(queueKey);
            result.add(leftPop);
        }
        return result;
    }

分別測試pipeline和單次逐個獲取

 1 List<String> list = new ArrayList<String>();
 2 
 3 int count = 100000;
 4 for (int i = 0; i < count; i++) {
 5      list.add(StringUtils.join("queue_value", i));
 6 }
 7 
 8 redisUtil.pushQueue("testQueue", list);
 9 long start = System.currentTimeMillis()
10 
11 redisUtil.getQueueValues("testQueue", count);
12 logger.info("pipe line use time: {}", System.currentTimeMillis() - start);
13 
14 //        redisUtil.getQueueCountSingle("testQueue", count);
15 //        logger.info("Single get batch use time: {}", System.currentTimeMillis() - start);

以上,先執行11和12行pipeline批量獲取,在注釋掉這兩行,放開14,15行注冊,測試單次逐個獲取用時

最終結果如下,不用關注具體的用時,因為不同的服務器,用時差別會很大,本次測試只是在一個vmware虛擬機上進行測試,pipeline比單詞逐個獲取快了18倍

2020-08-16 18:48:19.119  INFO 11292 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : pipe line use time: 2552
2020-08-16 18:49:44.255  INFO 2748 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : Single get batch use time: 47468

如果將上面條數改為100萬,效果如下,pipeline快了20倍左右

2020-08-16 18:53:43.731  INFO 1748 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : pipe line use time: 24179
2020-08-16 19:02:45.684  INFO 19632 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : Single get batch use time: 490412

但一般一次獲取不了這么多的數據,使用200

2020-08-16 19:06:23.569  INFO 17852 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : pipe line use time: 79
2020-08-16 19:06:52.363  INFO 16404 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : Single get batch use time: 113

100個

2020-08-16 19:12:10.427  INFO 11036 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : pipe line use time: 108
2020-08-16 19:12:37.291  INFO 20136 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : Single get batch use time: 65

10個

2020-08-16 19:07:25.762  INFO 16252 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : pipe line use time: 50
2020-08-16 19:08:18.523  INFO 19368 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : Single get batch use time: 8

1個

2020-08-16 19:10:16.268  INFO 7600 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : pipe line use time: 46
2020-08-16 19:11:03.790  INFO 19380 --- [           main] c.redispro.pipleline.test.RedisUtilTest  : Single get batch use time: 2

通過以上的測試,可以簡單看出,單從時間上來看,也不是在任何時候都適合使用pipeline的,只有在數據量大到一定程度,使用pipeline才會達到想要的目的,但是也不能根據上面的測試就斷定要超過多少個值,使用pipelne,具體還需要根據自己的服務器性能來確定。

備注

理論上,多個命令節省了線程創建和銷毀的開銷,只要是大於1個,就應該是pipeline執行的快,可能是pipeline在初始化時需要一些處理,后面有時間在測試一下,先跑一下pipeline,然后再執行批量。pipeline其實並不適合於不確定大小的隊列,因為pipeline是批量發送的腳本,所有的腳本也是批量返回,假設現在隊列中只有1個數據,每次都批量獲取500個,則有499個命令是多余執行的,且返回的集合中,有499個null值,如果在獲取之前先判斷下隊列長度,大於500,則按照500,小於500,按照真實長度獲取,則多線程情況下不安全,故pipeline在使用時,建議在獲取固定長度的集合中獲取數據或者沒有現成安全的情況下使用,如果是多線程從隊列獲取數據,還是需要使用lua腳本來控制,將獲取長度和獲取隊列值放到一個原子性的操作中比較合適

以下為參考資料

https://redis.io/topics/pipelining

https://docs.spring.io/spring-data/redis/docs/current/reference/html/#pipeline


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM