Flume(一)


Flume的定義

  • Flume是一個分布式的、高可靠的、高可用的將大批量的不同數據源的日志數據收集、聚合、移動到數據中心(HDFS)進行存儲的系統。即是日志采集和匯總的工具
  • Logstash、FileBeat是ES棧的日志數據抽取工具,他們和Flume很類似,前者是輕量級、后者是重量級,若項目組使用的是ES棧技術,那完全可以使用Logstash取代Flume。

版本

 

  • NG: 1.x的版本   (N=NEW)
  • OG:0.9.x的版本,不用管(O=OLD)
  • 由於我使用的是CDH5.7.0,故選擇flume-ng-1.6.0-cdh5.7.0版本,注意此1.6和社區的1.6有差別。

 

  • flume的優勢:

  1. 可以高速采集數據,采集的數據能夠以想要的文件格式及壓縮方式存儲在hdfs上
  2. 事務功能保證了數據在采集的過程中數據不丟失
  3. 部分Source保證了Flume掛了以后重啟依舊能夠繼續在上一次采集點采集數據,真正做到數據零丟失

 

  • flume的組成

  • flume有3大組件
  1. source(源端數據采集):Flume提供了各種各樣的Source、同時還提供了自定義的Source
  2. Channel(臨時存儲聚合數據):主要用的是memory channel和File channel(生產最常用),生產中channel的數據一定是要監控的,防止sink掛了,撐爆channel
  3. Sink(移動數據到目標端):如HDFS、KAFKA、DB以及自定義的sink
  • flume的架構

  • flume的agent架構
  • 單Agent:

  • 串聯Agent:

  • 並聯Agent(生產中最多的使用):

 

  •  多sinkAgent(也很常見):

 

 

 

 flume部署

  • 打開官網http://archive.cloudera.com/cdh5/cdh/5/

  • 上傳並解壓

[hadoop@hadoop001 app]$ rz 

flume-ng-1.6.0-cdh5.7.0.tar.gz

[hadoop@hadoop001 app]$tar -xzvf  flume-ng-1.6.0-cdh5.7.0.tar.gz

  

#修改配置文件,添加JAVA_HOME

[hadoop@hadoop001 app]$ cd ~/app/apache-flume-1.6.0-cdh5.7.0-bin
[hadoop@hadoop001 apache-flume-1.6.0-cdh5.7.0-bin]$ cp ~/app/apache-flume-1.6.0-cdh5.7.0-bin/conf/flume-env.sh.template ~/app/apache-flume-1.6.0-cdh5.7.0-bin/conf/flume-env.sh
[hadoop@hadoop001 apache-flume-1.6.0-cdh5.7.0-bin]$ vim ~/app/apache-flume-1.6.0-cdh5.7.0-bin/conf/flume-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_45


#添加環境變量

hadoop@hadoop001 bin]$ soruce ~/.bash_profile
export FLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin
export PATH=$FLUME_HOME/bin:$PATH
[hadoop@hadoop001 bin]$ source ~/.bash_profile
[hadoop@hadoop001 bin]$ which flume-ng
~/app/apache-flume-1.6.0-cdh5.7.0-bin/bin/flume-ng

 

  • Agent配置使用案列

  1. Flume的使用其實就是Source、Channel、Sink的配置
  2. Agent=Source+Channel+Sink,其實agent就是Flume的配置文件
  3. 一個配置文件可以配置多個Agent的。
  4. Event:Flume數據傳輸的最小單位,一個EVent就是一條記錄,由head和body兩個部分組成,head存儲的是管道,body存儲的是字節數組
  • Flume文件配置

[hadoop@hadoop001 conf]$ vim  /home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin/conf/example.conf

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  •  NetSource:黑色的配置是必填項

 

  • memory channel:capatity=>channel的存儲最大event(消息)個數,生產至少10萬條,
  • transationCapacity=>最多達到多少條必須提交事務。生產也必須調大。

  • logger:就是控制台類型的sink
  • 注意1:一個source可以綁定多個channel,但是一個sink只能綁定一個Channel

 

  • 啟動Agent以及測試

  • 啟動
#最后一行是為了方便觀察輸出INFO日志到控制台,可以去掉
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/example.conf \
-Dflume.root.logger=INFO,console


使用telnet測試:

[hadoop@hadoop001 ~]$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
OK

 

Exec Source采集文件數據到hdfs

生產的架構是: log數據=》flume=》hdfs,這里我們采用簡單的Exec Source通過tail -F 數據文件進行數據采集。

# example.conf: A single-node Flume configuration

# Name the components on this agent
exec-hdfs-agent.sources = exec-source
exec-hdfs-agent.sinks = hdfs-sink
exec-hdfs-agent.channels = memory-channel

# Describe/configure the source
exec-hdfs-agent.sources.exec-source.type = exec
exec-hdfs-agent.sources.exec-source.command = tail -F /home/hadoop/data/access_10000.log
exec-hdfs-agent.sources.exec-source.shell = /bin/sh -c

# Describe the sink
exec-hdfs-agent.sinks.hdfs-sink.type = hdfs
exec-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:9000/flume/exec
exec-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = DataStream 
exec-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text

# Use a channel which buffers events in memory
exec-hdfs-agent.channels.memory-channel.type = memory
exec-hdfs-agent.channels.memory-channel.capacity = 1000
exec-hdfs-agent.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
exec-hdfs-agent.sources.exec-source.channels = memory-channel
exec-hdfs-agent.sinks.hdfs-sink.channel = memory-channel

  

 

  • 寫hdfs文件時先生成創建一個后綴名稱為.tmp的文件,當寫完成時,去掉了.tmp

 

  • 缺點:
  1. 雖然此種tail方式可以將日志數據采集到hdfs,但是tail -F進程掛了咋辦,還是會丟數據!生產上是行不通的。無法做到高可用。
  2. 其次上面的采集流程並未解決生成大量小文件的問題,無法做到高可靠
  3. tail只能監控一個文件,生產中更多的是監控一個文件夾。不能滿足需求

 

  • 使用Spooling Directory Source采集文件夾數據到hdfs

  • 寫到HDFS上的文件大小最好是100M左右,比blocksize的值(128M)略低
  • 一般使用rolllnterval(時間)、rollSize(大小)來控制文件的生成,哪個先觸發就會生成HDFS文件,將根據條數的roll關閉。
  • rollSize控制的大小是指的壓縮前的,所以若hdfs文件使用了壓縮,需調大rollsize的大小
  • 當文件夾下的某個文件被采集到hdfs上,會有個。complete的標志
  • 使用Spooling Directory Source采集文件數據時若該文件數據已經被采集,再對該文件做修改是會報錯的停止的,其次若放進去一個已經完成采集的同名數據文件也是會報錯停止的
  • 寫HDFS數據可按照時間分區,注意改時間刻度內無數據則不會生成該時間文件夾
  • 生成的文件名稱默認是前綴+時間戳,這個是可以更改的。
# example.conf: A single-node Flume configuration

# Name the components on this agent
spool-hdfs-agent.sources = spool-source
spool-hdfs-agent.sinks = hdfs-sink
spool-hdfs-agent.channels = memory-channel

# Describe/configure the source
spool-hdfs-agent.sources.spool-source.type = spooldir
spool-hdfs-agent.sources.spool-source.spoolDir = /home/hadoop/data/flume/spool/input

# Describe the sink
spool-hdfs-agent.sinks.hdfs-sink.type = hdfs
spool-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:9000/flume/spool/%Y%m%d%H%M
spool-hdfs-agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
spool-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream 
spool-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text
spool-hdfs-agent.sinks.hdfs-sink.hdfs.codeC = gzip
spool-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix = wsk
spool-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval = 30
spool-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize = 100000000
spool-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount = 0

# Use a channel which buffers events in memory
spool-hdfs-agent.channels.memory-channel.type = memory
spool-hdfs-agent.channels.memory-channel.capacity = 1000
spool-hdfs-agent.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
spool-hdfs-agent.sources.spool-source.channels = memory-channel
spool-hdfs-agent.sinks.hdfs-sink.channel = memory-channel

 

  

上述的Spooling Directory Source配置雖然解決了小文件過多以及監控多個文件的問題,但是依舊有如下問題。

  • 問題1:雖然能監控一個文件夾,但是無法監控遞歸的文件夾中的數據
  • 問題2:若采集時Flume掛了,無法保證重啟時還從之前文件讀取的那一行繼續采集數據

基於以上兩個問題,此凡是生產也是不可接受的

 

  • (生產版本)使用Taildir Source采集文件夾數據到hdfs

  1. Taildir Source 是Apache flume1.7新推出的,但是CDH Flume1.6做了集成
  2. Taildir Source是高可靠(reliable)的source,他會實時的將文件偏移量寫到json文件中並保存到磁盤。下次重啟Flume時會讀取Json文件獲取文件O偏移量,然后從之前的位置讀取數據,保證數據零丟失
  3. taildir Source可同時監控多個文件夾以及文件。即使文件在實時寫入數據。
  4. Taildir Source也是無法采集遞歸文件下的數據,這需要改造源碼
  5. Taildir Source監控一個文件夾下的所有文件一定要用.*正則

 

# example.conf: A single-node Flume configuration

# Name the components on this agent
taildir-hdfs-agent.sources = taildir-source
taildir-hdfs-agent.sinks = hdfs-sink
taildir-hdfs-agent.channels = memory-channel

# Describe/configure the source
taildir-hdfs-agent.sources.taildir-source.type = TAILDIR
taildir-hdfs-agent.sources.taildir-source.filegroups = f1
taildir-hdfs-agent.sources.taildir-source.filegroups.f1 = /home/hadoop/data/flume/taildir/input/.*
taildir-hdfs-agent.sources.taildir-source.positionFile = /home/hadoop/data/flume/taildir/taildir_position/taildir_position.json

# Describe the sink
taildir-hdfs-agent.sinks.hdfs-sink.type = hdfs
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:9000/flume/taildir/%Y%m%d%H%M
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream 
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.codeC = gzip
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix = wsk
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval = 30
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize = 100000000
taildir-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount = 0

# Use a channel which buffers events in memory
taildir-hdfs-agent.channels.memory-channel.type = memory
taildir-hdfs-agent.channels.memory-channel.capacity = 1000
taildir-hdfs-agent.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
taildir-hdfs-agent.sources.taildir-source.channels = memory-channel
taildir-hdfs-agent.sinks.hdfs-sink.channel = memory-channel

  




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM