普通的flume啟動命令
bin/flume-ng agent -c conf -f conf/flume-conf.properties -n agent -Dflume.root.logger=INFO,console
日志信息在終端輸出,只有去掉這個參數,日志才能在log4j和logback中輸出
-Dflume.root.logger=INFO,console
如果要加上http監控的話
bin/flume-ng agent -c conf -f conf/flume-conf.properties -n agent -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
即加上參數,flume.monitoring.type=http 指定了Reporting的方式為http,flume.monitoring.port 指定了http服務的端口號
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545
訪問
http://localhost:34545/metrics
參數說明:
(1)、SOURCE
SOURCE作為flume的數據源組件,所有收集日志的第一個到達的地方,它的監控信息非常重要。通過監控我們能夠得到的監控數據有這些:
KafkaEventGetTimer
AppendBatchAcceptedCount(追加到channel中的批數量) 速率
EventAcceptedCount(成功放入channel的event數量) 速率
AppendReceivedCount(source追加目前收到的數量) 速率
StartTime(組件開始時間)
AppendBatchReceivedCount(source端剛剛追加的批數量) 速率
KafkaCommitTimer
EventReceivedCount(source端成功收到的event數量) 速率
Type(組件類型)
AppendAcceptedCount(放入channel的event數量) 速率
OpenConnectionCount(打開的連接數)
KafkaEmptyCount
StopTime(組件停止時間)
當然這些只是flume監控源碼中已經自帶的監控元素,如果你需要其他的監控信息,例如ip、端口號等,有兩種方法,第一個,修改監控源碼,添加你需要的監控元素,這種方法只是在原有代碼基礎上,添加一些滿足自己需求的監控元素,比較簡單,但靈活性不足;第二個就是自定義監控組件,這種方法是在原有監控框架中,自己實現自己的監控組件,這樣可以達到完全滿足自己需求,且靈活性很高。至於這兩種方法如何操作,在后面Flume監控如何實現有討論到。
同理CHANNEL、SINK這兩個組件的監控也可以使用這兩種方法來添加自己想要的監控元素。
(2)、CHANNEL
CHANNEL是flume的一個通道組件,對數據有一個緩存的作用。能夠得到的數據:
ChannelCapacity(通道容量)
ChannelFillPercentage(通道使用比例)
Type(組件類型)
ChannelSize(目前在channel中的event數量)
EventTakeSuccessCount(從channel中成功取走的event數量) 速率
EventTakeAttemptCount(嘗試從channel中取走event的次數) 速率
StartTime(組件開始時間)
EventPutAttemptCount(嘗試放入將event放入channel的次數) 速率
EventPutSuccessCount(成功放入channel的event數量) 速率
StopTime(組件停止時間)
(3)、SINK
SINK是數據即將離開flume的最后一個組件,它從channel中取走數據,然后發送到緩存系統或者持久化數據庫。能得到數據:
ConnectionCreatedCount(創建連接數) 速率
BatchCompleteCount(完成的批數量) 速率
BatchEmptyCount(批量取空的數量,空的批量的數量,如果數量很大表示souce寫數據比sink清理數據慢速度慢很多
) 速率
EventDrainSuccessCount(成功發送event的數量) 速率
StartTime(組件開始時間)
BatchUnderflowCount(正處於批量處理的batch數)等。 速率
ConnectionFailedCount(連接失敗數) 速率
ConnectionClosedCount(關閉連接數量) 速率
Type(組件類型)
RollbackCount
EventDrainAttemptCount(嘗試提交的event數量) 速率
KafkaEventSendTimer
StopTime(組件停止時間)
在實際生產環境中,由於數據量比較大(Kafka中導入200M左右的數據),Flume有時候會遇到下面oom問題:
問題1
Exception in thread "PollableSourceRunner-KafkaSource-r1" java.lang.OutOfMemoryError: GC overhead limit exceeded
或者
Exception in thread "PollableSourceRunner-KafkaSource-r1" java.lang.OutOfMemoryError: Java heap space
這是由於flume啟動時的默認最大的堆內存大小是20M
解決方法:在flume的基礎配置文件conf下的flume-env.sh中添加
export JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
問題2
13:54:27.213 ERROR org.apache.flume.source.kafka.KafkaSource:317 - KafkaSource EXCEPTION, {} org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
flume的properties文件中添加
agent.channels.c1.capacity = 1000000 #改大一點 agent.channels.c1.keep-alive = 60