Kafka消費者組三種分區分配策略roundrobin,range,StickyAssignor


一個consumer group中有多個consumer,一個 topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費。

Kafka有兩種分配策略,一是roundrobin,一是range。最新還有一個StickyAssignor策略

將分區的所有權從一個消費者移到另一個消費者稱為重新平衡(rebalance)。當以下事件發生時,Kafka 將會進行一次分區分配:

  • 同一個 Consumer Group 內新增消費者

  • 消費者離開當前所屬的Consumer Group,包括shuts down 或 crashes

  • 訂閱的主題新增分區

目前我們還不能自定義分區分配策略,只能通過partition.assignment.strategy參數選擇 range 或 roundrobin。partition.assignment.strategy參數默認的值是range。

Kafka提供了消費者客戶端參數partition.assignment.strategy用來設置消費者與訂閱主題之間的分區分配策略。默認情況下,此參數的值為:org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。除此之外,Kafka中還提供了另外兩種分配策略: RoundRobinAssignor和StickyAssignor。消費者客戶端參數partition.asssignment.strategy可以配置多個分配策略,彼此之間以逗號分隔。

本文假設我們有個名為T1的主題,其包含了10個分區,然后我們有兩個消費者(C1,C2)來消費這10個分區里面的數據,而且C1的num.streams = 1,C2的num.streams = 2。

1.Range(默認策略)

2.3.x版本API介紹:http://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html

0.10版本API介紹: http://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html

Range是對每個Topic而言的(即一個Topic一個Topic分),首先對同一個Topic里面的分區按照序號進行排序,並對消費者按照字母順序進行排序。然后用Partitions分區的個數除以消費者線程的總數來決定每個消費者線程消費幾個分區。如果除不盡,那么前面幾個消費者線程將會多消費一個分區。

假設n=分區數/消費者數量,m=分區數%消費者數量,那么前m個消費者每個分配n+1個分區,后面的(消費者數量-m)個消費者每個分配n個分區。

假如有10個分區,3個消費者線程,把分區按照序號排列0,1,2,3,4,5,6,7,8,9;消費者線程為C1-0,C2-0,C2-1,那么用partition數除以消費者線程的總數來決定每個消費者線程消費幾個partition,如果除不盡,前面幾個消費者將會多消費一個分區。在我們的例子里面,我們有10個分區,3個消費者線程,10/3 = 3,而且除除不盡,那么消費者線程C1-0將會多消費一個分區,所以最后分區分配的結果看起來是這樣的:

C1-0:0,1,2,3
C2-0:4,5,6
C2-1:7,8,9

如果有11個分區將會是:

C1-0:0,1,2,3
C2-0:4,5,6,7
C2-1:8,9,10

假如我們有兩個主題T1,T2,分別有10個分區,最后的分配結果將會是這樣:

C1-0:T1(0,1,2,3) T2(0,1,2,3)
C2-0:T1(4,5,6) T2(4,5,6)
C2-1:T1(7,8,9) T2(7,8,9)

可以看出, C1-0消費者線程比其他消費者線程多消費了2個分區

如上,只是針對 1 個 topic 而言,C1-0消費者多消費1個分區影響不是很大。如果有 N 多個 topic,那么針對每個 topic,消費者 C1-0 都將多消費 1 個分區,topic越多,C1-0 消費的分區會比其他消費者明顯多消費 N 個分區。這就是 Range 范圍分區的一個很明顯的弊端了

2.RoundRobin

0.10版本API:http://kafka.apache.org/0102/javadoc/allclasses-noframe.html

2.3.x版本API:http://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html

RoundRobin介紹

RoundRobinAssignor策略的原理是將消費組內所有消費者以及消費者所訂閱的所有topic的partition按照字典序排序,然后通過輪詢方式逐個將分區以此分配給每個消費者。RoundRobinAssignor策略對應的partition.assignment.strategy參數值為:org.apache.kafka.clients.consumer.RoundRobinAssignor。

使用RoundRobin策略有兩個前提條件必須滿足:

  1. 同一個消費者組里面的所有消費者的num.streams(消費者消費線程數)必須相等;
  2. 每個消費者訂閱的主題必須相同。

所以這里假設前面提到的2個消費者的num.streams = 2。RoundRobin策略的工作原理:將所有主題的分區組成 TopicAndPartition 列表,然后對 TopicAndPartition 列表按照 hashCode 進行排序,這里文字可能說不清,看下面的代碼應該會明白:

val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>
  info("Consumer %s rebalancing the following partitions for topic %s: %s"
       .format(ctx.consumerId, topic, partitions))
  partitions.map(partition => {
    TopicAndPartition(topic, partition)
  })
}.toSeq.sortWith((topicPartition1, topicPartition2) => {
  /*
   * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending
   * up on one consumer (if it has a high enough stream count).
   */
  topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
})

最后按照round-robin風格將分區分別分配給不同的消費者線程。

在我們的例子里面,加入按照 hashCode 排序完的topic-partitions組依次為T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我們的消費者線程排序為C1-0, C1-1, C2-0, C2-1,最后分區分配的結果為:

C1-0 將消費 T1-5, T1-2, T1-6 分區;
C1-1 將消費 T1-3, T1-1, T1-9 分區;
C2-0 將消費 T1-0, T1-4 分區;
C2-1 將消費 T1-8, T1-7 分區;
RoundRobin的兩種情況
  1. 如果同一個消費組內所有的消費者的訂閱信息都是相同的,那么RoundRobinAssignor策略的分區分配會是均勻的。

    舉例,假設消費組中有2個消費者C0和C1,都訂閱了主題t0和t1,並且每個主題都有3個分區,那么所訂閱的所有分區可以標識為:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最終的分配結果為:

    消費者C0:t0p0、t0p2、t1p1
    消費者C1:t0p1、t1p0、t1p2
    
  2. 如果同一個消費組內的消費者所訂閱的信息是不相同的,那么在執行分區分配的時候就不是完全的輪詢分配,有可能會導致分區分配的不均勻。如果某個消費者沒有訂閱消費組內的某個topic,那么在分配分區的時候此消費者將分配不到這個topic的任何分區。

    舉例,假設消費組內有3個消費者C0、C1和C2,它們共訂閱了3個主題:t0、t1、t2,這3個主題分別有1、2、3個分區,即整個消費組訂閱了t0p0、t1p0、t1p1、t2p0、t2p1、t2p2這6個分區。具體而言,消費者C0訂閱的是主題t0,消費者C1訂閱的是主題t0和t1,消費者C2訂閱的是主題t0、t1和t2,那么最終的分配結果為:

    消費者C0:t0p0
    消費者C1:t1p0
    消費者C2:t1p1、t2p0、t2p1、t2p2
    

可以看到RoundRobinAssignor策略也不是十分完美,這樣分配其實並不是最優解,因為完全可以將分區t1p1分配給消費者C1。

3.StickyAssignor

我們再來看一下StickyAssignor策略,“sticky”這個單詞可以翻譯為“粘性的”,Kafka從0.11.x版本開始引入這種分配策略,它主要有兩個目的:

  1. 分區的分配要盡可能的均勻,分配給消費者者的主題分區數最多相差一個;
  2. 分區的分配盡可能的與上次分配的保持相同。

當兩者發生沖突時,第一個目標優先於第二個目標。鑒於這兩個目標,StickyAssignor策略的具體實現要比RangeAssignor和RoundRobinAssignor這兩種分配策略要復雜很多。我們舉例來看一下StickyAssignor策略的實際效果。

假設消費組內有3個消費者:C0、C1和C2,它們都訂閱了4個主題:t0、t1、t2、t3,並且每個主題有2個分區,也就是說整個消費組訂閱了t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1這8個分區。最終的分配結果如下:

消費者C0:t0p0、t1p1、t3p0
消費者C1:t0p1、t2p0、t3p1
消費者C2:t1p0、t2p1

這樣初看上去似乎與采用RoundRobinAssignor策略所分配的結果相同,但事實是否真的如此呢?

此時假設消費者C1脫離了消費組,那么消費組就會執行再平衡操作,進而消費分區會重新分配。如果采用RoundRobinAssignor策略,那么此時的分配結果如下:

消費者C0:t0p0、t1p0、t2p0、t3p0
消費者C2:t0p1、t1p1、t2p1、t3p1

如分配結果所示,RoundRobinAssignor策略會按照消費者C0和C2進行重新輪詢分配。而如果此時使用的是StickyAssignor策略,那么分配結果為:

消費者C0:t0p0、t1p1、t3p0、t2p0
消費者C2:t1p0、t2p1、t0p1、t3p1

可以看到分配結果中保留了上一次分配中對於消費者C0和C2的所有分配結果,並將原來消費者C1的“負擔”分配給了剩余的兩個消費者C0和C2,最終C0和C2的分配還保持了均衡。

如果發生分區重分配,那么對於同一個分區而言有可能之前的消費者和新指派的消費者不是同一個,對於之前消費者進行到一半的處理還要在新指派的消費者中再次復現一遍,這顯然很浪費系統資源。StickyAssignor策略如同其名稱中的“sticky”一樣,讓分配策略具備一定的“粘性”,盡可能地讓前后兩次分配相同,進而減少系統資源的損耗以及其它異常情況的發生。

到目前為止所分析的都是消費者的訂閱信息都是相同的情況,我們來看一下訂閱信息不同的情況下的處理。

舉例,同樣消費組內有3個消費者:C0、C1和C2,集群中有3個主題:t0、t1和t2,這3個主題分別有1、2、3個分區,也就是說集群中有t0p0、t1p0、t1p1、t2p0、t2p1、t2p2這6個分區。消費者C0訂閱了主題t0,消費者C1訂閱了主題t0和t1,消費者C2訂閱了主題t0、t1和t2。

如果此時采用RoundRobinAssignor策略,那么最終的分配結果如下所示(和講述RoundRobinAssignor策略時的一樣,這樣不妨贅述一下):

消費者C0:t0p0
消費者C1:t1p0
消費者C2:t1p1、t2p0、t2p1、t2p2

如果此時采用的是StickyAssignor策略,那么最終的分配結果為:

消費者C0:t0p0
消費者C1:t1p0、t1p1
消費者C2:t2p0、t2p1、t2p2

可以看到這是一個最優解(消費者C0沒有訂閱主題t1和t2,所以不能分配主題t1和t2中的任何分區給它,對於消費者C1也可同理推斷)。

假如此時消費者C0脫離了消費組,那么RoundRobinAssignor策略的分配結果為:

消費者C1:t0p0、t1p1
消費者C2:t1p0、t2p0、t2p1、t2p2

可以看到RoundRobinAssignor策略保留了消費者C1和C2中原有的3個分區的分配:t2p0、t2p1和t2p2(針對結果集1)。而如果采用的是StickyAssignor策略,那么分配結果為:

消費者C1:t1p0、t1p1、t0p0
消費者C2:t2p0、t2p1、t2p2

可以看到StickyAssignor策略保留了消費者C1和C2中原有的5個分區的分配:t1p0、t1p1、t2p0、t2p1、t2p2。

從結果上看StickyAssignor策略比另外兩者分配策略而言顯得更加的優異,這個策略的代碼實現也是異常復雜。

4.Range策略演示

package com.cw.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

/** * * @author 陳小哥cw * @date 2020/6/19 17:07 */
public class CustomOffsetConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // kafka集群,broker-list
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "cm1:9092,cm2:9092,cm3:9092");

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 消費者組,只要group.id相同,就屬於同一個消費者組
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // 關閉自動提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        // 1.創建一個消費者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        // 消費者訂閱topic
        consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
            // 重新分配完分區之前調用
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println("==============回收的分區=============");
                for (TopicPartition partition : partitions) {
                    System.out.println("partition = " + partition);
                }
            }

            // 重新分配完分區后調用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println("==============重新得到的分區==========");
                for (TopicPartition partition : partitions) {
                    System.out.println("partition = " + partition);
                }
            }
        });

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {

                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                commitOffset(topicPartition, record.offset() + 1);
            }
        }

    }

    private static void commitOffset(TopicPartition topicPartition, long l) {

    }

    private static Long getPartitionOffset(TopicPartition partition) {
        return null;
    }
}

此時先啟動一次程序,此時結果為

==============回收的分區=============
==============重新得到的分區==========
partition = first-2
partition = first-1
partition = first-0

此時在不關閉已開啟的程序的情況下,再啟動一次程序

第一次運行的程序結果

==============回收的分區=============
partition = first-2
partition = first-1
partition = first-0
==============重新得到的分區==========
partition = first-2

第二次運行的程序結果

==============回收的分區=============
==============重新得到的分區==========
partition = first-1
partition = first-0

這是因為兩次運行的程序的消費者組id都是test,為同一個消費者組,當第二次運行程序時,對原來的分區進行回收,進行了分區的rebalance重新分配(默認range分配)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM