上一篇文章《redis pipeline批量處理提高性能》中我們講到redis pipeline模式在批量數據處理上帶來了很大的性能提升,我們先來回顧一下pipeline的原理,redis client與server之間采用的是請求應答的模式,如下所示:
Client: command1
Server: response1
Client: command2
Server: response2
…
在這種情況下,如果要完成10個命令,則需要20次交互才能完成。因此,即使redis處理能力很強,仍然會受到網絡傳輸影響,導致吞吐量上不去。而在管道(pipeline)模式下,多個請求可以變成這樣:
Client: command1,command2…
Server: response1,response2…
在這種情況下,完成命令只需要2次交互。這樣網絡傳輸上能夠更加高效,加上redis本身強勁的處理能力,給數據處理帶來極大的性能提升。但實際上遇到的問題是,項目上所用到的是Redis集群,初始化的時候使用的類是JedisCluster而不是Jedis。去查了JedisCluster的文檔,並沒有發現提供有像Jedis一樣的獲取Pipeline對象的 pipelined()方法。
為什么RedisCluster無法使用pipeline?
我們知道,Redis 集群的鍵空間被分割為 16384 個槽(slot),集群的最大節點數量也是 16384 個。每個主節點都負責處理 16384 個哈希槽的其中一部分。具體的redis命令,會根據key計算出一個槽位(slot),然后根據槽位去特定的節點redis上執行操作。如下所示:
master1(slave1): 0~5460
master2(slave2):5461~10922
master3(slave3):10923~16383
集群有三個master節點組成,其中master1分配了 0~5460的槽位,master2分配了 5461~10922的槽位,master3分配了 10923~16383的槽位。
一次pipeline會批量執行多個命令,那么每個命令都需要根據“key”運算一個槽位(JedisClusterCRC16.getSlot(key)),然后根據槽位去特定的機器執行命令,也就是說一次pipeline操作會使用多個節點的redis連接,而目前JedisCluster是無法支持的。
如何基於JedisCluster擴展pipeline?
設計思路
1.首先要根據key計算出此次pipeline會使用到的節點對應的連接(也就是jedis對象,通常每個節點對應一個Pool)。
2.相同槽位的key,使用同一個jedis.pipeline去執行命令。
3.合並此次pipeline所有的response返回。
4.連接釋放返回到池中。
也就是將一個JedisCluster下的pipeline分解為每個單節點下獨立的jedisPipeline操作,最后合並response返回。具體實現就是通過JedisClusterCRC16.getSlot(key)計算key的slot值,通過每個節點的slot分布,就知道了哪些key應該在哪些節點上。再獲取這個節點的JedisPool就可以使用pipeline進行讀寫了。
實現上面的過程可以有很多種方式,本文將介紹一種也許是代碼量最少的一種解決方案。
解決方案
上面提到的過程,其實在JedisClusterInfoCache對象中都已經幫助開發人員實現了,但是這個對象在JedisClusterConnectionHandler中為protected並沒有對外開放,而且通過JedisCluster的API也無法拿到JedisClusterConnectionHandler對象。所以通過下面兩個類將這些對象暴露出來,這樣使用getJedisPoolFromSlot就可以知道每個key對應的JedisPool了。
Maven依賴
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
JedisClusterPipeline
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import java.util.Set;
public class JedisClusterPipeline extends JedisCluster {
public JedisClusterPipeline(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String password, final GenericObjectPoolConfig poolConfig) {
super(jedisClusterNode,connectionTimeout, soTimeout, maxAttempts, password, poolConfig);
super.connectionHandler = new JedisSlotAdvancedConnectionHandler(jedisClusterNode, poolConfig,
connectionTimeout, soTimeout ,password);
}
public JedisSlotAdvancedConnectionHandler getConnectionHandler() {
return (JedisSlotAdvancedConnectionHandler)this.connectionHandler;
}
/**
* 刷新集群信息,當集群信息發生變更時調用
* @param
* @return
*/
public void refreshCluster() {
connectionHandler.renewSlotCache();
}
}
JedisSlotAdvancedConnectionHandler
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException;
import java.util.Set;
public class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler {
public JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout,String password) {
super(nodes, poolConfig, connectionTimeout, soTimeout, password);
}
public JedisPool getJedisPoolFromSlot(int slot) {
JedisPool connectionPool = cache.getSlotPool(slot);
if (connectionPool != null) {
// It can't guaranteed to get valid connection because of node
// assignment
return connectionPool;
} else {
renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
connectionPool = cache.getSlotPool(slot);
if (connectionPool != null) {
return connectionPool;
} else {
throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot " + slot);
}
}
}
}
編寫測試類,向redis集群寫入10000條數據,分別測試調用普通JedisCluster模式和調用上面實現的JedisCluster Pipeline模式的性能對比,測試類如下:
import redis.clients.jedis.*;
import redis.clients.util.JedisClusterCRC16;
import java.io.UnsupportedEncodingException;
import java.util.*;
public class PipelineTest {
public static void main(String[] args) throws UnsupportedEncodingException {
PipelineTest client = new PipelineTest();
Set<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("node1",20249));
nodes.add(new HostAndPort("node2",20508));
nodes.add(new HostAndPort("node3",20484));
String redisPassword = "123456";
//測試
client.jedisCluster(nodes,redisPassword);
client.clusterPipeline(nodes,redisPassword);
}
//普通JedisCluster 批量寫入測試
public void jedisCluster(Set<HostAndPort> nodes,String redisPassword) throws UnsupportedEncodingException {
JedisCluster jc = new JedisCluster(nodes, 2000, 2000,100,redisPassword, new JedisPoolConfig());
List<String> setKyes = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
setKyes.add("single"+i);
}
long start = System.currentTimeMillis();
for(int j = 0;j < setKyes.size();j++){
jc.setex(setKyes.get(j),100,"value"+j);
}
System.out.println("JedisCluster total time:"+(System.currentTimeMillis() - start));
}
//JedisCluster Pipeline 批量寫入測試
public void clusterPipeline(Set<HostAndPort> nodes,String redisPassword) {
JedisClusterPipeline jedisClusterPipeline = new JedisClusterPipeline(nodes, 2000, 2000,10,redisPassword, new JedisPoolConfig());
JedisSlotAdvancedConnectionHandler jedisSlotAdvancedConnectionHandler = jedisClusterPipeline.getConnectionHandler();
Map<JedisPool, List<String>> poolKeys = new HashMap<>();
List<String> setKyes = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
setKyes.add("pipeline"+i);
}
long start = System.currentTimeMillis();
//查詢出 key 所在slot ,通過 slot 獲取 JedisPool ,將key 按 JedisPool 分組
jedisClusterPipeline.refreshCluster();
for(int j = 0;j < setKyes.size();j++){
String key = setKyes.get(j);
int slot = JedisClusterCRC16.getSlot(key);
JedisPool jedisPool = jedisSlotAdvancedConnectionHandler.getJedisPoolFromSlot(slot);
if (poolKeys.keySet().contains(jedisPool)){
List<String> keys = poolKeys.get(jedisPool);
keys.add(key);
}else {
List<String> keys = new ArrayList<>();
keys.add(key);
poolKeys.put(jedisPool, keys);
}
}
//調用Jedis pipeline進行單點批量寫入
for (JedisPool jedisPool : poolKeys.keySet()) {
Jedis jedis = jedisPool.getResource();
Pipeline pipeline = jedis.pipelined();
List<String> keys = poolKeys.get(jedisPool);
for(int i=0;i<keys.size();i++){
pipeline.setex(keys.get(i),100, "value" + i);
}
pipeline.sync();//同步提交
jedis.close();
}
System.out.println("JedisCluster Pipeline total time:"+(System.currentTimeMillis() - start));
}
}
測試結果如下:
JedisCluster total time:29147
JedisCluster Pipeline total time:190
結論:對於批量操作,JedisCluster Pipeline有明顯的性能提升。
總結
本文旨在介紹一種在Redis集群模式下提供Pipeline批量操作的功能。基本思路就是根據redis cluster對數據哈希取模的算法,先計算數據存放的slot位置, 然后根據不同的節點將數據分成多批,對不同批的數據進行單點pipeline處理。
但是需要注意的是,由於集群模式存在節點的動態添加刪除,且client不能實時感知(只有在執行命令時才可能知道集群發生變更),因此,該實現不保證一定成功,建議在批量操作之前調用 refreshCluster() 方法重新獲取集群信息。應用需要保證不論成功還是失敗都會調用close() 方法,否則可能會造成泄露。如果失敗需要應用自己去重試,因此每個批次執行的命令數量需要控制,防止失敗后重試的數量過多。
基於以上說明,建議在集群環境較穩定(增減節點不會過於頻繁)的情況下使用,且允許失敗或有對應的重試策略。
