flume使用詳解


1.    Flume簡介

Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。

當前Flume有兩個版本Flume 0.9X版本的統稱Flume-og,Flume1.X版本的統稱Flume-ng。由於Flume-ng經過重大重構,與Flume-og有很大不同,使用時請注意區分。

這篇文章介紹的是Flume 1.7版本,flume v1.7新增了tailDir數據源。

1.1 系統要求

Flume1.7運行系統要求:jdk1.7,linux

由於taildir的實現是基於jdk1.7的,所以要求jdk版本在1.7以上。

Flume也可以運行的windows上。但是在啟動及管理比較繁瑣。在官方的文檔介紹中啟動命令等都是linux基礎上。另外部分flume組件的運行只有linux系統支持,比如taildir source中對文件按照inode來唯一標識,然而windows系統中文件沒有inode的概念。所以本篇也是基於linux系統。

1.2  資料整理

在搜索引擎中輸入flume將會得到很多資料。官方文檔如下。查看官方資料對於學習新事物非常重要。

Flume介紹:http://flume.apache.org/

可以在這個網站下載flume。不過關於flume其他的原理或入門例子等,建議查看flume用戶手冊

 

Flume用戶手冊:http://flume.apache.org/FlumeUserGuide.html

Flume開發者手冊:http://flume.apache.org/FlumeDeveloperGuide.html

Flume github源碼:https://github.com/apache/flume

1.3 flume 原理介紹

                       

圖 1 flume agent 組成結構

 

一個flume由三個部分組成:source,channel,sink。根據官方的介紹原文,我整理如下:

  1. Source:A source consumes events delivered to it by external source.
  2. Channel: when a source receive an event, it stores it into one or more channels.The channel is a passive store that keeps the event until it’s consumed by a flume sink
  3. Sink: The sink remove the event from the channel and puts it into an exteral repository like HDFS.
  4. The source and sink within the given agent run asynchronously with the events staged in the channel.

1.4 flume agent 示例

  1. 配置文件

下載好flume解壓后,在conf文件夾下存放着配置文件模板,可以復制一份重命名后在此基礎上進行修改。

# example.conf: A single-node Flume configuration
 

# 指定flume組件的名稱,agent名為a1,source為r1,sink為k1,channel為c1

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# 配置source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444


# 配置sink,logger表示接受到的event將直接展示到console,這個類型經常在調試時使用

a1.sinks.k1.type = logger


#配置 channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# 給source及sink指定channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 

  1. 啟動

使用flume-ng shell腳本進行啟動,如下:

$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template -Dflume.root.logger=INFO,console

啟動命令由4部分組成:

-n $agent_name:這里指定啟動的agent 名,按照配置文件中的命名這里應該替換成a1

-c conf: 指定配置文件目錄,可以是相對路徑或絕對路徑

-f conf/flume-conf.properties.template :指定具體配置文件名

-Dflume.root.logger=INFO,console:將flume運行日志展示到console台,這個是可選的,但是一般都需要加上,便於查看flume運行情況。

 

  1. 運行結果

在另外一個終端,使用telnet命令發送Hello world!

因為根據配置文件我們指定了netcat類型的source是監聽在本機的44444端口上。

$ telnet localhost 44444

Trying 127.0.0.1...

Connected to localhost.localdomain (127.0.0.1).

Escape character is '^]'.

Hello world! <ENTER>

OK

 

將在flume運行的控制台查看到sink已經將接受到的event打印到控制台。

12/06/19 15:32:19 INFO source.NetcatSource: Source starting

12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

至此,一個完整的flume運行過程完成。

 

2.    Flume Source

Flume不僅提供了豐富的source類型,可以直接使用,目前已經覆蓋了很多應用場景。同時也支持自定義source。

在這里簡單介紹下exec,spooldir,taildir三種source。其他類型及具體詳情請查看官方文檔Flume Source章節

2.1  Exec

使用exec作為數據源,需要指定執行的shell命令。經常使用到的命令tail -F [file],來讀取新增到日志文件的內容。

缺點:數據可能丟失,官方推薦spooldir作為數據源。

 

2.2  Spooldir

Spooldir將從指定的文件夾中讀取文件,並且是按行讀取文件中的內容。如果指定的文件夾中出現新文件,也將會被識別並讀取。Spooldir將讀取完的文件進行重命名(默認添加.COMPLETE)或永久刪除。

優點:Spooldir不會出現丟失數據的情況,即使flume重啟或停止。

缺點:1. 放置在spooldir目錄中的文件不允許進行修改,否則flume會報錯並停止工作

                2. 在spooldir目錄中的文件名不可重復使用,否則flume會報錯並停止工作

2.3  Taildir

Taildir可以說是exec和spooldir兩種source的優點集合。在車載OBD的日志服務功能就是使用此作為數據源。

注意:taildir目前不支持windows系統。

查看源碼可以看到在ReliableTaildirEventReader.java實現代碼中獲取文件的inode,其中“unix”表明僅在linux系統生效:

276   private long getInode(File file) throws IOException { 
277     long inode = (long) Files.getAttribute(file.toPath(), "unix:ino"); 
278     return inode; 
279   } 

 

Taildir數據源將會監控指定目錄下所有文件,實時獲取新附加到各個文件末尾的內容。它將定時保存各個文件最后讀取位置記錄到一個json格式的文件。Flume重新啟動后將按照此json文件保存的位置開始讀取。

如果需要監控多個文件源,並且對各個不同讀取到的數據文件進行區別處理,可以使用提供的headerkey。

配置文件舉例:

# Describe/configure source1

agent1.sources.s1.type = TAILDIR

agent1.sources.s1.positionFile = ./bin/taildir_position.json

agent1.sources.s1.filegroups = f1 f2

agent1.sources.s1.filegroups.f1 = /home/neoway/apache-flume-1.7.0-bin/log1/.*

agent1.sources.s1.headers.f1.componentName = mqtt

agent1.sources.s1.filegroups.f2 = /home/neoway/apache-flume-1.7.0-bin/log2/.*

agent1.sources.s1.headers.f2.componentName = mybatis

agent1.sources.s1.fileHeader = true

#agent1.sources.s1.channels = c1

agent1.sources.s1.channels = c1

 

讀取到event:

2017-08-24 09:48:06:INFO SinkRunner-PollingRunner-DefaultSinkProcessor org.apache.flume.sink.LoggerSink - Event: { headers:{ componentName = mqtt, file=/home/neoway/apache-flume-1.7.0-bin/log/mylineDeserializer.log} body: 32 30 31 37 2D 30 38 2D 32 33 54 31 34 3A 33 30 2017-08-23T14:30 }

2017-08-24 09:48:06:INFO SinkRunner-PollingRunner-DefaultSinkProcessor org.apache.flume.sink.LoggerSink - Event: { headers:{ componentName = mqtt, file=/home/neoway/apache-flume-1.7.0-bin/log/mylineDeserializer.log} body: 32 30 31 37 2D 30 38 2D 32 33 54 31 34 3A 33 30 2017-08-23T14:30 }

 

可以看到在讀取到的event中與header部分,在sink部分處理時,可以獲取envent的header,從而判斷出屬於哪個文件源並依此做對應處理。

 

3.    Flume Sink

Flume提供了很多類型的sink,詳情可參考flume用戶手冊的flume sink章節

在車載的日志服務的需求是將讀取到的內容保存到mysql數據庫中。這里需要使用自定義sink。我參考了這篇文章:http://blog.csdn.net/poisions/article/details/51695372

  1. 自定義mysqlSink類,繼承 AbstractSink 並實現 Configurable 。重寫start()方法,stop()方法,process()方法
  2. 將編譯好的jar包及連接mysql的驅動jar包存放到flume的lib目錄下
  3. 在配置文件中配置sink,為自定義mysqlsink的包路徑。
agent1.sinks.k1.type = org.flume.mysql.sink.MysqlSink

agent1.sinks.k1.hostname = 192.168.10.136

agent1.sinks.k1.port=3306

agent1.sinks.k1.databaseName=carcloud

agent1.sinks.k1.recordTableName=log_record

agent1.sinks.k1.configTableName=log_config

agent1.sinks.k1.projectName= carcloud

#the string that joint all componentNames by ',' and each componentName come from filegroups's fileHeader;

agent1.sinks.k1.componentNames = mqtt,mybatis

agent1.sinks.k1.user=root

agent1.sinks.k1.password=123

agent1.sinks.k1.channel = c1

 

增加說明:

在實際應用中,我參考的http://blog.csdn.net/poisions/article/details/51695372示例不能滿足商用需求。比如使用原生的jdbc連接mysql會出現wait_timeout的情況,報錯 No operations allowed after connection closed。這個問題我最后放棄原生jdbc,使用spring的JdbcTemplate代替完成。瞬間感覺代碼健壯了不少。原諒我之前懶。

同時這個文章中bathsize=100,每100個event才提交一次,非常容易出現錯誤:The channel is full or unexpected failure導致flume停止工作。我修改為不使用bathsize,每次處理一個event並提交。

4. Flume探索路上遇到的問題

4.1  在windows系統運行flume

在除接觸flume時,一直在windows上嘗試啟動flume,碰到很多問題。慢慢查多資料發現flume設計的命令都是linux的,從而轉戰到linux系統。這也是對linux系統不熟悉造成的坑。

 

4.2  安裝路徑有空格

在linux安裝路徑上的目錄有空格,也會出現問題。在文件及文件夾命名時使用空格是個壞習慣,可以使用‘-’代替空格。

 

4.3  Taildir重復讀取

在taildir測試的時候,遇到了往taildir監控的文件中追加內容時,總是會從頭讀取文件的內容,而不是僅讀取新添加的這一行內容。

測試環境是這樣的:

  1. 使用 sed命令往目標文件追加內容
  2. 查詢數據庫,數據庫表中增加了目標文件所有行內容,而非僅僅是上一步sed的行內容。

一度懷疑taildir是否能讀取追加的內容。並且檢查了所有的配置,均無效。

查詢資料也完全沒有提到過使用taildir會重復讀取的問題。

最后將源碼拷貝下來,自定義為myTaildirSource。並且在運行的關鍵部分打印日志,順便了解下tailDir的運行過程。

根據日志發現每次往目標文件中sed內容后,taildir顯示目標文件的inode發送了變化,從而被識別為新文件,難怪會從頭讀取。

接下來查閱資料關於linux系統的inode機制,什么情況下會導致inode發送變化。根據查閱的資料inode僅在重命名后,或者刪除后再次新建一個同名的文件時 inode發送變化。

最后無意去查看了車載項目產生的日志文件的inode,在往日志文件中追加內容時inode不會發送變化。至此問題解決。

附:使用ls –i 可查看文件的inode

 

4.4  Flume后台啟動

前面介紹的啟動命令是在前台直接運行的,這時不能關閉這個界面,否則flume也被停止。

在后台啟動flume命令:

./bin/flume-ng agent --conf ../conf --conf-file ../conf/x1_dir_to_db_flume.conf --name a1 -Dflume.root.logger=INFO,console > x1nohup.out 2>&1 &

在原來的啟動命令上增加> x1nohup.out 2>&1 &即可實現后台啟動。並且flume的運行日志都將打印到x1nohup.out文件中。

為了方便減少每次都復制啟動代碼。可以將啟動代碼前添加

#!/bin/sh

,然后將此內容保存在新建的文件例如start.sh中。注意start.sh的路徑,要放在./bin對應的路徑上。最后chmod +x start.sh 給這個文件增加執行權限,即可通過執行start.sh來啟動flume。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM