1.flume概念
flume是分布式的,可靠的,高可用的,用於對不同來源的大量的日志數據進行有效收集、聚集和移動,並以集中式的數據存儲的系統。
flume目前是apache的一個頂級項目。
flume需要java運行環境,要求java1.6以上,推薦java1.7.
將下載好的flume安裝包解壓到指定目錄即可。
2.flume中的重要模型
2.1.1.flume Event:
flume 事件,被定義為一個具有有效荷載的字節數據流和可選的字符串屬性集。
2.1.2.flume Agent:
flume 代理,是一個進程承載從外部源事件流到下一個目的地的過程。包含source channel 和 sink。
2.1.3.Source
數據源,消耗外部傳遞給他的事件,外部源將數據按照flume Source 能識別的格式將Flume 事件發送給flume Source。
2.1.4.Channel
數據通道,是一個被動的存儲,用來保持事件,直到由一個flume Sink消耗。
2.1.5.Sink
數據匯聚點,代表外部數據存放位置。發送flume event到指定的外部目標。
2.2.
flume
流動模型

2.3.
flume
的特點
2.3.1.
復雜流動性
Flume
允許用戶進行多級流動到最終目的地,也允許扇出流(一到多)、扇入流
(
多到一
)
的、故障轉移和失敗處理。
2.3.2.
可靠性
事務性的數據傳遞,保證了數據的可靠性。
2.3.3.
可恢復性
通道可以以內存或文件的方式實現,內存更快,但是不可恢復,而文件雖然比較慢但提供了可恢復性。
入門案例
1.首先編寫一個配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#example.conf:單節點Flume配置 #命名Agent a1的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置Source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 #描述Sink a1.sinks.k1.type = logger #描述內存Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #為Channle綁定Source和Sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
2.通過flume的工具啟動agent
1 | $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console |
3、發送數據
在
windows中通過
telnet命令連接
flume所在機器的
44444端口發送數據。
4.
Source
詳解
現在介紹幾種比較重要的Source
4.1.
Avro Source
監聽
AVRO
端口來接受來自外部
AVRO
客戶端的事件流。利用
Avro Source
可以實現多級流動、扇出流、扇入流等效果。另外也可以接受通過
flume
提供的
Avro
客戶端發送的日志信息。
4.1.1.
Avro Source
屬性說明
!channels
–
!type
–
類型名稱,
"AVRO"
!bind
–
需要監聽的主機名或
IP
!port
–
要監聽的端口
threads
–
工作線程最大線程數
selector.type
selector.*
interceptors
–
空格分隔的攔截器列表
interceptors.*
compression-type none
壓縮類型,可以是“
none
”或“
default
”,這個值必須和
AvroSource
的壓縮格式匹配
sslfalse
是否啟用
ssl
加密,如果啟用還需要配置一個“
keystore
”和一個“
keystore-password
”。
keystore
–
為
SSL
提供的
java
密鑰文件所在路徑。
keystore-password
–
為
SSL
提供的
java
密鑰文件
密碼。
keystore-typeJKS
密鑰庫類型可以是“
JKS
”或“
PKCS12
”。
exclude-protocolsSSLv3
空格分隔開的列表,用來指定在
SSL / TLS
協議中排除。
SSLv3
將總是被排除除了所指定的協議。
ipFilter false
如果需要為
netty
開啟
ip
過濾,將此項設置為
true
ipFilterRules
–
定義
netty
的
ip
過濾設置表達式規則
案例:
編寫配置文件 修改上面給出的配置文件,除了Source部分配置不同,其余部分都一樣。不同的地方如下:
1 2 3 4 |
#描述/配置Source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 |
啟動
flume
:
./flume-ng agent --conf ../conf --conf-file ../conf/template2.conf --name a1 -Dflume.root.logger=INFO,console
通過
flume
提供的
avro
客戶端向指定機器指定端口發送日志信息:
./flume-ng avro-client --conf ../conf --host 0.0.0.0 --port 44444 --filename ../mydata/log1.txt
會發現確實收集到日志
4.2.
Spooling Directory Source
這個
Source
允許你將將要收集的數據放置到
"
自動搜集
"
目錄中。這個
Source
將監視該目錄,並將解析新文件的出現。事件處理邏輯是可插拔的,當一個文件被完全讀入通道,它會被重命名或可選的直接刪除。
要注意的是,放置到自動搜集目錄下的文件不能修改,如果修改,則
flume
會報錯。另外,也不能產生重名的文件,如果有重名的文件被放置進來,則
flume
會報錯。
屬性說明:(由於比較長 這里只給出了必須給出的屬性,全部屬性請參考官方文檔):
!channels
–
!type
–
類型,需要指定為
"spooldir"
!spoolDir
–
讀取文件的路徑,即
"
搜集目錄
"
fileSuffix.COMPLETED
對處理完成的文件追加的后綴
案例:
編寫配置文件 修改上面給出的配置文件,除了Source部分配置不同,其余部分都一樣。不同的地方如下:
1 2 3 |
#描述/配置Source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir=/home/park/work/apache-flume-1.6.0-bin/mydata |
啟動
flume
:
./flume-ng agent --conf ../conf --conf-file ../conf/template4.conf --name a1 -Dflume.root.logger=INFO,console
向指定目錄中傳輸文件,發現
flume收集到了該文件,將文件中的每一行都作為日志來處理
4.3.
NetCat Source
一個
NetCat Source
用來監聽一個指定端口,並將接收到的數據的每一行轉換為一個事件。
4.3.1.
NetCat Source
屬性說明
!
channels
–
!
type
–
類型名稱,需要被設置為
"netcat"
!
bind
–
指定要綁定到的
ip
或主機名。
!
port
–
指定要綁定到的端口號
max-line-length 512
單行最大字節數
案例:上面完整的例子即是
4.4.
HTTP Source
HTTP Source
接受
HTTP
的
GET
和
POST
請求作為
Flume
的事件
,
其中
GET
方式應該只用於試驗。
該
Source
需要提供一個可插拔的
"
處理器
"
來將請求轉換為事件對象,這個處理器必須實現
HTTPSourceHandler
接口,該處理器接受一個
HttpServletRequest
對象,並返回一個
Flume Envent
對象集合。
從一個
HTTP
請求中得到的事件將在一個事務中提交到通道中。因此允許像文件通道那樣對通道提高效率。
如果處理器拋出一個異常,
Source
將會返回一個
400
的
HTTP
狀態碼。
如果通道已滿,無法再將
Event
加入
Channel
,則
Source
返回
503
的
HTTP
狀態碼,表示暫時不可用。
4.4.1.
HTTP Source
屬性說明
!
type
類型,必須為
"HTTP"
!
port
–
監聽的端口
bind 0.0.0.0
監聽的主機名或
ip
handler org.apache.flume.source.http.JSONHandler
處理器類,需要實現
HTTPSourceHandler
接口
handler.*
–
處理器的配置參數
selector.type
selector.*
interceptors
–
interceptors.*
enableSSL false
是否開啟
SSL,
如果需要設置為
true
。注意,
HTTP
不支持
SSLv3
。
excludeProtocols SSLv3
空格分隔的要排除的
SSL/TLS
協議。
SSLv3
總是被排除的。
keystore
密鑰庫文件所在位置。
keystorePassword Keystore
密鑰庫密碼
案例:
編寫配置文件 修改上面給出的配置文件,除了Source部分配置不同,其余部分都一樣。不同的地方如下:
1 2 3 |
#描述/配置Source a1.sources.r1.type = http a1.sources.r1.port = 66666 |
啟動
flume:
./flume-ng agent --conf ../conf --conf-file ../conf/template6.conf --name a1 -Dflume.root.logger=INFO,console
通過命令發送
HTTP
請求到指定端口:
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://0.0.0.0:6666