kafka05-offset自动提交和手动提交-漏消费和重复消费


offset的默认维护位置:

每个broker都会维护一个对应的_consumer_offsets系统主题,用来记录消费者消费分区数据的位置。0.9版本以前是记录在zk中的。

_consumer_offsets主题采用key和value的方式存储数据,key是groupid + topic + partition号,value对应当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,也就是每个groupid + topic + partition号根据key去重,只保留最新数据。

自动提交offset(kafka默认使用该模式):

为了使我们能够专注于自己的业务逻辑,kafka提供了自动提交offset的功能。

 

 

手动提交offset:

虽然自动提交offset十分简单遍历,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此kafka还提供了手动提交offset的api。

自动提交分为:(通常使用异步提交方式多一些,追求效率)

  1. 同步提交 commitSync:拉取数据提交offset会进入阻塞状态,直到提交成功后才开始下一次的拉取。

  2. 异步提交 commitAsync:拉取数据和提交offset异步进行,不会管offset是否提交成功

 

 

指定offset消费:

当kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

消费方式有:auto.offset.reset = latest

  1. earliest:自动将偏移量重置为最早的偏移量,--from-bcginning

  2. latest(默认):自动将偏移量重置为最新偏移量

  3. none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

  4. 指定具体的offset:kafkaConsumer.seek(topicPartition,offsetNum)

指定时间消费:

在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如按照时间消费前一天的数据。

kafkaConsumer.offsetForTimes(Map); 通过时间获取对应的offset

漏消费和重复消费:

重复消费:已经消费了数据,但offset没有提交成功

漏消费:先提交offset后消费,可能会造成数据的漏消费

如果想完成consumer端的精准一次性消费,那么需要kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将kafka的offset保存到支持事务的自定义介质(比如mysql)。

 

 

 

 

数据积压:

 

 

 

Kafka Eagle监控,需要自己手动安装

 

Kafka-Kraft模式,不再依赖于zk

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM