使用方式
創建一個 KafkaConsumer 對象訂閱主題並開始接收消息:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("enable.auto.commit", "false"); // 禁止自動提交
properties.setProperty("group.id", "my-group-id"); // 設置消費者組群ID
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("topic")); // 訂閱主題
try {
while (! Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
records.forEach(System.out::println);
consumer.commitAsync(); // 異步提交偏移量
}
} catch (WakeupException ignore) {
// 忽略關閉異常
} finally {
try {
consumer.commitSync(); // 同步提交偏移量
} finally {
consumer.close();
}
}
Thread thread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup();
try {
thread.join();
} catch (InterruptedException ignore) {}
}));
也可以通過手工管理消費邏輯,實現更為復雜的功能:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("enable.auto.commit", "false"); // 禁止自動提交
// commit 機制依賴於 group.id
// 當使用 assign 人工分配主題與分片時,可以不指定 group.id
// 但是此時仍然可以執行 commit 操作,底層使用空字符串作為 group.id
properties.setProperty("group.id", "my-group-id"); // 仍然設置消費者組群ID,避免出現意外情況
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
List<PartitionInfo> partitions = consumer.partitionsFor("topic"); // 查詢訂閱主題對應的分片信息
List<TopicPartition> topicPartitions = partitions.stream().map(partition -> new TopicPartition(partition.topic(), partition.partition())).collect(Collectors.toList());
// consumer 會在啟動時,查找最近一次 commit 記錄,並從此處開始消費
// 無論 subscribe 與 assign 均有該效果
consumer.assign(topicPartitions); // 人工為 consumer 指定消費分片
// seek 可以指定消費的起始位置
// 需要注意,當 seek 超前於 beginningOffset 時,會導致 kafkaConsumer 一直無法讀取到數據
// 可以使用 seekToBeginning 來規避這一問題
// consumer.seek(partition, 518811150L); // 從 518811150 這條消息開始消費(包含)
// 使用 assign 的情況下,commit 仍然是有效的,可以通過 committed 獲取最近一次提交記錄
// consumer.committed(partition)
try {
while (! Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
records.forEach(System.out::println);
// 消費過程中,可以使用 commit 來保存消費位置,方便下一次進行恢復
// 更為常見的情況是將 offset 保存外部存儲中,比如:MySQL
// consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
// 消費過程中,可以使用 seek 直接修改 poll 讀取位置,達到回溯的效果
// consumer.seek(partition, offset);
// 通過 pause 與 resume 可以控制下一次 poll 是否返回指定的 partition 的數據
// 可以實現類似延遲隊列的效果
// consumer.pause(partitions);
// consumer.resume(partitions);
}
} catch (WakeupException ignore) {
// 忽略關閉異常
} finally {
consumer.close();
}
消費者群組
為了實現橫向擴展,應用程序需要創建一個消費者群組,然后往群組里添加消費者來提高處理效率,群組里的每個消費者只處理一部分消息:
一個群組里的消費者訂閱的是同一個主題,每個消費者接收主題一部分分區的消息(左圖)。
消費者無法跟上數據生成的速度,可以向組群內增加更多的消費者分擔負載,是消費端橫向伸縮的主要手段(中圖)。
預先為主題預留的分區可以在負載增長時增加更多的消費者,不過當消費者的數量超過主題分區時,多余的消費者只會被閑置(右圖)。
只要保證每個應用程序有獨立的消費者群組,就可以讓它們獲取到主題所有的消息,而不只是其中的一部分:
提交和偏移量
為了保證調用KafkaConsumer.poll()方法時總能返回未被被消費者讀取過的記錄,消費者需要維護每個分區中已讀消息對應的偏移量offset。
一旦消費者發生崩潰或者有新的消費者加入群組,就會觸發再均衡rebalance,每個消費者可能分配到新的分區,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要知道每個分區的已讀偏移量,然后從偏移量指定的地方繼續處理。
為了保證這些信息不丟失,消費者需要定期向一個名為 _consumer_offset 的特殊主題發送消息,消息里包含每個分區的偏移量。Kafka 中將這一更新的操作稱作提交commit。
邊界情況
如果提交的偏移量小於客戶端處理的最后一個消息的偏移量,那么處於兩個偏移量之間的消息就會被重復處理:
如果提交的偏移量大於客戶端處理的最后一個消息的偏移量,那么處於兩個偏移量之間的消息將會丟失:
處理偏移量的方式對客戶端會有很大的影響,下面分析幾種常見的提交方式:
自動提交
最簡單的提交方式是讓消費者自動提交偏移量:設置 enable.auto.commit = true,那么每過 auto.commit.interval.ms,消費者會自動把從KafkaConsumer.poll()方法接收到的最大偏移量提交上去。
自動提交是在輪詢里進行的:消費者每次在進行輪詢時會檢查是否該提交偏移量,並且自動提交最近一次輪詢返回的偏移量。
不過,在使用這種簡便的方式之前,需要知道它將會帶來怎樣的結果:
-
假設在兩次提交間隔之間發生了再均衡,期間的消息會被重復處理。
可以通過修改提交時間間隔來減小可能出現重復消息的時間窗,不過無法完全避免這種情況。 -
每次調用輪詢方法都會提交上一次調用返回的偏移量,而並不關心具體哪些消息已經被處理了。
在再次調用之前,最好確保所有當前已返回消息都已經處理完畢(調用KafkaConsumer.close()前也會進行自動提交)。
手動提交
自動提交雖然方便,不過並沒有為開發者留有余地來避免重復處理消息。為了提高可控性,開發者可以設置 enable.auto.commit = false,讓應用程序決定何時提交偏移量。
-
同步提交:使用
KafkaConsumer.commitSync()會提交最新偏移量並等待 broker 對提交請求作出回應。
在成功提交或碰到無法恢復的錯誤之前會不斷重試,會導致應用程序一直阻塞,限制了應用程序的吞吐量。 -
異步提交:使用
KafkaConsumer.commitAsync()會提交最新偏移量但無需等待 broker 的響應並且不進行重試。
不進行重試,是因為可能有一個更大的偏移量已經提交成功,重試可能會覆蓋到最新的值,導致再均衡后出現重復消息。
該方法在 broker 作出響應時會執行用戶指定的回調,回調經常被用於記錄提交錯誤或生成度量指標。
不過如果要在其中進行重試,一定要注意提交的順序。
一般情況下,偶爾異步提交失敗不會有太大問題,后續的提交總會有成功的。但在關閉消費者或再均衡前的最后一次提交,必須確保提交成功。為了保證可靠性與吞吐量,比較常見的方式是將兩者組合使用(具體參考開頭的代碼示例)。
提交特定偏移量
上面討論的提交方式中,提交偏移量的頻率與處理消息批次的頻率是一樣的,但某些場景需要在更細的粒度上控制提交:如果KafkaConsumer.poll()方法返回一大批數據,為了避免因再均衡引起的重復處理整批消息,我們希望能在批次處理中間提交部分偏移量。
此時,可以在調用KafkaConsumer.commitSync()或KafkaConsumer.commitAsync()時,通過參數指定具體的分區及其對應的偏移量,人為地控制提交內容。
分區再均衡
群組里的消費者共同讀取主題的分區,消費過程中可能出現以下情況:
- 新的消費者加入群組,它會被分配到一個原本由其他消費者讀取的分區
- 當前消費者離開群組,原本由它讀取的分區將分配給群組里的其他消費者
- 訂閱主題發生變化時,比如管理員添加了新的分區,會發生分區重分配
分區的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為再均衡rebalance。
再均衡非常重要,它為消費者群組帶來了高可用性和伸縮性(支持動態添加或移除消費者)。
但是,再均衡整個群組會一小段時間不可用,期間消費者無法讀取消息,在恢復之前會拖慢應用程序。
消費者分代
消費者組群可以進行任意次再均衡,為了更好地隔離已失效的狀態(比如:避免僵屍實例提交過期的偏移量),Kafka 消費者端引入了分代generation的概念。
消費者的分代信息是一個整數,每當組群進行一次 rebalance 操作,組內所有消費者的 generation 都會遞增。當消費者提交偏移時會伴隨着 generation 信息,當 broker 會據此判斷消息是否來源於一個上一代的消費者,並拒絕過期的提交。
在均衡監聽器
如果消費者想在再均衡前后做一些清理工作或准備工作,只需在調用 KafkaConsumer.subscribe() 方法時傳進去一個 ConsumerRebalanceListener 實例即可:
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 訂閱時注冊再均衡監聽器
consumer.subscribe(Collections.singletonList("topic"), new ConsumerRebalanceListener(){
/** 方法會在重新分配分區之后和消費者開始讀取消息之前被調用 */
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 從特定的偏移量處開始讀取消息
offsets.entrySet().stream().
filter(e -> partitions.contains(e.getKey())).
forEach(e -> consumer.seek(e.getKey(), e.getValue().offset()));
}
/** 方法會在再均衡開始之前和消費者停止讀取消息之后被調用 */
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(offsets); // 提交已經數據對應的偏移量
}
});
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
// 消息處理過程中,實時更新偏移量變化
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
消費者管理
消費者管理由兩部分組成:
- 成員管理:管理組內所有 consumer 實例生命周期並負責下發分區分配方案到每個 consumer。
該功能依賴於一個被選為組協調者coordinator的 broker。 - 分區分配:根據指定的分區分配策略制定分配方案並上報 coordinator。
該功能由一個 leader consumer 負責。
分區分配的操作是在 consumer 端執行的好處主要有以下兩點:
- 便於維護與升級: 調整分配策略時,無需重啟整 broker 集群。
- 便於實現自定義策略:代碼實現更為靈活,可以依賴外部存儲維護復雜策略,甚至實現機架感知。
變更流程
初始化
- 根據 $\tiny \texttt{_consumer_offset}$ 主題的分區數量 $\tiny \texttt{n}$ 計算 $\tiny \texttt{hashmod(group.id, n)}$
- 選擇第 $\tiny \texttt{hashmod(group.id, n)}$ 個 $\tiny \texttt{_consumer_offset}$ 分區的 leader broker 作為 coordinator
加入組群
JoinGroup請求。
coordinator 會從中選擇一個組群 leader consumer,並把所有成員信息以及它們的訂閱信息發送給這個 leader consumer。
規划分配方案
SyncGroup請求將方案中發送給 coordinator。
其他 consumer 也會向 coordinator 發送不包含分區方案的
SyncGroup請求,然后 coordinator 會將分區方案返回給這些 consumer。
發送心跳
Heartbeat請求,維持它們和群組會話
session以及對分區的所有權。
當 coordinator 長時間未接收到 consumer 的心請求時,會認為 consumer 已失效並觸發一次再均衡。
若 coordinator 在心跳響應中返回了 REBALANCE_IN_PROGRESS 信息,意味着當前組群已經開啟了新一輪的再均衡。
離開組群
LeaveGroup告知 coordinator 它將要離開群組,並立即觸發一次再均衡,盡可能降低處理停頓。
配置解析
client.id
該屬性可以是任意字符串,broker 用它來標識從客戶端發送過來的消息,通常被用在日志、度量指標和配額里。
group.id
標識消費者所屬的消費者組群的唯一字符串。
如果消費者需要基於組群的對訂閱進行管理,或基於 Kafka 管理偏移量時,需要指定此屬性。
bootstrap.servers
該屬性指定 broker 的地址列表。
清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找到其他 broker 的信息。
不過建議至少要提供兩個 broker 的信息,一旦其中一個宕機,生產者仍然能夠連接到集群上。
key.deserializer & value.deserializer
這兩個屬性必須被設置為一個實現了org.apache.kafka.common.serialization.Deserializer接口的類。
生產者會使用這個類把鍵值字節數組反序列化成對象。
receive.buffer.bytes & send.buffer.bytes
設置 socket 讀寫數據時用到的 TCP 緩沖區大小。如果它們被設為 -1,就使用操作系統的默認值。
當生產者或消費者與 broker 處於不同的機房時,可以適當增大這些值。
fetch.min.bytes
該屬性指定了消費者從服務器獲取記錄的最小字節數。
broker 在收到消費者的數據請求時,如果可用的數據量小於該配置,那么它會等到有足夠的可用數據時一並返回給消費者。
如果消費者的數量比較多,把該屬性的值設置得大一點可以降低 broker 的工作負載。
或當消費者訂閱的主題不活躍時,消費者的 CPU 使用率卻很高,可以適當調大該值。
fetch.max.wait.ms
該屬性指定了可用數據量不足時 broker 的等待時間。
如果沒有足夠的數據流入 Kafka,消費者獲取最小數據量的要求就得不到滿足,最終導致fetch.max.wait.ms的延遲。
如果要降低潛在的延遲(為了滿足 SLA),可以把該參數值設置得小一些。
max.poll.records
該屬性用於控制單次調用KafkaConsumer.poll()方法能夠返回的記錄數量。
可以借助該配置控制在輪詢里需要處理的數據量。
max.partition.fetch.bytes
該屬性指定了KafkaConsumer.poll() 方法從每個分區里返回的最大字節數。
消費者需要保證 max.partition.fetch.bytes * 消費分區數量 可用內存來接收記錄。
在為消費者分配內存時,可以給它們多分配一些,因為如果群組里有消費者發生崩潰,剩下的消費者需要處理更多的分區。
該值必須比 broker 能夠接收的最大消息的字節數max.message.size大,否則消費者可能無法讀取這些消息,導致消費者一直掛起重試。
在設置該屬性時,另一個需要考慮的因素是消費者處理數據的時間。
消費者需要頻繁調用 KafkaConsumer.poll() 方法來避免會話過期和發生分區再均衡,如果單次調用返回的數據太多,消費者需要更多的時間來處理,可能無法及時進行下一個輪詢來避免會話過期。
如果出現這種情況,可以把該值改小,或者延長會話過期時間。
session.timeout.ms
該屬性指定了消費者在被認為下線之前可以與服務器斷開連接的時間。
如果消費者沒有在指定的時間內發送心跳給群組協調器,會被認為已經下線。
協調器就會觸發再均衡,把它的分區分配給群組里的其他消費者。
該值越小,可以更快地檢測和恢復崩潰的節點,但可能導致非預期的再均衡。
該值越大,可以減少意外的再均衡,不過檢測節點崩潰需要更長的時間。
該屬性與heartbeat.interval.ms緊密相關:
heartbeat.interval.ms指定了 poll() 方法向協調器發送心跳的頻率。session.timeout.ms則指定了消費者可以多久不發送心跳。
一般需要同時修改這兩個屬性,heartbeat.interval.ms 一般設置為 session.timeout.ms 的三分之一。
auto.offset.reset
該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量失效時(包含偏移量的記錄已過時並被刪除)該作何處理。
latest在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)。earliest在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄。
enable.auto.commit
該屬性指定了消費者是否自動提交偏移量(可以通過配置auto.commit.interval.ms來控制提交的頻率)。
為了盡量避免出現重復數據和數據丟失,可以把它設為 false ,由自己控制何時提交偏移量。
partition.assignment.strategy
這兩個屬性必須被設置為一個實現了org.apache.kafka.clients.consumer.internals.PartitionAssignor接口的類。
消費協調者ConsumerCoordinator會使用這個類,根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者。
Kafka 默認的分配策略:
org.apache.kafka.clients.consumer.RangeAssignor把主題的若干個連續的分區分配給消費者org.apache.kafka.clients.consumer.RoundRobinAssignor把主題的所有分區逐個分配給消費者org.apache.kafka.clients.consumer.StickyAssignor盡可保證分配平衡的前提下減少再均衡造成的變更
