消費者組
kafka 使用消費者組來控制消費組消費,每個消費組都可以消費 topic 的全量信息,消費組內部的消費者再各自進行引流,消費不同的分區信息。
消費者從屬於消費組,一個組里的消費者訂閱的是相同的主題,每個消費者接收主題一部分的分區的消息。
一個消費者組只有一個消費者的情況
如上圖,主題 T1 只有一個消費者組訂閱,並且消費者組只有一個消費者,那么這個消費者消費 T1 的所有分區消息。
一個消費者組的消費者數量小於分區數量的情況
如上圖,當一個消費者組里的消費者數量小於主題分區的數量,那么每個消費者都會負責消費多個分區的消息
一個消費組的消費者數量等於分區的數量的情況
如圖,當一個消費組里的消費者的數量與主題的分區數持平的時候,每個消費者負責一個分區
消費組的消費者數量大於分區數量的情況
如上圖,當消費者組里的消費者數量大於分區數量,那么多出來的那一部分消費者就會被閑置,接收不到消息。
因此,最好的情況是組里的消費者的數量和主題分區數持平的情況,這種情況吞吐量最高
多個消費組的情況
當有多個消費組的時候,每個消費組都會全量消費主題的所有分區的消息,如上圖,組 1 全量消費主題 T1 的消息,由於組1 的消費者數量和分區數持平,所以每個消費者負責消費一個分區的消息。組 2 也全量消費主題 T1 的消息,但是組 2 只有兩個消費者,因此每個消費者負責 2 個分區的消息。
消費者代表什么
在線上環境中,消費者一般是由一個單獨的線程開起來的,但是更常見的是在分布式服務場景中,一台服務器開一個線程負責消費消息,多台服務器組成一個消費組。也就是說一個集群組成一個組,集群中的每台機器都是一個消費者,這種組合是最常見的。
分區再均衡
當我們的消費組里的消費者數量少於分區的數量的時候,一個消費者負責多個分區,這時候如果消費速度過慢,我們可以對消費組進行水平擴容,就是增加組內消費者的數量,可以多開幾個線程也可以多加幾台機器,當然,消費者數量最好和分區數持平,不要超。
當然,在由新的消費者加入群組或者舊的消費者由於某些原因宕機了離開了消費組,就會觸發分區再均衡,重新分區每個消費者應該負責的分區,這里要介紹幾個概念。
- 群組協調器
每個消費者組都有一個群組協調器,由某一台 broker 組成,不同的消費者組可以擁有不同的協調器。
- 群主
第一個加入消費者組的消費者會自動成為群主,負責平衡各個消費者的分區的情況,同時與群組協調器進行協調。
具體的工作場景是這樣的:
群主會從協調器獲取到所有活躍的消費者進行,通過自己的分區策略來給每個消費者進行分區分配,分配好后告訴協調器,協調器再把這些消息告訴每個消費者,消費者只能看到自己負責的消息,群主知道所有消費者的一個情況。這個過程會在再均衡的時候重新發生。
再均衡時機
- 當有新的消費者加入到組內的時候
- 消費者崩潰
- 消費者主動離開群組
獨立於群組之外的消費者
kafka 並沒有規定必須加入某個群組才能消費消息,消費者可以不指定群組,可以自己指定要消費的主題分區,可以消費某個主題的特定分區,或者多個主題的多個分區。當然,脫離了群組后自然也就沒有分區再均衡的概念了。
訂閱與消費消息
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
如上代碼,前面的 Properties 對象很熟悉。和創建生產者時候類似,有個變動是 group.id 參數,代表消費組的名字。
KafkaConsumer 是生成一個消費者對象,subscribe 方法是開始訂閱消息,入參是 topic 名稱,這里是 foo 和 bar 兩個主題。
輪詢
當然上面只是簡單的創建消費者和訂閱主題,並沒有開始消費,拉取數據是在輪詢的時候做的
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
如上代碼,消費者是一個長期運行的程序,所以在一個無限循環里拉取數據梳理數據,kafka 客戶端有它優雅的退出方式。
poll(time)方法很重要,很多事情都是在這里做的,后面會介紹。這里的參數是一個超時時間,當分區沒有數據的時候,會阻塞等待數據,當超時了,不管有沒有數據都會返回。這里展示的就是通過 poll 拉取到數據,每條消息都被構造成了 ConsumerRecord 對象,有偏移量、key、value等信息。
再來說說 poll 方法,在第一次調用 poll 的時候,會找到群主協調器,加入群組,獲取到自己的分區。如果發生了再均衡,重新規划分區也是再 poll 的過程中進行的。
輪詢與心跳
在老版本中,心跳是在輪詢的時候發送的,這樣會有一個弊端,當消費處理過程太過復雜,每批消息都要很久才能消費處理完,這樣兩次輪詢就會間隔很久,超過一個會話超時時間,broker 會認為這個消費者死亡,觸發再均衡,處理完后重新請求,broker 又會讓他加入群組,觸發再均衡,就會頻繁的觸發再均衡。
新版本中,心跳線程和輪詢線程獨立開了,兩者沒有關系了,心跳線程維持一個會話時間,控制會話不超時。輪詢之間的間隔也有自己的參數控制,如果間隔太久了,就認為掛了,不管心跳有沒有正常發送。當然,心跳停止發送了,會話超時了,也會認為掛了。
偏移量
講到這里,我們會有一個疑惑,觸發再均衡的時候,新的消費者拿到新的分區,怎么知道從哪里開始消費呢,是從最新位置開始消費還是從最早位置開始消費。其實都不是,從最早位置開始消費的話,由於中間很多消息分區的上一個擁有者已經消費過了,會造成重復消費的情況。從最新位置開始消費,上一個消費者停止消費的消息到最新的消息中間的消息就會丟失了。因此,kafka 維護着一個偏移量,用來控制該分區被某個消費者組消費到了哪個位置了。
老版本中這個偏移量是維護再 zookeeper 中的,但是由於提交偏移量的頻率很頻繁,所以為了降低對 zk 的壓力,在新版本中可以將偏移量維護在 broker 中。
偏移量有以下幾種提交方式
- 自動提交
- 同步提交
- 異步提交
- 提交特定偏移量
自動提交
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
這是一個自動提交偏移量的例子,參數中的 enable.auto.commit 需要設置為 true。
自動提交是每過 5s 消費者就會把從 poll 中拉取到的最大偏移量提交上去,這個時間是由 auto.commit.interval.ms 進行控制的,默認是 5s。當然,自動提交也是在輪詢中進行的,消費者在每次進行輪詢的時候都會檢查一下是否該提交偏移量了,如果是,它就提交上一次返回的偏移量。
當然,這種情況會造成重復消費的情況,5s 提交一次偏移量,如果再第 3 秒時候消費者宕機了,偏移量沒有提交,但是這 3s 的數據已經處理了,下次消費的時候這 3 秒數據就會被重復消費,可以調低 auto.commit.interval.ms 參數,當然,無法根治這個問題。正常來說是不會有什么問題的,就是出現異常的時候或提前退出輪詢的時候容易出現問題。
同步提交
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
我們可以讓應用程序來決定什么時候提交偏移量,通常是在處理完一批消息的時候提交。通過 commitSync() 方法會提交由 poll 返回的最新偏移量。成功后馬上返回,失敗后拋出異常。當成功提交或者碰到不可恢復的異常前,commitSync 會一直不斷的重試。
手動提交有個弊端,在 broker 對消費者做出回應之前會阻塞住,對吞吐量不太友好。
異步提交
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitAsync();
buffer.clear();
}
}
異步提交失敗的時候不會進行重試,提交就提交了,但是也可以像生產者異步發送消息一樣有回調信息,用來記錄錯誤日志等。不重試的原因是因為就算這次提交失敗了,可能有偏移量更高的提交成功了。
異步和同步組合提交
一般來說異步和同步是可以組合起來使用的,如果在運行期間,沒有問題產生的時候使用異步提交,提高吞吐,在直接關閉消費者的時候,使用同步提交,保證偏移量的提交成功。
提交特定偏移量
有的時候我們需要提交特定的偏移量,自己控制提交的速度,在批次中間提交偏移量,而不是每次 poll 的時候提交偏移量,這時候就需要我們自己記錄偏移量,然后手動提交。可以使用帶參數的 commitSync 和 commitAsync,參數就是分區偏移量信息。
再均衡監聽器
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"), new ConsumerRebalanceListener() {
// 在再均衡開始前和消費者停止讀取消息時調用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
// 重新分配分區后,消費者開始消費前調用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
}
});
如上代碼,在訂閱消息的時候傳入一個 ConsumerRebalanceListener 接口的實現者,這個接口就是再均衡監聽器,其中有兩個方法:
onPartitionsRevoked:在再均衡開始前和消費者停止讀取消息時調用
onPartitionsAssigned:重新分配分區后,消費者開始消費前調用
這兩個方法可以在開始消費前和停止消費后進行一些自定化的操作。
從特定位置開始消費
-
seek(TopicPartition, long):從指定位置開始消費
-
seekToBeginning(Collection):從分區頭部開始消費
-
seekToEnd(Collection):從分區尾部開始消費
有時候我們想要自己保存偏移量,不使用 kafka 內置的 offset 倉庫,比如消費的結果存儲在數據庫中,可以讓 offset 也存儲在數據庫中,這時候 offset 的提交時處理結果的提交就是在一個事務中,一起成功,一起失敗。
這時候我們在再均衡的時候就需要手動的指定分區的 offset,從指定的位置開始消費這時候就可以通過上面的 ConsumerRebalanceListener 再均衡監聽器,訂閱的時候,開始消費前,指定要開始消費的分區的位置。
配置
fetch.min.bytes
消費組從服務器獲取記錄的最小字節數,不到這個數量,則阻塞等待數據
fetch.max.wait.ms
如果沒有足夠的數據流入 broker,等待 broke 的時間,超時將返回
上面兩個參數共同作用,誰先滿足就停止阻塞,返回消息。
max.partition.fetch.bytes
從每個分區返回的最大字節數,可以控制消費速度,過大會導致一批數據處理的時間過長,導致下次 poll 的時間過久。
session.timeout.ms
會話超時時間,超過這個時間沒有發送心跳,那么就認為死亡。觸發再均衡。
enable.auto.commit
消費組是否自動提交
partition.assignment.strategy
分區分配策略,指定給群主分區分區使用的,有兩種模式
- range:把主題的若干個連續分區分配給消費者,可能會出現某個消費者分區遠遠大於其他消費者的情況
- roundRobin:把主題的所有分區逐個分配給消費者,每個消費者的分區比較均勻
max.poll.records
單車poll返回的記錄數量
反序列化
生產者序列化消息,那么消費者肯定也會反序列化消息啦,這塊不做過多描述,有興趣的自己查閱資料。
總結
消費者消費的時候也是有很多自己可以控制的項目,比如提交偏移量的時機、從分區開始讀取消息的位置等等,有很大的配置空間,但是其實大部分使用場景使用默認的配置就已經滿足條件了,真的出現了問題了解原理定位問題解決問題也能夠很快。
ps
文章為本人學習過程中的一些個人見解,漏洞是必不可少的,希望各位大佬多多指教,幫忙修復修復漏洞!!!
通過本人語雀文檔閱讀體驗更好哦
你可能還想了解:
kafka 概述
kafka 生產者
參考資料
《kafka 權威指南》