Apache Flume入門指南[翻譯自官方文檔]


聲明: 根據官方文檔選擇性的翻譯了下,不對請指正 https://flume.apache.org/FlumeUserGuide.html

術語介紹
組件 說明
Agent 一個flume的jvm實例
Client 消息生產者
Source 從client接收數據,傳遞給channel
Sink 從channel接收數據發送到目的端
Channel 是source和sink的橋梁,類似隊列
Events 可以是日志記錄、 avro 對象等
啟動方法:
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
 
配置文件示例:
# example.conf: A single-node Flume configuration
 
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
 
# Describe the sink
a1.sinks.k1.type = logger
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
 
啟動:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

 

生產環境配置 --conf=conf_dir,那么conf_dir目錄中應該有下面兩個文件
  • flume-env.sh
  • log4j.properties
 
記錄日志方便debug, 給java傳參數
-Dorg.apache.flume.log.printconfig=true
或者在flume-env.sh里設置JAVA_OPTS=-Dorg.apache.flume.log.printconfig=true  日志級別應設置 debug或trace
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
 
flume也支持配置放在zookeeper中(實驗性質)
 
第三方插件
flume-env.sh   配置FLUME_CLASSPATH  或者 放在plugins.d
 
多agent模式之間必須使用avro類型
flume還支持多路復用,即一份數據可以復制多份傳給多個sink

配置文件
注意: 一個source可以有多個channle, 但是一個sink只能有一個channle,如下所示
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>
 
# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
 
# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

  

有兩種模式可以支持扇出:  復制技術和 多路技術
  • 復制: event發送給所有的channel
  • 多路: 選擇性的發送給channel
 
如果要使用多路技術,需要單獨定義一個selector
<Agent>.sources.<Source1>.selector.type = replicating  這是默認的replicating
 
下面是多路的定義方法:
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...
 
<Agent>.sources.<Source1>.selector.default = <Channel2>  定義一個默認值 如果什么都不匹配則使用這個default
 
來個具體例子:
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
 
# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2
 
# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
 
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
 
還有一個optional的例子
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2

  

當所有的channel都消費完成,selector會嘗試往optional也寫一份, optional channel寫入失敗會忽略錯誤不會重試
 
如何header配置的channel既是required又是optional的 那么這個channel被認為是required, 任何一個channel失敗都會導致整個channel集合失敗,就像上面的CA,任何一個channel失敗都認為 CA失敗了
如果一個header沒有設置任何channel,那么event會寫到default channel,也會往optional寫一份. 既是指定了寫入optional channel但仍然會往default寫. 如果連default也沒指定那么event只能寫optional channel,寫入失敗也只是簡單的忽略.

Flume source
  1. avro source
    下面是配置參數,粗體是必選
Property Name Default Description
channels  
type The component type name, needs to be avro
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
selector.type    
selector.*    
interceptors Space-separated list of interceptors
interceptors.*    
compression-type none This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
ssl false Set this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”.
keystore This is the path to a Java keystore file. Required for SSL.
keystore-password The password for the Java keystore. Required for SSL.
keystore-type JKS The type of the Java keystore. This can be “JKS” or “PKCS12”.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
ipFilter false Set this to true to enable ipFiltering for netty
ipFilterRules Define N netty ipFilter pattern rules with this config.
 
ipFilterRules例子如下:
allow:name:localhost,deny:ip:
ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*    允許本機,禁止其他ip allow:name:localhost,deny:ip:
deny:name:localhost,allow:ip:   禁止本機,允許其他
 
  1. Thrift source
    Thrift source通過kerberos可以配置加密模式
    需要設置這兩個參數: agent-principal   agent-keytab
  2. Exec source ( 不推薦使用)
    運行unix命令輸出到標准輸出(如果沒有配置logStdErr=true那么錯誤會被拋棄)

… 太多source了  有興趣的自己看吧

Flume sinks
支持的sink也很多這里只列舉一下吧
    • HDFS
    • HIVE
    • Logger 日志級別是INFO, 這個sink大都用來測試和調試
    • Avro
    • Thrift 也支持kerberos加密
    • IRC
    • File Roll
    • Null  丟棄收到的消息
    • HBase
    • AsyncHBase  異步模式
    • MorphlineSolr
    • ElasticSearch
    • Kite Dataset
    • Kafka

Flume channel
channel就是在agent端存儲event的, source發送過來event,sink去消費
支持的channel如下
    • memory
    • JDBC  用戶持久化存儲到數據庫, 支持內置的Derby
    • Kafka
    • File
    • Spillable memory
      大體解釋下這個channel, 有一個內存的隊列和一個文件的channle組成,數據先在queue中存,存不下了就往file channle寫,這個channel的想法是在正常情況下的高吞吐就是內存的隊列, file channel就是應對突然來了很大的數據量或者agent突然掛了,那么數據還能從file恢復
      重點來了: 實驗性質,不推薦在生產環境使用
    • Pseudo Transaction  用來做單元測試,不推薦生產環境

Flume channle selector
前面說過了,默認的selector是復制模式,而不是多路模式
來一個多路模式的配置文件
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing  這里注意
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

  


Flume sink processors
可以給sink分組,實現負載均衡或者容錯
Property Name Default Description
sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be default, failover or load_balance
Example for agent named a1:
 
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
 
有三種類型的processor: default, failover, load_balance
 
    1. 這篇文檔中配置的source-channel-sink就是default模式,什么都不用配置
    2. failover
      給多個sink配置優先級,保證至少有一個sink是可用的,優先級的數字越大優先級越高
      這些sink被放到一個池子pool里,發送event失敗的sink會被暫時移除pool,並被冷卻30s(默認),后續的event由次高優先級的sink處理,如果沒有設置優先級那么按照配置文件中的次序
    3. load balance
      有兩個負載均衡的策略: 輪訓(round_robin默認), 隨機(random)
      如果有一個sink發送失敗,processor會選擇下一個sink,如果你沒有設置backoff,它也不會被從可用sink中移除,如果所有的sink都失敗了,那么這個信息就傳遞給調用者

Event serializer
file_roll和hdfs sink都支持eventSerializer
    1. Body text serializer
      就是普通的文本,event的header會被忽略
    2. "Flume event" Avro Event serializer
    3. Avro event serializer

Flume interceptor
flume可以使用攔截器修改和丟棄event,支持鏈式攔截器,多個攔截器用空格分隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1 
  1. Timestamp   在header中添加一個timestamp的key 顯示處理event的時間
  2. Host  在header中添加host     保存agent的ip地址
  3. Static 用戶可以自定義一個key,一次只能添加一個key,如果要添加多個可以使用多個interceptor攔截器
  4. UUID  生成一個128位的值
  5. Morphline
  6. search and replace  基於java的正則表達式   java matcher.replaceAll
  7. regex filter
  8. regex extractor 把正則分組中的匹配放到header中

flume properties
flume.called.from.service 每30s flume會重新載入配置文件.
如果你這是了這個參數 -Dflume.called.from.service
當agent第一次讀取配置文件並且這個文件不存在時,恰巧你設置了上面的參數,那么agent不會報錯,並會繼續等待30s重載配置文件
如果你沒設置上面的參數,那么agent立即退出
當agent到了30s重載配置文件, 而這不是第一次載入這個文件,那么agent不會退出,而是認為配置文件沒有變化繼續使用舊的配置

Json Reporting
flume可以啟動一個內置的http server以json的形式報告各種指標
$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM