工作中經常使用redis作為隊列,但redis隊列彈出值時,只能逐個彈出,無法批量獲取數據,在數據量很大時,在連接的獲取和釋放占用了較多的時間,效率上不是很好,只能逐個入庫。Redis pipeline可以解決該問題,允許發送多個請求,批量獲取數據
Springboot pipeline
springboot pipeline使用比較簡單,直接調用方法即可,如下
public List<String> getQueueValues(final String queueKey, final int getCount) { List<Object> result = stringRedisTemplate.executePipelined(new RedisCallback<List<String>>() { @Override public List<String> doInRedis(RedisConnection connection) throws DataAccessException { for (int i = 0; i < getCount; i++) { connection.lPop(queueKey.getBytes()); } return null; } }); List<String> collect = null; if (result.size() > 0) { collect = result.stream().filter(item -> item != null).map(item -> item.toString()) .collect(Collectors.toList()); } return collect; }
此處使用的為RedisConnection,但其真實類型為StringRedisConnection,如果有需要,可以將其轉換為StringRedisConnection,在進行后面操作。這里的返回值必須為null,否則將會出現如下錯誤
Caused by: org.springframework.dao.InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline at org.springframework.data.redis.core.RedisTemplate.lambda$executePipelined$1(RedisTemplate.java:330) ~[spring-data-redis-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228) ~[spring-data-redis-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) ~[spring-data-redis-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:175) ~[spring-data-redis-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.data.redis.core.RedisTemplate.executePipelined(RedisTemplate.java:324) ~[spring-data-redis-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.data.redis.core.RedisTemplate.executePipelined(RedisTemplate.java:314) ~[spring-data-redis-2.3.3.RELEASE.jar:2.3.3.RELEASE] at com.redispro.pipleline.RedisUtil.getQueueValues(RedisUtil.java:30) ~[classes/:na] at com.redispro.pipleline.test.RedisUtilTest.run(RedisUtilTest.java:34) ~[classes/:na] at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:795) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE] ... 5 common frames omitted
但上述代碼在使用時需要進行一些處理,比如現在隊列中只有1個值,但是每次都批量獲取500條數據,則也會組織500個pop命令,並且在返回的集合中也只有1個值,其他的都是null。可以在使用之前先判斷隊列大小,隊列大於500,則按照500,小於等於500,則按照實際大小來獲取,這樣可以避免每次都組織很多多余的命令。
添加入隊列方法
public void pushQueue(String queueKey, List<String> queueValues) { stringRedisTemplate.opsForList().rightPushAll(queueKey, queueValues); }
測試
List<String> list = new ArrayList<String>(); int count = 5; for (int i = 0; i < count; i++) { list.add(StringUtils.join("queue_value", i)); } redisUtil.pushQueue("testQueue", list); logger.info("Pipeline result: {}", redisUtil.getQueueValues("testQueue", count));
結果,先進先出,正常。
2020-08-16 18:34:26.459 INFO 8880 --- [ main] c.redispro.pipleline.test.RedisUtilTest : Pipeline result: [queue_value0, queue_value1, queue_value2, queue_value3, queue_value4]
Stringboot pipeline和逐個獲取隊列值用時對比
redis性能受到很多因素的影響,本次只是簡單的做下測試,可能不是很准確,不過可以簡單說明pipeline比單個獲取效率更高。
逐個獲取隊列值
public List<String> getQueueValuesSingle(String queueKey, int getCount) { List<String> result = new ArrayList<String>(getCount); String leftPop = null; for (int i = 0; i < getCount; i++) { leftPop = stringRedisTemplate.opsForList().leftPop(queueKey); result.add(leftPop); } return result; }
分別測試pipeline和單次逐個獲取
1 List<String> list = new ArrayList<String>(); 2 3 int count = 100000; 4 for (int i = 0; i < count; i++) { 5 list.add(StringUtils.join("queue_value", i)); 6 } 7 8 redisUtil.pushQueue("testQueue", list); 9 long start = System.currentTimeMillis() 10 11 redisUtil.getQueueValues("testQueue", count); 12 logger.info("pipe line use time: {}", System.currentTimeMillis() - start); 13 14 // redisUtil.getQueueCountSingle("testQueue", count); 15 // logger.info("Single get batch use time: {}", System.currentTimeMillis() - start);
以上,先執行11和12行pipeline批量獲取,在注釋掉這兩行,放開14,15行注冊,測試單次逐個獲取用時
最終結果如下,不用關注具體的用時,因為不同的服務器,用時差別會很大,本次測試只是在一個vmware虛擬機上進行測試,pipeline比單詞逐個獲取快了18倍
2020-08-16 18:48:19.119 INFO 11292 --- [ main] c.redispro.pipleline.test.RedisUtilTest : pipe line use time: 2552
2020-08-16 18:49:44.255 INFO 2748 --- [ main] c.redispro.pipleline.test.RedisUtilTest : Single get batch use time: 47468
如果將上面條數改為100萬,效果如下,pipeline快了20倍左右
2020-08-16 18:53:43.731 INFO 1748 --- [ main] c.redispro.pipleline.test.RedisUtilTest : pipe line use time: 24179
2020-08-16 19:02:45.684 INFO 19632 --- [ main] c.redispro.pipleline.test.RedisUtilTest : Single get batch use time: 490412
但一般一次獲取不了這么多的數據,使用200
2020-08-16 19:06:23.569 INFO 17852 --- [ main] c.redispro.pipleline.test.RedisUtilTest : pipe line use time: 79 2020-08-16 19:06:52.363 INFO 16404 --- [ main] c.redispro.pipleline.test.RedisUtilTest : Single get batch use time: 113
100個
2020-08-16 19:12:10.427 INFO 11036 --- [ main] c.redispro.pipleline.test.RedisUtilTest : pipe line use time: 108 2020-08-16 19:12:37.291 INFO 20136 --- [ main] c.redispro.pipleline.test.RedisUtilTest : Single get batch use time: 65
10個
2020-08-16 19:07:25.762 INFO 16252 --- [ main] c.redispro.pipleline.test.RedisUtilTest : pipe line use time: 50 2020-08-16 19:08:18.523 INFO 19368 --- [ main] c.redispro.pipleline.test.RedisUtilTest : Single get batch use time: 8
1個
2020-08-16 19:10:16.268 INFO 7600 --- [ main] c.redispro.pipleline.test.RedisUtilTest : pipe line use time: 46 2020-08-16 19:11:03.790 INFO 19380 --- [ main] c.redispro.pipleline.test.RedisUtilTest : Single get batch use time: 2
通過以上的測試,可以簡單看出,單從時間上來看,也不是在任何時候都適合使用pipeline的,只有在數據量大到一定程度,使用pipeline才會達到想要的目的,但是也不能根據上面的測試就斷定要超過多少個值,使用pipelne,具體還需要根據自己的服務器性能來確定。
備注
理論上,多個命令節省了線程創建和銷毀的開銷,只要是大於1個,就應該是pipeline執行的快,可能是pipeline在初始化時需要一些處理,后面有時間在測試一下,先跑一下pipeline,然后再執行批量。pipeline其實並不適合於不確定大小的隊列,因為pipeline是批量發送的腳本,所有的腳本也是批量返回,假設現在隊列中只有1個數據,每次都批量獲取500個,則有499個命令是多余執行的,且返回的集合中,有499個null值,如果在獲取之前先判斷下隊列長度,大於500,則按照500,小於500,按照真實長度獲取,則多線程情況下不安全,故pipeline在使用時,建議在獲取固定長度的集合中獲取數據或者沒有現成安全的情況下使用,如果是多線程從隊列獲取數據,還是需要使用lua腳本來控制,將獲取長度和獲取隊列值放到一個原子性的操作中比較合適
以下為參考資料
https://redis.io/topics/pipelining
https://docs.spring.io/spring-data/redis/docs/current/reference/html/#pipeline