實時采集日志的數據采集引擎 flume


介紹:

  Flume由Cloudera公司開發,是一個分布式、高可靠、高可用的海量日志采集、聚 合、傳輸的系統。

  簡單的說,Flume是實時采集日志的數據采集引擎

  重要組件:Source、Channel、Sink

 

 

  • Agent本質上是一個 JVM 進程,該JVM進程控制Event數據流從外部日志生產者 那里傳輸到目的地(或者是下一個Agent)。一個完整的Agent中包含了三個組 件Source、Channel和Sink,Source是指數據的來源和方式,Channel是一個數 據的緩沖池,Sink定義了數據輸出的方式和目的地。
  • Source是負責接收數據到Flume Agent的組件。Source組件可以處理各種類 型、各種格式的日志數據,包括avro、exec、spooldir、netcat等。
  • Channel是位於Source和Sink之間的緩沖區。Channel允許Source和Sink運作 在不同的速率上。Channel是線程安全的,可以同時處理多個Source的寫入操作 及多個Sink的讀取操作。
    • Memory Channel是內存中的隊列。Memory Channel在允許數據丟失的情 景下適用。如果不允許數據丟失,應該避免使用Memory Channel,因為程 序死亡、機器宕機或者重啟都可能會導致數據丟失;
    • File Channel將所有事件寫到磁盤。因此在程序關閉或機器宕機的情況下不 會丟失數據;
  • Sink不斷地輪詢Channel中的事件且批量地移除它們,並將這些事件批量寫入到 存儲或索引系統、或者被發送到另一個Flume Agent。
    • Sink是完全事務性的。在從Channel批量刪除數據之前,每個Sink用Channel啟 動一個事務。批量事件一旦成功寫出到存儲系統或下一個Flume Agent,Sink就 利用Channel提交事務。事務一旦被提交,該Channel從自己的內部緩沖區刪除 事件。
  • Event是Flume定義的一個數據流傳輸的最小單位。

 

 

 flume 流程:

  • 1. Source接收事件,交給其Channel處理器處理事件
  • 2. 處理器通過攔截器Interceptor,對事件一些處理,比如壓縮解碼,正則攔截,時 間戳攔截,分類等
  • 3. 經過攔截器處理過的事件再傳給Channel選擇器,將事件寫入相應的Channel。 Channel Selector有兩種:
    •   Replicating Channel Selector(默認),會將source過來的Event發往所有 Channel(比較常用的場景是,用多個Channel實現冗余副本,保證可用性)
    •   Multiplexing Channel Selector,根據配置分發event。此selector會根據 event中某個header對應的value來將event發往不同的channel,一般配合攔截器使用
  • 4. 最后由Sink處理器處理各個Channel的事件

常用source:

  •   avro source:監聽 Avro 端口來接收外部 avro 客戶端的事件流。avro-source 接收到的是經過avro序列化后的數據,然后反序列化數據繼續傳輸。如果是avro source的話,源數據必須是經過avro序列化后的數據。利用 Avro source可以實現多 級流動、扇出流、扇入流等效果。接收通過flume提供的avro客戶端發送的日 志信 息。
  • Taildir Source(1.7):監控指定的多個文件,一旦文件內有新寫入的數據, 就會將其寫入到指定的sink內,本來源可靠性高,不會丟失數據。其不會對於跟蹤的 文件有任何處理,不會重命名也不會刪除,不會做任何修改。目前不支持Windows 系統,不支持讀取二進制文件,支持一行一行的讀取文本文件。
  • netcat source:一個NetCat Source用來監聽一個指定端口,並接收監聽到的 數據。

 

常用channel:

  • (1)memory channel:緩存到內存中(最常用)
  • (2)file channel:緩存到文件中
  • (3)JDBC channel:通過JDBC緩存到關系型數據庫中
  • (4)kafka channel:緩存到kafka中

 

常用sink:

  • (1)logger sink:將信息顯示在標准輸出上,主要用於測試
  • (2)avro sink:Flume events發送到sink,轉換為Avro events,並發送到配置好 的hostname/port。從配置好的channel按照配置好的批量大小批量獲取events
  • (3)null sink:將接收到events全部丟棄
  • (4)HDFS sink:將 events 寫進HDFS。支持創建文本和序列文件,支持兩種文件 類型壓縮。文件可以基於數據的經過時間、大小、事件的數量周期性地滾動
  • (5)Hive sink:該sink streams 將包含分割文本或者JSON數據的events直接傳送 到Hive表或分區中。使用Hive 事務寫events。當一系列events提交到Hive時,它們 馬上可以被Hive查詢到
  • (6)HBase sink:保存到HBase中
  • (7)kafka sink:保存到kafka中

 

flume 文檔鏈接: Flume 1.9用戶手冊中文版 — 可能是目前翻譯最完整的版本了 (liyifeng.org)

 

案例:netcat source,memory,logger sink

# a1是agent的名稱。source、channel、sink的名稱分別為:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux123
a1.sources.r1.port = 8888
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

# sink
a1.sinks.k1.type = logger
# source、channel、sink之間的關系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動:

$FLUME_HOME/bin/flume-ng agent --name a1 \
--conf-file $FLUME_HOME/conf/flume-netcat-logger.conf \
-Dflume.root.logger=INFO,console
  • name。定義agent的名字,要與參數文件一致
  • conf-file。指定參數文件位置
  • -D表示flume運行時動態修改 flume.root.logger 參數屬性值,並將控制台日志 打印級別設置為INFO級別。日志級別包括:log、info、warn、error

 

案例:taildir,memory,hdfs/file_roll

 

 

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數據流復制給所有channel
a1.sources.r1.selector.type = replicating
# source
a1.sources.r1.type = taildir
# 記錄每個文件最新消費位置
a1.sources.r1.positionFile = /root/flume/taildir_position.json
a1.sources.r1.filegroups = f1
# 備注:.*log 是正則表達式;這里寫成 *.log 是錯誤的
a1.sources.r1.filegroups.f1 = /tmp/root/.*log
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux123
a1.sinks.k1.port = 9091

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux123
a1.sinks.k2.port = 9092
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = linux123
a2.sources.r1.port = 9091
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 500
# Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://linux121:8020/flume2/%Y%m%d/%H
# 上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2- # 是否使用本地時間戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true # 500個Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 500 # 設置文件類型,可支持壓縮 a2.sinks.k1.hdfs.fileType = DataStream # 60秒生成一個新的文件 a2.sinks.k1.hdfs.rollInterval = 60 a2.sinks.k1.hdfs.rollSize = 0 a2.sinks.k1.hdfs.rollCount = 0 a2.sinks.k1.hdfs.minBlockReplicas = 1 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = linux123
a3.sources.r1.port = 9092
# Describe the sink
a3.sinks.k1.type = file_roll
# 目錄需要提前創建好
a3.sinks.k1.sink.directory = /root/flume/output
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 10000
a3.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

分別啟動:

$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file ~/conf/flume-avro-file.conf \
-Dflume.root.logger=INFO,console &
$FLUME_HOME/bin/flume-ng agent --name a2 \
--conf-file ~/conf/flume-avro-hdfs.conf \
-Dflume.root.logger=INFO,console &
$FLUME_HOME/bin/flume-ng agent --name a1 \
--conf-file ~/conf/flume-taildir-avro.conf \
-Dflume.root.logger=INFO,console &

 

 hdfs sink 參數說明:

  一般使用 HDFS Sink 都會采用滾動生成文件的方式,滾動生成文件的策略有:

  • 基於時間
    •   hdfs.rollInterval
    • 缺省值:30,單位秒
    • 0禁用
  • 基於文件大小
    •    hdfs.rollSize
    • 缺省值:1024字節
    • 0禁用
  • 基於event數量
    •   hdfs.rollCount
    • 10
    • 0禁用
  • 基於文件空閑時間
    •   hdfs.idleTimeout
    • 缺省值:0。禁用
  • 基於HDFS文件副本數
    •   hdfs.minBlockReplicas
    • 默認:與HDFS的副本數一致
    • 要將該參數設置為1;否則HFDS文件所在塊的復制會引起文件滾動

  其他重要配置:

  • hdfs.useLocalTimeStamp
    •   使用本地時間,而不是event header的時間戳
    • 默認值:false
  • hdfs.round
    • 時間戳是否四舍五入
    •   默認值false
    • 如果為true,會影響所有的時間,除了t%
  • hdfs.roundValue
    •    四舍五入的最高倍數(單位配置在hdfs.roundUnit),但是要小於當前時間
    • 默認值:1
  • hdfs.roundUnit
    •   可選值為:second、minute、hour
    • 默認值:second

如果要避免hdfs  sink產生小文件,參數配置參考如下:

a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://linux121:9000/flume/events/%Y/%m/
%d/%H/%M
a1.sinks.k1.hdfs.minBlockReplicas=1
a1.sinks.k1.hdfs.rollInterval=3600
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0

 

攔截器:

  時間戳攔截器:具體參考官方文檔

 

 

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux123
a1.sources.r1.port = 8888
# 這部分是新增 時間攔截器的 內容
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
# 是否保留Event header中已經存在的同名時間戳,缺省值false
a1.sources.r1.interceptors.i1.preserveExisting= false
# 這部分是新增 時間攔截器的 內容
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

host 攔截器:

  

 

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux123
a1.sources.r1.port = 8888
# 這部分是新增 時間攔截器 的內容
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting= false

# 這部分是新增 主機名攔截器 的內容
a1.sources.r1.interceptors.i2.type = host
# 如果header中已經存在同名的屬性是否保留
a1.sources.r1.interceptors.i2.preserveExisting= false
# true:使用IP地址;false:使用hostname
a1.sources.r1.interceptors.i2.useIP = false
# 這部分是新增 主機名攔截器 的內容
# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

channel 選擇器 :

  replication:

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

  multiplexing:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state #以每個Event的
header中的state這個屬性的值作為選擇channel的依據
a1.sources.r1.selector.mapping.CZ = c1 #如果state=CZ,則選
擇c1這個channel
a1.sources.r1.selector.mapping.US = c2 c3 #如果state=US,則選
擇c2 和 c3 這兩個channel
a1.sources.r1.selector.default = c4 #默認使用c4這個channel

 

 Sink組邏輯處理器:

  負載均衡:

 

 

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

故障轉移:

  

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 1000

 

flume 事務:

 

 

  put 事務流程:

  •   把一批event寫入putList(臨時緩沖區)
  • 檢查channel 是否有足夠的空間保存putList的所有event
  • 如果可以保存,doCommit,如果不能保存 doRollback

  take事務流程:

  •   把一批event寫入takeList
  •        把takeList的所有event 發送給下一級flume 或者其他存儲系統
  • 如果全部發送成功,doCommit ,如果發送失敗, doRollback

 

flume 是否丟數據:

  •   source: tailDir source 不丟數據,其他可能丟
  •   channel: file channel 不丟數據,memory channel 可能丟數據
  •       sink :不會丟數據,但可能重復(take事務失敗重新發送)

 

  

    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM