1. Flume簡介
Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。
當前Flume有兩個版本Flume 0.9X版本的統稱Flume-og,Flume1.X版本的統稱Flume-ng。由於Flume-ng經過重大重構,與Flume-og有很大不同,使用時請注意區分。
這篇文章介紹的是Flume 1.7版本,flume v1.7新增了tailDir數據源。
1.1 系統要求
Flume1.7運行系統要求:jdk1.7,linux
由於taildir的實現是基於jdk1.7的,所以要求jdk版本在1.7以上。
Flume也可以運行的windows上。但是在啟動及管理比較繁瑣。在官方的文檔介紹中啟動命令等都是linux基礎上。另外部分flume組件的運行只有linux系統支持,比如taildir source中對文件按照inode來唯一標識,然而windows系統中文件沒有inode的概念。所以本篇也是基於linux系統。
1.2 資料整理
在搜索引擎中輸入flume將會得到很多資料。官方文檔如下。查看官方資料對於學習新事物非常重要。
Flume介紹:http://flume.apache.org/
可以在這個網站下載flume。不過關於flume其他的原理或入門例子等,建議查看flume用戶手冊。
Flume用戶手冊:http://flume.apache.org/FlumeUserGuide.html
Flume開發者手冊:http://flume.apache.org/FlumeDeveloperGuide.html
Flume github源碼:https://github.com/apache/flume
1.3 flume 原理介紹
圖 1 flume agent 組成結構
一個flume由三個部分組成:source,channel,sink。根據官方的介紹原文,我整理如下:
- Source:A source consumes events delivered to it by external source.
- Channel: when a source receive an event, it stores it into one or more channels.The channel is a passive store that keeps the event until it’s consumed by a flume sink
- Sink: The sink remove the event from the channel and puts it into an exteral repository like HDFS.
- The source and sink within the given agent run asynchronously with the events staged in the channel.
1.4 flume agent 示例
- 配置文件
下載好flume解壓后,在conf文件夾下存放着配置文件模板,可以復制一份重命名后在此基礎上進行修改。
# example.conf: A single-node Flume configuration # 指定flume組件的名稱,agent名為a1,source為r1,sink為k1,channel為c1 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # 配置sink,logger表示接受到的event將直接展示到console,這個類型經常在調試時使用 a1.sinks.k1.type = logger #配置 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 給source及sink指定channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 啟動
使用flume-ng shell腳本進行啟動,如下:
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template -Dflume.root.logger=INFO,console
啟動命令由4部分組成:
-n $agent_name:這里指定啟動的agent 名,按照配置文件中的命名這里應該替換成a1
-c conf: 指定配置文件目錄,可以是相對路徑或絕對路徑
-f conf/flume-conf.properties.template :指定具體配置文件名
-Dflume.root.logger=INFO,console:將flume運行日志展示到console台,這個是可選的,但是一般都需要加上,便於查看flume運行情況。
- 運行結果
在另外一個終端,使用telnet命令發送Hello world!
因為根據配置文件我們指定了netcat類型的source是監聽在本機的44444端口上。
$ telnet localhost 44444 Trying 127.0.0.1... Connected to localhost.localdomain (127.0.0.1). Escape character is '^]'. Hello world! <ENTER> OK
將在flume運行的控制台查看到sink已經將接受到的event打印到控制台。
12/06/19 15:32:19 INFO source.NetcatSource: Source starting 12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] 12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }
至此,一個完整的flume運行過程完成。
2. Flume Source
Flume不僅提供了豐富的source類型,可以直接使用,目前已經覆蓋了很多應用場景。同時也支持自定義source。
在這里簡單介紹下exec,spooldir,taildir三種source。其他類型及具體詳情請查看官方文檔Flume Source章節。
2.1 Exec
使用exec作為數據源,需要指定執行的shell命令。經常使用到的命令tail -F [file],來讀取新增到日志文件的內容。
缺點:數據可能丟失,官方推薦spooldir作為數據源。
2.2 Spooldir
Spooldir將從指定的文件夾中讀取文件,並且是按行讀取文件中的內容。如果指定的文件夾中出現新文件,也將會被識別並讀取。Spooldir將讀取完的文件進行重命名(默認添加.COMPLETE)或永久刪除。
優點:Spooldir不會出現丟失數據的情況,即使flume重啟或停止。
缺點:1. 放置在spooldir目錄中的文件不允許進行修改,否則flume會報錯並停止工作
2. 在spooldir目錄中的文件名不可重復使用,否則flume會報錯並停止工作
2.3 Taildir
Taildir可以說是exec和spooldir兩種source的優點集合。在車載OBD的日志服務功能就是使用此作為數據源。
注意:taildir目前不支持windows系統。
查看源碼可以看到在ReliableTaildirEventReader.java實現代碼中獲取文件的inode,其中“unix”表明僅在linux系統生效:
276 private long getInode(File file) throws IOException { 277 long inode = (long) Files.getAttribute(file.toPath(), "unix:ino"); 278 return inode; 279 }
Taildir數據源將會監控指定目錄下所有文件,實時獲取新附加到各個文件末尾的內容。它將定時保存各個文件最后讀取位置記錄到一個json格式的文件。Flume重新啟動后將按照此json文件保存的位置開始讀取。
如果需要監控多個文件源,並且對各個不同讀取到的數據文件進行區別處理,可以使用提供的headerkey。
配置文件舉例:
# Describe/configure source1 agent1.sources.s1.type = TAILDIR agent1.sources.s1.positionFile = ./bin/taildir_position.json agent1.sources.s1.filegroups = f1 f2 agent1.sources.s1.filegroups.f1 = /home/neoway/apache-flume-1.7.0-bin/log1/.* agent1.sources.s1.headers.f1.componentName = mqtt agent1.sources.s1.filegroups.f2 = /home/neoway/apache-flume-1.7.0-bin/log2/.* agent1.sources.s1.headers.f2.componentName = mybatis agent1.sources.s1.fileHeader = true #agent1.sources.s1.channels = c1 agent1.sources.s1.channels = c1
讀取到event:
2017-08-24 09:48:06:INFO SinkRunner-PollingRunner-DefaultSinkProcessor org.apache.flume.sink.LoggerSink - Event: { headers:{ componentName = mqtt, file=/home/neoway/apache-flume-1.7.0-bin/log/mylineDeserializer.log} body: 32 30 31 37 2D 30 38 2D 32 33 54 31 34 3A 33 30 2017-08-23T14:30 }
2017-08-24 09:48:06:INFO SinkRunner-PollingRunner-DefaultSinkProcessor org.apache.flume.sink.LoggerSink - Event: { headers:{ componentName = mqtt, file=/home/neoway/apache-flume-1.7.0-bin/log/mylineDeserializer.log} body: 32 30 31 37 2D 30 38 2D 32 33 54 31 34 3A 33 30 2017-08-23T14:30 }
可以看到在讀取到的event中與header部分,在sink部分處理時,可以獲取envent的header,從而判斷出屬於哪個文件源並依此做對應處理。
3. Flume Sink
Flume提供了很多類型的sink,詳情可參考flume用戶手冊的flume sink章節。
在車載的日志服務的需求是將讀取到的內容保存到mysql數據庫中。這里需要使用自定義sink。我參考了這篇文章:http://blog.csdn.net/poisions/article/details/51695372
- 自定義mysqlSink類,繼承 AbstractSink 並實現 Configurable 。重寫start()方法,stop()方法,process()方法
- 將編譯好的jar包及連接mysql的驅動jar包存放到flume的lib目錄下
- 在配置文件中配置sink,為自定義mysqlsink的包路徑。
agent1.sinks.k1.type = org.flume.mysql.sink.MysqlSink agent1.sinks.k1.hostname = 192.168.10.136 agent1.sinks.k1.port=3306 agent1.sinks.k1.databaseName=carcloud agent1.sinks.k1.recordTableName=log_record agent1.sinks.k1.configTableName=log_config agent1.sinks.k1.projectName= carcloud #the string that joint all componentNames by ',' and each componentName come from filegroups's fileHeader; agent1.sinks.k1.componentNames = mqtt,mybatis agent1.sinks.k1.user=root agent1.sinks.k1.password=123 agent1.sinks.k1.channel = c1
增加說明:
在實際應用中,我參考的http://blog.csdn.net/poisions/article/details/51695372示例不能滿足商用需求。比如使用原生的jdbc連接mysql會出現wait_timeout的情況,報錯 No operations allowed after connection closed。這個問題我最后放棄原生jdbc,使用spring的JdbcTemplate代替完成。瞬間感覺代碼健壯了不少。原諒我之前懶。
同時這個文章中bathsize=100,每100個event才提交一次,非常容易出現錯誤:The channel is full or unexpected failure導致flume停止工作。我修改為不使用bathsize,每次處理一個event並提交。
4. Flume探索路上遇到的問題
4.1 在windows系統運行flume
在除接觸flume時,一直在windows上嘗試啟動flume,碰到很多問題。慢慢查多資料發現flume設計的命令都是linux的,從而轉戰到linux系統。這也是對linux系統不熟悉造成的坑。
4.2 安裝路徑有空格
在linux安裝路徑上的目錄有空格,也會出現問題。在文件及文件夾命名時使用空格是個壞習慣,可以使用‘-’代替空格。
4.3 Taildir重復讀取
在taildir測試的時候,遇到了往taildir監控的文件中追加內容時,總是會從頭讀取文件的內容,而不是僅讀取新添加的這一行內容。
測試環境是這樣的:
- 使用 sed命令往目標文件追加內容
- 查詢數據庫,數據庫表中增加了目標文件所有行內容,而非僅僅是上一步sed的行內容。
一度懷疑taildir是否能讀取追加的內容。並且檢查了所有的配置,均無效。
查詢資料也完全沒有提到過使用taildir會重復讀取的問題。
最后將源碼拷貝下來,自定義為myTaildirSource。並且在運行的關鍵部分打印日志,順便了解下tailDir的運行過程。
根據日志發現每次往目標文件中sed內容后,taildir顯示目標文件的inode發送了變化,從而被識別為新文件,難怪會從頭讀取。
接下來查閱資料關於linux系統的inode機制,什么情況下會導致inode發送變化。根據查閱的資料inode僅在重命名后,或者刪除后再次新建一個同名的文件時 inode發送變化。
最后無意去查看了車載項目產生的日志文件的inode,在往日志文件中追加內容時inode不會發送變化。至此問題解決。
附:使用ls –i 可查看文件的inode
4.4 Flume后台啟動
前面介紹的啟動命令是在前台直接運行的,這時不能關閉這個界面,否則flume也被停止。
在后台啟動flume命令:
./bin/flume-ng agent --conf ../conf --conf-file ../conf/x1_dir_to_db_flume.conf --name a1 -Dflume.root.logger=INFO,console > x1nohup.out 2>&1 &
在原來的啟動命令上增加> x1nohup.out 2>&1 &即可實現后台啟動。並且flume的運行日志都將打印到x1nohup.out文件中。
為了方便減少每次都復制啟動代碼。可以將啟動代碼前添加
#!/bin/sh
,然后將此內容保存在新建的文件例如start.sh中。注意start.sh的路徑,要放在./bin對應的路徑上。最后chmod +x start.sh 給這個文件增加執行權限,即可通過執行start.sh來啟動flume。