當我們要操作一批key時,可以通過 redis pipline 再執行完后一次性讀取所有結果來較少網絡傳輸的消耗; 很明顯,這有個限制條件 => 這批key的執行必須在同一個連接上
當部署的redis為 standalone 或 master-slave 結構的時候還好,可以從 pool 取出來的連接都是一個 master 節點的, 那要是 redis cluster 的時候怎么辦? 這批key 可能在同一個 redis node 也可能分散在多個 redis nodes 這樣就是多個連接了
redis cluster 雖然自動對 key 進行了分片,但是它對 client 的要求比較高,需要客戶端連接所有 cluster 內的節點(這個和 db client方案類似)並緩存 slots分配信息,然后在客戶端采用同樣的算法進行hash后定位 key 的 slot 進而定位 slot 所屬的 redis 節點,然后獲取對應節點的連接發送命令
cluster pipeline 實現思路
java 常用的客戶端 jedis,雖然提供了 redis-cluster 功能,但是並沒有提供 cluster 下的 pipeline 能力,我們借助它封裝好的 JedisClusterCRC16 工具去計算 slot 定位對應 redis node 的連接,按照 redis node 將這批 key 進行分組 ,那么每組 key 就能分別進行 pipeline 邏輯了
偽代碼
static List<Integer, HostAndPort> slot2NodeMap; // 可以通過主動調用Jedis.clusterNodes獲取slot映射關系,並緩存在本地
List<Object> clusterPipeline(List keys) {
Map<HostAndPort, List<String>> node2Keys= new HashMap<>(); // 節點對應keys分組
for(String key : keys) {
// 計算key對應的slot
int slot = JedisClusterCRC16.getSlot(key);
// 根據slot獲取對應的節點信息,將同一節點的key收在一組
node2Keys.get(slot2NodeMap.get(slot)).add(key);
}
List<Object> results = new ArrayList();
// 分組執行
for (Map.Entry<HostAndPort, List<String>> group : node2Keys) {
Jedis jedis = JedisClusterConnectionHandler.getConnectionFromNode(group.key);
PipeLine pipeline = jedis.pipelined();
// 執行本組keys
result.addAll(jedis.syncAndReturnAll());
}
return results;
}
注意:在 cluster 上執行 pipeline 可能會由於 redis 節點擴縮容 中途 redirection 切換連接導致結果丟失; 可以把 attempts 重試次數設為0 不允許自動切換連接 以感知到異常,然后業務主動進行重試,也可以直接支持重試,遇到重定向的時候,刷新 slots和節點對應關系后重試; 總之,cluster pipeline 的操作應該是冪等的,不然不大安全
jedis 官方支持?
github 上其實2017年就有人提交了 cluster pipeline 的pr,維護人員也很樂意 merge 但是~~ 后續跟進比較慢,然后19年 merge review的時候有些異常,提交人也沒再跟進,導致一直沒有合並成功;
https://github.com/redis/jedis/pull/1455


實現 cluster pipeline 也可以參考這個pr 的提交代碼
