RedisTemplate使用PipeLine的總結


    最近做一個統計項目,數據量非常大,之前使用scan命令對redis中指定key進行掃描,一次100條,執行穩定、效率低,同時tcp關閉連接的time-wait增速相當的快,對性能造成了極大的浪費同時執行時間也很慢,而且當數據量進一步增大可能會影響其他服務。為了減少tcp連接數量,將redis的scan修改為pipeLine操作。

一、為什么要使用Pipeline?

    Redis是采用基於C/S模式的請求/響應協議的TCP服務器。
               性能問題一:redis客戶端發送多條請求,后面的請求需要等待前面的請求處理完后,才能進行處理,而且每個請求都存在往返時間RRT(Round Trip Time),即使redis性能極高,當數據量足夠大,也會極大影響性能,還可能會引起其他意外情況。
               性能問題二:性能問題一,我們可以通過scan命令來解決,如何來設置count又是一個問題,設置不好,同樣會有大量請求存在,即使設置到1w(推薦最大值),如果掃描的數據量太大,這個問題同樣不能避免。每個請求都會經歷三次握手、四次揮手,在處理大量連接時,處理完后,揮手會產生大量time-wait,如果該服務器提供其他服務,可能對其他服務造成影響。

使用Pipeline可以解決以上問題。
二、如何在使用Pipeline?

    本文使用的是RedisTemplate調用execute,connection使用的redis原生的操作,這里簡單使用了zCount來計算keys中集合數據的長度。獲取結果為result,這里要使用Pipeline需要調用Connection.openPipeline()。Connection.closePipeline()返回值為List<Object>是執行后的結果,相當簡單。

   redisTemplate.execute(new RedisCallback<Long>() {
                @Nullable
                @Override
                public Long doInRedis(RedisConnection connection) throws DataAccessException {
                    connection.openPipeline();
                    for (int i = 0; i < 1000000; i++) {
                        String key = "123" + i;
                        connection.zCount(key.getBytes(), 0,Integer.MAX_VALUE);
                    }
                    List<Object> result=connection.closePipeline();
                    return null;
                }
            });

  

    這個list是放在匿名類內部,對於數據處理不太友好,代碼會看起來相當難受,想取出來使用還是不可變的。如果要獲取返回值,我們可以調用如下代碼executePipelined,這樣就可以返回我們需要的結果,下面我們可以對得到list進行操作。

   

  List<Long> List = redisTemplate.executePipelined(new RedisCallback<Long>() {
                @Nullable
                @Override
                public Long doInRedis(RedisConnection connection) throws DataAccessException {
                    connection.openPipeline();
                   for (int i = 0; i < 1000000; i++) {
                        String key = "123" + i;
                        connection.zCount(key.getBytes(), 0,Integer.MAX_VALUE);
                    }
                    return null;
                }
            });

在這里需要注意4點內容:

    1.這里的connect是redis原生鏈接,所以connection的返回結果是基本上是byte數組,如果需要存儲的數據,需要對byte[]數組反序列化。
    2.在doInRedis中返回值必須返回為null,為什么返回為空?可以定位到內部代碼去查看詳情,這里不再贅述。3.connection.openPipeline()可以調用,也可以不調用,但是connection.closePipeline()不能調用,調用了拿不到返回值。因為調用的時候會直接將結果返回,同時也不會對代碼進行反序列化。
    4.反序列化需要傳入反序列化對象,這些對象都可以進行相應的實例化,如下圖所示。

    根據你的項目需求選擇合適的反序列化對象。比如我在項目中key使用的是StringRedisSerializer,而值通常使用的是GenerJackson2JsonRedisSerializer。所以在初始化redisTemplate的時候會這樣做,代碼如下,將序列化的實例化對象放入redisTemplate中,當使用的時候就可以直接redis.getKeySerializer()或者redis.getValueSerializer(),這樣就不用在實例化一個對象,造成浪費和冗余。

  

  public class MyRedisUtil {
     
        private RedisTemplate redisTemplate;
     
        @Autowired
        public void setRedisTemplate(RedisTemplate redisTemplate) {
            RedisSerializer keySerializer = new StringRedisSerializer();
            RedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer();
            redisTemplate.setKeySerializer(keySerializer);
            redisTemplate.setValueSerializer(valueSerializer);
            this.redisTemplate = redisTemplate;
     
        }
     
        public RedisTemplate getRedisTemplate() {
            return redisTemplate;
        }
     
    }

    通過這樣的掉用方式,我們就可以不用進行強制轉換,直接獲得我們想要的對象了。

     List<User> List = redisTemplate.executePipelined(new RedisCallback<User>() {
                @Nullable
                @Override
                public User doInRedis(RedisConnection connection) throws DataAccessException {
                    connection.openPipeline();
                   for (int i = 0; i < 1000000; i++) {
                        String key = "123" + i;
                        connection.zCount(key.getBytes(), 0,Integer.MAX_VALUE);
                    }
                    return null;
                }
            }, myRedisComponent.getRedisTemplate().getValueSerializer());

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM