【Flink系列二】構建實時計算平台——特別篇,用InfluxDb收集Flink Metrics


Influxdb 快速入門

原文地址:https://www.cnblogs.com/slankka/p/13865338.html

從Docker啟動 Influxdb

docker pull influxdb:LATEST

docker run -d --name influxdb -p 8086:8086 \
      -v /opt/work/influxdb:/var/lib/influxdb \
      influxdb

進入Influxdb的Client

# docker exec -it influxdb influx
Connected to http://localhost:8086 version 1.8.3
InfluxDB shell version: 1.8.3

>create database flink # 創建Flink數據庫

>use flink #為FLink創建RETENTION_POLICY(1)

>CREATE RETENTION POLICY one_hour ON flink DURATION 1h REPLICATION 1 #為FLink創建RETENTION_POLICY(2)

配置Flink

Flink ifluxdb reporter

修改FLINK_CONF

metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: <IP>
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink
metrics.reporter.influxdb.username: 
metrics.reporter.influxdb.password: 
metrics.reporter.influxdb.retentionPolicy: one_hour

以該配置啟動的Flink作業,Flink會自動將指標寫入Influxdb

【可選】安裝Chronograf可視化界面

docker pull chronograf:LATEST
docker run --name chronograf -d -p 8888:8888 -v /opt/work/chronograf:/var/lib/chronograf chronograf

執行SQL可以查到CheckpointExternalPath

SELECT * FROM "flink"."one_hour"."jobmanager_job_lastCheckpointExternalPath"

效果圖

Chronograf

優化

監控指標,一般使用Prometheus來做,而根據我的需求和實踐來看,Influxdb僅用來接收lastCheckpointExternalPath這個指標。
經過大約半年多的觀察,Influxdb 1.8,100個作業的情況下, 內存占用峰值會超過20GB,這個時候容器會自動重啟,客戶端無法上報。

因此需要對influxdb進行優化。這里記錄一種最簡單的優化,那就是直接減少指標數量:

package org.apache.flink.metrics.influxdb;

abstract class AbstractReporter<MetricInfo> implements MetricReporter {
	protected final Logger log = LoggerFactory.getLogger(getClass());

	protected final Map<Gauge<?>, MetricInfo> gauges = new HashMap<>();
	protected final Map<Counter, MetricInfo> counters = new HashMap<>();
	protected final Map<Histogram, MetricInfo> histograms = new HashMap<>();
	protected final Map<Meter, MetricInfo> meters = new HashMap<>();
	protected final MetricInfoProvider<MetricInfo> metricInfoProvider;

	protected AbstractReporter(MetricInfoProvider<MetricInfo> metricInfoProvider) {
		this.metricInfoProvider = metricInfoProvider;
	}

	@Override
	public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
		if (!metricName.equals("lastCheckpointExternalPath")) {
			return;
		}

經過驗證,Flink 使用此Reporter,僅上報這一個指標。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM