監控Kafka消費進度


使用Kafka作為消息中間件消費數據時,監控Kafka消費的進度很重要。其中,在監控消費進度的過程中,主要關注消費Lag。

常用監控Kafka消費進度的方法有三種,分別是使用Kafka自帶的命令行工具、使用Kafka Consumer API和Kafka自帶的JMX監控指標,這里介紹前兩種方法。
注: 內網IP:10.12.100.126 10.12.100.127 10.12.100.128 外網IP:47.90.133.76 47.90.133.77 47.90.133.78 用戶名:server1 server2 server3

1 使用kafka自帶的命令行工具

針對Kafka高級消費API,使用kafka自帶的命令行工具kafka-consumer-groups.sh腳本直接查看Kafka消費進度

1.1 列出存在的所有消費者組

(base) root@node3:/opt/kafka/kafka_2.11-0.10.2.2/bin#  kafka-consumer-groups.sh new-consumer --bootstrap-server 10.12.100.126:9092,10.12.100.127:9092,10.12.100.128:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/kafka/kafka_2.11-0.10.2.2/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
consumer
consumers

1.2 使用kafka-consumer-groups.sh查看消費進度

(base) root@node3:/opt/kafka/kafka_2.11-0.10.2.2/bin# kafka-consumer-groups.sh --bootstrap-server 10.12.100.126:9092,10.12.100.127:9092,10.12.100.128:9092 --describe --group consumers
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/kafka/kafka_2.11-0.10.2.2/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                        0          17734           17734           0          consumer-1-dd342b6b-176a-4127-9a0b-81a3b46bd388   /47.90.133.76                  consumer-1
test                        1          17736           17736           0          consumer-1-dd342b6b-176a-4127-9a0b-81a3b46bd388   /47.90.133.76                   consumer-1
test                        2          17735           17735           0          consumer-1-dd342b6b-176a-4127-9a0b-81a3b46bd388   /47.90.133.76                  consumer-1

GROUP TOPIC PID OFFSET LOGSIZE LAG
消費者組 話題id 分區id 當前已消費的條數 總條數 未消費的條數

注意:LAG的單位時消息條數,LAG為0,表示消費者實時消費生產者產生的消息,無滯后;LAG越大,表示消費者不能及時消費生產者生產的消息,有滯后。

2 使用kafka Consumer API

from kafka import SimpleClient, KafkaConsumer
from kafka.common import OffsetRequestPayload, TopicPartition
def get_topic_offset(brokers, topic):
    client = SimpleClient(brokers)
    partitions = client.topic_partitions[topic]
    offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in  partitions.keys()]
    offsets_responses = client.send_offset_request(offset_requests)
    return sum([r.offsets[0] for r in offsets_responses])
def get_group_offset(brokers, group_id, topic):
    consumer = KafkaConsumer(bootstrap_servers=brokers,
                             group_id=group_id,
                             )
    pts = [TopicPartition(topic=topic, partition=i) for i in
           consumer.partitions_for_topic(topic)]
    result = consumer._coordinator.fetch_committed_offsets(pts)
    return sum([r.offset for r in result.values()])
if __name__ == '__main__':
    topic_offset =  get_topic_offset("47.90.133.76:9092,47.90.133.77:9092,47.90.133.78:9092", "test")
    group_offset =  get_group_offset("47.90.133.76:9092,47.90.133.77:9092,47.90.133.78:9092", "consumers", "test")
    lag = topic_offset - group_offset
    print(topic_offset) # topic的offset總和
    print(group_offset) # topic特定group已消費的offset的總和
    print(lag) # 未消費的條數

(base) root@server3:~# python  getKafkaLag.py
17735
17735
0

代碼參考https://www.jianshu.com/p/e48af92e199d


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM