Redis集群擴容導致的Jedis客戶端報JedisMovedDataException異常


  注:這是我們線上遇到的問題,這里講同事的總結直接粘過來僅做一個記錄。

0 問題的產生

由於線上Redis集群內存使用量已經接近達到預警閾值,需要對Redis集群擴容。(使用的是Redis自帶的Redis-Cluster)

目前有6台主節點,6台從節點。

暫時稱為:

  • redis-master001 ~ redis-master006
  • redis-slave001 ~ redis-slave006

需要增加3主3從。

  • redis-master007 ~ redis-master009
  • redis-slave007 ~ redis-master009

之前Redis集群的16384個槽均勻分配在6台主節點中,每個節點2730個槽。

為保證擴容后,槽依然均勻分布,需要將之前6台的每台機器上遷移出910個槽,方案如下:

  • redis-master001的910個slot遷移到redis-master007
  • redis-master002的910個slot遷移到redis-master007
  • redis-master003的910個slot遷移到redis-master008
  • redis-master004的910個slot遷移到redis-master008
  • redis-master005的910個slot遷移到redis-master009
  • redis-master006的910個slot遷移到redis-master009

分配完之后,每台節點1820個slot。

當將redis-master001的910個slot遷移到redis-master007后,業務上開始報下面的異常

在馬賽克的上一行,可以看到是調Jedis的get方法出的問題。

1 原因及解決方案

問題的原因在於使用了Jedis客戶端,改為使用JedisCluster客戶端即可解決問題。

出問題的get方法是這樣寫的(在Jedis原生基礎上包裝了一層)

// 自己封裝的get方法 public String get(String key) { String result = ";  // 為了打印獲取連接耗時的日志,這里單獨獲取了一下Jedis連接  try (Jedis jedis = this.getResourceLog(key)) {  TimeCost timeCost = new TimeCost();  result = jedis.get(key); // 這里報錯  debugLogger.debug("redis cluster get TimeCost={}", timeCost.getCostMillSeconds()); } // 其實改為下面這樣get就可以解決一直報JedisMovedDataException問題  // return jedisCluster.get(key);  return result; }

getResourceLog方法的作用是根據key計算出這個key所在的slot,再通過slot獲取Redis連接。代碼如下

private Jedis getResourceLog(String key) { TimeCost tc = new TimeCost(); int slot = JedisClusterCRC16.getSlot(key); // CRC計算slot  debugLogger.debug("calc slot TimeCost={}", tc.getCostMillSeconds()); tc.reset(); Jedis jedis = connectionHandler.getConnectionFromSlot(slot); // 通過slot獲取連接  debugLogger.debug("get connection TimeCost={}", tc.getCostMillSeconds()); return jedis; }

上面的get方法可以直接改為JedisCluster的get方法解決。

再考慮另外一種情況,如果必須通過Jedis操作呢?比如watch方法,JedisCluster是不提供watch的,那么只能通過上述方法在Redis集群中根據key獲取到slot,再通過slot獲取到jedis鏈接,然后調用watch。這樣一來,在調watch的地方也會報JedisMovedDataException。

例如下面的代碼,在業務上需要保證事務的情況下(或樂觀鎖),可能會這樣實現:

Jedis jedis = null; String key = ...; // redis key try { // 通過上面的getResource方法獲取jedis鏈接  jedis = getResource(userId); // 通過jedis watch key  if (RedisConstants.SAVE_TO_REDIS_OK.equals(jedis.watch(key))) { // .... 業務邏輯 ....  // ....  // 通過jedis鏈接開始事務  Transaction transaction = jedis.multi(); // ...  // ... 執行一些transaction操作...  // ...  // 提交事務  List<Object> execResult = transaction.exec(); return ...; } } catch (Exception ex) { // do something ... } finally { if (jedis != null) { try { if (!flag) { jedis.unwatch(); } } finally { jedis.close(); } } }

此時如果發生slot遷移,就會報JedisMovedDataException。

那這種情況下的解決方案是什么呢?

其實,優先catch住JedisMovedDataException,然后通過JedisCluster.get(key);一下就行,如下:

Jedis jedis = null; String key = ...; // redis key try { // 通過上面的getResource方法獲取jedis鏈接  jedis = getResource(userId); // 通過jedis watch key  if (RedisConstants.SAVE_TO_REDIS_OK.equals(jedis.watch(key))) { // .... 業務邏輯 ....  // ....  // 通過jedis鏈接開始事務  Transaction transaction = jedis.multi(); // ...  // ... 執行一些transaction操作...  // ...  // 提交事務  List<Object> execResult = transaction.exec(); return ...; } } catch (JedisMovedDataException jmde) { jmde.printStackTrace(); // redisClusterService中維護着jedisCluster實例,這個get實際上調用的是jedisCluster的get  redisClusterService.get(key); return ...; } catch (Exception ex) { // do something ... } finally { if (jedis != null) { try { if (!flag) { jedis.unwatch(); } } finally { jedis.close(); } } }

需要注意的是,用Jedis的get是不能解決的。

2 JedisCluster類圖

JedisCluster整體的UML關系如下,先有個整體的印象,在后面的源碼分析中,可以再回來看。

 

 

3 為什么通過RedisCluster.get一下可以解決?

下面通過JedisCluster源碼解釋為什么這么做可以解決問題,注釋中會有詳細說明。

JedisCluster.get源碼如下:

@Override public String get(final String key) { return new JedisClusterCommand<String>(connectionHandler, maxAttempts) { @Override public String execute(Jedis connection) { return connection.get(key); } }.run(key); }

發現他是委托給JedisClusterCommand來完成get操作的,也可以發現execute方法實際上是使用Jedis來執行的get。這個Jedis實際上就是通過上述方法,先計算出slot,再通過slot獲取到Jedis鏈接的。關鍵在於最下面run方法的執行,下面具體看一下。

Run方法源碼如下:

public T run(String key) { // JedisClusterCRC16.getSlot(key) 計算出slot  return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null); }

runWithRetries源碼如下

private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) { // 這里是一個重試機制,報異常時觸發  if (attempts <= 0) { throw new JedisClusterMaxAttemptsException("No more cluster attempts left."); } Jedis connection = null; try { if (redirect != null) { connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode()); if (redirect instanceof JedisAskDataException) { // TODO: Pipeline asking with the original command to make it faster....  connection.asking(); } } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { // 執行到這里,通過slot獲取到Jedis connection  // 內部是通過一個map維護的slot到JedisPool的映射關系  connection = connectionHandler.getConnectionFromSlot(slot); } } // 執行上面JedisClusterCommand定義的execute方法。  return execute(connection); } catch (JedisNoReachableClusterNodeException jnrcne) { throw jnrcne; } catch (JedisConnectionException jce) { // release current connection before recursion  releaseConnection(connection); connection = null; if (attempts <= 1) { //We need this because if node is not reachable anymore - we need to finally initiate slots  //renewing, or we can stuck with cluster state without one node in opposite case.  //But now if maxAttempts = [1 or 2] we will do it too often.  //TODO make tracking of successful/unsuccessful operations for node - do renewing only  //if there were no successful responses from this node last few seconds  this.connectionHandler.renewSlotCache(); } return runWithRetries(slot, attempts - 1, tryRandomNode, redirect); } catch (JedisRedirectionException are) { // *** 關鍵在這 ***  // if MOVED redirection occurred,  // JedisMovedDataException是JedisRedirectionException的子類,所以會執行下面if中的代碼  if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache recommended by Redis cluster specification  // 重新通過這個jedis鏈接獲取RedisCluster中的Node信息以及slot信息  this.connectionHandler.renewSlotCache(connection); } // release current connection before recursion  releaseConnection(connection); connection = null; return runWithRetries(slot, attempts - 1, false, jre); } finally { releaseConnection(connection); } }

注釋中說到了最終會通過this.connectionHandler.renewSlotCache(connection);來重新獲取slot信息。下面來看下這個方法。

public void renewSlotCache(Jedis jedis) { cache.renewClusterSlots(jedis); }

調用了cache的renewClusterSlots方法來重新獲取slot信息,這個cache是JedisClusterInfoCache類的實例,他里面維護這Node和Slot信息,如下:

public class JedisClusterInfoCache { private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>(); private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>(); // .. }

renewClusterSlots方法如下

public void renewClusterSlots(Jedis jedis) { //If rediscovering is already in process - no need to start one more same rediscovering, just return  if (!rediscovering) { try { w.lock(); if (!rediscovering) { rediscovering = true; try { if (jedis != null) { try { // 關鍵在於這一步,這個方法會重新從遠程集群中獲取最新的slot信息  discoverClusterSlots(jedis); return; } catch (JedisException e) { //try nodes from all pools  } } for (JedisPool jp : getShuffledNodesPool()) { Jedis j = null; try { j = jp.getResource(); discoverClusterSlots(j); return; } catch (JedisConnectionException e) { // try next nodes  } finally { if (j != null) { j.close(); } } } } finally { rediscovering = false; } } } finally { w.unlock(); } } }

關鍵在於discoverClusterSlots方法,這個方法的實現如下:

private void discoverClusterSlots(Jedis jedis) { // 通過slots命令從遠程獲取slot信息  List<Object> slots = jedis.clusterSlots(); this.slots.clear(); // 清除本地緩存slot信息  // 每個slotInfoObj包含集群中某一節點的slot信息  for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } // 計算當前節點的slot信息  List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos  // 獲取這組slot所在的節點信息  List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX); if (hostInfos.isEmpty()) { continue; } // at this time, we just use master, discard slave information  HostAndPort targetNode = generateHostAndPort(hostInfos); // 重新關聯這組slot到遠程節點的映射,至此,完成slot信息的刷新  assignSlotsToNode(slotNums, targetNode); } }

4 為什么Jedis的get不行?

首先我們來對比一下JedisCluster的get和Jedis的get

JedisCluster.get

@Override public String get(final String key) { return new JedisClusterCommand<String>(connectionHandler, maxAttempts) { @Override public String execute(Jedis connection) { return connection.get(key); // 這里追蹤進去,就是Jedis.get  } }.run(key); }

Jedis.get

@Override public String get(final String key) { checkIsInMultiOrPipeline(); client.get(key); return client.getBulkReply(); }

由此可知,Jedis.get沒有了run方法中的異常重試和重新發現機制,所以Jedis.get不行。

5 總結

本文從一次線上擴容引發問題的討論,由擴容引出了slot的遷移,由slot的遷移引出線上報錯-JedisMovedDataException,然后說明了引發這個異常的原因,是因為我們使用了Jedis客戶端,導致無法自動發現遠程集群slot的變化。

然后提出了解決方案,通過使用JedisCluster來解決無法自動發現slot變化的問題。並從源碼的角度說明了為什么JedisCluster的get方法可以自動發現遠程slot的變化。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM