Kafka Consumer 原理與實踐


1、Kafka 版本說明

 

 

 

從上面截圖已經很清楚的看到,針對版本 kafka_2.12-2.6.0.tgz ,2.12是編譯Kafka Server端的Scala版本,2.6.0是Kafka Server的版本!

另外,目前Kafka客戶端從某個版本開始已經用Java重寫了,服務端依然還是Scala版本!

2、消費者組(Consumer Group)

  • 一個consumer group包含多個consumer(進程或線程),每個消費者組用一個group id唯一標識;
  • consumer group訂閱的某個topic(可以訂閱多個topic)下的每個partition只能被組里的一個consumer所消費;更深層次的講,consumer group訂閱的topic下的消息是以partition為單位在組里的多個consumer上進行負載均衡的,這點也保證了partition層面消息被消費的順序性;如果consumer group訂閱的所有topic下總的partition數量大於組內consumer數量,則會出現一個消費者消費多個partition的情況,如果partition數量小於組內consumer數量,則會出現有的consumer消費不到partition的情況;
  • consumer group是一個邏輯訂閱者,多個consumer group可以消費相同的topic中的消息,相當於同一個topic下的消息被廣播給多個consumer group;

 

 

 

舉個簡單的例子,如果某位名人正在給粉絲進行簽名活動,但是限制過來簽名的人必須以家庭為單位,且給每個家庭的簽名數量都是固定的4次,如果家庭成員數量多於4個,就會存在有的家庭成員得不到簽名,如果家庭成員少於3個,就會出現有的家庭成員得到一個以上的簽名(簽名數總是要用完的嘛,畢竟除了收藏還可以賣),在這里,這個名人就類似於一個topic,每個家庭就是一個consumer group,家庭成員就是consumer,簽名數就是topic下的partition數目,再擴展一種更寬泛的場景,如果這次活動來的不是一個名人,而是多個,那么一個家庭就可以得到多個名人的簽名,這就是consumer group消費多個topic的場景。

3、位移管理(offset management)

3.1、自動VS手動

Kafka默認是定期幫你自動提交位移的(enable.auto.commit = true),你當然可以選擇手動提交位移實現自己控制。另外kafka會定期把group消費情況保存起來,做成一個offset map,如下圖所示:

 

 上圖中表明了test-group這個組當前的消費情況。

3.2、位移提交

老版本的位移是提交到zookeeper中的,目錄結構是:/consumers/<group.id>/offsets/<topic>/<partitionId>,但是zookeeper其實並不適合進行大批量的讀寫操作,尤其是寫操作。因此kafka提供了另一種解決方案:增加__consumer_offsets topic,將offset信息寫入這個topic,擺脫對zookeeper的依賴(指保存offset這件事情)。__consumer_offsets中的消息保存了每個consumer group某一時刻提交的offset信息。依然以上圖中的consumer group為例,格式大概如下:

 

 

 

 __consumers_offsets topic配置了compact策略,使得它總是能夠保存最新的位移信息,既控制了該topic總體的日志容量,也能實現保存最新offset的目的。compact的具體原理請參見:Log Compaction

 至於每個group保存到__consumers_offsets的哪個分區,如何查看的問題請參見這篇文章:Kafka 如何讀取offset topic內容 (__consumer_offsets)

 

4、Rebalance

 

4.1 什么是rebalance?

rebalance本質上是一種協議,規定了一個consumer group下的所有consumer如何達成一致來分配訂閱topic的每個分區。比如某個group下有20個consumer,它訂閱了一個具有100個分區的topic。正常情況下,Kafka平均會為每個consumer分配5個分區。這個分配的過程就叫rebalance。

 

4.2 什么時候rebalance?

這也是經常被提及的一個問題。rebalance的觸發條件有三種:

  • 組成員發生變更(新consumer加入組、已有consumer主動離開組或已有consumer崩潰了——這兩者的區別后面會談到)
  • 訂閱主題數發生變更——這當然是可能的,如果你使用了正則表達式的方式進行訂閱,那么新建匹配正則表達式的topic就會觸發rebalance
  • 訂閱主題的分區數發生變更

 

4.3 如何進行組內分區分配?

之前提到了group下的所有consumer都會協調在一起共同參與分配,這是如何完成的?Kafka新版本consumer默認提供了兩種分配策略:range和round-robin。當然Kafka采用了可插拔式的分配策略,你可以創建自己的分配器以實現不同的分配策略。實際上,由於目前range和round-robin兩種分配器都有一些弊端,Kafka社區已經提出第三種分配器來實現更加公平的分配策略,只是目前還在開發中。我們這里只需要知道consumer group默認已經幫我們把訂閱topic的分區分配工作做好了就行了。

簡單舉個例子,假設目前某個consumer group下有兩個consumer: A和B,當第三個成員加入時,kafka會觸發rebalance並根據默認的分配策略重新為A、B和C分配分區,如下圖所示:

 

4.4 誰來執行rebalance和consumer group管理?

Kafka提供了一個角色:coordinator來執行對於consumer group的管理。坦率說kafka對於coordinator的設計與修改是一個很長的故事。最新版本的coordinator也與最初的設計有了很大的不同。這里我只想提及兩次比較大的改變。

首先是0.8版本的coordinator,那時候的coordinator是依賴zookeeper來實現對於consumer group的管理的。Coordinator監聽zookeeper的/consumers/<group>/ids的子節點變化以及/brokers/topics/<topic>數據變化來判斷是否需要進行rebalance。group下的每個consumer都自己決定要消費哪些分區,並根據自己的決定搶先在zookeeper中的/consumers/<group>/owners/<topic>/<partition>下注冊。很明顯,這種方案要依賴於zookeeper的幫助,而且每個consumer是單獨做決定的,沒有那種“大家屬於一個組,要協商做事情”的精神。

基於這些潛在的弊端,0.9版本的kafka改進了coordinator的設計,提出了group coordinator——每個consumer group都會被分配一個這樣的coordinator用於組管理和位移管理。這個group coordinator比原來承擔了更多的責任,比如組成員管理、位移提交保護機制等。當新版本consumer group的第一個consumer啟動的時候,它會去和kafka server確定誰是它們組的coordinator。之后該group內的所有成員都會和該coordinator進行協調通信。顯而易見,這種coordinator設計不再需要zookeeper了,性能上可以得到很大的提升。后面的所有部分我們都將討論最新版本的coordinator設計。

4.5 如何確定coordinator?

上面簡單討論了新版coordinator的設計,那么consumer group如何確定自己的coordinator是誰呢? 簡單來說分為兩步:

  • 確定consumer group位移信息寫入__consumers_offsets的哪個分區。具體計算公式:
    •   __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)   注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認是50個分區。
  • 該分區leader所在的broker就是被選定的coordinator

4.6 Rebalance Generation

JVM GC的分代收集就是這個詞,我這里把它翻譯成“屆”好了,它表示了rebalance之后的一屆成員,主要是用於保護consumer group,隔離無效offset提交的。比如上一屆的consumer成員是無法提交位移到新一屆的consumer group中。我們有時候可以看到ILLEGAL_GENERATION的錯誤,就是kafka在抱怨這件事情。每次group進行rebalance之后,generation號都會加1,表示group進入到了一個新的版本,如下圖所示: Generation 1時group有3個成員,隨后成員2退出組,coordinator觸發rebalance,consumer group進入Generation 2,之后成員4加入,再次觸發rebalance,group進入Generation 3.

 

 

 

4.7 協議(protocol)

前面說過了, rebalance本質上是一組協議。group與coordinator共同使用它來完成group的rebalance。目前kafka提供了5個協議來處理與consumer group coordination相關的問題:

  • Heartbeat請求:consumer需要定期給coordinator發送心跳來表明自己還活着
  • LeaveGroup請求:主動告訴coordinator我要離開consumer group
  • SyncGroup請求:group leader把分配方案告訴組內所有成員
  • JoinGroup請求:成員請求加入組
  • DescribeGroup請求:顯示組的所有信息,包括成員信息,協議名稱,分配方案,訂閱信息等。通常該請求是給管理員使用

Coordinator在rebalance的時候主要用到了前面4種請求。

4.8 liveness

consumer如何向coordinator證明自己還活着? 通過定時向coordinator發送Heartbeat請求。如果超過了設定的超時時間,那么coordinator就認為這個consumer已經掛了。一旦coordinator認為某個consumer掛了,那么它就會開啟新一輪rebalance,並且在當前其他consumer的心跳response中添加“REBALANCE_IN_PROGRESS”,告訴其他consumer:不好意思各位,你們重新申請加入組吧!

4.9 Rebalance過程

終於說到consumer group執行rebalance的具體流程了。很多用戶估計對consumer內部的工作機制也很感興趣。下面就跟大家一起討論一下。當然我必須要明確表示,rebalance的前提是coordinator已經確定了

總體而言,rebalance分為2步:Join和Sync

1 Join, 顧名思義就是加入組。這一步中,所有成員都向coordinator發送JoinGroup請求,請求入組。一旦所有成員都發送了JoinGroup請求,coordinator會從中選擇一個consumer擔任leader的角色,並把組成員信息以及訂閱信息發給leader——注意leader和coordinator不是一個概念。leader負責消費分配方案的制定

2 Sync,這一步leader開始分配消費方案,即哪個consumer負責消費哪些topic的哪些partition。一旦完成分配,leader會將這個方案封裝進SyncGroup請求中發給coordinator,非leader也會發SyncGroup請求,只是內容為空。coordinator接收到分配方案之后會把方案塞進SyncGroup的response中發給各個consumer。這樣組內的所有成員就都知道自己應該消費哪些分區了。

還是拿幾張圖來說明吧,首先是加入組的過程:

 

 

值得注意的是, 在coordinator收集到所有成員請求前,它會把已收到請求放入一個叫purgatory(煉獄)的地方。記得國內有篇文章以此來證明kafka開發人員都是很有文藝范的,寫得也是比較有趣,有興趣可以去搜搜。
然后是分發分配方案的過程,即SyncGroup請求:

 

 

注意!! consumer group的分區分配方案是在客戶端執行的!Kafka將這個權利下放給客戶端主要是因為這樣做可以有更好的靈活性。比如這種機制下我可以實現類似於Hadoop那樣的機架感知(rack-aware)分配方案,即為consumer挑選同一個機架下的分區數據,減少網絡傳輸的開銷。Kafka默認為你提供了兩種分配策略:range和round-robin。由於這不是本文的重點,這里就不再詳細展開了,你只需要記住你可以覆蓋consumer的參數:partition.assignment.strategy來實現自己分配策略就好了。

4.10 consumer group狀態機

和很多kafka組件一樣,group也做了個狀態機來表明組狀態的流轉。coordinator根據這個狀態機會對consumer group做不同的處理,如下圖所示(完全是根據代碼注釋手動畫的,多見諒吧)

 

 

簡單說明下圖中的各個狀態:

  • Dead:組內已經沒有任何成員的最終狀態,組的元數據也已經被coordinator移除了。這種狀態響應各種請求都是一個response: UNKNOWN_MEMBER_ID
  • Empty:組內無成員,但是位移信息還沒有過期。這種狀態只能響應JoinGroup請求
  • PreparingRebalance:組准備開啟新的rebalance,等待成員加入
  • AwaitingSync:正在等待leader consumer將分配方案傳給各個成員
  • Stable:rebalance完成!可以開始消費了~

至於各個狀態之間的流程條件以及action,這里就不具體展開了。

 

5、rebalance場景剖析

上面詳細闡述了consumer group是如何執行rebalance的,可能依然有些雲里霧里。這部分對其中的三個重要的場景做詳盡的時序展開,進一步加深對於consumer group內部原理的理解。由於圖比較直觀,所有的描述都將以圖的方式給出,不做過多的文字化描述了。

 

1 新成員加入組(member join) 

 

 

 

 

2 組成員崩潰(member failure)

前面說過了,組成員崩潰和組成員主動離開是兩個不同的場景。因為在崩潰時成員並不會主動地告知coordinator此事,coordinator有可能需要一個完整的session.timeout周期才能檢測到這種崩潰,這必然會造成consumer的滯后。可以說離開組是主動地發起rebalance;而崩潰則是被動地發起rebalance。直接上圖: 

 

3 組成員主動離組(member leave group)

 

 4 提交位移(member commit offset)

 

 總結一下,本文着重討論了一下新版本的consumer group的內部設計原理,特別是consumer group與coordinator之間的交互過程,希望對各位有所幫助。

 

6 Consumer Fetch Message

 

 

上圖中,Consumer A、B分屬於不用的Consumer Group。Consumer B讀取到offset =11,Consumer A讀取到offset=9 。這個值表示Consumer Group中的某個Consumer 在下次讀取該partition時會從哪個offset的 message開始讀取,即 Consumer Group A 中的Consumer下次會從offset = 9 的message 讀取, Consumer Group B 中的Consumer下次會從offset = 11 的message 讀取。

這里並沒有說是Consumer A 下次會從offset = 9 的message讀取,原因是Consumer A可能會退出Group ,然后Group A 進行rebalance,即重新分配分區

6.1 poll 方法

  poll方法內部通過調用fetch方法來間接地從partition拉取消息,大致流程就是,fetch方法通過網絡請求從broker中拉取一定量的消息(通過配置項max.partition.fetch.bytes來限制)放到consumer端的緩存中。poll方法時再從緩存中讀取一定量的消息用來消費處理(通過配置項max.poll.records來限制一次最多poll多少個record)。

  舉個例子: 在滿足max.partition.fetch.bytes限制的情況下,假如fetch到了100個record,放到本地緩存后,由於max.poll.records限制每次只能poll出15個record。那么KafkaConsumer就需要執行7次才能將這一次通過網絡發起的fetch請求所fetch到的這100個record消費完畢。其中前6次是每次pool中15個record,最后一次是poll出10個record.。所以poll方法只是可能會發起fetch請求,即在本地緩存中沒有消息的情況下再調用fetch方法去拉取消息。

  在consumer中,還有另外一個配置項:max.poll.interval.ms ,它表示最大的poll數據間隔,如果超過這個間隔沒有發起pool請求,但heartbeat仍舊在發,就認為該consumer處於 livelock狀態。就會將該consumer退出consumer group。所以為了不使Consumer 自己被退出,Consumer 應該不停的發起poll(timeout)操作。而這個動作 KafkaConsumer Client是不會幫我們做的,這就需要自己在程序中不停的調用poll方法了。

 

6.2 commit offset

    當一個consumer因某種原因退出Group時,進行重新分配partition后,同一group中的另一個consumer在讀取該partition時,怎么能夠知道上一個consumer該從哪個offset的message讀取呢?也就是如何保證同一個group內的consumer不重復消費消息呢?上面說了一次走網絡的fetch請求會拉取到一定量的數據,但是這些數據還沒有被消息完畢,Consumer就掛掉了,下一次進行數據fetch時,是否會從上次讀到的數據開始讀取,而導致Consumer消費的數據丟失嗎?

    為了做到這一點,當使用poll從本地緩存拉取到數據之后,需要client調用commitSync方法(或者commitAsync方法)去commit 下一次開始讀取的offset 。

    而這個commit方法會通過走網絡的commit請求將offset在coordinator中保留,這樣就能夠保證下一次讀取(不論進行了rebalance)時,既不會重復消費消息,也不會遺漏消息。

 

    對於offset的commit,Kafka Consumer Java Client支持兩種模式:由KafkaConsumer自動提交,或者是用戶通過調用commitSync、commitAsync方法的方式完成offset的提交。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}

 

手動提交的例子: 

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}

在手動提交時,需要注意的一點是:要提交的是下一次要讀取的offset,例如: 

try {
    while (true) {
        // 取得消息
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
        // 根據分區來遍歷數據:
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            // 數據處理
            for (ConsumerRecord<String, String> record : partitionRecords) {
                System.out.println(record.offset() + ": " + record.value());
            }
            // 取得當前讀取到的最后一條記錄的offset
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            // 提交offset,記得要 + 1
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
        }
    }
} finally {
    consumer.close();
}

7、Consumer的線程安全性

KafkaProducer是線程安全的,上一節已經了解到。但Consumer卻沒有設計成線程安全的。當用戶想要在在多線程環境下使用kafkaConsumer時,需要自己來保證synchronized。如果沒有這樣的保證,就會拋出ConcurrentModificatinException的。

當你想要關閉Consumer或者為也其它的目的想要中斷Consumer的處理時,可以調用consumer的wakeup方法。這個方法會拋出WakeupException。

class KafkaConsumerRunner implements Runnable {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer;

    public KafkaConsumerRunner(KafkaConsumer consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
                // Handle new records
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) {
                throw e;
            }
        } finally {
            consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

8、Consumer Configuration

在kafka 0.9+使用Java Consumer替代了老版本的scala Consumer。新版的配置如下:

·bootstrap.servers

在啟動consumer時配置的broker地址的。不需要將cluster中所有的broker都配置上,因為啟動后會自動的發現cluster所有的broker。它配置的格式是:host1:port1;host2:port2…

 

·key.descrializervalue.descrializer

Message record 的key, value的反序列化類。

 

·group.id

用於表示該consumer想要加入到哪個group中。默認值是 “”。

 

·heartbeat.interval.ms

心跳間隔。心跳是在consumer與coordinator之間進行的。心跳是確定consumer存活,加入或者退出group的有效手段。

這個值必須設置的小於session.timeout.ms,因為:

當Consumer由於某種原因不能發Heartbeat到coordinator時,並且時間超過session.timeout.ms時,就會認為該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。

通常設置的值要低於session.timeout.ms的1/3。默認值是:3000 (3s)

 

·session.timeout.ms

Consumer session 過期時間。這個值必須設置在broker configuration中的group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。

其默認值是:10000 (10 s)

 

·enable.auto.commit

Consumer 在commit offset時有兩種模式:自動提交,手動提交。手動提交在前面已經說過。自動提交:是Kafka Consumer會在后台周期性的去commit。

默認值是true。

 

·auto.commit.interval.ms

自動提交間隔。范圍:[0,Integer.MAX],默認值是 5000 (5 s)

 

·auto.offset.reset

這個配置項,是告訴Kafka Broker在發現kafka在沒有初始offset,或者當前的offset是一個不存在的值(如果一個record被刪除,就肯定不存在了)時,該如何處理。它有4種處理方式:

1) earliest:自動重置到最早的offset。

2) latest:自動重置到最晚的offset。

3) none:如果連更早的offset也沒有的話,就拋出異常給consumer,告訴consumer在整個consumer group中都沒有發現有這樣的offset。

4) 如果不是上述3種,只拋出異常給consumer。

默認值是latest。

 

·connections.max.idle.ms

連接空閑超時時間。因為consumer只與broker有連接(coordinator也是一個broker),所以這個配置的是consumer到broker之間的。

默認值是:540000 (9 min)

 

·fetch.max.wait.ms

Fetch請求發給broker后,在broker中可能會被阻塞的(當topic中records的總size小於fetch.min.bytes時),此時這個fetch請求耗時就會比較長。這個配置就是來配置consumer最多等待response多久。

 

·fetch.min.bytes

當consumer向一個broker發起fetch請求時,broker返回的records的大小最小值。如果broker中數據量不夠的話會wait,直到數據大小滿足這個條件。

取值范圍是:[0, Integer.Max],默認值是1。

默認值設置為1的目的是:使得consumer的請求能夠盡快的返回。

 

·fetch.max.bytes

一次fetch請求,從一個broker中取得的records最大大小。如果在從topic中第一個非空的partition取消息時,如果取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這種情況下,只會返回這一條record。

broker、topic都會對producer發給它的message size做限制。所以在配置這值時,可以參考broker的message.max.bytes 和 topic的max.message.bytes的配置。

取值范圍是:[0, Integer.Max],默認值是:52428800 (5 MB)

 

·max.partition.fetch.bytes

一次fetch請求,從一個partition中取得的records最大大小。如果在從topic中第一個非空的partition取消息時,如果取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片情況下,只會返回這一條record。

broker、topic都會對producer發給它的message size做限制。所以在配置這值時,可以參考broker的message.max.bytes 和 topic的max.message.bytes的配置。

 

·max.poll.interval.ms

前面說過要求程序中不間斷的調用poll()。如果長時間沒有調用poll,且間隔超過這個值時,就會認為這個consumer失敗了。

 

·max.poll.records

Consumer每次調用poll()時取到的records的最大數。

 

·receive.buffer.byte

Consumer receiver buffer (SO_RCVBUF)的大小。這個值在創建Socket連接時會用到。

取值范圍是:[-1, Integer.MAX]。默認值是:65536 (64 KB)

如果值設置為-1,則會使用操作系統默認的值。

 

·request.timeout.ms

請求發起后,並不一定會很快接收到響應信息。這個配置就是來配置請求超時時間的。默認值是:305000 (305 s)

 

·client.id

Consumer進程的標識。如果設置一個人為可讀的值,跟蹤問題會比較方便。

 

·interceptor.classes

用戶自定義interceptor。

 

·metadata.max.age.ms

Metadata數據的刷新間隔。即便沒有任何的partition訂閱關系變更也行執行。

范圍是:[0, Integer.MAX],默認值是:300000 (5 min)

 

參考:

https://www.cnblogs.com/huxi2b/p/6223228.html

https://www.cnblogs.com/f1194361820/p/6054148.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM