介紹:
Flume由Cloudera公司開發,是一個分布式、高可靠、高可用的海量日志采集、聚 合、傳輸的系統。
簡單的說,Flume是實時采集日志的數據采集引擎。
重要組件:Source、Channel、Sink
- Agent本質上是一個 JVM 進程,該JVM進程控制Event數據流從外部日志生產者 那里傳輸到目的地(或者是下一個Agent)。一個完整的Agent中包含了三個組 件Source、Channel和Sink,Source是指數據的來源和方式,Channel是一個數 據的緩沖池,Sink定義了數據輸出的方式和目的地。
- Source是負責接收數據到Flume Agent的組件。Source組件可以處理各種類 型、各種格式的日志數據,包括avro、exec、spooldir、netcat等。
- Channel是位於Source和Sink之間的緩沖區。Channel允許Source和Sink運作 在不同的速率上。Channel是線程安全的,可以同時處理多個Source的寫入操作 及多個Sink的讀取操作。
- Memory Channel是內存中的隊列。Memory Channel在允許數據丟失的情 景下適用。如果不允許數據丟失,應該避免使用Memory Channel,因為程 序死亡、機器宕機或者重啟都可能會導致數據丟失;
- File Channel將所有事件寫到磁盤。因此在程序關閉或機器宕機的情況下不 會丟失數據;
- Sink不斷地輪詢Channel中的事件且批量地移除它們,並將這些事件批量寫入到 存儲或索引系統、或者被發送到另一個Flume Agent。
- Sink是完全事務性的。在從Channel批量刪除數據之前,每個Sink用Channel啟 動一個事務。批量事件一旦成功寫出到存儲系統或下一個Flume Agent,Sink就 利用Channel提交事務。事務一旦被提交,該Channel從自己的內部緩沖區刪除 事件。
- Event是Flume定義的一個數據流傳輸的最小單位。
flume 流程:
- 1. Source接收事件,交給其Channel處理器處理事件
- 2. 處理器通過攔截器Interceptor,對事件一些處理,比如壓縮解碼,正則攔截,時 間戳攔截,分類等
- 3. 經過攔截器處理過的事件再傳給Channel選擇器,將事件寫入相應的Channel。 Channel Selector有兩種:
- Replicating Channel Selector(默認),會將source過來的Event發往所有 Channel(比較常用的場景是,用多個Channel實現冗余副本,保證可用性)
- Multiplexing Channel Selector,根據配置分發event。此selector會根據 event中某個header對應的value來將event發往不同的channel,一般配合攔截器使用
- 4. 最后由Sink處理器處理各個Channel的事件
常用source:
- avro source:監聽 Avro 端口來接收外部 avro 客戶端的事件流。avro-source 接收到的是經過avro序列化后的數據,然后反序列化數據繼續傳輸。如果是avro source的話,源數據必須是經過avro序列化后的數據。利用 Avro source可以實現多 級流動、扇出流、扇入流等效果。接收通過flume提供的avro客戶端發送的日 志信 息。
- Taildir Source(1.7):監控指定的多個文件,一旦文件內有新寫入的數據, 就會將其寫入到指定的sink內,本來源可靠性高,不會丟失數據。其不會對於跟蹤的 文件有任何處理,不會重命名也不會刪除,不會做任何修改。目前不支持Windows 系統,不支持讀取二進制文件,支持一行一行的讀取文本文件。
- netcat source:一個NetCat Source用來監聽一個指定端口,並接收監聽到的 數據。
常用channel:
- (1)memory channel:緩存到內存中(最常用)
- (2)file channel:緩存到文件中
- (3)JDBC channel:通過JDBC緩存到關系型數據庫中
- (4)kafka channel:緩存到kafka中
常用sink:
- (1)logger sink:將信息顯示在標准輸出上,主要用於測試
- (2)avro sink:Flume events發送到sink,轉換為Avro events,並發送到配置好 的hostname/port。從配置好的channel按照配置好的批量大小批量獲取events
- (3)null sink:將接收到events全部丟棄
- (4)HDFS sink:將 events 寫進HDFS。支持創建文本和序列文件,支持兩種文件 類型壓縮。文件可以基於數據的經過時間、大小、事件的數量周期性地滾動
- (5)Hive sink:該sink streams 將包含分割文本或者JSON數據的events直接傳送 到Hive表或分區中。使用Hive 事務寫events。當一系列events提交到Hive時,它們 馬上可以被Hive查詢到
- (6)HBase sink:保存到HBase中
- (7)kafka sink:保存到kafka中
flume 文檔鏈接: Flume 1.9用戶手冊中文版 — 可能是目前翻譯最完整的版本了 (liyifeng.org)
案例:netcat source,memory,logger sink
# a1是agent的名稱。source、channel、sink的名稱分別為:r1 c1 k1 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # source a1.sources.r1.type = netcat a1.sources.r1.bind = linux123 a1.sources.r1.port = 8888 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # sink a1.sinks.k1.type = logger # source、channel、sink之間的關系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啟動:
$FLUME_HOME/bin/flume-ng agent --name a1 \ --conf-file $FLUME_HOME/conf/flume-netcat-logger.conf \ -Dflume.root.logger=INFO,console
- name。定義agent的名字,要與參數文件一致
- conf-file。指定參數文件位置
- -D表示flume運行時動態修改 flume.root.logger 參數屬性值,並將控制台日志 打印級別設置為INFO級別。日志級別包括:log、info、warn、error
案例:taildir,memory,hdfs/file_roll
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 將數據流復制給所有channel a1.sources.r1.selector.type = replicating # source a1.sources.r1.type = taildir # 記錄每個文件最新消費位置 a1.sources.r1.positionFile = /root/flume/taildir_position.json a1.sources.r1.filegroups = f1 # 備注:.*log 是正則表達式;這里寫成 *.log 是錯誤的 a1.sources.r1.filegroups.f1 = /tmp/root/.*log # sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = linux123 a1.sinks.k1.port = 9091 a1.sinks.k2.type = avro a1.sinks.k2.hostname = linux123 a1.sinks.k2.port = 9092 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 500 a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 500 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.bind = linux123 a2.sources.r1.port = 9091 # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 500
# Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://linux121:8020/flume2/%Y%m%d/%H
# 上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2- # 是否使用本地時間戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true # 500個Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 500 # 設置文件類型,可支持壓縮 a2.sinks.k1.hdfs.fileType = DataStream # 60秒生成一個新的文件 a2.sinks.k1.hdfs.rollInterval = 60 a2.sinks.k1.hdfs.rollSize = 0 a2.sinks.k1.hdfs.rollCount = 0 a2.sinks.k1.hdfs.minBlockReplicas = 1 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = linux123 a3.sources.r1.port = 9092 # Describe the sink a3.sinks.k1.type = file_roll # 目錄需要提前創建好 a3.sinks.k1.sink.directory = /root/flume/output # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 10000 a3.channels.c2.transactionCapacity = 500 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
分別啟動:
$FLUME_HOME/bin/flume-ng agent --name a3 \ --conf-file ~/conf/flume-avro-file.conf \ -Dflume.root.logger=INFO,console & $FLUME_HOME/bin/flume-ng agent --name a2 \ --conf-file ~/conf/flume-avro-hdfs.conf \ -Dflume.root.logger=INFO,console & $FLUME_HOME/bin/flume-ng agent --name a1 \ --conf-file ~/conf/flume-taildir-avro.conf \ -Dflume.root.logger=INFO,console &
hdfs sink 參數說明:
一般使用 HDFS Sink 都會采用滾動生成文件的方式,滾動生成文件的策略有:
- 基於時間
- hdfs.rollInterval
- 缺省值:30,單位秒
- 0禁用
- 基於文件大小
- hdfs.rollSize
- 缺省值:1024字節
- 0禁用
- 基於event數量
- hdfs.rollCount
- 10
- 0禁用
- 基於文件空閑時間
- hdfs.idleTimeout
- 缺省值:0。禁用
- 基於HDFS文件副本數
- hdfs.minBlockReplicas
- 默認:與HDFS的副本數一致
- 要將該參數設置為1;否則HFDS文件所在塊的復制會引起文件滾動
其他重要配置:
- hdfs.useLocalTimeStamp
- 使用本地時間,而不是event header的時間戳
- 默認值:false
- hdfs.round
- 時間戳是否四舍五入
- 默認值false
- 如果為true,會影響所有的時間,除了t%
- hdfs.roundValue
- 四舍五入的最高倍數(單位配置在hdfs.roundUnit),但是要小於當前時間
- 默認值:1
- hdfs.roundUnit
- 可選值為:second、minute、hour
- 默認值:second
如果要避免hdfs sink產生小文件,參數配置參考如下:
a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.useLocalTimeStamp=true a1.sinks.k1.hdfs.path=hdfs://linux121:9000/flume/events/%Y/%m/ %d/%H/%M a1.sinks.k1.hdfs.minBlockReplicas=1 a1.sinks.k1.hdfs.rollInterval=3600 a1.sinks.k1.hdfs.rollSize=0 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=0
攔截器:
時間戳攔截器:具體參考官方文檔
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = linux123 a1.sources.r1.port = 8888 # 這部分是新增 時間攔截器的 內容 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp # 是否保留Event header中已經存在的同名時間戳,缺省值false a1.sources.r1.interceptors.i1.preserveExisting= false # 這部分是新增 時間攔截器的 內容 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 500 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
host 攔截器:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = linux123 a1.sources.r1.port = 8888 # 這部分是新增 時間攔截器 的內容 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = timestamp a1.sources.r1.interceptors.i1.preserveExisting= false # 這部分是新增 主機名攔截器 的內容 a1.sources.r1.interceptors.i2.type = host # 如果header中已經存在同名的屬性是否保留 a1.sources.r1.interceptors.i2.preserveExisting= false # true:使用IP地址;false:使用hostname a1.sources.r1.interceptors.i2.useIP = false # 這部分是新增 主機名攔截器 的內容 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 500 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
channel 選擇器 :
replication:
a1.sources = r1 a1.channels = c1 c2 c3 a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 c3 a1.sources.r1.selector.optional = c3
multiplexing:
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state #以每個Event的 header中的state這個屬性的值作為選擇channel的依據 a1.sources.r1.selector.mapping.CZ = c1 #如果state=CZ,則選 擇c1這個channel a1.sources.r1.selector.mapping.US = c2 c3 #如果state=US,則選 擇c2 和 c3 這兩個channel a1.sources.r1.selector.default = c4 #默認使用c4這個channel
Sink組邏輯處理器:
負載均衡:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
故障轉移:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 1000
flume 事務:
put 事務流程:
- 把一批event寫入putList(臨時緩沖區)
- 檢查channel 是否有足夠的空間保存putList的所有event
- 如果可以保存,doCommit,如果不能保存 doRollback
take事務流程:
- 把一批event寫入takeList
- 把takeList的所有event 發送給下一級flume 或者其他存儲系統
- 如果全部發送成功,doCommit ,如果發送失敗, doRollback
flume 是否丟數據:
- source: tailDir source 不丟數據,其他可能丟
- channel: file channel 不丟數據,memory channel 可能丟數據
- sink :不會丟數據,但可能重復(take事務失敗重新發送)