Flume使用(案例分析)


Flume官方文檔

Usage: bin/flume-ng <command> [options]...

commands:
  help                      display this help text
  agent                     run a Flume agent

global options:
  --conf,-c <conf>          use configs in <conf> directory
  -Dproperty=value          sets a Java system property value

agent options:
  --name,-n <name>          the name of this agent (required)
  --conf-file,-f <file>     specify a config file (required if -z missing)

eg:
bin/flume-ng agent --conf conf --name agent-test --conf-file test.conf -Dflume.root.logger=DEBUG,console
bin/flume-ng agent -c conf -n agent-test -f test.conf -Dflume.root.logger=DEBUG,console

一個不能再簡單的例子

1.編輯 Conf 范例 (官網和 conf 目錄下都有)

# example.conf: A single-node Flume configuration

# 1.定義三個組件的名稱
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 2.配置Source(從哪里連接Sources)
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = cen-ubuntu
a1.sources.r1.port = 44444

# 3.配置Sink(主要用於輸出日志信息)
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 1024

# 4.配置Channel(使用存儲當做管道)
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 5.綁定三個組件
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.安裝 netcat (一個可以傳輸文件,信息的網絡工具)來發送接收信息

$ sudo apt-get install netcat

3.運行實時 flume 實時抓取數據(監控 端口 )

bin/flume-ng agent --conf conf --name a1 --conf-file conf/a1.conf -Dflume.root.logger=DEBUG,console

4.通過 shell 查看端口是否開啟成功

netstat -tnlp

5.通過 telnet 向該端口發送數據

telnet cen-ubuntu 44444

6.若Flume接收到數據則表示成功

Event: { headers:{} body: 6E 69 68 61 6F 20 08 0D                         nihao .. }

各種各樣的 Sources

Exec Source 通過執行命令行

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure

Spooling Directory Source 監控一個目錄的文件變化

Kafka Source

Syslog Sources 收集系統日志

HTTP Source 通過HTTP協議供互聯網下載服務器的數據

NetCat Source

各種各樣的Channels

Memory Channel

Kafka Channel

File Channel 存在文件中

各種各樣的Sinks

HDFS Sink

Hive Sink

HBase Siinks(HBase Sink ; AsyncHBaseSink)

MorphlineSolrSink 一個ELT工具(Extract, transform, load)

ElasticSearchSink 一個基於Lucene的搜索服務器



案例1:

收集Hive運行的目錄到hdfs文件系統

分析:使用 Exec 來監控文件實時性較高,但可靠性較差,當系統命令中斷后,數據丟失,或重新讀取,數據安全性無法得到保障,生產環境中不能使用;使用文件緩存比內存來得更安全

  • Source: Exec Source
    tail -f /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
  • Channel: Memory Channel
  • Sink: HDFS Sink
    /user/cen/flume/hive-log

1.編寫 agent 程序

# example.conf: A single-node Flume configuration

# 1.定義三個組件的名稱
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# 2.配置Source(從哪里連接Sources)
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log

# 3.配置Sink(主要用於輸出日志信息)
# Describe the sink
a2.sinks.k2.type = hdfs
# 非高可用的 namenode 指定 host (注1,注2)
a2.sinks.k2.hdfs.path = hdfs://cen-ubuntu:8020/user/cen/flume/hive-log
# 設置前綴
a2.sinks.k2.hdfs.filePrefix = events-
# 數據格式(不壓縮的文本數據)
a2.sinks.k2.hdfs.fileType = DataStream 
# 存儲格式
a2.sinks.k2.hdfs.writeFormat = Text
# 每次寫的event數
a2.sinks.k2.hdfs.batchSize = 100
# 設置文件滾動的參數(配合下面一項使用)
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 1024
a2.sinks.k2.hdfs.rollCount = 0
# 參考http://doc.okbase.net/chiweitree/archive/126197.html
a2.sinks.k2.hdfs.minBlockReplicas=1

# 4.配置Channel(使用存儲當做管道)
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# 5.綁定三個組件
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

2.添加相應的jar依賴包(使用 find /dir/dir -name 'filename' 即可輕松找到)

commons-configuration-1.6.jar
hadoop-common-2.5.0-cdh5.3.6.jar
hadoop-auth-2.5.0-cdh5.3.6.jar
hadoop-hdfs-2.5.0-cdh5.3.6.jar

3.執行

bin/flume-ng agent --conf conf --name a2 --conf-file conf/flume-tail.conf -Dflume.root.logger=DEBUG,console


案例二:

  • 收集Hive運行的目錄到hdfs文件系統

  • Source: Spooling Directory Source
    /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/logs/
  • Channel: File Channel
  • Sink: HDFS Sink
    /user/cen/flume/hive-log
    分析:Spooling Directory Source 通過監控文件夾的新增文件來實現日志信息收集。實際生產環境結合 log4j 來使用,日志文件傳輸完成后會修改其后綴名,添加.COMPLETED 后綴

1.編寫 agent 程序

# example.conf: A single-node Flume configuration

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/datas/flume/
a3.sources.r3.ignorePattern = (.)*.log$
# 監控后的文件后綴
a3.sources.r3.fileSuffix = .deleteable

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://cen-ubuntu:8020/user/cen/flume/spool-file-hdfs/%Y%m%d
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.filePrefix = events-
a3.sinks.k3.hdfs.fileType = DataStream 
a3.sinks.k3.hdfs.writeFormat = Text
a3.sinks.k3.hdfs.batchSize = 10

# Use a channel which buffers events in file
a3.channels.c3.type = file
# 臨時文件存儲目錄(可選)
a3.channels.c3.checkpointDir = /opt/cdh5.3.6/flume-1.5.0-cdh5.3.6/data/filechanel/cheakpoint
a3.channels.c3.dataDirs = /opt/cdh5.3.6/flume-1.5.0-cdh5.3.6/data/filechanel/data

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

2.執行

bin/flume-ng agent --conf conf --name a3 --conf-file conf/spooling-file-hdfs.conf -Dflume.root.logger=DEBUG,console

3.運行結果

  • 被讀取過的文件從背上了.delectable 的罪名
  • .log 結尾的文件不會被讀取
  • HDFS文件系統如實出現了被讀取的文件,且按日期分文件夾存儲

注1:HDFS 的 HA 配置

1.添加配置文件 hdfs-site.xml core-site.xml 到目錄 conf 下

2.修改 hdfs 的路徑

# 若 namenode 為HA 
# a2.sinks.k2.hdfs.path = hdfs://ns1/user/cen/flume/hive-log

注2:特別的,可以設置一定規則(如按時間%Y%m%d)來創建文件目錄,詳情見官方文檔

# 如官方文檔所說明,關於時間有關的參數需要在 events 的頭中加入服務器的時間這個字段,添加參數如下
hdfs.useLocalTimeStamp = true

注3:使用文件

/bin/sqoop --options-file /opt/datas/filename


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM