Kafka監控與JMX


JMX

JMX(Java Management Extensions,即Java管理擴展)是一個為應用程序、設備、系統等植入管理功能的框架。JMX可以跨越一系列異構操作系統平台、系統體系結構和網絡傳輸協議,靈活的開發無縫集成的系統、網絡和服務管理應用。

通俗地講,有了它就可以監控Java程序的基本信息和運行情況。

Kafka開啟JMX的配置

Windows【修改kafka-server-start.bat文件,在設置堆內存后面加上JMX端口】

@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more rem contributor license agreements. See the NOTICE file distributed with rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License.  You may obtain a copy of the License at
rem
rem     http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and rem limitations under the License.

IF [%1] EQU [] (
    echo USAGE: %0 server.properties
    EXIT /B 1
)

SetLocal
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
    set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties
)
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
    rem detect OS architecture
    wmic os get osarchitecture | find /i "32-bit" >nul 2>&1
    IF NOT ERRORLEVEL 1 (
        rem 32-bit OS
        set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
    ) ELSE (
        rem 64-bit OS
        set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
    )
    set JMX_PORT="9999"
)
"%~dp0kafka-run-class.bat" kafka.Kafka %*
EndLocal

Linux【修改kafka-server-start.sh文件,在設置堆內存后配置JMX】

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ $# -lt 1 ];
then
    echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
    exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" export JMX_PORT="9999"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

那么如何外部如何感知到Kafka的監控指標呢?

JConsole

jconsole是JDK自帶的監控工具,在JDK目錄/bin下

打開,輸入地址和JMX端口

首頁:

一些概要

還有重要的MBean

Java連接JMX應用

以上面的Kafka信息為例

public static void jmx()throws Exception{
    JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi");
    JMXConnector jmxc = JMXConnectorFactory.connect(url);
    MBeanServerConnection connection = jmxc.getMBeanServerConnection();

    System.out.println("=========Domains=========");
    String[] domains = connection.getDomains();
    for (String d : domains) {
        System.out.println(d);
    }

    System.out.println("=========MBeans=========");
    System.out.println(connection.getMBeanCount());


    System.out.println("=========Invoke=========");
    ObjectName mBeanName = new ObjectName("kafka.log:type=Log,name=Size,topic=my-topic,partition=0");
    // 獲取值
    Object value = connection.getAttribute(mBeanName, "Value");
    System.out.println(value);
    // 執行MBean的方法
    Object invoke = connection.invoke(mBeanName, "objectName", null, null);
    System.out.println(invoke);


    System.out.println("=========MBean Info=========");
    mBeanName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec");
    MBeanInfo info = connection.getMBeanInfo(mBeanName);
    System.out.println("ClassName:"+info.getClassName());
    for(MBeanAttributeInfo attr : info.getAttributes()){
        System.out.println("屬性:" + attr.getName() + ",類型:" + attr.getType() + ",值:" + connection.getAttribute(mBeanName, attr.getName()));

    }
    for(MBeanOperationInfo op : info.getOperations()){
        System.out.println("操作:" + op.getName());
    }

    jmxc.close();
}

輸出:

=========Domains=========
java.util.logging
kafka.utils
kafka.controller
java.nio
kafka.network
JMImplementation
kafka.log
kafka.coordinator.group
java.lang
com.sun.management
kafka.server
kafka.cluster
kafka
kafka.coordinator.transaction
=========MBeans=========
1098
=========Invoke=========
24997
kafka.log:type=Log,name=Size,topic=my-topic,partition=0
=========MBean Info=========
ClassName:com.yammer.metrics.reporting.JmxReporter$Meter
屬性:Count,類型:long,值:0
屬性:EventType,類型:java.lang.String,值:bytes
屬性:RateUnit,類型:java.util.concurrent.TimeUnit,值:SECONDS
屬性:MeanRate,類型:double,值:0.0
屬性:OneMinuteRate,類型:double,值:0.0
屬性:FiveMinuteRate,類型:double,值:0.0
屬性:FifteenMinuteRate,類型:double,值:0.0
操作:objectName

解釋:

1. 輸出了當前所有的Domain信息。

2. 還能輸出MBean的總數量。

3. 輸出監控信息才是我們真正需要的:ObjectName其中的值對應在JConsole

 

4. 每一個具體的Object下面都有兩個信息:屬性和操作(方法),很容易理解吧

 

更多的方法自行探索

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM