kafka消費端提交offset的方式


Kafka 提供了 3 種提交 offset 的方式

  1. 自動提交
1
2
3
4
// 自動提交,默認true
props.put("enable.auto.commit", "true");
// 設置自動每1s提交一次
props.put("auto.commit.interval.ms", "1000");
  1. 手動同步提交 offset
1
consumer.commitSync();
  1. 手動異步提交 offset
1
consumer.commitAsync();

上面說了既然異步提交 offset 可能會重復消費, 那么我使用同步提交是否就可以表明這個問題呢?

1
2
3
4
5
6
7
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
insertIntoDB(record);
consumer.commitSync();
});
}

很明顯不行, 因為 insertIntoDB 和 commitSync() 做不到原子操作, 如果 insertIntoDB() 成功了,但是提交 offset 的時候 consumer 掛掉了,然后服務器重啟,仍然會導致重復消費問題。

如何做到不重復消費?

只要保證處理消息和提交 offset 得操作是原子操作,就可以做到不重復消費。我們可以自己管理 committed offset, 而不讓 kafka 來進行管理。

比如如下使用方式:

  1. 如果消費的數據剛好需要存儲在數據庫,那么可以把 offset 也存在數據庫,就可以就可以在一個事物中提交這兩個結果,保證原子操作。
  2. 借助搜索引擎,把 offset 和數據一起放到索引里面,比如 Elasticsearch

每條記錄都有自己的 offset, 所以如果要管理自己的 offset 還得要做下面事情

  1. 設置 enable.auto.commit=false
  2. 使用每個 ConsumerRecord 提供的 offset 來保存消費的位置。
  3. 在重新啟動時使用 seek(TopicPartition, long) 恢復上次消費的位置。

通過上面的方式就可以在消費端實現”Exactly Once” 的語義, 即保證只消費一次。但是是否真的需要保證不重復消費呢?這個得看具體業務, 重復消費數據對整體有什么影響在來決定是否需要做到不重復消費。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM