Flume現在使用越來越多,在使用過程中難免發現性能瓶頸或者消息丟失的問題。在遇到這些問題的時候,第一想到的是通過java自帶命令去分析問題和使用一些日志去定位問題。
Flume在處理消息時自帶了很多counter,並可以以JMX、Ganglia、JSON等方式發布出來,在需要的時候,在啟動腳本中增加該配置項即可使用:
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545
增加啟動后,可輸入http://188.1.186.XXX:34545/metrics 得到監控信息的json數據
如果在linux上運行,直接執行 curl -XGET '188.1.186.XXX:34545/metrics'
得到消息,可以通過放到eclipse中,命名一個json文件,ctrl+shift+f 格式化下:
{
"SINK.k1": {
"ConnectionCreatedCount": "1",
"ConnectionClosedCount": "0",
"Type": "SINK",
"BatchCompleteCount": "0",
"BatchEmptyCount": "0",
"EventDrainAttemptCount": "7908340",
"StartTime": "1514878638909",
"EventDrainSuccessCount": "7657343",
"BatchUnderflowCount": "250997",
"StopTime": "0",
"ConnectionFailedCount": "0"
},
"CHANNEL.c1": {
"ChannelCapacity": "1000000",
"ChannelFillPercentage": "0.0",
"Type": "CHANNEL",
"ChannelSize": "0",
"EventTakeSuccessCount": "7908340",
"EventTakeAttemptCount": "7908466",
"StartTime": "1514878638906",
"EventPutAttemptCount": "7908340",
"EventPutSuccessCount": "7908340",
"StopTime": "0"
},
"SOURCE.r1": {
"KafkaEventGetTimer": "6468875",
"AppendBatchAcceptedCount": "0",
"EventAcceptedCount": "7908340",
"AppendReceivedCount": "0",
"StartTime": "1514878643588",
"AppendBatchReceivedCount": "0",
"KafkaCommitTimer": "156254",
"EventReceivedCount": "7908340",
"Type": "SOURCE",
"AppendAcceptedCount": "0",
"OpenConnectionCount": "0",
"KafkaEmptyCount": "0",
"StopTime": "0"
}
}
通過對source、channel、sink中的指標了解系統的處理瓶頸。
如果自己開發的插件,同樣可以使用這些counter來完成統計信息的輸出,如:
public class ElasticSearchSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchSink.class); private BulkProcessor bulkProcessor;private SinkCounter sinkCounter; @Override public void configure(Context context) { ... buildIndexBuilder(context); buildSerializer(context); if (sinkCounter == null) { sinkCounter = new SinkCounter(getName()); } bulkProcessor = new BulkProcessorBuilder().buildBulkProcessor(context, client); } @Override public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction txn = channel.getTransaction(); txn.begin(); try { Event event = channel.take(); if (event != null) { String body = new String(event.getBody(), Charsets.UTF_8); sinkCounter.incrementEventDrainAttemptCount(); if (!Strings.isNullOrEmpty(body)) { String index = indexBuilder.getIndex(event); String type = indexBuilder.getType(event); String id = indexBuilder.getId(event); XContentBuilder xContentBuilder = serializer.serialize(event); if(index!=null && xContentBuilder != null) { if (!StringUtil.isNullOrEmpty(id)) { bulkProcessor.add(new IndexRequest(index, type, id) .source(xContentBuilder)); sinkCounter.incrementEventDrainSuccessCount();
...
@Override
public synchronized void start() {
sinkCounter.start();
sinkCounter.incrementConnectionCreatedCount();
super.start();
}
...
總結:
1.遇到flume性能問題,在啟動腳本增加 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 開啟監控日志
2.通過http://<hostname>:<port>/metrics獲取信息
3.自定義開發插件時,可以使用flume已有counter記錄統計信息
備注:更多內容,參考官方文檔:
http://flume.apache.org/FlumeUserGuide.html#monitoring
-----------------------------------------
有問題不怕,解決思路很重要!