Kafka 提供了 3 種提交 offset 的方式
- 自動提交
1 |
// 自動提交,默認true |
- 手動同步提交 offset
1 |
consumer.commitSync(); |
- 手動異步提交 offset
1 |
consumer.commitAsync(); |
上面說了既然異步提交 offset 可能會重復消費, 那么我使用同步提交是否就可以表明這個問題呢?
1 |
while(true) { |
很明顯不行, 因為 insertIntoDB 和 commitSync() 做不到原子操作, 如果 insertIntoDB() 成功了,但是提交 offset 的時候 consumer 掛掉了,然后服務器重啟,仍然會導致重復消費問題。
如何做到不重復消費?
只要保證處理消息和提交 offset 得操作是原子操作,就可以做到不重復消費。我們可以自己管理 committed offset, 而不讓 kafka 來進行管理。
比如如下使用方式:
- 如果消費的數據剛好需要存儲在數據庫,那么可以把 offset 也存在數據庫,就可以就可以在一個事物中提交這兩個結果,保證原子操作。
- 借助搜索引擎,把 offset 和數據一起放到索引里面,比如 Elasticsearch
每條記錄都有自己的 offset, 所以如果要管理自己的 offset 還得要做下面事情
- 設置 enable.auto.commit=false
- 使用每個 ConsumerRecord 提供的 offset 來保存消費的位置。
- 在重新啟動時使用 seek(TopicPartition, long) 恢復上次消費的位置。
通過上面的方式就可以在消費端實現”Exactly Once” 的語義, 即保證只消費一次。但是是否真的需要保證不重復消費呢?這個得看具體業務, 重復消費數據對整體有什么影響在來決定是否需要做到不重復消費。