Lettuce之RedisClusterClient使用以及源碼分析


  

  Redis Cluster模式簡介

    redis集群並沒有使用一致性hash算法而引入了哈希槽概念,Redis 集群有16384個哈希槽,每個key通過CRC16校驗后對16384取模來決定放置哪個槽.集群的每個節點負責一部分hash槽.也就是說如果key是不變的對應的slot也是不變的

  Redis 服務器命令    

  • cluster info

可以通過cluster info 命令查看集群信息

cluster info
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:12
  • cluster nodes 

通過cluster nodes命令查看當前節點以及該節點分配的slot,如下圖可以發現當前redis集群有12個節點,每個節點大約管理1365個slot

xx.xxx.xxx.xx:6959> cluster nodes 45abb8663c0cdb25ed17c29521bf6fda98e913ea xx.xxx.xxx.xx:6961 master - 0 1529229636724 11 connected 13653-15018 e40080f32a3fb89e34b7622038ce490682428fdf xx.xxx.xxx.xx:6960 master - 0 1529229633723 10 connected 12288-13652 a749bba5614680dea9f47e3c8fe595aa8be71a2c xx.xxx.xxx.xx:6954 master - 0 1529229639230 4 connected 4096-5460 1096e2a8737401b66c7d4ee0addcb10d7ff14088 xx.xxx.xxx.xx:6952 master - 0 1529229636224 2 connected 1365-2730 fbc76f3481271241c1a89fabeb5139905e1ec2a6 xx.xxx.xxx.xx:6962 master - 0 1529229638230 12 connected 15019-16383 85601fa67820a5af0de0cc21d102d72575709ec6 xx.xxx.xxx.xx:6959 myself,master - 0 0 9 connected 10923-12287 c00d86999c98f97d697f3a2b33ba26fbf50e46eb xx.xxx.xxx.xx:6955 master - 0 1529229634724 5 connected 5461-6826 0b09a5c4c9e9158520389dd2672bd711d55085c6 xx.xxx.xxx.xx:6953 master - 0 1529229637227 3 connected 2731-4095 9f26d208fa8772449d5c322eb63786a1cf9937e0 xx.xxx.xxx.xx:6958 master - 0 1529229635224 8 connected 9557-10922 274294a88758fcb674e1a0292db0e36a66a0bf48 xx.xxx.xxx.xx:6951 master - 0 1529229634223 1 connected 0-1364 369780bdf56d483a0f0a92cb2baab786844051f3 xx.xxx.xxx.xx:6957 master - 0 1529229640232 7 connected 8192-9556 71ed0215356c664cc56d4579684e86a83dba3a92 xx.xxx.xxx.xx:6956 master - 0 1529229635724 6 connected 6827-8191

  

 

  • client list

Redis Client List 命令用於返回所有連接到服務器的客戶端信息和統計數據。

redis 127.0.0.1:6379> CLIENT LIST 
addr=127.0.0.1:43143 fd=6 age=183 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client 
addr=127.0.0.1:43163 fd=5 age=35 idle=15 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
addr=127.0.0.1:43167 fd=7 age=24 idle=6 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=get

  

 

  •  cluster slots 

  Redis Client Slots 命令用於當前的集群狀態

redis 127.0.0.1:6379> cluster slots
1) 1) (integer) 0
   2) (integer) 4095
   3) 1) "127.0.0.1"
      2) (integer) 7000
   4) 1) "127.0.0.1"
      2) (integer) 7004
2) 1) (integer) 12288
   2) (integer) 16383
   3) 1) "127.0.0.1"
      2) (integer) 7003
   4) 1) "127.0.0.1"
      2) (integer) 7007
3) 1) (integer) 4096
   2) (integer) 8191
   3) 1) "127.0.0.1"
      2) (integer) 7001
   4) 1) "127.0.0.1"
      2) (integer) 7005
4) 1) (integer) 8192
   2) (integer) 12287
   3) 1) "127.0.0.1"
      2) (integer) 7002
   4) 1) "127.0.0.1"
      2) (integer) 7006

  

  • cluster keyslot

cluster keyslot key  返回一個整數,用於標識指定鍵所散列到的哈希槽

cluster keyslot test
(integer) 6918

  

請求重定向

由於每個節點只負責部分slot,以及slot可能從一個節點遷移到另一節點,造成客戶端有可能會向錯誤的節點發起請求。因此需要有一種機制來對其進行發現和修正,這就是請求重定向。有兩種不同的重定向場景:

  • MOVED

         聲明的是slot所有權的轉移,收到的客戶端需要更新其key-node映射關系

  • ASK

         申明的是一種臨時的狀態.在重新進行分片期間,源節點向目標節點遷移一個slot過程中,可能會出現這樣一種情況:屬於被遷移slot的一部分鍵值對保存在源節點里面,一部分保存在目標節點里面.當客戶端向源節點發送一個與鍵有關的命令,並且這個鍵企恰好被遷移到目標節點,則向客戶端返回一個ASK錯誤.因為這個節點還在處於遷移過程中,所有權還沒有轉移,所以客戶端在接收到ASK錯誤后,需要在目標節點執行命令前,先發送一個ASKING命令,如果不發放該命令到話,則會返回MOVED錯誤,ASKING表示已經知道遷移狀態,則會執行該命令.

 

 

通過集群查詢數據key為test的值 redis-cli為單機模式;如果為集群模式時(redis-cli -c) 接收到MOVED 錯誤時是不會打印MOVED錯誤,而是根據MOVED信息自動重定向到正確節點,並打印出重定向信息

xx.xxx.xxx.xx:6959> get test
(error) MOVED 6918 xx.xxx.xx.xxx:6956  

  此時返回的結果表示該key在6956這個實例上,通過這個實例可以獲取到緩存值

xx.xxx.xx.xxx:6956> get test
"cluster"

  通過上文的示例可以發現獲取緩存值的過程需要訪問cluster兩次,既然key到slot值的算法是已知的,如果可以通過key直接計算slot,在通過每個節點的管理的slot范圍就可以知道這個key對應哪個節點了,這樣不就可以一次獲取到了嗎?其實lettuce中就是這樣處理的.下文會有詳細介紹

    如果mget操作值跨slot時會怎樣呢? 

mget test test1
(error) CROSSSLOT Keys in request don't hash to the same slot

 

Lettuce使用

    @Bean(name="clusterRedisURI")
    RedisURI clusterRedisURI(){
        return RedisURI.builder().withHost("xx.xx.xxx.xx").withPort(6954).build();
    }
  //配置集群選項,自動重連,最多重定型1次
    @Bean
    ClusterClientOptions clusterClientOptions(){
        return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1).build();
    }

//創建集群客戶端 @Bean RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){ RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } /** * 集群連接 */ @Bean(destroyMethod = "close") StatefulRedisClusterConnection<String,String> statefulRedisClusterConnection(RedisClusterClient redisClusterClient){ return redisClusterClient.connect(); }

  Lettuce在Spring 中的使用通過上文中的配置方式進行配置后就可以使用了

  1. 通過StatefulRedisClusterConnection獲取命令處理方式,同步,異步以及響應式
  2. 執行redis相關命令

  

Lettuce相關源碼

     lettuce的使用方式還是很簡單的那么它的處理過程到底是怎樣的呢?下面將通過源碼進行解析.

通過上文可以知道連接是通過RedisClusterClient創建的,它默認使用了StringCodec(LettuceCharsets.UTF8)作為編碼器創建連接

 public StatefulRedisClusterConnection<String, String> connect() {
        return connect(newStringStringCodec());
    }

  

     在創建連接時就會主動發現集群拓撲信息,在第一次創建的時候partitions一定為null則此時需要初始化分區信息

  <K, V> StatefulRedisClusterConnectionImpl<K, V> connectClusterImpl(RedisCodec<K, V> codec) {
         //如果分區信息為null則初始化分區信息
        if (partitions == null) {
            initializePartitions();
        }
        //如果需要就激活拓撲刷新
        activateTopologyRefreshIfNeeded();

 初始化集群分片信息,就是將加載分片信息賦值給partitions屬性 

 protected void initializePartitions() {
        this.partitions = loadPartitions();
    }

  具體加載分片信息處理過程如下:

  protected Partitions loadPartitions() {
        //獲取拓撲刷新信息,
        Iterable<RedisURI> topologyRefreshSource = getTopologyRefreshSource();

        String message = "Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource;
        try {
            //加載拓撲信息
            Map<RedisURI, Partitions> partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources());

第一次可以知道partitions為null則此時需要初始化種子節點的,那么它的種子節點又是什么呢?通過代碼可以發現種子節點就是初始化的URI,那么它又是什么時候設置的呢?

protected Iterable<RedisURI> getTopologyRefreshSource() {

        //是否初始化種子節點
        boolean initialSeedNodes = !useDynamicRefreshSources();

        Iterable<RedisURI> seed;
        //如果需要初始化種子節點或分區信息為null或分區信息為空 則將初始URI賦值給種子
        if (initialSeedNodes || partitions == null || partitions.isEmpty()) {
            seed = RedisClusterClient.this.initialUris;
        } else {//不需要初始化種子節點
            List<RedisURI> uris = new ArrayList<>();
            for (RedisClusterNode partition : TopologyComparators.sortByUri(partitions)) {
                uris.add(partition.getUri());
            }
            seed = uris;
        }
        return seed;
    }

  通過如下代碼可以發現種子節點是在創建redisClusterClient的時候指定的

 protected RedisClusterClient(ClientResources clientResources, Iterable<RedisURI> redisURIs) {

        super(clientResources);

        assertNotEmpty(redisURIs);
        assertSameOptions(redisURIs);
        //初始化節點
        this.initialUris = Collections.unmodifiableList(LettuceLists.newList(redisURIs));
         //根據第一個URI的超時時間作為默認超時時間
        setDefaultTimeout(getFirstUri().getTimeout());
        setOptions(ClusterClientOptions.builder().build());
    }

  默認使用動態刷新

 protected boolean useDynamicRefreshSources() {

        //如果集群客戶端選項不為null
        if (getClusterClientOptions() != null) {
            //獲取集群拓撲刷新選項
            ClusterTopologyRefreshOptions topologyRefreshOptions = getClusterClientOptions().getTopologyRefreshOptions();
            //返回集群拓撲刷新選項中配置到是否使用動態刷新
            return topologyRefreshOptions.useDynamicRefreshSources();
        }
        //默認動態刷新
        return true;
    }

  下面看看加載分區信息的處理過程,第一次則根據種子節點的連接獲取整個集群的拓撲信息

 public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, boolean discovery) {

        //獲取超時時間,默認60秒
        long commandTimeoutNs = getCommandTimeoutNs(seed);

        Connections connections = null;
        try {
            //獲取所有種子連接
            connections = getConnections(seed).get(commandTimeoutNs, TimeUnit.NANOSECONDS);
            //Requests將異步執行命令封裝到多個節點
   //cluster nodes Requests requestedTopology = connections.requestTopology();
//client list Requests requestedClients = connections.requestClients(); //獲取節點拓撲視圖 NodeTopologyViews nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs); if (discovery) {//是否查找額外節點 //獲取集群節點 Set<RedisURI> allKnownUris = nodeSpecificViews.getClusterNodes(); //排除種子節點,得到需要發現節點 Set<RedisURI> discoveredNodes = difference(allKnownUris, toSet(seed)); //如果需要發現節點不為空 if (!discoveredNodes.isEmpty()) { //需要發現節點連接 Connections discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs, TimeUnit.NANOSECONDS); //合並連接 connections = connections.mergeWith(discoveredConnections); //合並請求 requestedTopology = requestedTopology.mergeWith(discoveredConnections.requestTopology()); requestedClients = requestedClients.mergeWith(discoveredConnections.requestClients()); //獲取節點視圖 nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs); //返回uri對應分區信息 return nodeSpecificViews.toMap(); } } return nodeSpecificViews.toMap(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RedisCommandInterruptedException(e); } finally { if (connections != null) { connections.close(); } } }

  

 

     這樣在創建connection的時候就已經知道集群中的所有有效節點.根據之前的文章可以知道對於集群命令的處理是在ClusterDistributionChannelWriter中處理的.其中有一些信息在初始化writer的時候就初始化了

class ClusterDistributionChannelWriter implements RedisChannelWriter {
    //默認寫入器
    private final RedisChannelWriter defaultWriter;
    //集群事件監聽器
    private final ClusterEventListener clusterEventListener;
    private final int executionLimit;
    //集群連接提供器
    private ClusterConnectionProvider clusterConnectionProvider;
    //異步集群連接提供器
    private AsyncClusterConnectionProvider asyncClusterConnectionProvider;
    //是否關閉
    private boolean closed = false;
    //分區信息
    private volatile Partitions partitions;

  寫命令的處理如下,會根據key計算出slot,進而找到這個slot對應的node,直接訪問這個node,這樣可以有效減少訪問cluster次數

public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {

        LettuceAssert.notNull(command, "Command must not be null");
        //如果連接已經關閉則拋出異常
        if (closed) {
            throw new RedisException("Connection is closed");
        }
        //如果是集群命令且命令沒有處理完畢
        if (command instanceof ClusterCommand && !command.isDone()) {
            //類型轉換, 轉換為ClusterCommand
            ClusterCommand<K, V, T> clusterCommand = (ClusterCommand<K, V, T>) command;
            if (clusterCommand.isMoved() || clusterCommand.isAsk()) {

                HostAndPort target;
                boolean asking;
                //如果集群命令已經遷移,此時通過ClusterCommand中到重試操作進行到此
                if (clusterCommand.isMoved()) {
                    //獲取命令遷移目標節點
                    target = getMoveTarget(clusterCommand.getError());
                    //觸發遷移事件
                    clusterEventListener.onMovedRedirection();
                    asking = false;
                } else {//如果是ask
                    target = getAskTarget(clusterCommand.getError());
                    asking = true;
                    clusterEventListener.onAskRedirection();
                }

                command.getOutput().setError((String) null);
                //連接遷移后的目標節點
                CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = asyncClusterConnectionProvider
                        .getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, target.getHostText(), target.getPort());
                //成功建立連接,則向該節點發送命令
                if (isSuccessfullyCompleted(connectFuture)) {
                    writeCommand(command, asking, connectFuture.join(), null);
                } else {
                    connectFuture.whenComplete((connection, throwable) -> writeCommand(command, asking, connection, throwable));
                }

                return command;
            }
        }
        //不是集群命令就是RedisCommand,第一個請求命令就是非ClusterCommand
         //將當前命令包裝為集群命令
        ClusterCommand<K, V, T> commandToSend = getCommandToSend(command);
        //獲取命令參數
        CommandArgs<K, V> args = command.getArgs();

        //排除集群路由的cluster命令
        if (args != null && !CommandType.CLIENT.equals(commandToSend.getType())) {
            //獲取第一個編碼后的key
            ByteBuffer encodedKey = args.getFirstEncodedKey();
            //如果encodedKey不為null
            if (encodedKey != null) {
                //獲取slot值
                int hash = getSlot(encodedKey);
                //根據命令類型獲取命令意圖 是讀還是寫
                ClusterConnectionProvider.Intent intent = getIntent(command.getType());
                //根據意圖和slot獲取連接
                CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = ((AsyncClusterConnectionProvider) clusterConnectionProvider)
                        .getConnectionAsync(intent, hash);
                //如果成功獲取連接
                if (isSuccessfullyCompleted(connectFuture)) {
                    writeCommand(commandToSend, false, connectFuture.join(), null);
                } else {//如果連接尚未處理完,或有異常,則添加完成處理器
                    connectFuture.whenComplete((connection, throwable) -> writeCommand(commandToSend, false, connection,
                            throwable));
                }

                return commandToSend;
            }
        }

        writeCommand(commandToSend, defaultWriter);

        return commandToSend;
    }

  但是如果計算出的slot因為集群擴展導致這個slot已經不在這個節點上lettuce是如何處理的呢?通過查閱ClusterCommand源碼可以發現在complete方法中對於該問題進行了處理;如果響應是MOVED則會繼續訪問MOVED目標節點,這個重定向的此時可以指定的,默認為5次,通過上文的配置可以發現,在配置中只允許一次重定向

 @Override
    public void complete() {
        //如果響應是MOVED或ASK
        if (isMoved() || isAsk()) {
            //如果最大重定向次數大於當前重定向次數則可以進行重定向
            boolean retryCommand = maxRedirections > redirections;
            //重定向次數自增
            redirections++;

            if (retryCommand) {
                try {
                    //重定向
                    retry.write(this);
                } catch (Exception e) {
                    completeExceptionally(e);
                }
                return;
            }
        }
        super.complete();
        completed = true;
    }

  如果是ask向重定向目標發送命令前需要同步發送asking

 private static <K, V> void writeCommand(RedisCommand<K, V, ?> command, boolean asking,
            StatefulRedisConnection<K, V> connection, Throwable throwable) {

        if (throwable != null) {
            command.completeExceptionally(throwable);
            return;
        }

        try {
            //如果需要發送asking請求,即接收到ASK錯誤消息,則在重定向到目標主機后需要發送asking命令
            if (asking) {
                connection.async().asking();
            }
            //發送命令
            writeCommand(command, ((RedisChannelHandler<K, V>) connection).getChannelWriter());
        } catch (Exception e) {
            command.completeExceptionally(e);
        }
    }

  

  上文主要介紹了lettuce對於單個key的處理,如果存在多個key,如mget lettuce又是如何處理的呢?其主要思路是將key根據slot進行分組,將在同一個slot的命令一起發送到對應的節點,再將所有請求的返回值合並作為最終結果.源碼如下:

  @Override
    public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
        //獲取分區和key的映射關系
        Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
        //如果分區數小於2也就是只有一個分區即所有key都落在一個分區就直接獲取
        if (partitioned.size() < 2) {
            return super.mget(keys);
        }
        //每個key與slot映射關系
        Map<K, Integer> slots = SlotHash.getSlots(partitioned);

        Map<Integer, RedisFuture<List<KeyValue<K, V>>>> executions = new HashMap<>();
        //遍歷分片信息,逐個發送
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<List<KeyValue<K, V>>> mget = super.mget(entry.getValue());
            executions.put(entry.getKey(), mget);
        }

        //恢復key的順序
        return new PipelinedRedisFuture<>(executions, objectPipelinedRedisFuture -> {
            List<KeyValue<K, V>> result = new ArrayList<>();
            for (K opKey : keys) {
                int slot = slots.get(opKey);

                int position = partitioned.get(slot).indexOf(opKey);
                RedisFuture<List<KeyValue<K, V>>> listRedisFuture = executions.get(slot);
                result.add(MultiNodeExecution.execute(() -> listRedisFuture.get().get(position)));
            }

            return result;
        });
    }

  

 

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM