簡單介紹
flink內部實現了一套metric數據收集庫。 同時flink自身系統有一些固定的metric數據, 包括系統的一些指標,CPU,內存, IO 或者各個task運行的一些指標。具體包含那些指標可以查看官方文檔: flink-metric
同時我們也可以利用系統的metric庫在自己的代碼中進行打點收集metrics數據。此外, flink提供了外部接口,可以用來導出這些metrics數據.
flink-metric庫的使用
在官方的文檔中有介紹, 需要繼承Richfunction 才能獲得對應的metric對象, 用法如下:
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value;
}
}
flink-metrics導出到外部系統
在flink中, 提供了方便的metric數據導出的庫,通過實現自己的reporter,可以將metrics數據導出到不同的系統.
官方提供有多種reporter庫,JMX, Graphite, Slf4j... 等等. 同時,我們可以自定義實現metric庫,來導入到自己的系統.
自定義reporter類
實現MetricReporter類中的open,close, notifyOfAddedMetric, notifyOfRemovedMetric方法
實現Scheduled的report方法 ,在剛方法中實現寫入到其他系統的邏輯
實現CharacterFilter的 filterCharacters方法, 用於對scope進行過濾.
public class FalconReporter implements MetricReporter, CharacterFilter, Scheduled {
private static final Logger LOG =LoggerFactory.getLogger(FalconReporter.class);
private final Map<Gauge<?>, MetricTag> gauges = new ConcurrentHashMap<>();
private final Map<Counter, MetricTag> counters = new ConcurrentHashMap<>();
private final Map<Histogram, MetricTag> histograms = new ConcurrentHashMap<>();
private final Map<Meter, MetricTag> meters = new ConcurrentHashMap<>();
@Override
public String filterCharacters(String s) {
return s;
}
@Override
public void open(MetricConfig metricConfig) {
}
@Override
public void close() {
}
@Override
public void notifyOfAddedMetric(Metric metric, String s, MetricGroup metricGroup) {
}
@Override
public void notifyOfRemovedMetric(Metric metric, String s, MetricGroup metricGroup) {
}
@Override
public void report() {
}
}
配置reporter
在flink-conf.yaml中配置即可,配置如下
metrics.reporters: slf4j, jmx
metrics.reporter.slf4j.class: org.apache.flink.metrics.falcon.FalconReporter
metrics.reporter.slf4j.interval: 60 SECONDS
metrics.reporters 用於配置類型名, 自定義即可
metrics.reporter.slf4j.class: 配置對應類型的reporter類
metrics.reporter.slf4j.interval: 60 SECONDS 消息上報的間隔
metrics.reporter.slf4j.* 可以自定義配置, 可以在open(MetricConfig metricConfig)
中的獲得對應的config