Kafka消費進度監控


Kakfa消費滯后程度有個專門的名稱:消費者 Lag 或 Consumer Lag。所謂滯后程度,就是指消費者當前落后於生產者的程度。

比方說,Kafka 生產者向某主題成功生產了 100 萬條消息,你的消費者當前消費了 80 萬條消息,那么我們就說你的消費者滯后了 20 萬條消息,即 Lag 等於 20 萬。

一個正常工作的消費者,它的 Lag 值應該很小,甚至是接近於 0 的,這表示該消費者能夠及時地消費生產者生產出來的消息,滯后程度很小。反之,如果一個消費者 Lag 值很大,通常就表明它無法跟上生產者的速度,最終 Lag 會越來越大,從而拖慢下游消息的處理速度。

更可怕的是,由於消費者的速度無法匹及生產者的速度,極有可能導致它消費的數據已經不在操作系統的頁緩存中了,那么這些數據就會失去享有 Zero Copy 技術的資格。這樣的話,消費者就不得不從磁盤上讀取它們,這就進一步拉大了與生產者的差距,進而出現馬太效應,即那些 Lag 原本就很大的消費者會越來越慢,Lag 也會越來越大。

 

有 3 種方法監控kafka消費進度。

1.使用 Kafka 自帶的命令行工具 kafka-consumer-groups 腳本。

Kafka 連接信息就是 < 主機名:端口 > 對,而 group 名稱就是你的消費者程序中設置的 group.id 值。

bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker 連接信息 > --describe --group <group 名稱 >

主題、分區等信息外,它會匯報每個分區當前最新生產的消息的位移值(即 LOG-END-OFFSET 列值)、該消費者組當前最新消費消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前兩者的差值)、消費者實例 ID、消費者連接 Broker 的主機名以及消費者的 CLIENT-ID 信息

 

 

有時候會有這種,說明運行 kafka-consumer-groups 腳本時沒有啟動消費者程序,消費者組里沒有活躍的成員。但一樣可以看到lag值。

 

 

 

 

 

2.使用 Kafka Java Consumer API 編程。

public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
        Properties props = new Properties();
        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        try (AdminClient client = AdminClient.create(props)) {
            ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
            try {
                Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動提交位移
                props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                    Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
                    return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
                            entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                // 處理中斷異常
                // ...
                return Collections.emptyMap();
            } catch (ExecutionException e) {
                // 處理 ExecutionException
                // ...
                return Collections.emptyMap();
            } catch (TimeoutException e) {
                throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
            }
        }
    }

注意第 1 處是調用 AdminClient.listConsumerGroupOffsets 方法獲取給定消費者組的最新消費消息的位移;第 2 處則是獲取訂閱分區的最新消息位移;最后 1 處就是執行相應的減法操作,獲取 Lag 值並封裝進一個 Map 對象。這段代碼只適用於 Kafka 2.0.0 及以上的版本,2.0.0 之前的版本中沒有 AdminClient.listConsumerGroupOffsets 方法。

3.使用 Kafka 自帶的 JMX 監控指標。

Kafka 消費者提供了一個名為 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指標,里面有很多屬性。和我們今天所講內容相關的有兩組屬性:records-lag-max 和 records-lead-min,它們分別表示此消費者在測試窗口時間內曾經達到的最大的 Lag 值和最小的 Lead 值。

Lag 值的含義我們已經反復講過了,我就不再重復了。這里的 Lead 值是指消費者最新消費消息的位移與分區當前第一條消息位移的差值。很顯然,Lag 和 Lead 是一體的兩個方面:Lag 越大的話,Lead 就越小,反之也是同理。

為什么要有lead值?監控到 Lag 越來越大,可能只會給你一個感受,那就是消費者程序變得越來越慢了,至少是追不上生產者程序了,除此之外,你可能什么都不會做。畢竟,有時候這也是能夠接受的。但反過來,一旦你監測到 Lead 越來越小,甚至是快接近於 0 了,你就一定要小心了,這可能預示着消費者端要丟消息了。

Kafka 消費者還在分區級別提供了額外的 JMX 指標,用於單獨監控分區級別的 Lag 和 Lead 值。JMX 名稱為:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。

所以比較推薦用自帶的 JMX 監控指標監控


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM