利用InfluxDB+Grafana實現kafka消費tps監控


由於配套的監控系統還不完善,自己寫了一個簡易版kafka消息消費tps監控頁面。

InfluxDB安裝

  • 下載安裝

wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.2.x86_64.rpm

sudo yum localinstall influxdb-1.6.2.x86_64.rpm

 

  • InfluxDB配置

 

vi /etc/influxdb/influxdb.conf

 

 

enabled = true
database = "jmeter"
retention-policy = ""
bind-address = ":2003"
protocol = "tcp"
consistency-level = "one"
batch-size = 5000
batch-pending = 10
batch-timeout = "1s"
udp-read-buffer = 0
separator = "."
  • InfluxDB啟動
influxd -config /etc/influxdb/influxdb.conf
  • InfluxDB操作
顯示所有數據庫:show databases

新增數據庫:create database shhnwangjian

創建管理員權限的用戶:CREATE USER "admin" WITH PASSWORD 'admin' WITH ALL PRIVILEGES

刪除數據庫:drop database shhnwangjian

使用指定數據庫:use shhnwangjian

顯示所有表:SHOW MEASUREMENTS

顯示
一個measurement中所有tag key:show tag keys from disk_free
查看一個measurement中所有的field key:show field keys
新增數據:insert disk_free,hostname=server01 value=442221834240i 1435362189575692182

刪除表:drop measurement disk_free

插入數據:insert disk_free,hostname=server01 value=100 1613791502000000000
順序為:表名(不存在則直接新建)+tag的key和value,field的key和value。注意中間的空格。

 

Api接口插入數據:

curl -i -XPOST 'http://IP:PORT/write?db=jmeter' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'

 

JMeter配置

  1. 創建一個測試計划,並添加Backend Listener
  2. 設置InfluxDB IP及端口設置InfluxDB IP及端口
  3. 運行測試,等待幾秒運行測試,等待幾秒
  4. 查看JMeter是否生成錯誤日志查看JMeter是否生成錯誤日志

端口說明:

  • 8086端口,Grafana用來從數據庫取數據的端口
  • 2003端口,JMeter往數據庫發數據的端口

 

Grafana配置

  • 下載安裝
wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.2.0-1.x86_64.rpm sudo yum localinstall grafana-4.2.0-1.x86_64.rpm
  • 啟動
service grafana-server start

  • 創建InfluxDB數據源

  • 創建dashboard

 

 

SQL:SELECT mean("value") FROM "tps" WHERE $timeFilter GROUP BY time(1s), "host" fill(null)

$__interval:group time(interval)會對查詢結果按照interval進行聚合,例如:time(5m),interval=5m, 則會將數據每隔5m進行聚合

  • 折線圖配置

 

通過Stack來控制是否累積 

數據入庫

通過javasample將消費監控集成到jmeter中,在啟動生產者之前先啟動消費監控線程組

import lombok.SneakyThrows;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;


import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @author: weisy
 * @date: 2021/02/22
 * @description:
 */
public class KafkaMonitor extends AbstractJavaSamplerClient {
    private String kafkaIp;
    private String topic;
    private String consumer;
    private String influxdbIp;
    private String influxdbName;
    private String measurement;
    private int sampleInterval;
    private int loopTime;
    @Override
    public Arguments getDefaultParameters() {
        Arguments params = new Arguments();
        params.addArgument("kafkaIp", "IP:PORT","kafka集群信息");
        params.addArgument("topic", "topic_name","topic");
        params.addArgument("consumer", group_name","消費者");
        params.addArgument("influxdbIp", "IP:PORT","數據庫ip");
        params.addArgument("influxdbName", "jmeter","數據庫名");
        params.addArgument("measurement", "tps","表名");
        params.addArgument("sampleInterval", "0.5","采樣間隔時間/s");
        params.addArgument("loopTime", "60","持續時間/s");

        return params;
    }

    /**
     * 每個線程測試前執行一次,做一些初始化工作
     * 獲取輸入的參數,賦值給變量,參數也可以在下面的runTest方法中獲取,這里是為了展示該方法的作用
     * @param arg0
     */
    @Override
    public void setupTest(JavaSamplerContext arg0) {
        kafkaIp = arg0.getParameter("kafkaIp");
        topic = arg0.getParameter("topic");
        consumer = arg0.getParameter("consumer");
        influxdbIp = arg0.getParameter("influxdbIp");
        influxdbName = arg0.getParameter("influxdbName");
        measurement = arg0.getParameter("measurement");
        try {
            this.sampleInterval = Integer.valueOf(arg0.getParameter("sampleInterval")).intValue()*1000;
        } catch (NumberFormatException e) {
            e.printStackTrace();
        }
        try {
            this.loopTime = Integer.valueOf(arg0.getParameter("loopTime")).intValue();
        } catch (NumberFormatException e) {
            e.printStackTrace();
        }

    }

    /**
     * 真正執行邏輯的方法
     * @param arg0
     * @return
     */
    @SneakyThrows
    @Override
    public SampleResult runTest(JavaSamplerContext arg0) {

        SampleResult sr = new SampleResult();
        Date data = new Date();
        long currentTime =new Date().getTime();
        long endTime=currentTime+loopTime * 1000L;
        int tol = 0;
        int num = 0;
        System.out.println("開始時間"+currentTime);
        System.out.println("結束時間"+endTime);
        while (currentTime < endTime) {
            currentTime=new Date().getTime();
            int offsets = 0;
            offsets = getPartitionsForTopic(topic,kafkaIp,consumer);
            System.out.println("消費時間:"+currentTime+" 消費量:"+ (offsets-num));
            int tps = offsets-num;
            if (num != 0 && offsets - num != 0){
                tol = offsets - num + tol;

                String querybody = measurement+",host=server01,region=us-west value="+tps;
                String url = "http://"+influxdbIp+"/write?db="+influxdbName;
                HttpClient4.writeInfluxdb(url,querybody,null);
            }
            num = offsets;
            Thread.sleep(sampleInterval);
        }
        return sr;
    }

    /**
     * 測試結束后調用
     * @param arg0
     */
    @Override
    public void teardownTest(JavaSamplerContext arg0) {


    }
    private static Consumer<Long, String> createConsumer(String BOOTSTRAP_SERVERS,String Group) {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, Group);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        final Consumer<Long, String> consumer = new KafkaConsumer<>(props);

        return consumer;
    }

    public static int getPartitionsForTopic(String TOPIC,String BOOTSTRAP_SERVERS,String Group) {
        final Consumer<Long, String> consumer = createConsumer(BOOTSTRAP_SERVERS,Group);
        AtomicInteger sonsumernum = new AtomicInteger();

        Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
        System.out.println("Get the partition info as below:");
        List<TopicPartition> tp =new ArrayList<TopicPartition>();
        partitionInfos.forEach(str -> {

            tp.add(new TopicPartition(TOPIC,str.partition()));
            consumer.assign(tp);
            consumer.seekToEnd(tp);
            sonsumernum.set(sonsumernum.get() + (int) consumer.committed(new TopicPartition(TOPIC, str.partition())).offset());  //獲取每個分區提交的偏移量總和

        });

        return sonsumernum.get();

    }

    /**
     * main方法測試程序是否可用,打包時 注釋掉
     * @param args
     */
    public static void main(String[] args) {
        Arguments params = new Arguments();
        //設置參數
        params.addArgument("kafkaIp", "IP:PORT","kafka集群信息");
        params.addArgument("topic", "topic_name","topic");
        params.addArgument("consumer", "group_name","消費者");
        params.addArgument("influxdbIp", "IP:PORT","數據庫ip");
        params.addArgument("influxdbName", "jmeter","數據庫名");
        params.addArgument("measurement", "tps","表名");
        params.addArgument("sampleInterval", "0.5","采樣間隔時間");
        params.addArgument("loopTime", "60","持續時間/s");

        JavaSamplerContext arg0 = new JavaSamplerContext(params);
        t3.jmeter.javasample.kafka.KafkaMonitor test = new KafkaMonitor();
        test.setupTest(arg0);
        test.runTest(arg0);
        test.teardownTest(arg0);
    }

}

 

最終效果

 

 

另外自己也通過Highcharts寫了一個簡單的監控頁面:

 

 

 

<html>
<head>
<meta charset="UTF-8" />
<title>Highcharts</title>
<script src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>
<script src="http://code.highcharts.com/highcharts.js"></script></head>
<body>
<div id="container" style="width: 1000px; height: 800px; margin: 0 auto"></div>
<script language="JavaScript">
$(document).ready(function() {  
   var chart = {
      type: 'spline',
      animation: Highcharts.svg, // don't animate in IE < IE 10.
      marginRight: 10,
      events: {
         load: function () {
            // set up the updating of the chart each second
            var series = this.series[0];
            setInterval(function () {
               var res = showAdress(5);
               if(res.series){
                  var x =  res.series[0].values[0][0];
                  var y =  res.series[0].values[0][1];
                  series.addPoint([x, y], true, true);
               }
               
               // series.addPoint([x, y], true, true);
            }, 1000);
         }
      }
   };
   var title = {
      text: 'kafka消費速度'   
   };   
   var xAxis = {
      type: 'datetime',
      tickPixelInterval: 150
   };
   var yAxis = {
      title: {
         text: 'TPS'
      },
      plotLines: [{
         value: 0,
         width: 1,
         color: '#808080'
      }]
   };
   var tooltip = {
      //設置x和y軸顯示格式
      formatter: function () {
      return '<b>' + this.series.name + '</b><br/>' +
         Highcharts.dateFormat('%Y-%m-%d %H:%M:%S', this.x) + '<br/>' +
         Highcharts.numberFormat(this.y, 2);
      }
   };
   var plotOptions = {
      area: {
         pointStart: 1940,
         marker: {
            enabled: false,
            symbol: 'circle',
            radius: 2,
            states: {
               hover: {
                 enabled: true
               }
            }
         }
      }
   };
   var legend = {
      enabled: false
   };
   var exporting = {
      enabled: false
   };
   var series= [{
      name: '消費tps',
      data: (function () {
         // generate an array of random data
         var data = [],time = (new Date()).getTime(),i;
         var res = showAdress(60);
         if(res.series){
            var values = res.series[0].values;
            //遍歷數組並打印當前值和下標
            values.forEach(function (element, index, array) {
               console.log(element, index)
               data.push({
                  x: element[0],
                  y: element[1]
               });
            })
           
         }

         console.log(res);
         
         return data;
      }())    
   }];  
   
   function showAdress(time)
 {
   var flag = false;//全局變量,以便下面做判斷
   var datas;
    $.ajax
    ({
        url: "http://IP:PORT/query",
        dataType: "json",
      type: "get",
      async: false,
        data: {
            u:"admin",
            p:"admin",
            db:"jmeter",
            q:`SELECT mean("value") FROM "tps" WHERE time > now() - ${time}s GROUP BY time(1s) fill(null)`,
            // q:'SELECT mean("value") FROM "tps" WHERE time > 1614757792613ms and time < 1614757896397ms GROUP BY time(1s) fill(null)',
            epoch:"ms"
            
        },
        success:function(res){
         if(null != res){
            // datas = res.results[0].series[0].values;
            datas = res.results[0];

            flag = true;
         }
        
        },
        error:function(){
            alert('failed!');
        },
    });
    if(flag){
       return datas;
       } 
 }
      
   var json = {};   
   json.chart = chart; 
   json.title = title;     
   json.tooltip = tooltip;
   json.xAxis = xAxis;
   json.yAxis = yAxis; 
   json.legend = legend;  
   json.exporting = exporting;   
   json.series = series;
   json.plotOptions = plotOptions;
   
   
   Highcharts.setOptions({
      global: {
         useUTC: false
      }
   });
   $('#container').highcharts(json);
  
});
</script>
</body>
</html>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM