flume消息處理的監控信息使用


Flume現在使用越來越多,在使用過程中難免發現性能瓶頸或者消息丟失的問題。在遇到這些問題的時候,第一想到的是通過java自帶命令去分析問題和使用一些日志去定位問題。

Flume在處理消息時自帶了很多counter,並可以以JMX、Ganglia、JSON等方式發布出來,在需要的時候,在啟動腳本中增加該配置項即可使用:

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

增加啟動后,可輸入http://188.1.186.XXX:34545/metrics 得到監控信息的json數據

如果在linux上運行,直接執行 curl -XGET '188.1.186.XXX:34545/metrics'

得到消息,可以通過放到eclipse中,命名一個json文件,ctrl+shift+f 格式化下:

{
    "SINK.k1": {
        "ConnectionCreatedCount": "1",
        "ConnectionClosedCount": "0",
        "Type": "SINK",
        "BatchCompleteCount": "0",
        "BatchEmptyCount": "0",
        "EventDrainAttemptCount": "7908340",
        "StartTime": "1514878638909",
        "EventDrainSuccessCount": "7657343",
        "BatchUnderflowCount": "250997",
        "StopTime": "0",
        "ConnectionFailedCount": "0"
    },
    "CHANNEL.c1": {
        "ChannelCapacity": "1000000",
        "ChannelFillPercentage": "0.0",
        "Type": "CHANNEL",
        "ChannelSize": "0",
        "EventTakeSuccessCount": "7908340",
        "EventTakeAttemptCount": "7908466",
        "StartTime": "1514878638906",
        "EventPutAttemptCount": "7908340",
        "EventPutSuccessCount": "7908340",
        "StopTime": "0"
    },
    "SOURCE.r1": {
        "KafkaEventGetTimer": "6468875",
        "AppendBatchAcceptedCount": "0",
        "EventAcceptedCount": "7908340",
        "AppendReceivedCount": "0",
        "StartTime": "1514878643588",
        "AppendBatchReceivedCount": "0",
        "KafkaCommitTimer": "156254",
        "EventReceivedCount": "7908340",
        "Type": "SOURCE",
        "AppendAcceptedCount": "0",
        "OpenConnectionCount": "0",
        "KafkaEmptyCount": "0",
        "StopTime": "0"
    }
}

通過對source、channel、sink中的指標了解系統的處理瓶頸。

如果自己開發的插件,同樣可以使用這些counter來完成統計信息的輸出,如:

public class ElasticSearchSink extends AbstractSink implements Configurable {

    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchSink.class);
    private BulkProcessor bulkProcessor;private SinkCounter sinkCounter;

    @Override
    public void configure(Context context) {
        ...
        buildIndexBuilder(context);
        buildSerializer(context);
        if (sinkCounter == null) {
             sinkCounter = new SinkCounter(getName()); }
        bulkProcessor = new BulkProcessorBuilder().buildBulkProcessor(context, client);
    }
    @Override
    public Status process() throws EventDeliveryException {
        Channel channel = getChannel();
        Transaction txn = channel.getTransaction();
        txn.begin();
        try {
            Event event = channel.take();
            if (event != null) {
                String body = new String(event.getBody(), Charsets.UTF_8);
                sinkCounter.incrementEventDrainAttemptCount(); if (!Strings.isNullOrEmpty(body)) {
                    String index = indexBuilder.getIndex(event);
                    String type = indexBuilder.getType(event);
                    String id = indexBuilder.getId(event);
                    XContentBuilder xContentBuilder = serializer.serialize(event);
                    if(index!=null && xContentBuilder != null) {
                        if (!StringUtil.isNullOrEmpty(id)) {
                            bulkProcessor.add(new IndexRequest(index, type, id)
                                    .source(xContentBuilder));
                            sinkCounter.incrementEventDrainSuccessCount();
...

  @Override
  public synchronized void start() {
    sinkCounter.start();
    sinkCounter.incrementConnectionCreatedCount();
    super.start();
  }

...

 

總結:

1.遇到flume性能問題,在啟動腳本增加 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 開啟監控日志

2.通過http://<hostname>:<port>/metrics獲取信息

3.自定義開發插件時,可以使用flume已有counter記錄統計信息

備注:更多內容,參考官方文檔:

http://flume.apache.org/FlumeUserGuide.html#monitoring

 

 

-----------------------------------------

 

有問題不怕,解決思路很重要!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM