Redis集群批量操作


Redis在3.0版正式引入了集群這個特性,擴展變得非常簡單。然而當你開心的升級到3.0后,卻發現有些很好用的功能現在工作不了了, 比如我們今天要聊的pipeline功能等批量操作。

Redis集群是沒法執行批量操作命令的,如mget,pipeline等。這是因為redis將集群划分為16383個哈希槽,不同的key會划分到不同的槽中。但是,Jedis客戶端提供了計算key的slot方法,已經slot和節點之間的映射關系,通過這兩個數據,就可以計算出每個key所在的節點,然后使用pipeline獲取數據。

/**
 * 根據key計算slot,
 * 再根據slot計算node,
 * 獲取pipeline
 * 進行批量操作 
 */
public class BatchUtil {
    public static Map<String, String> mget(JedisCluster jc, String... keys){
        Map<String, String> resMap = new HashMap<>();
        if(keys == null || keys.length == 0){
            return resMap;
        }
        //如果只有一條,直接使用get即可
        if(keys.length == 1){
            resMap.put(keys[0], jc.get(keys[0]));
            return resMap;
        }
        
        //JedisCluster繼承了BinaryJedisCluster
        //BinaryJedisCluster的JedisClusterConnectionHandler屬性
        //里面有JedisClusterInfoCache,根據這一條繼承鏈,可以獲取到JedisClusterInfoCache
        //從而獲取slot和JedisPool直接的映射
        MetaObject metaObject = SystemMetaObject.forObject(jc);
        JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
        //保存地址+端口和命令的映射
        Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>();
        List<String> keyList = null;
        JedisPool currentJedisPool = null;
        Pipeline currentPipeline = null;
        
        for(String key : keys){
            //計算哈希槽
            int crc = JedisClusterCRC16.getSlot(key);
            //通過哈希槽獲取節點的連接
            currentJedisPool = cache.getSlotPool(crc);
            //由於JedisPool作為value保存在JedisClusterInfoCache中的一個map對象中,每個節點的
            //JedisPool在map的初始化階段就是確定的和唯一的,所以獲取到的每個節點的JedisPool都是一樣
            //的,可以作為map的key
            if(jedisPoolMap.containsKey(currentJedisPool)){
                jedisPoolMap.get(currentJedisPool).add(key);
            }else{
                keyList = new ArrayList<>();
                keyList.add(key);
                jedisPoolMap.put(currentJedisPool, keyList);
            }
        }
        
        //保存結果
        List<Object> res = new ArrayList<>();
        //執行
        for(Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()){
            try {
                currentJedisPool = entry.getKey();
                keyList = entry.getValue();
                //獲取pipeline
                currentPipeline = currentJedisPool.getResource().pipelined();
                for(String key : keyList){
                    currentPipeline.get(key);
                }
                //從pipeline中獲取結果
                res = currentPipeline.syncAndReturnAll();
                currentPipeline.close();
                for(int i=0; i<keyList.size(); i++){
                    resMap.put(keyList.get(i), res.get(i)==null ? null : res.get(i).toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
                return new HashMap<>();
            }
        }
        return resMap;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM