Jedis源碼分析:JedisClusterConnectionHandler


JedisClusterConnectionHandler

   JedisClusterConnectionHandler提供了JedisCluster接口獲取資源池中Jedis連接對象的一個門面類,JedisClusterConnectionHandler提供了初始化集群,獲取資源池中連接對象,刷新資源池等各種方法。JedisClusterConnectionHandler必須依賴於JedisClusterInfoCache類,因為 JedisClusterConnectionHandler本身的方法都是基於JedisClusterInfoCache類實現的,下列所講解的方法的底層實現會在JedisClusterInfoCache進行講述

   JedisClusterConnectionHandler實例化時必須提供以下的參數(見代碼1-1),這些參數本身對於JedisClusterConnectionHandler沒什么用處,完全就是在該類的構造器中走了一遭,因為JedisClusterConnectionHandler內部維護着JedisClusterInfoCache實例。JedisClusterConnectionHandler將這些參數傳遞給cache實例后,通過initializeSlotsCache()方法獲取集群信息,並調用cache.discoverClusterNodesAndSlots()將獲取到的集群信息分別放入nodes緩存和slots緩存中。這兩個緩存的存儲形式為 Map<String, JedisPool>,Map<Integer, JedisPool> ,雖然都是Map,但是存儲的數據內容完全不同,nodes存儲着集群所有節點信息,存儲形式為<"host:port",JedisPool>,而slots存儲的信息為<slot,JedisPool>,並且slots只存儲主節點信息。

   從代碼1-2中看出,通過遍歷節點集合,實例化Jedis對象,再調用cache.discoverClusterNodesAndSlots(jedis)方法將節點放入緩存Map中,獲取集群信息的方法也很簡單,通過發送'cluster slots'便知道槽和節點的對應情況,再將Redis服務端返回來的輸入流解析成Java對象即可。這里注明下,發現集群狀況只需要一個可用的Jedis實例即可,但是集群節點總會發生"掛掉"的情況,建議在使用JedisCluster時配上所有的節點信息。

代碼1-1

public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) { //實例化集群信息緩存
    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap); initializeSlotsCache(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier); }

 

代碼1-2 集群發現及初始化緩存池

 private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) { //遍歷節點集合,拿出一個可用的節點獲取集群信息
      for (HostAndPort hostAndPort : startNodes) { Jedis jedis = null; try { jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier); //添加密碼認證
        if (password != null) { jedis.auth(password); } //設置客戶端名稱
        if (clientName != null) { jedis.clientSetname(clientName); } //TODO 根據節點發現集群信息
 cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes
      } finally { if (jedis != null) { jedis.close(); } } } }

 

集群節點的獲取

   JedisClusterConnectionHandler提供了四種獲取可用節點的方法,其中 getConnection()和 getConnectionFromSlot(int slot);被定義為抽象方法,交由JedisSlotBasedConnectionHandler子類進行實現。值得注意的是,在獲取Jedis連接時,有兩個比較重要的點需要說明一下。前面說過JedisClusterInfoCache存放着nodes和slots兩個map對象,通常集群模式都會做主從分離,即master節點負責讀寫操作,slave節點負責讀和備份.如果嘗試對slave節點進行寫操作,那么會發生寫入錯誤。所以提供了getConnectionFromSlot(int slot) 方法來根據槽點值獲取對應的主節點信息,如果此時后端集群發生主從切換,該方法會嘗試刷新集群狀態來保證獲取可用的Jedis對象。如果集群頻繁的出現切換,那么即使刷新了集群信息也不能保證獲取到的Jedis對象一定是有效的,代碼中也做了說明"It can't guaranteed to get valid connection because of node assignment(因為節點的分配問題並不能獲取到有效連接)",所以代碼中采用了重試機制和隨機數來盡量保證獲取到的是有效的Jedis連接。第二個點是Jedis包依賴於Apache-CommonsPool2來做Jedis的二級緩存,為了保證Jedis是有效的,Jedis包下的JedisFactory實現了PooledObjectFactory相關方法,通過配置DEFAULT_TEST_ON_CREATE,DEFAULT_TEST_ON_BORROW,DEFAULT_TEST_ON_RETURN,DEFAULT_NUM_TESTS_PER_EVICTION_RUN 參數保證在創建階段,借用階段,歸還階段時Jedis對象都是可用的。

  • Jedis getConnectionFromNode(HostAndPort node)  使用二級緩存,先從JedisClusterInfoCache中獲取緩存對象,再去commons-pool包下獲取Jedis對象,如果commons-pool2中沒有存儲該對象,那么先將該對象放入通用緩存池中再進行獲取。
  • Map<String, JedisPool> getNodes() 直接從JedisClusterInfoCache 的nodes緩存中獲取所有JedisPool對象
  • Jedis getConnection() 遍歷nodes中的節點,如果存在一個jedis對象能夠成功執行ping命令,那么返回該Jedis對象。如果遍歷完所有的node節點仍然不能找到一個有效的節點,那么拋出JedisNoReachableClusterNodeException異常
  • Jedis getConnectionFromSlot(int slot) 根據槽點值獲取對應的主節點信息,slots緩存map中存放着槽點值->master node的對應關系。

代碼2-1:從緩存池中獲取Jedis對象

public Jedis getConnectionFromNode(HostAndPort node) { //使用二級緩存,先從JedisClusterInfoCache中獲取緩存對象,再去commons-pool包下獲取Jedis對象
    return cache.setupNodeIfNotExist(node).getResource(); } public Map<String, JedisPool> getNodes() { //直接從JedisClusterInfoCache 的nodes緩存中獲取所有JedisPool對象
    return cache.getNodes(); } public Jedis getConnection() { // In antirez's redis-rb-cluster implementation, // getRandomConnection always return valid connection (able to // ping-pong) // or exception if all connections are invalid //隨機獲取
    List<JedisPool> pools = cache.getShuffledNodesPool(); /** * 遍歷緩存池中的每個池對象 */
    for (JedisPool pool : pools) { Jedis jedis = null; try { jedis = pool.getResource(); if (jedis == null) { continue; } String result = jedis.ping();//嘗試進行ping
        if (result.equalsIgnoreCase("pong")) return jedis;//返回可用連接
 jedis.close(); } catch (JedisException ex) { if (jedis != null) { jedis.close(); } } } //遍歷完成后仍然無法獲取有效實例
    throw new JedisNoReachableClusterNodeException("No reachable node in cluster"); } public Jedis getConnectionFromSlot(int slot) { //從slots中獲取
    JedisPool connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { // It can't guaranteed to get valid connection because of node // assignment
      return connectionPool.getResource(); } else { //刷新集群信息
      renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
      connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { //TODO
        return connectionPool.getResource(); } else { //no choice, fallback to new connection to random node //回到隨機獲取
        return getConnection(); } } }

 

集群信息的重置與關閉

JedisClusterConnectionHandler提供了三個方法來進行集群信息的刷新和關閉操作,詳細的說明見《Jedis源碼分析:JedisClusterInfoCache》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM