Jedis jedis = new Jedis("127.0.0.1",6379); Pipeline pipeline = jedis.pipelined(); for(int i = 0;i<1000;i++){ String content = i + ""; pipeline.set(content,content); } pipeline.sync();
Jedis jedis = new Jedis("127.0.0.1",6379); Pipeline pipeline = jedis.pipelined();
Map<String, Response> responses = new LinkedHashMap<String, Response>();
for(int i = 0;i<1000;i++){ String content = i + ""; Response<String> response = pipeline.get(content); responses.put(content,response); } pipeline.sync(); for(String key:responses.keySet()){ System.out.println("key:"+key + ",value:" + responses.get(key).get()); }
二、集群
1、方式一
import org.apache.ibatis.reflection.MetaObject; import org.apache.ibatis.reflection.SystemMetaObject; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisClusterInfoCache; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Pipeline; import redis.clients.util.JedisClusterCRC16; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @Auther: lyl * @Date: 2019/10/23 17:06 * @Description: */ public class BatchUtil { public static Map<String, String> mget(JedisCluster jc, String... keys){ Map<String, String> resMap = new HashMap<>(); if(keys == null || keys.length == 0){ return resMap; } //如果只有一條,直接使用get即可 if(keys.length == 1){ resMap.put(keys[0], jc.get(keys[0])); return resMap; } //JedisCluster繼承了BinaryJedisCluster //BinaryJedisCluster的JedisClusterConnectionHandler屬性 //里面有JedisClusterInfoCache,根據這一條繼承鏈,可以獲取到JedisClusterInfoCache //從而獲取slot和JedisPool直接的映射 MetaObject metaObject = SystemMetaObject.forObject(jc); JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache"); //保存地址+端口和命令的映射 Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>(); List<String> keyList = null; JedisPool currentJedisPool = null; Pipeline currentPipeline = null; for(String key : keys){ //計算哈希槽 int crc = JedisClusterCRC16.getSlot(key); //通過哈希槽獲取節點的連接 currentJedisPool = cache.getSlotPool(crc); //由於JedisPool作為value保存在JedisClusterInfoCache中的一個map對象中,每個節點的 //JedisPool在map的初始化階段就是確定的和唯一的,所以獲取到的每個節點的JedisPool都是一樣 //的,可以作為map的key if(jedisPoolMap.containsKey(currentJedisPool)){ jedisPoolMap.get(currentJedisPool).add(key); }else{ keyList = new ArrayList<>(); keyList.add(key); jedisPoolMap.put(currentJedisPool, keyList); } } //保存結果 List<Object> res = new ArrayList<>(); //執行 for(Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()){ try { currentJedisPool = entry.getKey(); keyList = entry.getValue(); //獲取pipeline currentPipeline = currentJedisPool.getResource().pipelined(); for(String key : keyList){ currentPipeline.get(key); } //從pipeline中獲取結果 res = currentPipeline.syncAndReturnAll(); currentPipeline.close(); for(int i=0; i<keyList.size(); i++){ resMap.put(keyList.get(i), res.get(i)==null ? null : res.get(i).toString()); } } catch (Exception e) { e.printStackTrace(); return new HashMap<>(); } } return resMap; } }
2、方式二
import java.io.Closeable; import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.BinaryJedisCluster; import redis.clients.jedis.Client; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisClusterConnectionHandler; import redis.clients.jedis.JedisClusterInfoCache; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisSlotBasedConnectionHandler; import redis.clients.jedis.PipelineBase; import redis.clients.jedis.exceptions.JedisMovedDataException; import redis.clients.jedis.exceptions.JedisRedirectionException; import redis.clients.util.JedisClusterCRC16; import redis.clients.util.SafeEncoder; /** * @Auther: lyl * @Date: 2019/10/23 16:12 * @Description: */ public class JedisClusterPipeline extends PipelineBase implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class); // 部分字段沒有對應的獲取方法,只能采用反射來做 // 你也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問接口 private static final Field FIELD_CONNECTION_HANDLER; private static final Field FIELD_CACHE; static { FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler"); FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache"); } private JedisSlotBasedConnectionHandler connectionHandler; private JedisClusterInfoCache clusterInfoCache; private Queue<Client> clients = new LinkedList<Client>(); // 根據順序存儲每個命令對應的Client private Map<JedisPool, Jedis> jedisMap = new HashMap<>(); // 用於緩存連接 private boolean hasDataInBuf = false; // 是否有數據在緩存區 /** * 根據jedisCluster實例生成對應的JedisClusterPipeline * @param * @return */ public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) { JedisClusterPipeline pipeline = new JedisClusterPipeline(); pipeline.setJedisCluster(jedisCluster); return pipeline; } public JedisClusterPipeline() { } public void setJedisCluster(JedisCluster jedis) { connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER); clusterInfoCache = getValue(connectionHandler, FIELD_CACHE); } /** * 刷新集群信息,當集群信息發生變更時調用 * @param * @return */ public void refreshCluster() { connectionHandler.renewSlotCache(); } /** * 同步讀取所有數據. 與syncAndReturnAll()相比,sync()只是沒有對數據做反序列化 */ public void sync() { innerSync(null); } /** * 同步讀取所有數據 並按命令順序返回一個列表 * * @return 按照命令的順序返回所有的數據 */ public List<Object> syncAndReturnAll() { List<Object> responseList = new ArrayList<Object>(); innerSync(responseList); return responseList; } private void innerSync(List<Object> formatted) { HashSet<Client> clientSet = new HashSet<Client>(); try { for (Client client : clients) { // 在sync()調用時其實是不需要解析結果數據的,但是如果不調用get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,因此需要調用get()來觸發錯誤。 // 其實如果Response的data屬性可以直接獲取,可以省掉解析數據的時間,然而它並沒有提供對應方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了 Object data = generateResponse(client.getOne()).get(); if (null != formatted) { formatted.add(data); } // size相同說明所有的client都已經添加,就不用再調用add方法了 if (clientSet.size() != jedisMap.size()) { clientSet.add(client); } } } catch (JedisRedirectionException jre) { if (jre instanceof JedisMovedDataException) { // if MOVED redirection occurred, rebuilds cluster's slot cache, // recommended by Redis cluster specification refreshCluster(); } throw jre; } finally { if (clientSet.size() != jedisMap.size()) { // 所有還沒有執行過的client要保證執行(flush),防止放回連接池后后面的命令被污染 for (Jedis jedis : jedisMap.values()) { if (clientSet.contains(jedis.getClient())) { continue; } flushCachedData(jedis); } } hasDataInBuf = false; close(); } } @Override public void close() { clean(); clients.clear(); for (Jedis jedis : jedisMap.values()) { if (hasDataInBuf) { flushCachedData(jedis); } jedis.close(); } jedisMap.clear(); hasDataInBuf = false; } private void flushCachedData(Jedis jedis) { try { jedis.getClient().getAll(); } catch (RuntimeException ex) { } } @Override protected Client getClient(String key) { byte[] bKey = SafeEncoder.encode(key); return getClient(bKey); } @Override protected Client getClient(byte[] key) { Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key)); Client client = jedis.getClient(); clients.add(client); return client; } private Jedis getJedis(int slot) { JedisPool pool = clusterInfoCache.getSlotPool(slot); // 根據pool從緩存中獲取Jedis Jedis jedis = jedisMap.get(pool); if (null == jedis) { jedis = pool.getResource(); jedisMap.put(pool, jedis); } hasDataInBuf = true; return jedis; } private static Field getField(Class<?> cls, String fieldName) { try { Field field = cls.getDeclaredField(fieldName); field.setAccessible(true); return field; } catch (NoSuchFieldException | SecurityException e) { throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e); } } @SuppressWarnings({"unchecked" }) private static <T> T getValue(Object obj, Field field) { try { return (T)field.get(obj); } catch (IllegalArgumentException | IllegalAccessException e) { LOGGER.error("get value fail", e); throw new RuntimeException(e); } } public static void main(String[] args) throws IOException { Set<HostAndPort> nodes = new HashSet<HostAndPort>(); nodes.add(new HostAndPort("127.0.0.1", 7000)); nodes.add(new HostAndPort("127.0.0.1", 7001)); nodes.add(new HostAndPort("127.0.0.1", 7002)); JedisCluster jc = new JedisCluster(nodes); long s = System.currentTimeMillis(); JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jc); jcp.refreshCluster(); List<Object> batchResult = null; try { // batch write for (int i = 0; i < 100; i++) { jcp.set("k" + i, "v1" + i); } jcp.sync(); // batch read for (int i = 0; i < 10000; i++) { jcp.get("k" + i); } batchResult = jcp.syncAndReturnAll(); } finally { jcp.close(); } // output time long t = System.currentTimeMillis() - s; System.out.println(t); System.out.println(batchResult.size()); // 實際業務代碼中,close要在finally中調,這里之所以沒這么寫,是因為懶 jc.close(); } }