使用 Python 監控 Kafka Consumer LAG


我在要完成這個需求的時候大概有兩個思路。

 

第一種方法:

我們直接使用 Kafka 提供的 bin 工具,去把我們關心的 lag 值 show 出來然后通過代碼處理一下報出來。例如:

 

我們可以起個遠程的 cmd 腳本,定期去執行 kafka-consumer-groups.sh 工具然后通過 awk '{print $1,$2,$5}' 拿到對應的 consumer paritions 和 lag 值,然后使用腳本稍微處理一下該報警的報警,該忽略的忽略。

這個辦法很 ok 但是有個不太好的地方是他依賴去連接物理主機。很多情況下我們其實並不能訪問 kafka 所在的物理主機,而只能訪問到其服務。並且通過 ssh 的方法還會受各種權限的影響,所以這個方法可能對於權限很高的同學來說還不錯,但是並不是很朴實的做法。

 

第二種方法:

思路是我們使用 Kafka 客戶端庫提供的 KafkaAdmin 工具來取得 Kafka 當前 topic 的高水位,然后使用當前 Topic 的高水位來減去當前消費者消費到的位置。

我其實也蠻奇怪的, Kafka-python 里面的 KafkaAdmin 工具並沒有提供現成的可以直接取得 lag 的方法,而是要自己擼一個,甚至連獲取高水位的方法都沒有提供。。。還是去 datadog 的庫里找的,並且最新的 1.4.7 版本還無法直接使用  = = 不知道其他語言的客戶端是否也是這樣。。

class MonitorKafkaInfra(object):
    """Reference:
    https://github.com/DataDog/integrations-core/pull/2730/files
    https://github.com/dpkp/kafka-python/issues/1673
    https://github.com/dpkp/kafka-python/issues/1501
    """
    kafka_admin_client = KafkaAdminClient().kafka

    @classmethod
    def get_highwater_offsets(cls, kafka_admin_client, topics=None):
        """Fetch highwater offsets for topic_partitions in the Kafka cluster.
        Do this for all partitions in the cluster because even if it has no
        consumers, we may want to measure whether producers are successfully
        producing. No need to limit this for performance because fetching
        broker offsets from Kafka is a relatively inexpensive operation.

        Internal Kafka topics like __consumer_offsets are excluded.
        Sends one OffsetRequest per broker to get offsets for all partitions
        where that broker is the leader:
        https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset)

        Arguments:
            topics (set): The set of topics (as strings) for which to fetch
                          highwater offsets. If set to None, will fetch highwater offsets
                          for all topics in the cluster.
        """

        highwater_offsets = {}
        topic_partitions_without_a_leader = set()
        # No sense fetching highwatever offsets for internal topics
        internal_topics = {
            '__consumer_offsets',
            '__transaction_state',
            '_schema',  # Confluent registry topic
        }

        for broker in kafka_admin_client._client.cluster.brokers():
            broker_led_partitions = kafka_admin_client._client.cluster.partitions_for_broker(broker.nodeId)
            # Take the partitions for which this broker is the leader and group
            # them by topic in order to construct the OffsetRequest.
            # Any partitions that don't currently have a leader will be skipped.
            partitions_grouped_by_topic = defaultdict(list)
            if broker_led_partitions is None:
                continue
            for topic, partition in broker_led_partitions:
                if topic in internal_topics or (topics is not None and topic not in topics):
                    continue
                partitions_grouped_by_topic[topic].append(partition)

            # Construct the OffsetRequest
            max_offsets = 1
            request = OffsetRequest[0](
                replica_id=-1,
                topics=[
                    (topic, [(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions])
                    for topic, partitions in iteritems(partitions_grouped_by_topic)])

            # For version >= 1.4.7, I find the ver 1.4.7 _send_request_to_node was changed
            future = kafka_admin_client._send_request_to_node(node_id=broker.nodeId, request=request)
            kafka_admin_client._client.poll(future=future)
            response = future.value

            offsets, unled = cls._process_highwater_offsets(response)
            highwater_offsets.update(offsets)
            topic_partitions_without_a_leader.update(unled)

        return highwater_offsets, topic_partitions_without_a_leader

    @classmethod
    def _process_highwater_offsets(cls, response):
        """Convert OffsetFetchResponse to a dictionary of offsets.

            Returns: A dictionary with TopicPartition keys and integer offsets:
                    {TopicPartition: offset}. Also returns a set of TopicPartitions
                    without a leader.
        """
        highwater_offsets = {}
        topic_partitions_without_a_leader = set()

        assert isinstance(response, OffsetResponse[0])

        for topic, partitions_data in response.topics:
            for partition, error_code, offsets in partitions_data:
                topic_partition = TopicPartition(topic, partition)
                error_type = kafka_errors.for_code(error_code)
                if error_type is kafka_errors.NoError:
                    highwater_offsets[topic_partition] = offsets[0]
                # Valid error codes:
                # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-PossibleErrorCodes.2
                elif error_type is kafka_errors.NotLeaderForPartitionError:
                    topic_partitions_without_a_leader.add(topic_partition)
                elif error_type is kafka_errors.UnknownTopicOrPartitionError:
                    pass
                else:
                    raise error_type("Unexpected error encountered while "
                                     "attempting to fetch the highwater offsets for topic: "
                                     "%s, partition: %s." % (topic, partition))
        assert topic_partitions_without_a_leader.isdisjoint(highwater_offsets)
        return highwater_offsets, topic_partitions_without_a_leader

    @classmethod
    def get_kafka_consumer_offsets(cls, kafka_admin_client, consumer_groups=None):
        """Fetch Consumer Group offsets from Kafka.
        Also fetch consumer_groups, topics, and partitions if not
        already specified in consumer_groups.
        Arguments:
            consumer_groups (dict): The consumer groups, topics, and partitions
                for which you want to fetch offsets. If consumer_groups is
                None, will fetch offsets for all consumer_groups. For examples
                of what this dict can look like, see
                _validate_explicit_consumer_groups().

        Returns:
            dict: {(consumer_group, topic, partition): consumer_offset} where
                consumer_offset is an integer.
        """
        consumer_offsets = {}
        old_broker = kafka_admin_client.config['api_version'] < (0, 10, 2)
        if consumer_groups is None:  # None signals to fetch all from Kafka
            if old_broker:
                raise BadKafkaConsumerConfiguration(WARNING_BROKER_LESS_THAN_0_10_2)
            for broker in kafka_admin_client._client.cluster.brokers():
                for consumer_group, group_type in kafka_admin_client.list_consumer_groups(broker_ids=[broker.nodeId]):
                    # consumer groups from Kafka < 0.9 that store their offset
                    # in Kafka don't use Kafka for group-coordination so
                    # group_type is empty
                    if group_type in ('consumer', ''):
                        # Typically the consumer group offset fetch sequence is:
                        # 1. For each broker in the cluster, send a ListGroupsRequest
                        # 2. For each consumer group, send a FindGroupCoordinatorRequest
                        # 3. Query the group coordinator for the consumer's offsets.
                        # However, since Kafka brokers only include consumer
                        # groups in their ListGroupsResponse when they are the
                        # coordinator for that group, we can skip the
                        # FindGroupCoordinatorRequest.
                        this_group_offsets = kafka_admin_client.list_consumer_group_offsets(
                            group_id=consumer_group, group_coordinator_id=broker.nodeId)
                        for (topic, partition), (offset, metadata) in iteritems(this_group_offsets):
                            key = (consumer_group, topic, partition)
                            consumer_offsets[key] = offset
        else:
            for consumer_group, topics in iteritems(consumer_groups):
                if topics is None:
                    if old_broker:
                        raise BadKafkaConsumerConfiguration(WARNING_BROKER_LESS_THAN_0_10_2)
                    topic_partitions = None
                else:
                    topic_partitions = []
                    # transform from [("t1", [1, 2])] to [TopicPartition("t1", 1), TopicPartition("t1", 2)]
                    for topic, partitions in iteritems(topics):
                        if partitions is None:
                            # If partitions aren't specified, fetch all
                            # partitions in the topic from Kafka
                            partitions = kafka_admin_client._client.cluster.partitions_for_topic(topic)
                        topic_partitions.extend([TopicPartition(topic, p) for p in partitions])
                this_group_offsets = kafka_admin_client.list_consumer_group_offsets(consumer_group, partitions=topic_partitions)
                for (topic, partition), (offset, metadata) in iteritems(this_group_offsets):
                    # when we are explicitly specifying partitions, the offset
                    # could returned as -1, meaning there is no recorded offset
                    # for that partition... for example, if the partition
                    # doesn't exist in the cluster. So ignore it.
                    if offset != -1:
                        key = (consumer_group, topic, partition)
                        consumer_offsets[key] = offset

        return consumer_offsets

 

下面我還實現了一個直接過濾關注 Topics 和 Consumer 的函數,有異常直接報給 dingding 大概是

def monitor_lag_to_dingding_client(cls, topics, consumers, warning_offsets, dingding_client):

    msg = ''
    warning_msg = []
    # {TopicPartition(topic=u'online-events', partition=49): (314735, u'illidan-c')}
    consumer_offsets_dict = {}

    # {TopicPartition(topic=u'online-events', partition=48): 314061}
    highwater_offsets_dict, _ = MonitorKafkaInfra.get_highwater_offsets(MonitorKafkaInfra.kafka_admin_client,
                                                                        topics)
    [consumer_offsets_dict.update({TopicPartition(i[0][1], i[0][2]): (i[1], i[0][0])}) for i in
     MonitorKafkaInfra.get_kafka_consumer_offsets(MonitorKafkaInfra.kafka_admin_client).items() if
     i[0][0] in consumers]

    for i in highwater_offsets_dict.items():
        key, offsets = i[0], i[1]
        if offsets - consumer_offsets_dict[key][0] > warning_offsets:
            # WARNING
            warning_msg.append(u"Consumer: {} \n"
                                            u"Topic: {} \n"
                                            u"Partition {} \n"
                                            u"OVER WARNING OFFSETS {} \n"
                                            u"If it not expected. Please check「KAFKA MANAGER」right now\n\n ".format(
                consumer_offsets_dict[key][1], key.topic, key.partition,
                offsets - consumer_offsets_dict[key][0]))

    for i in warning_msg:
        msg += i

    dingding_client.request_to_text(msg)

 

這兩段代碼值得注意的一個地方就是第一段代碼里面的

# For version >= 1.4.7, I find the ver 1.4.7 _send_request_to_node was changed
future = kafka_admin_client._send_request_to_node(node_id=broker.nodeId, request=request)
kafka_admin_client._client.poll(future=future)
response = future.value

_send_request_to_node 方法在最新版的客戶端中有改動,需要注意一下。

另外為 kafka 開啟 jmx port 可以讓我們從 kafka jmx 端口中獲得更多關注的信息

Setting  JMX_PORT inside  bin/kafka-run-class.sh will clash with Zookeeper, if you are running Zookeeper on the same node. Best is to set  JMX port individually inside corresponding  server-start scripts:
  1. Insert line “export JMX_PORT=${JMX_PORT:-9998}” before last line in $KAFKA_HOME/bin/zookeeper-server-start.sh file.
  2. Restart the Zookeeper server.
  3. Repeat steps 1 and 2 for all zookeeper nodes in the cluster.
  4. Insert line “export JMX_PORT=${JMX_PORT:-9999}” before last line in $KAFKA_HOME/bin/kafka-server-start.sh file.
  5. Restart the Kafka Broker.
  6. Repeat steps 4 and 5 for all brokers in the cluster.

通過上述方法打開 kafka jmx 端口之后可以直接從 kafka manager 中獲取更多 kafka 的運行狀態和狀況。 包括每秒流入流量和流出流量等,以 topic 為單位。可以幫助我們更方便的了解 kafka 目前的運行情況,評估性能,以及進行 benchmark。

 

 

Refenrece:

https://github.com/DataDog/integrations-core/pull/2730/files
https://github.com/dpkp/kafka-python/issues/1673
https://github.com/dpkp/kafka-python/issues/1501

https://stackoverflow.com/questions/36708384/how-to-enable-remote-jmx-on-kafka-brokers-for-jmxtool

https://github.com/DataDog/integrations-core/blob/master/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py  the newest how to check offsets


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM