Jedis cluster集群初始化源碼剖析
環境
jar版本: spring-data-redis-1.8.4-RELEASE.jar、jedis-2.9.0.jar
測試環境: Redis 3.2.8,八個集群節點
applicationContext-redis-cluster.xml 配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- 連接池配置. --> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <!-- 連接池中最大連接數。高版本:maxTotal,低版本:maxActive --> <property name="maxTotal" value="8" /> <!-- 連接池中最大空閑的連接數. --> <property name="maxIdle" value="4" /> <!-- 連接池中最少空閑的連接數. --> <property name="minIdle" value="1" /> <!-- 當連接池資源耗盡時,調用者最大阻塞的時間,超時將跑出異常。單位,毫秒數;默認為-1.表示永不超時。高版本:maxWaitMillis,低版本:maxWait --> <property name="maxWaitMillis" value="5000" /> <!-- 連接空閑的最小時間,達到此值后空閑連接將可能會被移除。負值(-1)表示不移除. --> <property name="minEvictableIdleTimeMillis" value="300000" /> <!-- 對於“空閑鏈接”檢測線程而言,每次檢測的鏈接資源的個數。默認為3 --> <property name="numTestsPerEvictionRun" value="3" /> <!-- “空閑鏈接”檢測線程,檢測的周期,毫秒數。如果為負值,表示不運行“檢測線程”。默認為-1. --> <property name="timeBetweenEvictionRunsMillis" value="60000" /> <!-- testOnBorrow:向調用者輸出“鏈接”資源時,是否檢測是有有效,如果無效則從連接池中移除,並嘗試獲取繼續獲取。默認為false。建議保持默認值. --> <!-- testOnReturn:向連接池“歸還”鏈接時,是否檢測“鏈接”對象的有效性。默認為false。建議保持默認值. --> <!-- testWhileIdle:向調用者輸出“鏈接”對象時,是否檢測它的空閑超時;默認為false。如果“鏈接”空閑超時,將會被移除。建議保持默認值. --> <!-- whenExhaustedAction:當“連接池”中active數量達到閥值時,即“鏈接”資源耗盡時,連接池需要采取的手段, 默認為1(0:拋出異常。1:阻塞,直到有可用鏈接資源。2:強制創建新的鏈接資源) --> </bean> <bean id="n1" class="org.springframework.data.redis.connection.RedisNode"> <constructor-arg value="127.0.0.1" /> <constructor-arg value="6379" type="int" /> </bean> <bean id="n2" class="org.springframework.data.redis.connection.RedisNode"> <constructor-arg value="127.0.0.1" /> <constructor-arg value="6380" type="int" /> </bean> <bean id="n3" class="org.springframework.data.redis.connection.RedisNode"> <constructor-arg value="127.0.0.1" /> <constructor-arg value="6381" type="int" /> </bean> <bean id="redisClusterConfiguration" class="org.springframework.data.redis.connection.RedisClusterConfiguration"> <property name="clusterNodes"> <set> <ref bean="n1" /> <ref bean="n2" /> <ref bean="n3" /> </set> </property> <property name="maxRedirects" value="5" /> </bean> <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <constructor-arg ref="redisClusterConfiguration" /> <constructor-arg ref="jedisPoolConfig" /> </bean> <!-- Spring提供的訪問Redis類. --> <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> <property name="connectionFactory" ref="jedisConnectionFactory" /> <property name="KeySerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> <property name="ValueSerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> <property name="hashKeySerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> <property name="hashValueSerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> </bean> <!-- Redis配置結束 --> </beans>
Jedis集群類說明
Jedis與Redis集群交互時,涉及的類可以分為兩類,分別如下:
1、Redis集群信息配置類:
| 類名 | 說明 |
|---|---|
| redis.clients.jedis.JedisPoolConfig | 保存Jedis連接池配置信息 |
| org.springframework.data.redis.connection.RedisNode | 保存Redis集群節點信息 |
| org.springframework.data.redis.connection.RedisClusterConfiguration | 保存Redis集群配置信息 |
| org.springframework.data.redis.connection.jedis.JedisConnectionFactory | Jedis連接工廠,負責創建JedisCluster集群操作類,獲取Redis連接對象 |
| org.springframework.data.redis.connection.jedis.JedisClusterConnection | 在JedisCluster基礎上實現,根據key類型使用具體的Jedis類與Redis進行交互 |
2、Redis集群信息操作類:
| 類名 | 說明 |
|---|---|
| redis.clients.jedis.JedisCluster | 擴展了BinaryJedisCluster類,負責與Redis集群進行String類型的key交互 |
| redis.clients.jedis.BinaryJedisCluster | JedisCluster的父類,負責與Redis集群進行byte[]類型的key交互 |
| redis.clients.jedis.JedisSlotBasedConnectionHandler | JedisClusterConnectionHandler類的子類,負責根據key的slot值獲取Redis連接 |
| redis.clients.jedis.JedisClusterConnectionHandler | 一個抽象類,負責初始化、重建、重置Redis slot槽緩存 |
| redis.clients.jedis.JedisClusterInfoCache | Redis slot緩存類,負責保存、重建和自動發現Redis slot槽與集群節點的關系 |
Jedis集群初始化流程
集群初始化入口:JedisConnectionFactory類
從上面的配置文件applicationContext-redis-cluster.xml中我們聲明了JedisConnectionFactory這個類:
<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <constructor-arg ref="redisClusterConfiguration" /> <constructor-arg ref="jedisPoolConfig" /> </bean>
這個類是用來創建、管理和銷毀Jedis與Redis集群的連接的。由於我們在Spring配置文件中聲明了這個類,因此當應用啟動時,Spring會自動加載該類,Jedis集群信息初始化的動作也由此開始。該類初始化的方法代碼如下:
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory { private JedisPoolConfig poolConfig = new JedisPoolConfig(); private RedisClusterConfiguration clusterConfig; public JedisConnectionFactory(RedisClusterConfiguration clusterConfig, JedisPoolConfig poolConfig) { this.clusterConfig = clusterConfig; this.poolConfig = poolConfig; } /* * (non-Javadoc) * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ public void afterPropertiesSet() { if (shardInfo == null) { shardInfo = new JedisShardInfo(hostName, port); if (StringUtils.hasLength(password)) { shardInfo.setPassword(password); } if (timeout > 0) { setTimeoutOn(shardInfo, timeout); } } if (usePool && clusterConfig == null) { this.pool = createPool(); } //如果集群配置信息不為空,則創建JedisCluster對象 if (clusterConfig != null) { this.cluster = createCluster(); } } }
在上面的配置文件中,我們使用構造函數注入的方式初始化了JedisConnectionFactory,由於該類實現了InitializingBean接口,因此在它被初始化之后會調用afterPropertiesSet()方法,在該方法中會根據clusterConfig集群配置信息是否為空來創建JedisCluster對象。createCluster()代碼定義如下:
private JedisCluster createCluster() { JedisCluster cluster = createCluster(this.clusterConfig, this.poolConfig); this.clusterCommandExecutor = new ClusterCommandExecutor( new JedisClusterConnection.JedisClusterTopologyProvider(cluster), new JedisClusterConnection.JedisClusterNodeResourceProvider(cluster), EXCEPTION_TRANSLATION); return cluster; } /** * Creates {@link JedisCluster} for given {@link RedisClusterConfiguration} and {@link GenericObjectPoolConfig}. * * @param clusterConfig must not be {@literal null}. * @param poolConfig can be {@literal null}. * @return * @since 1.7 */ protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig, GenericObjectPoolConfig poolConfig) { Assert.notNull(clusterConfig, "Cluster configuration must not be null!"); Set<HostAndPort> hostAndPort = new HashSet<HostAndPort>(); for (RedisNode node : clusterConfig.getClusterNodes()) { hostAndPort.add(new HostAndPort(node.getHost(), node.getPort())); } int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects().intValue() : 5; return StringUtils.hasText(getPassword()) ? new JedisCluster(hostAndPort, timeout, timeout, redirects, password, poolConfig) : new JedisCluster(hostAndPort, timeout, redirects, poolConfig); }
上面的代碼調用了JedisCluster的構造函數來創建JedisCluster對象,JedisCluster使用super關鍵字調用父類的構造函數:
public JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, final GenericObjectPoolConfig poolConfig) { super(jedisClusterNode, timeout, maxAttempts, poolConfig); }
BinaryJedisCluster構造函數:
public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, final GenericObjectPoolConfig poolConfig) { this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig, timeout); this.maxAttempts = maxAttempts; }
集群信息獲取:JedisClusterInfoCache類
初始化流程到這里,主要的部分就要浮出水面了。在BinaryJedisCluster類的構造函數中初始化了JedisSlotBasedConnectionHandler類,該類的出現說明Jedis要開始獲取Redis集群的slot槽和Redis集群節點信息了,該類也是使用super關鍵字調用父類構造函數來初始化的,它的父類JedisClusterConnectionHandler構造函數如下:
public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) { this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password); //這里是關鍵 initializeSlotsCache(nodes, poolConfig, password); }
JedisClusterConnectionHandler類的構造函數中創建了JedisClusterInfoCache對象,並調用initializeSlotsCache()方法對Redis集群信息進行初始化。該類的主要方法如下:
public Jedis getConnectionFromNode(HostAndPort node) { return cache.setupNodeIfNotExist(node).getResource(); } public Map<String, JedisPool> getNodes() { return cache.getNodes(); } private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) { for (HostAndPort hostAndPort : startNodes) { Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); if (password != null) { jedis.auth(password); } try { cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } } public void renewSlotCache() { cache.renewClusterSlots(null); } public void renewSlotCache(Jedis jedis) { cache.renewClusterSlots(jedis); } @Override public void close() { cache.reset(); }
可以看到,該類主要還是調用JedisClusterInfoCache對象的方法來完成slot的相關操作。因此我們重點看一下JedisClusterInfoCache類。
JedisClusterInfoCache類主要負責發送cluster slots命令來獲取Redis集群節點的槽和Redis集群節點信信息,並將相應信息保存到Map緩存中。我們使用redis-cli客戶端工具連接上任意一個Redis中的集群節點,向Redis發送該命令之后,獲得的結果如下:
127.0.0.1:6379> cluster slots
1) 1) (integer) 12288 2) (integer) 16383 3) 1) "127.0.0.1" 2) (integer) 6382 3) "65aea5fc4485bc7c0c3c4425fb3f500c562ee243" 4) 1) "127.0.0.1" 2) (integer) 6386 3) "4061e306b094e707b6f4a7c8cd8e82bd61155060" 2) 1) (integer) 4096 2) (integer) 8191 3) 1) "127.0.0.1" 2) (integer) 6380 3) "c6e1b3691b968b009357dcac3349afbcd557fd8c" 4) 1) "127.0.0.1" 2) (integer) 6384 3) "f915c7e6812a7d8fbe637c782ad261cd453022b2" 3) 1) (integer) 0 2) (integer) 4095 3) 1) "127.0.0.1" 2) (integer) 6379 3) "91bb43a956a04a9812e4d6950efebbb2e0f646fd" 4) 1) "127.0.0.1" 2) (integer) 6383 3) "c1d9d907f6905dd826dad774d127b75484ef8ea8" 4) 1) (integer) 8192 2) (integer) 12287 3) 1) "127.0.0.1" 2) (integer) 6381 3) "745936c1192bc1b136fd1f5df842bc1dd517ef36" 4) 1) "127.0.0.1" 2) (integer) 6385 3) "1c07bd8406156122eb4855d2e8b36e785e7901c7"
我現在本地的Redis集群有八個節點,四個主節點,四個從節點,通過cluster slots命令的結果都可以清楚地看到這些節點信息。這個命令的每一組結果由四個部分組成:起始槽節點、終止槽節點、主節點IP和端口加節點ID、從節點IP和端口加節點ID。
在JedisClusterInfoCache類中,相關的源碼如下:
public class JedisClusterInfoCache { // 保存Redis集群節點和節點連接池信息:key為節點地址、value為連接池 private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>(); // 保存Redis集群節點槽和槽所在的主節點連接池信息:key為節點槽、value為連接池 private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>(); // 使用讀寫鎖保證nodes和slots兩個map的寫安全 private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); // 重建緩存的標識變量,false為未進行,true為正在進行 private volatile boolean rediscovering; private final GenericObjectPoolConfig poolConfig; private int connectionTimeout; private int soTimeout; private String password; // 主節點索引位置標識,遍歷cluster slots結果時使用 private static final int MASTER_NODE_INDEX = 2; public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) { this(poolConfig, timeout, timeout, null); } public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, final String password) { this.poolConfig = poolConfig; this.connectionTimeout = connectionTimeout; this.soTimeout = soTimeout; this.password = password; } /** * 在jedis封裝的redis集群節點信息上發送cluster slots命令,獲取所有集群節點信息和槽信息 * * @param jedis */ public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock();// 由當前線程獲得寫鎖,在當前線程操作未結束之前,其他線程只能等待 try { reset();// 重置nodes、slots兩個Map,釋放JedisPool連接池資源 List<Object> slots = jedis.clusterSlots();// 在redis集群節點信息上發送cluster slots命令,獲取所有集群節點信息和槽信息 // 遍歷slots集合,保存Redis集群節點和節點連接池信息到nodes Map中,保存Redis集群節點槽和槽所在的主節點連接池信息到slots Map中 for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } // 獲取槽節點集合 List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos int size = slotInfo.size(); // 遍歷slots集合元素中的主從節點信息,保存Redis集群節點和節點連接池信息到nodes Map中,保存Redis集群節點槽和槽所在的主節點連接池信息到slots Map中 for (int i = MASTER_NODE_INDEX; i < size; i++) { List<Object> hostInfos = (List<Object>) slotInfo.get(i); if (hostInfos.size() <= 0) { continue; } // 獲取集群節點的服務器地址和端口 HostAndPort targetNode = generateHostAndPort(hostInfos); // 保存Redis集群節點和節點連接池信息到nodes Map中 setupNodeIfNotExist(targetNode); // 如果當前遍歷的是主節點信息,則保存Redis集群節點槽和槽所在的主節點連接池信息到slots Map中 if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); } } } } finally { w.unlock();// 釋放寫鎖,使其他線程使用 } } /** * 重建Cluster集群節點和Slot槽緩存 * * @param jedis */ public void renewClusterSlots(Jedis jedis) { // 如果重建操作未進行,則開始重建緩存操作 if (!rediscovering) { try { w.lock(); rediscovering = true;// 設重建緩存標識變量的值為true,表示重建操作正在進行 // 如果封裝redis連接信息的jedis對象不為空,則使用該節點進行重建緩存操作並返回 if (jedis != null) { try { discoverClusterSlots(jedis); return; } catch (JedisException e) { // try nodes from all pools } } // 如果封裝redis連接信息的jedis對象為空,則打亂nodes Map中保存的jedis連接池信息,遍歷連接池中的節點進行重建緩存操作並返回 for (JedisPool jp : getShuffledNodesPool()) { try { jedis = jp.getResource(); discoverClusterSlots(jedis); return; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } } finally { rediscovering = false;// 設重建緩存標識變量的值為false,表示重建操作未進行 w.unlock(); } } } /** * 邏輯類似discoverClusterNodesAndSlots方法 * * @param jedis */ private void discoverClusterSlots(Jedis jedis) { List<Object> slots = jedis.clusterSlots(); this.slots.clear(); for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX); if (hostInfos.isEmpty()) { continue; } // at this time, we just use master, discard slave information HostAndPort targetNode = generateHostAndPort(hostInfos); assignSlotsToNode(slotNums, targetNode); } } private HostAndPort generateHostAndPort(List<Object> hostInfos) { return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)), ((Long) hostInfos.get(1)).intValue()); } /** * 保存Redis集群節點和節點連接池信息到nodes Map中 * * @param node * @return */ public JedisPool setupNodeIfNotExist(HostAndPort node) { w.lock(); try { // 獲取節點key,形式為"服務器地址:端口" String nodeKey = getNodeKey(node); // 如果節點已存在nodes Map中,則直接返回 JedisPool existingPool = nodes.get(nodeKey); if (existingPool != null) return existingPool; // 創建節點相應的JedisPool連接池對象,並保存到nodes Map中,然后返回JedisPool連接池對象 JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(), connectionTimeout, soTimeout, password, 0, null, false, null, null, null); nodes.put(nodeKey, nodePool); return nodePool; } finally { w.unlock(); } } /** * 遍歷槽集合,保存Redis集群節點槽和槽所在的主節點連接池信息到slots Map中 * * @param targetSlots * @param targetNode */ public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) { w.lock(); try { JedisPool targetPool = setupNodeIfNotExist(targetNode); for (Integer slot : targetSlots) { slots.put(slot, targetPool); } } finally { w.unlock(); } } /** * 根據節點key獲取JedisPool連接池對象 * * @param nodeKey * @return */ public JedisPool getNode(String nodeKey) { r.lock(); try { return nodes.get(nodeKey); } finally { r.unlock(); } } /** * 根據slot槽值獲取JedisPool連接池對象 * * @param slot * @return */ public JedisPool getSlotPool(int slot) { r.lock(); try { return slots.get(slot); } finally { r.unlock(); } } /** * 獲取節點信息和節點對象對應的連接池信息 * * @return */ public Map<String, JedisPool> getNodes() { r.lock(); try { return new HashMap<String, JedisPool>(nodes); } finally { r.unlock(); } } /** * 獲取nodes Map打亂順序后的Redis集群節點連接池信息 * * @return */ public List<JedisPool> getShuffledNodesPool() { r.lock(); try { List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values()); Collections.shuffle(pools); return pools; } finally { r.unlock(); } } /** * 清空集群節點集合和槽集合,釋放JedisPool資源 */ public void reset() { w.lock(); try { for (JedisPool pool : nodes.values()) { try { if (pool != null) { pool.destroy(); } } catch (Exception e) { // pass } } nodes.clear(); slots.clear(); } finally { w.unlock(); } } public static String getNodeKey(HostAndPort hnp) { return hnp.getHost() + ":" + hnp.getPort(); } /** * 遍歷槽區間,獲取槽節點集合 * * @param slotInfo * @return */ private List<Integer> getAssignedSlotArray(List<Object> slotInfo) { List<Integer> slotNums = new ArrayList<Integer>(); for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); slot++) { slotNums.add(slot); } return slotNums; } }
總結
Jedis初始化Redis集群信息時,先使用JedisConnectionFactory獲取JedisCluster對象,再根據JedisCluster去逐步引出JedisClusterInfoCache對象完成Redis集群信息的獲取。在這個類中,主要有以下幾點:
- 整個類的核心是discoverClusterNodesAndSlots方法,它在jedis封裝的redis集群節點上發送cluster slots命令,來獲取所有集群節點信息和槽信息,然后分別緩存在nodes和slots兩個HashMap中
- 讀寫鎖一般在讀多寫少的場景下使用。進行Redis集群信息保存和獲取操作時,使用了讀寫鎖ReentrantReadWriteLock,保證寫和寫之間互斥,避免一個寫操作影響另外一個寫操作,引發線程安全問題
- 在定義重建緩存標識變量rediscovering時,使用了volatile關鍵字,保證重建緩存的操作對於其他線程的內存可見性,使JVM主內存與方法線程工作內存狀態同步
- 客戶端內部維護slots緩存表,並且針對每個節點維護連接池,當集群規模非常大時,客戶端會維護非常多的連接並消耗更多的內存
