flink metric庫的使用和自定義metric-reporter


簡單介紹

flink內部實現了一套metric數據收集庫。 同時flink自身系統有一些固定的metric數據, 包括系統的一些指標,CPU,內存, IO 或者各個task運行的一些指標。具體包含那些指標可以查看官方文檔: flink-metric
同時我們也可以利用系統的metric庫在自己的代碼中進行打點收集metrics數據。此外, flink提供了外部接口,可以用來導出這些metrics數據.

flink-metric庫的使用

在官方的文檔中有介紹, 需要繼承Richfunction 才能獲得對應的metric對象, 用法如下:

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}

flink-metrics導出到外部系統

在flink中, 提供了方便的metric數據導出的庫,通過實現自己的reporter,可以將metrics數據導出到不同的系統.
官方提供有多種reporter庫,JMX, Graphite, Slf4j... 等等. 同時,我們可以自定義實現metric庫,來導入到自己的系統.

自定義reporter類

實現MetricReporter類中的open,close, notifyOfAddedMetric, notifyOfRemovedMetric方法
實現Scheduled的report方法 ,在剛方法中實現寫入到其他系統的邏輯
實現CharacterFilter的 filterCharacters方法, 用於對scope進行過濾.

public class FalconReporter implements MetricReporter, CharacterFilter, Scheduled {

  private static final Logger LOG =LoggerFactory.getLogger(FalconReporter.class);

  private final Map<Gauge<?>, MetricTag> gauges = new ConcurrentHashMap<>();
  private final Map<Counter, MetricTag> counters = new ConcurrentHashMap<>();
  private final Map<Histogram, MetricTag> histograms = new ConcurrentHashMap<>();
  private final Map<Meter, MetricTag> meters = new ConcurrentHashMap<>();

  @Override
  public String filterCharacters(String s) {
    return s;
  }

  @Override
  public void open(MetricConfig metricConfig) {
  }

  @Override
  public void close() {
  }

  @Override
  public void notifyOfAddedMetric(Metric metric, String s, MetricGroup metricGroup) {

  }

  @Override
  public void notifyOfRemovedMetric(Metric metric, String s, MetricGroup metricGroup) {
  }

  @Override
  public void report() {

  }
}

配置reporter

在flink-conf.yaml中配置即可,配置如下

metrics.reporters: slf4j, jmx
metrics.reporter.slf4j.class: org.apache.flink.metrics.falcon.FalconReporter
metrics.reporter.slf4j.interval: 60 SECONDS

metrics.reporters 用於配置類型名, 自定義即可
metrics.reporter.slf4j.class: 配置對應類型的reporter類
metrics.reporter.slf4j.interval: 60 SECONDS  消息上報的間隔
metrics.reporter.slf4j.* 可以自定義配置, 可以在open(MetricConfig metricConfig) 中的獲得對應的config


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM