前言
redis的pipeline可以一次性發送多個命令去執行,在執行大量命令時,可以減少網絡通信次數提高效率。但是很可惜,redis的集群並不支持pipeline語法(只是不提供相應的方法而已)。不過只要稍稍看下jedis的源碼,就可以發現雖然沒有現成的輪子,但是卻很好造。
一、簡介
先說下redis集群的簡單結構和數據的定位規則(見下圖)。redis提供了16384個槽點,並為每個節點分配若干槽位,操作redis數據時會根據key進行hash,然后找到對應的節點進行操作,這也解釋了為什么jedisCluster不支持pipeline。因為pipeline中若干個需要操作的key可能位於不同的分片,如果想要獲取數據就必須進行一次請求的轉發(可能這個詞不標准,但是好理解,或者稱之為漂移吧),這與pipeline為了減少網絡通信次數的本意沖突。那我們只要根據key進行hash運算,然后再根據hash值獲取連接,接着按照連接對所有的key進行分組,保證同一pipeline內所有的key都對應一個節點就好了,最后通過pipeline執行。試試吧,萬一好使了呢
二、思路
既然知道了原因和流程,我們就試下能不能造輪子吧。首先我們需要hash算法以及根據hash結果獲取JedisPool的方法,很不巧的是,jedis都提供了。
為了實現上面的功能,我們需要一個類和兩個屬性,類是JedisClusterCRC16(hash算法),兩個屬性分別是connectionHandler(用於獲取cache)和cache(根據hash值獲取連接)
有興趣的同學可以看下JedisCluster的源碼,它集成自BinaryJedisCluster,在BinaryJedisCluster有個connectionHandler屬性,恰巧它又是protected修飾的,這不分明就是讓你繼承么。而cache屬性在JedisClusterConnectionHandler中,注意這個類是抽象類,輕易的重寫會導致整個功能都不能用了(如果內部實現不是很了解,繼承往往優於重寫,因為可以super嘛),我們發現它有一個實現類JedisSlotBasedConnectionHandler,那我們繼承這個類就好了。詳細的設計如下圖:
三、實現
import lombok.extern.slf4j.Slf4j; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.*; import redis.clients.util.JedisClusterCRC16; /** * @author zhangyining on 18/12/1 001. */ @Slf4j public class JedisPipelineCluster extends JedisCluster { static { cluster = init(); } private static JedisPipelineCluster cluster; public static JedisPipelineCluster getCluster(){ return cluster; } private static JedisPipelineCluster init(){ //todo 連接代碼省略... return jedisCluster; } public JedisPool getJedisPoolFromSlot(String redisKey) { return getConnectionHandler().getJedisPoolFromSlot(redisKey); } private JedisPipelineCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts,final GenericObjectPoolConfig poolConfig){ super(jedisClusterNode, timeout, maxAttempts, poolConfig); //繼承可以添加個性化的方法,仔細看構造方法其實和父類是一樣的 connectionHandler = new JedisSlotAdvancedConnectionHandler(jedisClusterNode, poolConfig, timeout, timeout); } private JedisSlotAdvancedConnectionHandler getConnectionHandler() { return (JedisSlotAdvancedConnectionHandler)this.connectionHandler; } private class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler { private JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) { super(nodes, poolConfig, connectionTimeout, soTimeout); } private JedisPool getJedisPoolFromSlot(String redisKey) { int slot = JedisClusterCRC16.getSlot(redisKey); JedisPool connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { return connectionPool; } else { renewSlotCache(); connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { return connectionPool; } else { throw new RuntimeException("No reachable node in cluster for slot " + slot); } } } } }
四、測試
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Pipeline; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author zhangyining on 18/12/1 001. */ public class Test { public static void main(String[] args) { JedisPipelineCluster cluster = JedisPipelineCluster.getCluster(); String[] testKeys = {"USER_FEAT_10013425884935", "USER_FEAT_10006864229638", "USER_FEAT_10008005187846"}; Map<JedisPool, List<String>> poolKeys = new HashMap<>(); for (String key : testKeys) { JedisPool jedisPool = cluster.getJedisPoolFromSlot(key); if (poolKeys.keySet().contains(jedisPool)){ List<String> keys = poolKeys.get(jedisPool); keys.add(key); }else { List<String> keys = new ArrayList<>(); keys.add(key); poolKeys.put(jedisPool, keys); } } for (JedisPool jedisPool : poolKeys.keySet()) { Jedis jedis = jedisPool.getResource(); Pipeline pipeline = jedis.pipelined(); List<String> keys = poolKeys.get(jedisPool); keys.forEach(key ->pipeline.get(key)); List result = pipeline.syncAndReturnAll(); System.out.println(result); jedis.close(); } } }
五、總結
之前看到過有人通過反射來獲取connectionHandler和cache屬性的,個人覺得反射雖然強大,但是明明可以繼承卻反射,有點怪怪的,看個人習慣吧。總之不管是那種方式,重點是熟悉redis集群是怎么分配數據以及執行請求的,剩下的不過是用不同的地方話(不用方式的代碼)說出來而已