redis管道pipeline


     Jedis jedis = new Jedis("127.0.0.1",6379);
        Pipeline pipeline = jedis.pipelined();
        for(int i = 0;i<1000;i++){
            String content = i + "";
            pipeline.set(content,content);
        }
        pipeline.sync();

  

Jedis jedis = new Jedis("127.0.0.1",6379);
Pipeline pipeline = jedis.pipelined();
Map<String, Response> responses = new LinkedHashMap<String, Response>();
        for(int i = 0;i<1000;i++){
            String content = i + "";
            Response<String> response = pipeline.get(content);
            responses.put(content,response);
        }
        pipeline.sync();
        for(String key:responses.keySet()){
            System.out.println("key:"+key + ",value:" + responses.get(key).get());
        }

  二、集群

1、方式一

import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import redis.clients.util.JedisClusterCRC16;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @Auther: lyl
 * @Date: 2019/10/23 17:06
 * @Description:
 */
public class BatchUtil {
    public static Map<String, String> mget(JedisCluster jc, String... keys){
        Map<String, String> resMap = new HashMap<>();
        if(keys == null || keys.length == 0){
            return resMap;
        }
        //如果只有一條,直接使用get即可
        if(keys.length == 1){
            resMap.put(keys[0], jc.get(keys[0]));
            return resMap;
        }

        //JedisCluster繼承了BinaryJedisCluster
        //BinaryJedisCluster的JedisClusterConnectionHandler屬性
        //里面有JedisClusterInfoCache,根據這一條繼承鏈,可以獲取到JedisClusterInfoCache
        //從而獲取slot和JedisPool直接的映射
        MetaObject metaObject = SystemMetaObject.forObject(jc);
        JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
        //保存地址+端口和命令的映射
        Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>();
        List<String> keyList = null;
        JedisPool currentJedisPool = null;
        Pipeline currentPipeline = null;

        for(String key : keys){
            //計算哈希槽
            int crc = JedisClusterCRC16.getSlot(key);
            //通過哈希槽獲取節點的連接
            currentJedisPool = cache.getSlotPool(crc);
            //由於JedisPool作為value保存在JedisClusterInfoCache中的一個map對象中,每個節點的
            //JedisPool在map的初始化階段就是確定的和唯一的,所以獲取到的每個節點的JedisPool都是一樣
            //的,可以作為map的key
            if(jedisPoolMap.containsKey(currentJedisPool)){
                jedisPoolMap.get(currentJedisPool).add(key);
            }else{
                keyList = new ArrayList<>();
                keyList.add(key);
                jedisPoolMap.put(currentJedisPool, keyList);
            }
        }

        //保存結果
        List<Object> res = new ArrayList<>();
        //執行
        for(Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()){
            try {
                currentJedisPool = entry.getKey();
                keyList = entry.getValue();
                //獲取pipeline
                currentPipeline = currentJedisPool.getResource().pipelined();
                for(String key : keyList){
                    currentPipeline.get(key);
                }
                //從pipeline中獲取結果
                res = currentPipeline.syncAndReturnAll();
                currentPipeline.close();
                for(int i=0; i<keyList.size(); i++){
                    resMap.put(keyList.get(i), res.get(i)==null ? null : res.get(i).toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
                return new HashMap<>();
            }
        }
        return resMap;
    }
}

  2、方式二

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.BinaryJedisCluster;
import redis.clients.jedis.Client;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisClusterConnectionHandler;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.PipelineBase;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
import redis.clients.util.SafeEncoder;
/**
 * @Auther: lyl
 * @Date: 2019/10/23 16:12
 * @Description:
 */
public class JedisClusterPipeline extends PipelineBase implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class);

    // 部分字段沒有對應的獲取方法,只能采用反射來做
    // 你也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問接口
    private static final Field FIELD_CONNECTION_HANDLER;
    private static final Field FIELD_CACHE;
    static {
        FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
        FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
    }

    private JedisSlotBasedConnectionHandler connectionHandler;
    private JedisClusterInfoCache clusterInfoCache;
    private Queue<Client> clients = new LinkedList<Client>();   // 根據順序存儲每個命令對應的Client
    private Map<JedisPool, Jedis> jedisMap = new HashMap<>();   // 用於緩存連接
    private boolean hasDataInBuf = false;   // 是否有數據在緩存區

    /**
     * 根據jedisCluster實例生成對應的JedisClusterPipeline
     * @param
     * @return
     */
    public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
        JedisClusterPipeline pipeline = new JedisClusterPipeline();
        pipeline.setJedisCluster(jedisCluster);
        return pipeline;
    }

    public JedisClusterPipeline() {
    }

    public void setJedisCluster(JedisCluster jedis) {
        connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
        clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
    }

    /**
     * 刷新集群信息,當集群信息發生變更時調用
     * @param
     * @return
     */
    public void refreshCluster() {
        connectionHandler.renewSlotCache();
    }

    /**
     * 同步讀取所有數據. 與syncAndReturnAll()相比,sync()只是沒有對數據做反序列化
     */
    public void sync() {
        innerSync(null);
    }

    /**
     * 同步讀取所有數據 並按命令順序返回一個列表
     *
     * @return 按照命令的順序返回所有的數據
     */
    public List<Object> syncAndReturnAll() {
        List<Object> responseList = new ArrayList<Object>();

        innerSync(responseList);

        return responseList;
    }

    private void innerSync(List<Object> formatted) {
        HashSet<Client> clientSet = new HashSet<Client>();

        try {
            for (Client client : clients) {
                // 在sync()調用時其實是不需要解析結果數據的,但是如果不調用get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,因此需要調用get()來觸發錯誤。
                // 其實如果Response的data屬性可以直接獲取,可以省掉解析數據的時間,然而它並沒有提供對應方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了
                Object data = generateResponse(client.getOne()).get();
                if (null != formatted) {
                    formatted.add(data);
                }

                // size相同說明所有的client都已經添加,就不用再調用add方法了
                if (clientSet.size() != jedisMap.size()) {
                    clientSet.add(client);
                }
            }
        } catch (JedisRedirectionException jre) {
            if (jre instanceof JedisMovedDataException) {
                // if MOVED redirection occurred, rebuilds cluster's slot cache,
                // recommended by Redis cluster specification
                refreshCluster();
            }

            throw jre;
        } finally {
            if (clientSet.size() != jedisMap.size()) {
                // 所有還沒有執行過的client要保證執行(flush),防止放回連接池后后面的命令被污染
                for (Jedis jedis : jedisMap.values()) {
                    if (clientSet.contains(jedis.getClient())) {
                        continue;
                    }

                    flushCachedData(jedis);
                }
            }

            hasDataInBuf = false;
            close();
        }
    }

    @Override
    public void close() {
        clean();

        clients.clear();

        for (Jedis jedis : jedisMap.values()) {
            if (hasDataInBuf) {
                flushCachedData(jedis);
            }

            jedis.close();
        }

        jedisMap.clear();

        hasDataInBuf = false;
    }

    private void flushCachedData(Jedis jedis) {
        try {
            jedis.getClient().getAll();
        } catch (RuntimeException ex) {
        }
    }

    @Override
    protected Client getClient(String key) {
        byte[] bKey = SafeEncoder.encode(key);

        return getClient(bKey);
    }

    @Override
    protected Client getClient(byte[] key) {
        Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));

        Client client = jedis.getClient();
        clients.add(client);

        return client;
    }

    private Jedis getJedis(int slot) {
        JedisPool pool = clusterInfoCache.getSlotPool(slot);

        // 根據pool從緩存中獲取Jedis
        Jedis jedis = jedisMap.get(pool);
        if (null == jedis) {
            jedis = pool.getResource();
            jedisMap.put(pool, jedis);
        }

        hasDataInBuf = true;
        return jedis;
    }

    private static Field getField(Class<?> cls, String fieldName) {
        try {
            Field field = cls.getDeclaredField(fieldName);
            field.setAccessible(true);

            return field;
        } catch (NoSuchFieldException | SecurityException e) {
            throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);
        }
    }

    @SuppressWarnings({"unchecked" })
    private static <T> T getValue(Object obj, Field field) {
        try {
            return (T)field.get(obj);
        } catch (IllegalArgumentException | IllegalAccessException e) {
            LOGGER.error("get value fail", e);

            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) throws IOException {
        Set<HostAndPort> nodes = new HashSet<HostAndPort>();
        nodes.add(new HostAndPort("127.0.0.1", 7000));
        nodes.add(new HostAndPort("127.0.0.1", 7001));
        nodes.add(new HostAndPort("127.0.0.1", 7002));
        JedisCluster jc = new JedisCluster(nodes);

        long s = System.currentTimeMillis();

        JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jc);
        jcp.refreshCluster();
        List<Object> batchResult = null;
        try {
            // batch write
            for (int i = 0; i < 100; i++) {
                jcp.set("k" + i, "v1" + i);
            }
            jcp.sync();

            // batch read
            for (int i = 0; i < 10000; i++) {
                jcp.get("k" + i);
            }
            batchResult = jcp.syncAndReturnAll();
        } finally {
            jcp.close();
        }

        // output time
        long t = System.currentTimeMillis() - s;
        System.out.println(t);

        System.out.println(batchResult.size());

        // 實際業務代碼中,close要在finally中調,這里之所以沒這么寫,是因為懶
        jc.close();
    }
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM