Flume使用小結


    本文介紹初次使用Flume傳輸數據到MongoDB的過程,內容涉及環境部署和注意事項。

1 環境搭建

    需要jdk、flume-ng、mongodb java driver、flume-ng-mongodb-sink
(1)jdk下載地址:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
(2)flune-ng下載地址:http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
(3)mongodb java driver jar包下載地址:https://oss.sonatype.org/content/repositories/releases/org/mongodb/mongo-java-driver/2.13.0/mongo-java-driver-2.13.0.jar
(4)flume-ng-mongodb-sink 源碼下載地址:https://github.com/leonlee/flume-ng-mongodb-sink
flume-ng-mongodb-sink 需要自己編譯jar包,從github上下載代碼,解壓之后執行mvn package,即可生成。需要先安裝maven用於編譯jar包,且機器需要能聯網。

2 簡單原理介紹

    這是一個關於池子的故事。有一個池子,它一頭進水,另一頭出水,進水口可以配置各種管子,出水口也可以配置各種管子,可以有多個進水口、多個出水口。水術語稱為Event,進水口術語稱為Source、出水口術語成為Sink、池子術語成為Channel,Source+Channel+Sink,術語稱為Agent。如果有需要,還可以把多個Agent連起來。
更多細節參考官方文檔:http://flume.apache.org/FlumeDeveloperGuide.html

3 Flume配置

(1)  env配置

      將mongo-java-driver和flume-ng-mongodb-sink兩個jar包放到flume\lib目錄下,並將路徑加入到flume-env.sh文件的FLUME_CLASSPATH變量中;
  JAVA_OPTS變量: 加上-Dflume.monitoring.type=http -Dflume.monitoring.port=xxxx,可以在[hostname:xxxx]/metrics 上看到監控信息;  -Xms指定JVM初始內存,-Xmx指定JVM最大內存
  FLUME_HOME變量: 設定FLUME根目錄
  JAVA_HOME變量:  設定JAVA根目錄

(2) log配置

      在調試時,將日志設置為debug並打到文件:flume.root.logger=DEBUG,LOGFILE

(3) 傳輸配置
        采用 Exec Source、file-channel、flume-ng-mongodb-sink。
    Source配置舉例:

my_agent.sources.my_source_1.channels = my_channel_1
my_agent.sources.my_source_1.type = exec
my_agent.sources.my_source_1.command = python  xxx.py
my_agent.sources.my_source_1.shell = /bin/bash -c
my_agent.sources.my_source_1.restartThrottle = 10000
my_agent.sources.my_source_1.restart = true
my_agent.sources.my_source_1.logStdErr = true
my_agent.sources.my_source_1.batchSize = 1000
my_agent.sources.my_source_1.interceptors = i1 i2 i3
my_agent.sources.my_source_1.interceptors.i1.type = static
my_agent.sources.my_source_1.interceptors.i1.key = db
my_agent.sources.my_source_1.interceptors.i1.value = cswuyg_test
my_agent.sources.my_source_1.interceptors.i2.type = static
my_agent.sources.my_source_1.interceptors.i2.key = collection
my_agent.sources.my_source_1.interceptors.i2.value = cswuyg_test
my_agent.sources.my_source_1.interceptors.i3.type = static
my_agent.sources.my_source_1.interceptors.i3.key = op
my_agent.sources.my_source_1.interceptors.i3.value = upsert

    字段說明:
    采用exec source,指定執行命令行為python  xxx.py,我在xxx.py代碼中處理日志,並按照跟flume-ng-mongodb-sink的約定,print出json格式的數據,如果update類操作必須帶着_id字段,print出來的日志被當作Event的Body,我再通過interceptors給它加上自定義Event Header;
static interceptors用於為Event Header添加信息,這里我為它加上了db=cswuyg_test、collection=cswuyg_test、op=upsert,這三個key是跟flume-ng-mongodb-sink 約定的,用於指定mongodb中的db、collection名以及操作類型為update。

    Channel配置舉例:

my_agent.channels.my_channel_1.type = file
my_agent.channels.my_channel_1.checkpointDir = /home/work/flume/file-channel/my_channel_1/checkPoint
my_agent.channels.my_channel_1.useDualCheckpoints = true
my_agent.channels.my_channel_1.backupCheckpointDir = /home/work/flume/file-channel/my_channel_1/checkPoint2
my_agent.channels.my_channel_1.dataDirs = /home/work/flume/file-channel/my_channel_1/data
my_agent.channels.my_channel_1.transactionCapacity = 10000
my_agent.channels.my_channel_1.checkpointInterval = 30000
my_agent.channels.my_channel_1.maxFileSize = 4292870142
my_agent.channels.my_channel_1.minimumRequiredSpace = 524288000
my_agent.channels.my_channel_1.capacity = 100000

    字段說明:

    要注意的參數是capacity,它指定了池子里可以存放的Event數量,需要根據日志量設置一個合適的值,如果你也采用file-channel,而且磁盤充足,那可以盡可能的設置得大些。
    dataDirs指定池子存放的位置,如果可以,選擇IO不是那么高的磁盤,可以使用逗號分隔使用多個磁盤目錄。

    sink配置舉例:

my_agent.sinks.my_mongo_1.type = org.riderzen.flume.sink.MongoSink
my_agent.sinks.my_mongo_1.host = xxxhost
my_agent.sinks.my_mongo_1.port = yyyport
my_agent.sinks.my_mongo_1.model = dynamic
my_agent.sinks.my_mongo_1.batch = 10
my_agent.sinks.my_mongo_1.channel = my_channel_1
my_agent.sinks.my_mongo_1.timestampField = _S

    字段說明:

 model選擇dynamic,表示mongodb的db、collection名字采用Event Header中指定的名字。timestampField 字段用於將json串中指定鍵的值轉換為datetime格式存進mongodb,flume-ng-mongodb-sink不支持嵌套key指定(如:_S.y),但可以自己通過修改sink的代碼來實現。

    agent配置舉例:

my_agent.channels = my_channel_1
my_agent.sources = my_source_1
my_agent.sinks = my_mongo_1

(4) 啟動

    可以寫一個control.sh 腳本來控制flume的啟動、關閉、重啟。
    啟動demo:
./bin/flume-ng agent --conf ./conf/ --conf-file ./conf/flume.conf -n agent1 > ./start.log 2>&1 &


    從此以后,日志數據就從日志文件,通過xxx.py讀取,進入到flie-channel,再被flume-ng-mongodb-sink讀走,進入到目的地MongoDB Cluster。
搭好基本功能之后,以后需要做的就是調整xxx.py、增強flume-ng-mongodb-sink。

4 其它

 1、監控:官方推薦的監控是ganglia:http://sourceforge.net/projects/ganglia/,有圖像界面。

 2、版本變更:flume 從1.X開始已經不再使用ZooKeeper,在數據可靠性上,提供了E2E(end-to-end)的支持,去掉了重構之前的DFO(store on failure)、BE(best effort)。E2E指的是:在刪除channel中的event時,保證event已經傳遞到了下一個agent或者終點,不過,這里沒有提到數據在進入到channel之前如何保證不丟失,像Exec Source這種數據導入channel的方式,需要使用者自己保證。

 3、關閉插件:使用Exec Source時,flume重啟不會關閉掉舊插件進程,需要自己關閉。

 4、Exec Source不能保證數據不丟失,因為這種方式只是把水灌到池子里,不管池子是什么狀況, 參見https://flume.apache.org/FlumeUserGuide.html#exec-source 的 Warning 部分。但是,Spooling directory source 也不一定是個好方法,監控目錄,但是注意不能修改文件的名字,不能出現同名覆蓋文件,不要出現只有一半內容的文件。傳輸完成之后,文件會被重命名為xx.COMPLETED,需要有定時清理腳本把這些文件清理掉。重啟會導致出現重復event,因為那些被傳輸到一半的文件沒有被設置為完成狀態。

 5、傳輸瓶頸:使用flume+mongodb來安全傳輸大量數據(每秒萬條級別的日志不算大數據量,每天幾百G的也不算),瓶頸會出現在MongoDB上,特別是Update類型的數據傳輸。

 6、需要修改當前的flume-ng-mongodb-sink 插件:(1)讓update支持 $setOnInsert;(2)解決update的 $set、$inc為空時,引發exception的bug;(3)解決批量插入時,因其中一條日志有duplicate exception而導致同批插入的后續日志全部被丟棄的bug。

 7、flume跟fluentd很類似,但來自hadoop生態的flume更熱門,所以我選擇flume。

 8、批量部署:先把jdk、flume打包成tar,然后借助python 的 paramiko庫,將tar包發到各台機器上,解壓、運行。

 

本文所在:http://www.cnblogs.com/cswuyg/p/4498804.html 
參考:

1、http://flume.apache.org/FlumeDeveloperGuide.html

2、《Apache Flume: Distributed Log Collection for Hadoop》 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM