redis集群JedisCluster連接關閉問題


JedisCluster連接關閉問題

set方法為例

//偽代碼
JedisCluster jedisCluster = new JedisCluster();
jedisCluster.set("testKey", "testValue");

進入到set方法

  • 類JedisCluster中;

  • 初始化一個JedisClusterCommand對象,調用run方法;

  • 需要實現一個execute方法,通過Jedis調用set方法(這里又回到單節點調用set的方式了);

public String set(final String key, final String value) {
   return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
     @Override
     public String execute(Jedis connection) {
       return connection.set(key, value);
     }
   }.run(key);
 }

進入到run方法

  • 類JedisClusterCommand中;
public T run(String key) {
   return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
 }

進入到runWithRetries方法

  • 類JedisClusterCommand中;
  • 只需要關注2個地方即可;
    • return execute(connection),這里調用了之前實現的execute方法;
    • releaseConnection(connection),在finally中釋放了連接;
private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
    if (attempts <= 0) {
      throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");
    }

    Jedis connection = null;
    try {
		//此處為空,走else
      if (redirect != null) {
        connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
        if (redirect instanceof JedisAskDataException) {
          // TODO: Pipeline asking with the original command to make it faster....
          connection.asking();
        }
      } else {
        //此處是false,走else
        if (tryRandomNode) {
          connection = connectionHandler.getConnection();
        } else {
          //這里會從池中獲取一個Jedis對象
          connection = connectionHandler.getConnectionFromSlot(slot);
        }
      }
		//這里調用最開始實現的execute方法
      return execute(connection);

    } catch (JedisNoReachableClusterNodeException jnrcne) {
      throw jnrcne;
    } catch (JedisConnectionException jce) {
      // release current connection before recursion
      releaseConnection(connection);
      connection = null;

      if (attempts <= 1) {
        //We need this because if node is not reachable anymore - we need to finally initiate slots
        //renewing, or we can stuck with cluster state without one node in opposite case.
        //But now if maxAttempts = [1 or 2] we will do it too often.
        //TODO make tracking of successful/unsuccessful operations for node - do renewing only
        //if there were no successful responses from this node last few seconds
        this.connectionHandler.renewSlotCache();
      }

      return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
    } catch (JedisRedirectionException jre) {
      // if MOVED redirection occurred,
      if (jre instanceof JedisMovedDataException) {
        // it rebuilds cluster's slot cache recommended by Redis cluster specification
        this.connectionHandler.renewSlotCache(connection);
      }

      // release current connection before recursion
      releaseConnection(connection);
      connection = null;

      return runWithRetries(slot, attempts - 1, false, jre);
    } finally {
      //此處釋放了連接
      releaseConnection(connection);
    }
  }

進入到releaseConnection方法

  • 類JedisClusterCommand中;
  • 實際上是通過Jedis.close()關閉的,和我們用單節點時,是一樣的關閉方式;
private void releaseConnection(Jedis connection) {
    if (connection != null) {
      connection.close();
    }
  }

總結

  • 使用JedisCluster時,不需要手動釋放連接;
  • 在調用的過程中,會自動釋放連接;
  • 實際上是JedisCluster中通過JedisPool獲取Jedis來執行命令;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM