對於kafka消費者來說,最重要的事情就是監控它們的消費進度了,或者說是監控它們消費的滯后程度。這個滯后程度有個專門的名稱:消費者Lag或Consumer Lag。
所謂滯后程度,就是指消費者當前落后生產者的程度。比方說,kafka生產者想某主題成功生產了100萬條消息,你的消息者當前消費了80萬條消息,那么我們就說你的消費者滯后了20萬條消息,即Lag等於20萬。
通常來說,Lag的單位是消息數,而且我們一般是在主題這個級別討論Lag的,但實際上,kafka監控Lag的層級實在分區上的。如果要計算主題級別的,你需要手動匯總所有主題分區的Lag,將它們累加起來,合並成最終的Lag值。
一般來說,正常的工作的消費者,它的Lag值應該很小,甚至是接近於0,這表示該消費者能夠及時地消費生產者產生的消息。如果Lag值很大,通常表明它無法跟上生產者的速度,最終Lag會越來越大,從而拖慢下游消息的處理速度。
既然消費進度這么重要,我們應該怎么監控它?簡單來說,有3中方法:
- 使用kafka自帶的命令行工具kafka-consumer-groups.sh腳本查看
- 使用kafka Consumer API編程
- 使用kafka自帶的JMX監控指標
kafka自帶命令:
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ad-kafka02-aliuk-00146:9092 --describe --group platform_ig-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
platform_ig-group platform_ig 1 20 20 0 rdkafka-d93a07d5-c87f-4465-8030-2cd50ead37f5 /172.31.4.133 rdkafka
platform_ig-group platform_ig 4 16 16 0 rdkafka-d93a07d5-c87f-4465-8030-2cd50ead37f5 /172.31.4.133 rdkafka
platform_ig-group platform_ig 2 13 13 0 rdkafka-d93a07d5-c87f-4465-8030-2cd50ead37f5 /172.31.4.133 rdkafka
platform_ig-group platform_ig 3 28 28 0 rdkafka-d93a07d5-c87f-4465-8030-2cd50ead37f5 /172.31.4.133 rdkafka
platform_ig-group platform_ig 0 17 17 0 rdkafka-d93a07d5-c87f-4465-8030-2cd50ead37f5 /172.31.4.133 rdkafka
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ad-kafka02-aliuk-00146:9092 --describe --group price-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
price-group price 2 2145 3404636 3402491 sarama-00084857-dae4-478b-a302-b42f6e76cb23 /172.17.2.167 sarama
price-group price 1 4032 4916799 4912767 sarama-00084857-dae4-478b-a302-b42f6e76cb23 /172.17.2.167 sarama
price-group price 4 10274 3401565 3391291 sarama-00084857-dae4-478b-a302-b42f6e76cb23 /172.17.2.167 sarama
price-group price 0 12707 4921711 4909004 sarama-00084857-dae4-478b-a302-b42f6e76cb23 /172.17.2.167 sarama
price-group price 3 11612 4916577 4904965 sarama-00084857-dae4-478b-a302-b42f6e76cb23 /172.17.2.167 sarama
kakfa Java Consumer API:
很多時候,你可能對運⾏命令⾏⼯具查詢 Lag 這種⽅式並不滿意,⽽是希望⽤程序的⽅式⾃動化 監控。幸運的是,社區的確為我們提供了這樣的⽅法。這就是我們今天要講的第⼆種⽅法。 簡單來說,社區提供的 Java Consumer API 分別提供了查詢當前分區最新消息位移和消費者組 最新消費消息位移兩組⽅法,我們使⽤它們就能計算出對應的 Lag。 下⾯這段代碼展示了如何利⽤ Consumer 端 API 監控給定消費者組的 Lag 值:
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動提交位移
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 處理中斷異常
// ...
return Collections.emptyMap();
} catch (ExecutionException e) {
// 處理 ExecutionException
// ...
return Collections.emptyMap();
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
}
}
}
你不⽤完全了解上⾯這段代碼每⼀⾏的具體含義,只需要記住我標為橙⾊的 3 處地⽅即可:第 1 處是調⽤ AdminClient.listConsumerGroupOffsets ⽅法獲取給定消費者組的最新消費消息的位 移;第 2 處則是獲取訂閱分區的最新消息位移;最后 1 處就是執⾏相應的減法操作,獲取 Lag 值並封裝進⼀個 Map 對象。 我把這段代碼送給你,你可以將 lagOf ⽅法直接應⽤於你的⽣產環境,以實現程序化監控消費者 Lag 的⽬的。不過請注意,這段代碼只適⽤於 Kafka 2.0.0 及以上的版本,2.0.0 之前的版本中 沒有 AdminClient.listConsumerGroupOffsets ⽅法。
kafka JMX監控指標:
我們來看下第三種方法,使用kafka默認提供的JMX監控指標來監控消費者的Lag值。並且可以集成到監控框架中
當前,kafka消費者提供了一個名為kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"的JMX指標,里面有很多屬性。和我們今天所講的內容相關的有倆組屬性: records-lag=max和records-lead-min,他們分別表示此消費者在測試窗口時間內曾經達到的最大的Lag值和最小的Lead值。
Lag值的含義我們已經反復講過了。這里的Lead值是指消費者最新消費消息的位移與分區當前第一條消息位移的差值。
很顯然,Lag和Lead是一體的倆個方面:Lag越大的話,Lead就越小,反之也是同理。
你可能會問,為什么要引⼊ Lead 呢?我只監控 Lag 不就⾏了嗎?這⾥提 Lead 的原因就在於這 部分功能是我實現的。開個玩笑,其實社區引⼊ Lead 的原因是,只看 Lag 的話,我們也許不能 及時意識到可能出現的嚴重問題。 試想⼀下,監控到 Lag 越來越⼤,可能只會給你⼀個感受,那就是消費者程序變得越來越慢了, ⾄少是追不上⽣產者程序了,除此之外,你可能什么都不會做。畢竟,有時候這也是能夠接受 的。但反過來,⼀旦你監測到 Lead 越來越⼩,甚⾄是快接近於 0 了,你就⼀定要⼩⼼了,這可 能預示着消費者端要丟消息了。 為什么?我們知道 Kafka 的消息是有留存時間設置的,默認是 1 周,也就是說 Kafka 默認刪除 1 周前的數據。倘若你的消費者程序⾜夠慢,慢到它要消費的數據快被 Kafka 刪除了,這時你就 必須⽴即處理,否則⼀定會出現消息被刪除,從⽽導致消費者程序重新調整位移值的情形。這可 能產⽣兩個后果:⼀個是消費者從頭消費⼀遍數據,另⼀個是消費者從最新的消息位移處開始消 費,之前沒來得及消費的消息全部被跳過了,從⽽造成丟消息的假象。 這兩種情形都是不可忍受的,因此必須有⼀個 JMX 指標,清晰地表征這種情形,這就是引⼊ Lead 指標的原因。所以,Lag 值從 100 萬增加到 200 萬這件事情,遠不如 Lead 值從 200 減 少到 100 這件事來得重要。在實際⽣產環境中,請你⼀定要同時監控 Lag 值和 Lead 值。當然 了,這個 lead JMX 指標的確也是我開發的,這⼀點倒是事實。
