聲明: 根據官方文檔選擇性的翻譯了下,不對請指正 https://flume.apache.org/FlumeUserGuide.html
術語介紹
組件 | 說明 |
---|---|
Agent | 一個flume的jvm實例 |
Client | 消息生產者 |
Source | 從client接收數據,傳遞給channel |
Sink | 從channel接收數據發送到目的端 |
Channel | 是source和sink的橋梁,類似隊列 |
Events | 可以是日志記錄、 avro 對象等 |
啟動方法:
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
配置文件示例:
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啟動:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
生產環境配置 --conf=conf_dir,那么conf_dir目錄中應該有下面兩個文件
- flume-env.sh
- log4j.properties
記錄日志方便debug, 給java傳參數
-Dorg.apache.flume.log.printconfig=true
或者在flume-env.sh里設置JAVA_OPTS=-Dorg.apache.flume.log.printconfig=true 日志級別應設置 debug或trace
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
flume也支持配置放在zookeeper中(實驗性質)
第三方插件
flume-env.sh 配置FLUME_CLASSPATH 或者 放在plugins.d
多agent模式之間必須使用avro類型
flume還支持多路復用,即一份數據可以復制多份傳給多個sink
配置文件
注意: 一個source可以有多個channle, 但是一個sink只能有一個channle,如下所示
# list the sources, sinks and channels for the agent <Agent>.sources = <Source> <Agent>.sinks = <Sink> <Agent>.channels = <Channel1> <Channel2> # set channel for source <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ... # set channel for sink <Agent>.sinks.<Sink>.channel = <Channel1>
有兩種模式可以支持扇出: 復制技術和 多路技術
- 復制: event發送給所有的channel
- 多路: 選擇性的發送給channel
如果要使用多路技術,需要單獨定義一個selector
<Agent>.sources.<Source1>.selector.type = replicating 這是默認的replicating
下面是多路的定義方法:
# Mapping for multiplexing selector <Agent>.sources.<Source1>.selector.type = multiplexing <Agent>.sources.<Source1>.selector.header = <someHeader> <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #... <Agent>.sources.<Source1>.selector.default = <Channel2> 定義一個默認值 如果什么都不匹配則使用這個default
來個具體例子:
# list the sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source1 agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 agent_foo.channels = mem-channel-1 file-channel-2 # set channels for source agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2 # set channel for sinks agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 agent_foo.sinks.avro-forward-sink2.channel = file-channel-2 # channel selector configuration agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing agent_foo.sources.avro-AppSrv-source1.selector.header = State agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1 還有一個optional的例子 agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
當所有的channel都消費完成,selector會嘗試往optional也寫一份, optional channel寫入失敗會忽略錯誤不會重試
如何header配置的channel既是required又是optional的 那么這個channel被認為是required, 任何一個channel失敗都會導致整個channel集合失敗,就像上面的CA,任何一個channel失敗都認為 CA失敗了
如果一個header沒有設置任何channel,那么event會寫到default channel,也會往optional寫一份. 既是指定了寫入optional channel但仍然會往default寫. 如果連default也沒指定那么event只能寫optional channel,寫入失敗也只是簡單的忽略.
Flume source
- avro source
下面是配置參數,粗體是必選
Property Name | Default | Description |
channels | – | |
type | – | The component type name, needs to be avro |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector.* | ||
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
compression-type | none | This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource |
ssl | false | Set this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”. |
keystore | – | This is the path to a Java keystore file. Required for SSL. |
keystore-password | – | The password for the Java keystore. Required for SSL. |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
ipFilter | false | Set this to true to enable ipFiltering for netty |
ipFilterRules | – | Define N netty ipFilter pattern rules with this config. |
ipFilterRules例子如下:
allow:name:localhost,deny:ip:
ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:* 允許本機,禁止其他ip allow:name:localhost,deny:ip:
deny:name:localhost,allow:ip: 禁止本機,允許其他
- Thrift source
Thrift source通過kerberos可以配置加密模式
需要設置這兩個參數: agent-principal agent-keytab - Exec source ( 不推薦使用)
運行unix命令輸出到標准輸出(如果沒有配置logStdErr=true那么錯誤會被拋棄)
… 太多source了 有興趣的自己看吧
Flume sinks
支持的sink也很多這里只列舉一下吧
-
- HDFS
- HIVE
- Logger 日志級別是INFO, 這個sink大都用來測試和調試
- Avro
- Thrift 也支持kerberos加密
- IRC
- File Roll
- Null 丟棄收到的消息
- HBase
- AsyncHBase 異步模式
- MorphlineSolr
- ElasticSearch
- Kite Dataset
- Kafka
Flume channel
channel就是在agent端存儲event的, source發送過來event,sink去消費
支持的channel如下
-
- memory
- JDBC 用戶持久化存儲到數據庫, 支持內置的Derby
- Kafka
- File
- Spillable memory
大體解釋下這個channel, 有一個內存的隊列和一個文件的channle組成,數據先在queue中存,存不下了就往file channle寫,這個channel的想法是在正常情況下的高吞吐就是內存的隊列, file channel就是應對突然來了很大的數據量或者agent突然掛了,那么數據還能從file恢復
重點來了: 實驗性質,不推薦在生產環境使用 - Pseudo Transaction 用來做單元測試,不推薦生產環境
Flume channle selector
前面說過了,默認的selector是復制模式,而不是多路模式
來一個多路模式的配置文件
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing 這里注意 a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4
Flume sink processors
可以給sink分組,實現負載均衡或者容錯
Property Name | Default | Description |
sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to be default, failover or load_balance |
Example for agent named a1:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance
有三種類型的processor: default, failover, load_balance
-
- 這篇文檔中配置的source-channel-sink就是default模式,什么都不用配置
- failover
給多個sink配置優先級,保證至少有一個sink是可用的,優先級的數字越大優先級越高
這些sink被放到一個池子pool里,發送event失敗的sink會被暫時移除pool,並被冷卻30s(默認),后續的event由次高優先級的sink處理,如果沒有設置優先級那么按照配置文件中的次序 - load balance
有兩個負載均衡的策略: 輪訓(round_robin默認), 隨機(random)
如果有一個sink發送失敗,processor會選擇下一個sink,如果你沒有設置backoff,它也不會被從可用sink中移除,如果所有的sink都失敗了,那么這個信息就傳遞給調用者
Event serializer
file_roll和hdfs sink都支持eventSerializer
-
- Body text serializer
就是普通的文本,event的header會被忽略 - "Flume event" Avro Event serializer
- Avro event serializer
- Body text serializer
Flume interceptor
flume可以使用攔截器修改和丟棄event,支持鏈式攔截器,多個攔截器用空格分隔
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i1.hostHeader = hostname a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d a1.sinks.k1.channel = c1
- Timestamp 在header中添加一個timestamp的key 顯示處理event的時間
- Host 在header中添加host 保存agent的ip地址
- Static 用戶可以自定義一個key,一次只能添加一個key,如果要添加多個可以使用多個interceptor攔截器
- UUID 生成一個128位的值
- Morphline
- search and replace 基於java的正則表達式 java matcher.replaceAll
- regex filter
- regex extractor 把正則分組中的匹配放到header中
flume properties
flume.called.from.service 每30s flume會重新載入配置文件.
如果你這是了這個參數 -Dflume.called.from.service
當agent第一次讀取配置文件並且這個文件不存在時,恰巧你設置了上面的參數,那么agent不會報錯,並會繼續等待30s重載配置文件
如果你沒設置上面的參數,那么agent立即退出
當agent到了30s重載配置文件, 而這不是第一次載入這個文件,那么agent不會退出,而是認為配置文件沒有變化繼續使用舊的配置
Json Reporting
flume可以啟動一個內置的http server以json的形式報告各種指標
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545