Kafka(三) —— 集群監控


任何應用功能再強大、性能再優越,如果沒有與之匹配的監控,那么一切都是虛無縹緲的。監控不僅可以為應用提供運行時的數據作為依據參考,還可以迅速定位問題,提供預防及告警等功能,很大程度上增強了整體服務的魯棒性。

一、Kafka監控指標與獲取

Kafka監控的4個維度:

  • 集群信息
  • broker信息
  • topic信息
  • consumer group信息

使用JConsole訪問JMX

(1)終端輸入jconsole,啟動Java監視和管理控制台。

(2)修改kafka-run-class.sh,使JConsole可以通過遠程連接。

KAFKA_JMX_OPTS="

-Dcom.sun.management.jmxremote 

-Dcom.sun.management.jmxremote.authenticate=false  

-Dcom.sun.management.jmxremote.ssl=false 

-Djava.rmi.server.hostname=服務器的IP地址或者域名"

(3)修改kafka-server-start.sh,增加export JMX_PORT="9999"

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
fi

(4)檢查是否開啟JMX

broker在啟動過程中,始終會將JMX端口信息寫入Kafka對應的位置.

(5)連接

(6)查看MBean

MBean的名稱,xxx.type=yyy,{attr} = zzz

其中xxx指的是組件名,如xxx = kafka.server
zzz 和 attr 指的是MBean的范圍,例如topic = test,表示該MBean的作用范圍是名為test的topic。

指標分類:

  • kafka.server 服務器端JMX指標
  • kafka.network 網絡相關JMX指標
  • kafka.log 分區日志相關JMX指標
  • kafka.controller controller相關指標

使用Java程序訪問JMX

(1)監控broker一分鍾消息流入的速度

kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec

OneMinuteRate 表示某個broker一分鍾消息流入的速度(messages/s)。


public class KafkaJmxDemo {

    private MBeanServerConnection conn;

    private String jmxUrl;

    private String ipAndPort;

    public KafkaJmxDemo(String ipAndPort) {
        this.ipAndPort = ipAndPort;
    }

    /**
     * 初始化JMX連接
     *
     * @return
     */
    public boolean init() {
        jmxUrl = "service:jmx:rmi:///jndi/rmi://" + ipAndPort + "/jmxrmi";
        try {
            JMXServiceURL serviceURL = new JMXServiceURL(jmxUrl);
            JMXConnector connector = JMXConnectorFactory.connect(serviceURL, null);
            conn = connector.getMBeanServerConnection();
            if (conn == null) {
                return false;
            }
        } catch (MalformedURLException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }

    public double getMsgInPerSec() {
        String objectName = "kafka.server:type=BrokerTopicMetrics," +
                "name=MessagesInPerSec";
        Object val = getAttribute(objectName, "OneMinuteRate");
        if (val != null) {
            return (double) (Double) val;
        }
        return 0.0;
    }

    private Object getAttribute(String objName, String objAttr) {
        ObjectName objectName;
        try {
            objectName = new ObjectName(objName);
            return conn.getAttribute(objectName, objAttr);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void main(String[] args) {
        KafkaJmxDemo kafkaJmxDemo = new KafkaJmxDemo("127.0.0.1:9999");
        kafkaJmxDemo.init();
        System.out.println(kafkaJmxDemo.getMsgInPerSec());
    }

}


(2)獲取指定Topic、指定分區的LEO值


    public long getTopicPatitionLeo(String topic, int partition) {
        String objectName = "kafka.log:type=Log,name=LogEndOffset,topic=" + topic + ",partition=" + partition;
        Object val = getAttribute(objectName, "Value");
        if (val != null) {
            return (long) (Long) val;
        }
        return 0L;
    }

(3)監控指定Topic的消息流入的速度


public double getBrokerTopicMetrics(String topic) {
        String objectName = "kafka.server:type=BrokerTopicMetrics," +
                "name=BytesInPerSec,topic=" + topic;
        Object val = getAttribute(objectName, "OneMinuteRate");
        if (val != null) {
            return (double) (Double) val;
        }
        return 0.0;
    }

輸出

Kafka重要監控參數

(1)消息入站、出站速率

## 入站速率
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec

## 出站速率
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
屬性名 含義
Count broker處理過的總消息字節數
OneMinuteRate 統計過去1分鍾內的消息速率
MeanRate 統計平均消息速率

二、監控系統kafka-manager

注意每一行后面不要留空格。

[repositories] 
local
aliyun: http://maven.aliyun.com/nexus/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots

Add Cluster時,會提示異常。

Yikes! Ask timed out on [ActorSelection[Anchor(akka://kafka-manager-system/), Path(/user/kafka-manager)]] after [5000 ms]. Message of type [kafka.manager.model.ActorModel$KMAddCluster]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.

三、監控系統kafka-eagle

安裝參考

官方參考文檔:https://docs.kafka-eagle.org/

https://www.cnblogs.com/yinzhengjie/p/9957389.html

下載


wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.2.0.tar.gz

效果圖

創建Topic

監控Broker的消息出入站速率

參考文檔

kafka-manager Github
Kafka集群管理工具kafka-manager的安裝使用
kafka manager的使用,kafka manager頁面參數說明
Kafka Manager幾個指標含義


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM