每个broker都会维护一个对应的_consumer_offsets系统主题,用来记录消费者消费分区数据的位置。0.9版本以前是记录在zk中的。
_consumer_offsets主题采用key和value的方式存储数据,key是groupid + topic + partition号,value对应当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,也就是每个groupid + topic + partition号根据key去重,只保留最新数据。
自动提交offset(kafka默认使用该模式):
为了使我们能够专注于自己的业务逻辑,kafka提供了自动提交offset的功能。
手动提交offset:
虽然自动提交offset十分简单遍历,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此kafka还提供了手动提交offset的api。
自动提交分为:(通常使用异步提交方式多一些,追求效率)
同步提交 commitSync:拉取数据提交offset会进入阻塞状态,直到提交成功后才开始下一次的拉取。
异步提交 commitAsync:拉取数据和提交offset异步进行,不会管offset是否提交成功
指定offset消费:
当kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
消费方式有:auto.offset.reset = latest
earliest:自动将偏移量重置为最早的偏移量,--from-bcginning
latest(默认):自动将偏移量重置为最新偏移量
none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
指定具体的offset:kafkaConsumer.seek(topicPartition,offsetNum)
指定时间消费:
在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如按照时间消费前一天的数据。
kafkaConsumer.offsetForTimes(Map); 通过时间获取对应的offset
漏消费和重复消费:
重复消费:已经消费了数据,但offset没有提交成功
漏消费:先提交offset后消费,可能会造成数据的漏消费
如果想完成consumer端的精准一次性消费,那么需要kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将kafka的offset保存到支持事务的自定义介质(比如mysql)。
数据积压:
Kafka Eagle监控,需要自己手动安装
Kafka-Kraft模式,不再依赖于zk