kafka入门 第十篇 消费者组进度监控都怎么实现?


对于kafka消费者来说,最重要的事情就是监控它们的消费进度了,或者说是监控它们消费的滞后程度。这个滞后程度有个专门的名称:消费者Lag或Consumer Lag。
所谓滞后程度,就是指消费者当前落后生产者的程度。比方说,kafka生产者想某主题成功生产了100万条消息,你的消息者当前消费了80万条消息,那么我们就说你的消费者滞后了20万条消息,即Lag等于20万。
通常来说,Lag的单位是消息数,而且我们一般是在主题这个级别讨论Lag的,但实际上,kafka监控Lag的层级实在分区上的。如果要计算主题级别的,你需要手动汇总所有主题分区的Lag,将它们累加起来,合并成最终的Lag值。
一般来说,正常的工作的消费者,它的Lag值应该很小,甚至是接近于0,这表示该消费者能够及时地消费生产者产生的消息。如果Lag值很大,通常表明它无法跟上生产者的速度,最终Lag会越来越大,从而拖慢下游消息的处理速度。
既然消费进度这么重要,我们应该怎么监控它?简单来说,有3中方法:
  1. 使用kafka自带的命令行工具kafka-consumer-groups.sh脚本查看
  2. 使用kafka Consumer API编程
  3. 使用kafka自带的JMX监控指标
kafka自带命令:
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ad-kafka02-aliuk-00146:9092 --describe --group platform_ig-group
 
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
platform_ig-group platform_ig 1 20 20 0 rdkafka-d93a07d5-c87f-4465-8030-2cd50ead37f5 /172.31.4.133 rdkafka
platform_ig-group platform_ig 4 16 16 0 rdkafka-d93a07d5-c87f-4465-8030-2cd50ead37f5 /172.31.4.133 rdkafka
platform_ig-group platform_ig 2 13 13 0 rdkafka-d93a07d5-c87f-4465-8030-2cd50ead37f5 /172.31.4.133 rdkafka
platform_ig-group platform_ig 3 28 28 0 rdkafka-d93a07d5-c87f-4465-8030-2cd50ead37f5 /172.31.4.133 rdkafka
platform_ig-group platform_ig 0 17 17 0 rdkafka-d93a07d5-c87f-4465-8030-2cd50ead37f5 /172.31.4.133 rdkafka
 
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ad-kafka02-aliuk-00146:9092 --describe --group price-group
 
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
price-group price 2 2145 3404636 3402491 sarama-00084857-dae4-478b-a302-b42f6e76cb23 /172.17.2.167 sarama
price-group price 1 4032 4916799 4912767 sarama-00084857-dae4-478b-a302-b42f6e76cb23 /172.17.2.167 sarama
price-group price 4 10274 3401565 3391291 sarama-00084857-dae4-478b-a302-b42f6e76cb23 /172.17.2.167 sarama
price-group price 0 12707 4921711 4909004 sarama-00084857-dae4-478b-a302-b42f6e76cb23 /172.17.2.167 sarama
price-group price 3 11612 4916577 4904965 sarama-00084857-dae4-478b-a302-b42f6e76cb23 /172.17.2.167 sarama
kakfa Java Consumer API:
很多时候,你可能对运⾏命令⾏⼯具查询 Lag 这种⽅式并不满意,⽽是希望⽤程序的⽅式⾃动化 监控。幸运的是,社区的确为我们提供了这样的⽅法。这就是我们今天要讲的第⼆种⽅法。 简单来说,社区提供的 Java Consumer API 分别提供了查询当前分区最新消息位移和消费者组 最新消费消息位移两组⽅法,我们使⽤它们就能计算出对应的 Lag。 下⾯这段代码展示了如何利⽤ Consumer 端 API 监控给定消费者组的 Lag 值:
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
        Properties props = new Properties();
        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        try (AdminClient client = AdminClient.create(props)) {
            ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
            try {
                Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
                props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                    Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
                    return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
                            entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                // 处理中断异常
                // ...
                return Collections.emptyMap();
            } catch (ExecutionException e) {
                // 处理 ExecutionException
                // ...
                return Collections.emptyMap();
            } catch (TimeoutException e) {
                throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
            }
        }
    }
 
 
你不⽤完全了解上⾯这段代码每⼀⾏的具体含义,只需要记住我标为橙⾊的 3 处地⽅即可:第 1 处是调⽤ AdminClient.listConsumerGroupOffsets ⽅法获取给定消费者组的最新消费消息的位 移;第 2 处则是获取订阅分区的最新消息位移;最后 1 处就是执⾏相应的减法操作,获取 Lag 值并封装进⼀个 Map 对象。 我把这段代码送给你,你可以将 lagOf ⽅法直接应⽤于你的⽣产环境,以实现程序化监控消费者 Lag 的⽬的。不过请注意,这段代码只适⽤于 Kafka 2.0.0 及以上的版本,2.0.0 之前的版本中 没有 AdminClient.listConsumerGroupOffsets ⽅法。
 
 
kafka JMX监控指标:
我们来看下第三种方法,使用kafka默认提供的JMX监控指标来监控消费者的Lag值。并且可以集成到监控框架中
当前,kafka消费者提供了一个名为kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"的JMX指标,里面有很多属性。和我们今天所讲的内容相关的有俩组属性: records-lag=max和records-lead-min,他们分别表示此消费者在测试窗口时间内曾经达到的最大的Lag值和最小的Lead值。
Lag值的含义我们已经反复讲过了。这里的Lead值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。
很显然,Lag和Lead是一体的俩个方面:Lag越大的话,Lead就越小,反之也是同理。
你可能会问,为什么要引⼊ Lead 呢?我只监控 Lag 不就⾏了吗?这⾥提 Lead 的原因就在于这 部分功能是我实现的。开个玩笑,其实社区引⼊ Lead 的原因是,只看 Lag 的话,我们也许不能 及时意识到可能出现的严重问题。 试想⼀下,监控到 Lag 越来越⼤,可能只会给你⼀个感受,那就是消费者程序变得越来越慢了, ⾄少是追不上⽣产者程序了,除此之外,你可能什么都不会做。毕竟,有时候这也是能够接受 的。但反过来,⼀旦你监测到 Lead 越来越⼩,甚⾄是快接近于 0 了,你就⼀定要⼩⼼了,这可 能预示着消费者端要丢消息了。 为什么?我们知道 Kafka 的消息是有留存时间设置的,默认是 1 周,也就是说 Kafka 默认删除 1 周前的数据。倘若你的消费者程序⾜够慢,慢到它要消费的数据快被 Kafka 删除了,这时你就 必须⽴即处理,否则⼀定会出现消息被删除,从⽽导致消费者程序重新调整位移值的情形。这可 能产⽣两个后果:⼀个是消费者从头消费⼀遍数据,另⼀个是消费者从最新的消息位移处开始消 费,之前没来得及消费的消息全部被跳过了,从⽽造成丢消息的假象。 这两种情形都是不可忍受的,因此必须有⼀个 JMX 指标,清晰地表征这种情形,这就是引⼊ Lead 指标的原因。所以,Lag 值从 100 万增加到 200 万这件事情,远不如 Lead 值从 200 减 少到 100 这件事来得重要。在实际⽣产环境中,请你⼀定要同时监控 Lag 值和 Lead 值。当然 了,这个 lead JMX 指标的确也是我开发的,这⼀点倒是事实。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM