Jedis jedis = new Jedis("127.0.0.1",6379);
Pipeline pipeline = jedis.pipelined();
for(int i = 0;i<1000;i++){
String content = i + "";
pipeline.set(content,content);
}
pipeline.sync();
Jedis jedis = new Jedis("127.0.0.1",6379);
Pipeline pipeline = jedis.pipelined();
Map<String, Response> responses = new LinkedHashMap<String, Response>();
for(int i = 0;i<1000;i++){
String content = i + "";
Response<String> response = pipeline.get(content);
responses.put(content,response);
}
pipeline.sync();
for(String key:responses.keySet()){
System.out.println("key:"+key + ",value:" + responses.get(key).get());
}
二、集群
1、方式一
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import redis.clients.util.JedisClusterCRC16;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Auther: lyl
* @Date: 2019/10/23 17:06
* @Description:
*/
public class BatchUtil {
public static Map<String, String> mget(JedisCluster jc, String... keys){
Map<String, String> resMap = new HashMap<>();
if(keys == null || keys.length == 0){
return resMap;
}
//如果只有一條,直接使用get即可
if(keys.length == 1){
resMap.put(keys[0], jc.get(keys[0]));
return resMap;
}
//JedisCluster繼承了BinaryJedisCluster
//BinaryJedisCluster的JedisClusterConnectionHandler屬性
//里面有JedisClusterInfoCache,根據這一條繼承鏈,可以獲取到JedisClusterInfoCache
//從而獲取slot和JedisPool直接的映射
MetaObject metaObject = SystemMetaObject.forObject(jc);
JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
//保存地址+端口和命令的映射
Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>();
List<String> keyList = null;
JedisPool currentJedisPool = null;
Pipeline currentPipeline = null;
for(String key : keys){
//計算哈希槽
int crc = JedisClusterCRC16.getSlot(key);
//通過哈希槽獲取節點的連接
currentJedisPool = cache.getSlotPool(crc);
//由於JedisPool作為value保存在JedisClusterInfoCache中的一個map對象中,每個節點的
//JedisPool在map的初始化階段就是確定的和唯一的,所以獲取到的每個節點的JedisPool都是一樣
//的,可以作為map的key
if(jedisPoolMap.containsKey(currentJedisPool)){
jedisPoolMap.get(currentJedisPool).add(key);
}else{
keyList = new ArrayList<>();
keyList.add(key);
jedisPoolMap.put(currentJedisPool, keyList);
}
}
//保存結果
List<Object> res = new ArrayList<>();
//執行
for(Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()){
try {
currentJedisPool = entry.getKey();
keyList = entry.getValue();
//獲取pipeline
currentPipeline = currentJedisPool.getResource().pipelined();
for(String key : keyList){
currentPipeline.get(key);
}
//從pipeline中獲取結果
res = currentPipeline.syncAndReturnAll();
currentPipeline.close();
for(int i=0; i<keyList.size(); i++){
resMap.put(keyList.get(i), res.get(i)==null ? null : res.get(i).toString());
}
} catch (Exception e) {
e.printStackTrace();
return new HashMap<>();
}
}
return resMap;
}
}
2、方式二
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.BinaryJedisCluster;
import redis.clients.jedis.Client;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisClusterConnectionHandler;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.PipelineBase;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
import redis.clients.util.SafeEncoder;
/**
* @Auther: lyl
* @Date: 2019/10/23 16:12
* @Description:
*/
public class JedisClusterPipeline extends PipelineBase implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class);
// 部分字段沒有對應的獲取方法,只能采用反射來做
// 你也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問接口
private static final Field FIELD_CONNECTION_HANDLER;
private static final Field FIELD_CACHE;
static {
FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
}
private JedisSlotBasedConnectionHandler connectionHandler;
private JedisClusterInfoCache clusterInfoCache;
private Queue<Client> clients = new LinkedList<Client>(); // 根據順序存儲每個命令對應的Client
private Map<JedisPool, Jedis> jedisMap = new HashMap<>(); // 用於緩存連接
private boolean hasDataInBuf = false; // 是否有數據在緩存區
/**
* 根據jedisCluster實例生成對應的JedisClusterPipeline
* @param
* @return
*/
public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
JedisClusterPipeline pipeline = new JedisClusterPipeline();
pipeline.setJedisCluster(jedisCluster);
return pipeline;
}
public JedisClusterPipeline() {
}
public void setJedisCluster(JedisCluster jedis) {
connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
}
/**
* 刷新集群信息,當集群信息發生變更時調用
* @param
* @return
*/
public void refreshCluster() {
connectionHandler.renewSlotCache();
}
/**
* 同步讀取所有數據. 與syncAndReturnAll()相比,sync()只是沒有對數據做反序列化
*/
public void sync() {
innerSync(null);
}
/**
* 同步讀取所有數據 並按命令順序返回一個列表
*
* @return 按照命令的順序返回所有的數據
*/
public List<Object> syncAndReturnAll() {
List<Object> responseList = new ArrayList<Object>();
innerSync(responseList);
return responseList;
}
private void innerSync(List<Object> formatted) {
HashSet<Client> clientSet = new HashSet<Client>();
try {
for (Client client : clients) {
// 在sync()調用時其實是不需要解析結果數據的,但是如果不調用get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,因此需要調用get()來觸發錯誤。
// 其實如果Response的data屬性可以直接獲取,可以省掉解析數據的時間,然而它並沒有提供對應方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了
Object data = generateResponse(client.getOne()).get();
if (null != formatted) {
formatted.add(data);
}
// size相同說明所有的client都已經添加,就不用再調用add方法了
if (clientSet.size() != jedisMap.size()) {
clientSet.add(client);
}
}
} catch (JedisRedirectionException jre) {
if (jre instanceof JedisMovedDataException) {
// if MOVED redirection occurred, rebuilds cluster's slot cache,
// recommended by Redis cluster specification
refreshCluster();
}
throw jre;
} finally {
if (clientSet.size() != jedisMap.size()) {
// 所有還沒有執行過的client要保證執行(flush),防止放回連接池后后面的命令被污染
for (Jedis jedis : jedisMap.values()) {
if (clientSet.contains(jedis.getClient())) {
continue;
}
flushCachedData(jedis);
}
}
hasDataInBuf = false;
close();
}
}
@Override
public void close() {
clean();
clients.clear();
for (Jedis jedis : jedisMap.values()) {
if (hasDataInBuf) {
flushCachedData(jedis);
}
jedis.close();
}
jedisMap.clear();
hasDataInBuf = false;
}
private void flushCachedData(Jedis jedis) {
try {
jedis.getClient().getAll();
} catch (RuntimeException ex) {
}
}
@Override
protected Client getClient(String key) {
byte[] bKey = SafeEncoder.encode(key);
return getClient(bKey);
}
@Override
protected Client getClient(byte[] key) {
Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));
Client client = jedis.getClient();
clients.add(client);
return client;
}
private Jedis getJedis(int slot) {
JedisPool pool = clusterInfoCache.getSlotPool(slot);
// 根據pool從緩存中獲取Jedis
Jedis jedis = jedisMap.get(pool);
if (null == jedis) {
jedis = pool.getResource();
jedisMap.put(pool, jedis);
}
hasDataInBuf = true;
return jedis;
}
private static Field getField(Class<?> cls, String fieldName) {
try {
Field field = cls.getDeclaredField(fieldName);
field.setAccessible(true);
return field;
} catch (NoSuchFieldException | SecurityException e) {
throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);
}
}
@SuppressWarnings({"unchecked" })
private static <T> T getValue(Object obj, Field field) {
try {
return (T)field.get(obj);
} catch (IllegalArgumentException | IllegalAccessException e) {
LOGGER.error("get value fail", e);
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws IOException {
Set<HostAndPort> nodes = new HashSet<HostAndPort>();
nodes.add(new HostAndPort("127.0.0.1", 7000));
nodes.add(new HostAndPort("127.0.0.1", 7001));
nodes.add(new HostAndPort("127.0.0.1", 7002));
JedisCluster jc = new JedisCluster(nodes);
long s = System.currentTimeMillis();
JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jc);
jcp.refreshCluster();
List<Object> batchResult = null;
try {
// batch write
for (int i = 0; i < 100; i++) {
jcp.set("k" + i, "v1" + i);
}
jcp.sync();
// batch read
for (int i = 0; i < 10000; i++) {
jcp.get("k" + i);
}
batchResult = jcp.syncAndReturnAll();
} finally {
jcp.close();
}
// output time
long t = System.currentTimeMillis() - s;
System.out.println(t);
System.out.println(batchResult.size());
// 實際業務代碼中,close要在finally中調,這里之所以沒這么寫,是因為懶
jc.close();
}
}
