Flume官方文檔
Usage: bin/flume-ng <command> [options]...
commands:
help display this help text
agent run a Flume agent
global options:
--conf,-c <conf> use configs in <conf> directory
-Dproperty=value sets a Java system property value
agent options:
--name,-n <name> the name of this agent (required)
--conf-file,-f <file> specify a config file (required if -z missing)
eg:
bin/flume-ng agent --conf conf --name agent-test --conf-file test.conf -Dflume.root.logger=DEBUG,console
bin/flume-ng agent -c conf -n agent-test -f test.conf -Dflume.root.logger=DEBUG,console
一個不能再簡單的例子
1.編輯 Conf 范例 (官網和 conf 目錄下都有)
# example.conf: A single-node Flume configuration
# 1.定義三個組件的名稱
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2.配置Source(從哪里連接Sources)
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = cen-ubuntu
a1.sources.r1.port = 44444
# 3.配置Sink(主要用於輸出日志信息)
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 1024
# 4.配置Channel(使用存儲當做管道)
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 5.綁定三個組件
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.安裝 netcat (一個可以傳輸文件,信息的網絡工具)來發送接收信息
$ sudo apt-get install netcat
3.運行實時 flume 實時抓取數據(監控 端口 )
bin/flume-ng agent --conf conf --name a1 --conf-file conf/a1.conf -Dflume.root.logger=DEBUG,console
4.通過 shell 查看端口是否開啟成功
netstat -tnlp
5.通過 telnet 向該端口發送數據
telnet cen-ubuntu 44444
6.若Flume接收到數據則表示成功
Event: { headers:{} body: 6E 69 68 61 6F 20 08 0D nihao .. }
各種各樣的 Sources
Exec Source 通過執行命令行
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
Spooling Directory Source 監控一個目錄的文件變化
Kafka Source
Syslog Sources 收集系統日志
HTTP Source 通過HTTP協議供互聯網下載服務器的數據
NetCat Source
各種各樣的Channels
Memory Channel
Kafka Channel
File Channel 存在文件中
各種各樣的Sinks
HDFS Sink
Hive Sink
HBase Siinks(HBase Sink ; AsyncHBaseSink)
MorphlineSolrSink 一個ELT工具(Extract, transform, load)
ElasticSearchSink 一個基於Lucene的搜索服務器
案例1:
收集Hive運行的目錄到hdfs文件系統
分析:使用 Exec 來監控文件實時性較高,但可靠性較差,當系統命令中斷后,數據丟失,或重新讀取,數據安全性無法得到保障,生產環境中不能使用;使用文件緩存比內存來得更安全
- Source: Exec Source
tail -f /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log - Channel: Memory Channel
- Sink: HDFS Sink
/user/cen/flume/hive-log
1.編寫 agent 程序
# example.conf: A single-node Flume configuration
# 1.定義三個組件的名稱
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# 2.配置Source(從哪里連接Sources)
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
# 3.配置Sink(主要用於輸出日志信息)
# Describe the sink
a2.sinks.k2.type = hdfs
# 非高可用的 namenode 指定 host (注1,注2)
a2.sinks.k2.hdfs.path = hdfs://cen-ubuntu:8020/user/cen/flume/hive-log
# 設置前綴
a2.sinks.k2.hdfs.filePrefix = events-
# 數據格式(不壓縮的文本數據)
a2.sinks.k2.hdfs.fileType = DataStream
# 存儲格式
a2.sinks.k2.hdfs.writeFormat = Text
# 每次寫的event數
a2.sinks.k2.hdfs.batchSize = 100
# 設置文件滾動的參數(配合下面一項使用)
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 1024
a2.sinks.k2.hdfs.rollCount = 0
# 參考http://doc.okbase.net/chiweitree/archive/126197.html
a2.sinks.k2.hdfs.minBlockReplicas=1
# 4.配置Channel(使用存儲當做管道)
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# 5.綁定三個組件
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
2.添加相應的jar依賴包(使用 find /dir/dir -name 'filename' 即可輕松找到)
commons-configuration-1.6.jar
hadoop-common-2.5.0-cdh5.3.6.jar
hadoop-auth-2.5.0-cdh5.3.6.jar
hadoop-hdfs-2.5.0-cdh5.3.6.jar
3.執行
bin/flume-ng agent --conf conf --name a2 --conf-file conf/flume-tail.conf -Dflume.root.logger=DEBUG,console
案例二:
-
收集Hive運行的目錄到hdfs文件系統
- Source: Spooling Directory Source
/opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/logs/ - Channel: File Channel
- Sink: HDFS Sink
/user/cen/flume/hive-log
分析:Spooling Directory Source 通過監控文件夾的新增文件來實現日志信息收集。實際生產環境結合 log4j 來使用,日志文件傳輸完成后會修改其后綴名,添加.COMPLETED 后綴
1.編寫 agent 程序
# example.conf: A single-node Flume configuration
# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/datas/flume/
a3.sources.r3.ignorePattern = (.)*.log$
# 監控后的文件后綴
a3.sources.r3.fileSuffix = .deleteable
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://cen-ubuntu:8020/user/cen/flume/spool-file-hdfs/%Y%m%d
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.filePrefix = events-
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.writeFormat = Text
a3.sinks.k3.hdfs.batchSize = 10
# Use a channel which buffers events in file
a3.channels.c3.type = file
# 臨時文件存儲目錄(可選)
a3.channels.c3.checkpointDir = /opt/cdh5.3.6/flume-1.5.0-cdh5.3.6/data/filechanel/cheakpoint
a3.channels.c3.dataDirs = /opt/cdh5.3.6/flume-1.5.0-cdh5.3.6/data/filechanel/data
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
2.執行
bin/flume-ng agent --conf conf --name a3 --conf-file conf/spooling-file-hdfs.conf -Dflume.root.logger=DEBUG,console
3.運行結果
- 被讀取過的文件從背上了.delectable 的罪名
- .log 結尾的文件不會被讀取
- HDFS文件系統如實出現了被讀取的文件,且按日期分文件夾存儲
注1:HDFS 的 HA 配置
1.添加配置文件 hdfs-site.xml core-site.xml 到目錄 conf 下
2.修改 hdfs 的路徑
# 若 namenode 為HA
# a2.sinks.k2.hdfs.path = hdfs://ns1/user/cen/flume/hive-log
注2:特別的,可以設置一定規則(如按時間%Y%m%d)來創建文件目錄,詳情見官方文檔
# 如官方文檔所說明,關於時間有關的參數需要在 events 的頭中加入服務器的時間這個字段,添加參數如下
hdfs.useLocalTimeStamp = true
注3:使用文件
/bin/sqoop --options-file /opt/datas/filename