Flume的定義
- Flume是一個分布式的、高可靠的、高可用的將大批量的不同數據源的日志數據收集、聚合、移動到數據中心(HDFS)進行存儲的系統。即是日志采集和匯總的工具
- Logstash、FileBeat是ES棧的日志數據抽取工具,他們和Flume很類似,前者是輕量級、后者是重量級,若項目組使用的是ES棧技術,那完全可以使用Logstash取代Flume。
版本
- NG: 1.x的版本 (N=NEW)
- OG:0.9.x的版本,不用管(O=OLD)
- 由於我使用的是CDH5.7.0,故選擇flume-ng-1.6.0-cdh5.7.0版本,注意此1.6和社區的1.6有差別。
-
flume的優勢:
- 可以高速采集數據,采集的數據能夠以想要的文件格式及壓縮方式存儲在hdfs上
- 事務功能保證了數據在采集的過程中數據不丟失
- 部分Source保證了Flume掛了以后重啟依舊能夠繼續在上一次采集點采集數據,真正做到數據零丟失
-
flume的組成
- flume有3大組件
- source(源端數據采集):Flume提供了各種各樣的Source、同時還提供了自定義的Source
- Channel(臨時存儲聚合數據):主要用的是memory channel和File channel(生產最常用),生產中channel的數據一定是要監控的,防止sink掛了,撐爆channel
- Sink(移動數據到目標端):如HDFS、KAFKA、DB以及自定義的sink
-
flume的架構
- flume的agent架構
- 單Agent:
- 串聯Agent:
- 並聯Agent(生產中最多的使用):
- 多sinkAgent(也很常見):
flume部署
-
打開官網http://archive.cloudera.com/cdh5/cdh/5/
-
上傳並解壓
[hadoop@hadoop001 app]$ rz flume-ng-1.6.0-cdh5.7.0.tar.gz [hadoop@hadoop001 app]$tar -xzvf flume-ng-1.6.0-cdh5.7.0.tar.gz
#修改配置文件,添加JAVA_HOME
[hadoop@hadoop001 app]$ cd ~/app/apache-flume-1.6.0-cdh5.7.0-bin [hadoop@hadoop001 apache-flume-1.6.0-cdh5.7.0-bin]$ cp ~/app/apache-flume-1.6.0-cdh5.7.0-bin/conf/flume-env.sh.template ~/app/apache-flume-1.6.0-cdh5.7.0-bin/conf/flume-env.sh [hadoop@hadoop001 apache-flume-1.6.0-cdh5.7.0-bin]$ vim ~/app/apache-flume-1.6.0-cdh5.7.0-bin/conf/flume-env.sh export JAVA_HOME=/usr/java/jdk1.8.0_45
#添加環境變量
hadoop@hadoop001 bin]$ soruce ~/.bash_profile export FLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin export PATH=$FLUME_HOME/bin:$PATH [hadoop@hadoop001 bin]$ source ~/.bash_profile [hadoop@hadoop001 bin]$ which flume-ng ~/app/apache-flume-1.6.0-cdh5.7.0-bin/bin/flume-ng
-
Agent配置使用案列
- Flume的使用其實就是Source、Channel、Sink的配置
- Agent=Source+Channel+Sink,其實agent就是Flume的配置文件
- 一個配置文件可以配置多個Agent的。
- Event:Flume數據傳輸的最小單位,一個EVent就是一條記錄,由head和body兩個部分組成,head存儲的是管道,body存儲的是字節數組
-
Flume文件配置
[hadoop@hadoop001 conf]$ vim /home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin/conf/example.conf # example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- NetSource:黑色的配置是必填項
- memory channel:capatity=>channel的存儲最大event(消息)個數,生產至少10萬條,
- transationCapacity=>最多達到多少條必須提交事務。生產也必須調大。
- logger:就是控制台類型的sink
- 注意1:一個source可以綁定多個channel,但是一個sink只能綁定一個Channel
-
啟動Agent以及測試
- 啟動
#最后一行是為了方便觀察輸出INFO日志到控制台,可以去掉 flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/conf/example.conf \ -Dflume.root.logger=INFO,console 使用telnet測試: [hadoop@hadoop001 ~]$ telnet localhost 44444 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hello OK
Exec Source采集文件數據到hdfs
生產的架構是: log數據=》flume=》hdfs,這里我們采用簡單的Exec Source通過tail -F 數據文件進行數據采集。
# example.conf: A single-node Flume configuration # Name the components on this agent exec-hdfs-agent.sources = exec-source exec-hdfs-agent.sinks = hdfs-sink exec-hdfs-agent.channels = memory-channel # Describe/configure the source exec-hdfs-agent.sources.exec-source.type = exec exec-hdfs-agent.sources.exec-source.command = tail -F /home/hadoop/data/access_10000.log exec-hdfs-agent.sources.exec-source.shell = /bin/sh -c # Describe the sink exec-hdfs-agent.sinks.hdfs-sink.type = hdfs exec-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:9000/flume/exec exec-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = DataStream exec-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text # Use a channel which buffers events in memory exec-hdfs-agent.channels.memory-channel.type = memory exec-hdfs-agent.channels.memory-channel.capacity = 1000 exec-hdfs-agent.channels.memory-channel.transactionCapacity = 100 # Bind the source and sink to the channel exec-hdfs-agent.sources.exec-source.channels = memory-channel exec-hdfs-agent.sinks.hdfs-sink.channel = memory-channel
- 寫hdfs文件時先生成創建一個后綴名稱為.tmp的文件,當寫完成時,去掉了.tmp
- 缺點:
- 雖然此種tail方式可以將日志數據采集到hdfs,但是tail -F進程掛了咋辦,還是會丟數據!生產上是行不通的。無法做到高可用。
- 其次上面的采集流程並未解決生成大量小文件的問題,無法做到高可靠
- tail只能監控一個文件,生產中更多的是監控一個文件夾。不能滿足需求
-
使用Spooling Directory Source采集文件夾數據到hdfs
- 寫到HDFS上的文件大小最好是100M左右,比blocksize的值(128M)略低
- 一般使用rolllnterval(時間)、rollSize(大小)來控制文件的生成,哪個先觸發就會生成HDFS文件,將根據條數的roll關閉。
- rollSize控制的大小是指的壓縮前的,所以若hdfs文件使用了壓縮,需調大rollsize的大小
- 當文件夾下的某個文件被采集到hdfs上,會有個。complete的標志
- 使用Spooling Directory Source采集文件數據時若該文件數據已經被采集,再對該文件做修改是會報錯的停止的,其次若放進去一個已經完成采集的同名數據文件也是會報錯停止的
- 寫HDFS數據可按照時間分區,注意改時間刻度內無數據則不會生成該時間文件夾
- 生成的文件名稱默認是前綴+時間戳,這個是可以更改的。
# example.conf: A single-node Flume configuration # Name the components on this agent spool-hdfs-agent.sources = spool-source spool-hdfs-agent.sinks = hdfs-sink spool-hdfs-agent.channels = memory-channel # Describe/configure the source spool-hdfs-agent.sources.spool-source.type = spooldir spool-hdfs-agent.sources.spool-source.spoolDir = /home/hadoop/data/flume/spool/input # Describe the sink spool-hdfs-agent.sinks.hdfs-sink.type = hdfs spool-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:9000/flume/spool/%Y%m%d%H%M spool-hdfs-agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true spool-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream spool-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text spool-hdfs-agent.sinks.hdfs-sink.hdfs.codeC = gzip spool-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix = wsk spool-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval = 30 spool-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize = 100000000 spool-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount = 0 # Use a channel which buffers events in memory spool-hdfs-agent.channels.memory-channel.type = memory spool-hdfs-agent.channels.memory-channel.capacity = 1000 spool-hdfs-agent.channels.memory-channel.transactionCapacity = 100 # Bind the source and sink to the channel spool-hdfs-agent.sources.spool-source.channels = memory-channel spool-hdfs-agent.sinks.hdfs-sink.channel = memory-channel
上述的Spooling Directory Source配置雖然解決了小文件過多以及監控多個文件的問題,但是依舊有如下問題。
- 問題1:雖然能監控一個文件夾,但是無法監控遞歸的文件夾中的數據
- 問題2:若采集時Flume掛了,無法保證重啟時還從之前文件讀取的那一行繼續采集數據
基於以上兩個問題,此凡是生產也是不可接受的
-
(生產版本)使用Taildir Source采集文件夾數據到hdfs
- Taildir Source 是Apache flume1.7新推出的,但是CDH Flume1.6做了集成
- Taildir Source是高可靠(reliable)的source,他會實時的將文件偏移量寫到json文件中並保存到磁盤。下次重啟Flume時會讀取Json文件獲取文件O偏移量,然后從之前的位置讀取數據,保證數據零丟失
- taildir Source可同時監控多個文件夾以及文件。即使文件在實時寫入數據。
- Taildir Source也是無法采集遞歸文件下的數據,這需要改造源碼
- Taildir Source監控一個文件夾下的所有文件一定要用.*正則
# example.conf: A single-node Flume configuration # Name the components on this agent taildir-hdfs-agent.sources = taildir-source taildir-hdfs-agent.sinks = hdfs-sink taildir-hdfs-agent.channels = memory-channel # Describe/configure the source taildir-hdfs-agent.sources.taildir-source.type = TAILDIR taildir-hdfs-agent.sources.taildir-source.filegroups = f1 taildir-hdfs-agent.sources.taildir-source.filegroups.f1 = /home/hadoop/data/flume/taildir/input/.* taildir-hdfs-agent.sources.taildir-source.positionFile = /home/hadoop/data/flume/taildir/taildir_position/taildir_position.json # Describe the sink taildir-hdfs-agent.sinks.hdfs-sink.type = hdfs taildir-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:9000/flume/taildir/%Y%m%d%H%M taildir-hdfs-agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true taildir-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream taildir-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text taildir-hdfs-agent.sinks.hdfs-sink.hdfs.codeC = gzip taildir-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix = wsk taildir-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval = 30 taildir-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize = 100000000 taildir-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount = 0 # Use a channel which buffers events in memory taildir-hdfs-agent.channels.memory-channel.type = memory taildir-hdfs-agent.channels.memory-channel.capacity = 1000 taildir-hdfs-agent.channels.memory-channel.transactionCapacity = 100 # Bind the source and sink to the channel taildir-hdfs-agent.sources.taildir-source.channels = memory-channel taildir-hdfs-agent.sinks.hdfs-sink.channel = memory-channel