redis集群使用pipeline


前言

redis的pipeline可以一次性發送多個命令去執行,在執行大量命令時,可以減少網絡通信次數提高效率。但是很可惜,redis的集群並不支持pipeline語法(只是不提供相應的方法而已)。不過只要稍稍看下jedis的源碼,就可以發現雖然沒有現成的輪子,但是卻很好造。

一、簡介

先說下redis集群的簡單結構和數據的定位規則(見下圖)。redis提供了16384個槽點,並為每個節點分配若干槽位,操作redis數據時會根據key進行hash,然后找到對應的節點進行操作,這也解釋了為什么jedisCluster不支持pipeline。因為pipeline中若干個需要操作的key可能位於不同的分片,如果想要獲取數據就必須進行一次請求的轉發(可能這個詞不標准,但是好理解,或者稱之為漂移吧),這與pipeline為了減少網絡通信次數的本意沖突。那我們只要根據key進行hash運算,然后再根據hash值獲取連接,接着按照連接對所有的key進行分組,保證同一pipeline內所有的key都對應一個節點就好了,最后通過pipeline執行。試試吧,萬一好使了呢

二、思路

既然知道了原因和流程,我們就試下能不能造輪子吧。首先我們需要hash算法以及根據hash結果獲取JedisPool的方法,很不巧的是,jedis都提供了。

為了實現上面的功能,我們需要一個類和兩個屬性,類是JedisClusterCRC16(hash算法),兩個屬性分別是connectionHandler(用於獲取cache)和cache(根據hash值獲取連接)

有興趣的同學可以看下JedisCluster的源碼,它集成自BinaryJedisCluster,在BinaryJedisCluster有個connectionHandler屬性,恰巧它又是protected修飾的,這不分明就是讓你繼承么。而cache屬性在JedisClusterConnectionHandler中,注意這個類是抽象類,輕易的重寫會導致整個功能都不能用了(如果內部實現不是很了解,繼承往往優於重寫,因為可以super嘛),我們發現它有一個實現類JedisSlotBasedConnectionHandler,那我們繼承這個類就好了。詳細的設計如下圖:

三、實現

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;

/**
 * @author zhangyining on 18/12/1 001.
 */
@Slf4j
public class JedisPipelineCluster extends JedisCluster {

    static {
        cluster = init();
    }

    private static JedisPipelineCluster cluster;

    public static JedisPipelineCluster getCluster(){
        return cluster;
    }
    private static JedisPipelineCluster init(){
        //todo 連接代碼省略...
        return jedisCluster;
    }

    public JedisPool getJedisPoolFromSlot(String redisKey) {
        return getConnectionHandler().getJedisPoolFromSlot(redisKey);
    }

    private JedisPipelineCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts,final GenericObjectPoolConfig poolConfig){
        super(jedisClusterNode, timeout, maxAttempts, poolConfig);
        //繼承可以添加個性化的方法,仔細看構造方法其實和父類是一樣的
        connectionHandler = new JedisSlotAdvancedConnectionHandler(jedisClusterNode, poolConfig,
                timeout, timeout);
    }

    private JedisSlotAdvancedConnectionHandler getConnectionHandler() {
        return (JedisSlotAdvancedConnectionHandler)this.connectionHandler;
    }

    private class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler {

        private JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) {
            super(nodes, poolConfig, connectionTimeout, soTimeout);
        }
      
        private JedisPool getJedisPoolFromSlot(String redisKey) {
            int slot = JedisClusterCRC16.getSlot(redisKey);
            JedisPool connectionPool = cache.getSlotPool(slot);
            if (connectionPool != null) {
                return connectionPool;
            } else {
                renewSlotCache();
                connectionPool = cache.getSlotPool(slot);
                if (connectionPool != null) {
                    return connectionPool;
                } else {
                    throw new RuntimeException("No reachable node in cluster for slot " + slot);
                }
            }
        }
    }
}

 

四、測試

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author zhangyining on 18/12/1 001.
 */
public class Test {

    public static void main(String[] args) {
        JedisPipelineCluster cluster = JedisPipelineCluster.getCluster();
        String[] testKeys = {"USER_FEAT_10013425884935", "USER_FEAT_10006864229638", "USER_FEAT_10008005187846"};

        Map<JedisPool, List<String>> poolKeys = new HashMap<>();

        for (String key : testKeys) {
            JedisPool jedisPool = cluster.getJedisPoolFromSlot(key);
            if (poolKeys.keySet().contains(jedisPool)){
                List<String> keys = poolKeys.get(jedisPool);
                keys.add(key);
            }else {
                List<String> keys = new ArrayList<>();
                keys.add(key);
                poolKeys.put(jedisPool, keys);
            }
        }

        for (JedisPool jedisPool : poolKeys.keySet()) {
            Jedis jedis = jedisPool.getResource();
            Pipeline pipeline = jedis.pipelined();

            List<String> keys = poolKeys.get(jedisPool);
            keys.forEach(key ->pipeline.get(key));
            List result = pipeline.syncAndReturnAll();
            System.out.println(result);
            jedis.close();
        }
    }
}

 五、總結

之前看到過有人通過反射來獲取connectionHandler和cache屬性的,個人覺得反射雖然強大,但是明明可以繼承卻反射,有點怪怪的,看個人習慣吧。總之不管是那種方式,重點是熟悉redis集群是怎么分配數據以及執行請求的,剩下的不過是用不同的地方話(不用方式的代碼)說出來而已


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM