目錄
前言
什么是Flume?
Apache Flume是一個分布式,可靠且可用的系統,用於有效地從許多不同的源收集,聚合和移動大量日志數據到集中式數據存儲。
Apache Flume 的使用不僅限於日志數據聚合。由於數據源是可定制的,因此 Flume 可用於傳輸大量事件數據,包括但不限於網絡流量數據,社交媒體生成的數據,電子郵件消息以及幾乎任何可能的數據源。
Flume的特點
Flume 是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume 提供對數據進行簡單處理,並寫到各種數據接受方(比如文本、HDFS、Hbase等)的能力 。
Flume 的數據流由事件(Event)貫穿始終。事件是 Flume 的基本數據單位,它攜帶日志數據(字節數組形式)並且攜帶有頭信息,這些 Event 由 Agent 外部的 Source 生成,當 Source 捕獲事件后會進行特定的格式化,然后 Source 會把事件推入(單個或多個) Channel 中。你可以把 Channel 看作是一個緩沖區,它將保存事件直到 Sink 處理完該事件。Sink 負責持久化日志或者把事件推向另一個 Source。
Flume的可靠性
當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。Flume 的事件是通過 Agent 在 Channel 中進行的。然后將事件傳遞到流中的下一個 Agent 或終端存儲庫(如HDFS)。
只有將事件存儲在下一個 Agent 的 Channel 或終端存儲庫中后,才會從 Channel 中刪除這些事件。
這就是 Flume 中的單跳消息傳遞語義如何提供流的端到端可靠性。可確保事件在流中從一個點到另一個點可靠地傳遞。在多條流的情況下,來自前一條的 Sink 和來自下一條的 Source 都運行其事務以確保數據安全地存儲在下一個的Channel 中。
Flume的可恢復性
還是靠 Channel。推薦使用 FileChannel ,事件持久化在本地文件系統里(性能較差)。 而內存通道,它只是將事件存儲在內存中的隊列中,雖然更快,但是當 Agent 進程死亡時仍然留在內存通道中的任何事件都無法恢復。
Flume的一些核心概念
Agent使用JVM運行Flume。每台機器運行一個Agent,但是可以在一個Agent中包含多個Sources和Sinks。Client生產數據,運行在一個獨立的線程。Source從Client收集數據,傳遞給Channel。Sink從Channel收集數據,運行在一個獨立線程。Channel連接Sources和Sinks,這個有點像一個隊列。Events可以是日志記錄、 avro 對象等。
Flume 以 Agent 為最小的獨立運行單位。一個 Agent 就是一個 JVM。單 Agent 由 Source、Sink 和 Channel三大組件構成。
Flume 的每個組件設置屬性type,以了解它需要什么類型的對象。每個源,接收器和通道類型都有自己的一組屬性,使其能夠按預期運行。
值得注意的是,Flume 提供了大量內置的 Source、Channel 和 Sink 類型。不同類型的 Source, Channel 和 Sink 可以自由組合。組合方式基於用戶設置的配置文件,非常靈活。比如:Channel 可以把事件暫存在內存里,也可以持久化到本地硬盤上。Sink 可以把日志寫入 HDFS, HBase,甚至是另外一個 Source 等等。Flume支持用戶建立多級流,也就是說,多個agent可以協同工作。

Flume的官方網站在哪里?
Flume在哪里下載以及如何安裝?
本文的運行環境,是基於文章<Hadoop 3.1.2(HA)+Zookeeper3.4.13+Hbase1.4.9(HA)+Hive2.3.4+Spark2.4.0(HA)高可用集群搭建>,關於 axel 工具的安裝,也請參考文章。
[root@c0 _src]# pwd
/home/work/_src
[root@c0 _src]# axel -n 10 -o /home/work/_src/flume.tar.gz http://mirror.bit.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
[root@c0 _src]# tar -xzvf flume.tar.gz
[root@c0 _src]# mv apache-flume-1.9.0-bin /home/work/_app/
設置環境變量
echo "" >> /etc/bashrc
echo "# Flume 1.9.0" >> /etc/bashrc
echo "export FLUME_HOME=/home/work/_app/apache-flume-1.9.0-bin/" >> /etc/bashrc
echo "export PATH=\$PATH:\$FLUME_HOME/bin" >> /etc/bashrc
source /etc/bashrc
驗證是否安裝成功
[root@c0 _src]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
Flume的案例
案例1:Avro
偵聽 Avro 端口並從外部 Avro 客戶端流接收事件。Avro 可以發送一個給定的文件給 Flume,Avro 源使用 AVRO RPC 機制。
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/avro.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/avro.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
## 要監聽的主機名或IP地址
a1.sources.r1.bind = 0.0.0.0
## 要監聽的端口號
a1.sources.r1.port = 4141
# k1 是 sink 的名稱,設置 sink的類型
a1.sinks.k1.type = logger
# c1 是 channel 的名稱,設置 channel的類型是內存。事件存儲在具可配置最大大小的內存中隊列中。它非常適合需要更高吞吐量的流量,並且在代理發生故障時准備丟失分階段數據。
a1.channels.c1.type = memory
## Channel 中存儲的最大事件數
a1.channels.c1.capacity = 1000
## 每個事件 Channel 從 Source 或提供給 Sink 的最大事件數
a1.channels.c1.transactionCapacity = 100
## 定義byteCapacity與通道中所有事件的估計總大小之間的緩沖區百分比,以計算標頭中的數據。
a1.channels.c1.byteCapacityBufferPercentage = 20
## 允許的最大內存總字節數,作為此通道中所有事件的總和。該實現僅計算Event主體,這也是提供byteCapacityBufferPercentage配置參數的原因。默認為計算值,等於JVM可用的最大內存的80%(即命令行傳遞的-Xmx值的80%)。請注意,如果在單個JVM上有多個內存通道,並且它們碰巧保持相同的物理事件(即,如果您使用來自單個源的復制通道選擇器),那么這些事件大小可能會被重復計算以用於通道byteCapacity
a1.channels.c1.byteCapacity = 800000
# 綁定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動 Flume 的 Agent 名稱是 a1
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 00:49:38,082 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: 0.0.0.0, port: 4141 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@365969a6 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2019-03-11 00:49:38,086 INFO node.Application: Starting Channel c1
2019-03-11 00:49:38,086 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
2019-03-11 00:49:38,134 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2019-03-11 00:49:38,134 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-03-11 00:49:38,587 INFO node.Application: Starting Sink k1
2019-03-11 00:49:38,588 INFO node.Application: Starting Source r1
2019-03-11 00:49:38,588 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 4141 }...
2019-03-11 00:49:38,862 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 00:49:38,862 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 00:49:38,867 INFO source.AvroSource: Avro source r1 started.
創建指定文件
[root@c0 ~]# echo "hello mshk.top" > $FLUME_HOME/log.00
使用 avro-client 發送文件
[root@c0 ~]# flume-ng avro-client -c . -H c0 -p 4141 -F $FLUME_HOME/log.00
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/work/_app/apache-flume-1.9.0-bin/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/work/_app/hadoop-3.1.2/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-03-11 00:50:46,621 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
Flume發行版中包含的avro-client可以使用avro RPC機制將給定文件發送到Flume Avro
在 c0 的控制台,可以看到以下信息,注意最后一行:
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 00:49:38,082 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: 0.0.0.0, port: 4141 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@365969a6 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2019-03-11 00:49:38,086 INFO node.Application: Starting Channel c1
2019-03-11 00:49:38,086 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
2019-03-11 00:49:38,134 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2019-03-11 00:49:38,134 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-03-11 00:49:38,587 INFO node.Application: Starting Sink k1
2019-03-11 00:49:38,588 INFO node.Application: Starting Source r1
2019-03-11 00:49:38,588 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 4141 }...
2019-03-11 00:49:38,862 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 00:49:38,862 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 00:49:38,867 INFO source.AvroSource: Avro source r1 started.
2019-03-11 00:55:22,708 INFO ipc.NettyServer: [id: 0x20d51aed, /10.0.0.100:58786 => /10.0.0.100:4141] OPEN
2019-03-11 00:55:22,710 INFO ipc.NettyServer: [id: 0x20d51aed, /10.0.0.100:58786 => /10.0.0.100:4141] BOUND: /10.0.0.100:4141
2019-03-11 00:55:22,710 INFO ipc.NettyServer: [id: 0x20d51aed, /10.0.0.100:58786 => /10.0.0.100:4141] CONNECTED: /10.0.0.100:58786
2019-03-11 00:55:22,934 INFO ipc.NettyServer: [id: 0x20d51aed, /10.0.0.100:58786 :> /10.0.0.100:4141] DISCONNECTED
2019-03-11 00:55:22,934 INFO ipc.NettyServer: [id: 0x20d51aed, /10.0.0.100:58786 :> /10.0.0.100:4141] UNBOUND
2019-03-11 00:55:22,934 INFO ipc.NettyServer: [id: 0x20d51aed, /10.0.0.100:58786 :> /10.0.0.100:4141] CLOSED
2019-03-11 00:55:22,934 INFO ipc.NettyServer: Connection to /10.0.0.100:58786 disconnected.
2019-03-11 00:55:26,880 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70 hello mshk.top }
案例2:Spool
Spool 監測配置的目錄下新增的文件,並將文件中的數據讀取出來。需要注意兩點:
1) 拷貝到 Spool 目錄下的文件不可以再打開編輯。
2)Spool 目錄下不可包含相應的子目錄
與Exec Source不同,即使 Flume 重新啟動或被殺死,Spool 也是可靠的並且不會遺漏數據。
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/spool.conf 文件編輯並保存,內容如下
[root@c0 ~]# mkdir -p $FLUME_HOME/logs
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/spool.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
## 讀取文件的目錄
a1.sources.r1.spoolDir = /home/work/_app/apache-flume-1.9.0-bin/logs
## 是否添加存儲絕對路徑文件名的標頭。
a1.sources.r1.fileHeader = true
## 反序列化器使用的字符集,將輸入文件視為文本。
a1.sources.r1.inputCharset = UTF-8
# k1 是 sink 的名稱,設置 sink的類型
a1.sinks.k1.type = logger
# c1 是 channel 的名稱,設置 channel的類型是內存。事件存儲在具可配置最大大小的內存中隊列中。它非常適合需要更高吞吐量的流量,並且在代理發生故障時准備丟失分階段數據。
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 綁定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動 Flume 的 Agent 名稱是 a1
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-03-11 01:58:04,582 INFO node.Application: Starting Sink k1
2019-03-11 01:58:04,587 INFO node.Application: Starting Source r1
2019-03-11 01:58:04,588 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/work/_app/apache-flume-1.9.0-bin/logs
2019-03-11 01:58:04,619 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 01:58:04,619 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
新打開一個窗口,輸入命令追加文件到 /home/hadoop/flume-1.5.0-bin/logs 目錄
[root@c0 ~]# echo "spool test1" > /home/work/_app/apache-flume-1.9.0-bin/logs/spool_text.log
在 c0 的控制台,可以看到以下相關信息:
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-03-11 01:58:04,582 INFO node.Application: Starting Sink k1
2019-03-11 01:58:04,587 INFO node.Application: Starting Source r1
2019-03-11 01:58:04,588 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/work/_app/apache-flume-1.9.0-bin/logs
2019-03-11 01:58:04,619 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 01:58:04,619 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 02:00:17,055 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2019-03-11 02:00:17,055 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/work/_app/apache-flume-1.9.0-bin/logs/spool_text.log to /home/work/_app/apache-flume-1.9.0-bin/logs/spool_text.log.COMPLETED
2019-03-11 02:00:18,617 INFO sink.LoggerSink: Event: { headers:{file=/home/work/_app/apache-flume-1.9.0-bin/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 spool test1 }
案例3:Exec
Exec 在啟動時運行給定的 Unix 命令,並期望該進程在標准輸出上連續生成數據(stderr被簡單地丟棄,除非屬性logStdErr設置為true)。
如果進程因任何原因退出,則源也會退出並且不會生成其他數據。這意味着 cat [named pipe] 或 tail -F [file] 等配置將產生所需的結果,而日期可能不會 - 前兩個命令產生數據流,而后者產生單個事件並退出。
下面的實例中,EXEC 執行一個給定的命令獲得輸出的源,如果要使用 tail 命令,必選使得 file 足夠大才能看到輸出內容
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/exec_tail.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/exec_tail.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
## 要執行的命令
a1.sources.r1.command = tail -F /home/work/_app/apache-flume-1.9.0-bin/logs/log_exec_tail
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = logger
# c1 是 channel 的名稱,設置 channel的類型是內存。事件存儲在具可配置最大大小的內存中隊列中。它非常適合需要更高吞吐量的流量,並且在代理發生故障時准備丟失分階段數據。
a1.channels.c1.type = memory
## Channel 中存儲的最大事件數
a1.channels.c1.capacity = 1000
## 每個事件 Channel 從 Source 或提供給 Sink 的最大事件數
a1.channels.c1.transactionCapacity = 100
# 綁定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動 Flume 的 Agent 名稱是 a1
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 03:38:47,979 INFO source.ExecSource: Exec source starting with command: tail -F /home/work/_app/apache-flume-1.9.0-bin/logs/log_exec_tail
2019-03-11 03:38:47,980 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 03:38:47,980 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
生成足夠多的內容在文件里
[root@c0 ~]# for i in {1..100};do echo "exec tail$i" >> /home/work/_app/apache-flume-1.9.0-bin/logs/log_exec_tail;echo $i;sleep 0.1;done
在 c0 的控制台,可以看到以下信息:
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 03:38:47,979 INFO source.ExecSource: Exec source starting with command: tail -F /home/work/_app/apache-flume-1.9.0-bin/logs/log_exec_tail
2019-03-11 03:38:47,980 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 03:38:47,980 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 03:48:30,118 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 31 exec tail1 }
2019-03-11 03:48:30,118 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 32 exec tail2 }
2019-03-11 03:48:30,118 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 33 exec tail3 }
2019-03-11 03:48:30,119 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 34 exec tail4 }
...
2019-03-11 03:48:40,135 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 38 exec tail98 }
2019-03-11 03:48:40,135 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 39 exec tail99 }
2019-03-11 03:48:40,135 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 31 30 30 exec tail100 }
案例4:Syslogtcp
Syslogtcp 監聽 TCP 的端口做為數據源
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/syslog_tcp.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/syslog_tcp.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = syslogtcp
## 要綁定監聽的端口號
a1.sources.r1.port = 5140
## 要綁定的主機名或IP地址
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = logger
# c1 是 channel 的名稱,設置 channel的類型是內存。事件存儲在具可配置最大大小的內存中隊列中。它非常適合需要更高吞吐量的流量,並且在代理發生故障時准備丟失分階段數據。
a1.channels.c1.type = memory
## Channel 中存儲的最大事件數
a1.channels.c1.capacity = 1000
## 每個事件 Channel 從 Source 或提供給 Sink 的最大事件數
a1.channels.c1.transactionCapacity = 100
# 綁定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動 Flume 的 Agent 名稱是 a1
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 04:45:11,383 INFO source.SyslogTcpSource: Syslog TCP Source starting...
2019-03-11 04:45:11,403 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 04:45:11,403 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
測試產生syslog
[root@c0 ~]# echo "hello idoall.org syslog" | nc localhost 5140
如果
nc命令不存在,可以使用yum install nmap-ncat.x86_64 -y安裝
在 c0 的控制台,可以看到以下信息:
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 04:45:11,383 INFO source.SyslogTcpSource: Syslog TCP Source starting...
2019-03-11 04:45:11,403 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 04:45:11,403 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 04:48:39,679 WARN source.SyslogUtils: Event created from Invalid Syslog data.
2019-03-11 04:48:39,688 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }
案例5:JSONHandler
可以處理以 JSON 格式表示的事件,並支持UTF-8,UTF-16和UTF-32字符集。
處理程序接受一個事件數組(即使只有一個事件,事件必須在數組中發送),並根據請求中指定的編碼將它們轉換為 Flume 事件。如果未指定編碼,則假定為 UTF-8。
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/post_json.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/post_json.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 8888
a1.sources.r1.channels = c1
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = logger
# c1 是 channel 的名稱,設置 channel的類型是內存。事件存儲在具可配置最大大小的內存中隊列中。它非常適合需要更高吞吐量的流量,並且在代理發生故障時准備丟失分階段數據。
a1.channels.c1.type = memory
## Channel 中存儲的最大事件數
a1.channels.c1.capacity = 1000
## 每個事件 Channel 從 Source 或提供給 Sink 的最大事件數
a1.channels.c1.transactionCapacity = 100
# 綁定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動 Flume 的 Agent 名稱是 a1
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 04:54:09,997 INFO server.Server: Started @1582ms
2019-03-11 04:54:09,998 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 04:54:09,998 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
生成 JSON 格式的 POST request
[root@c0 ~]# curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "mshk.top body"}]' http://localhost:8888
在 c0 的控制台,可以看到以下信息:
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 04:54:09,997 INFO server.Server: Started @1582ms
2019-03-11 04:54:09,998 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 04:54:09,998 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 04:55:14,560 INFO sink.LoggerSink: Event: { headers:{a=a1, b=b1} body: 6D 73 68 6B 2E 74 6F 70 5F 62 6F 64 79 mshk.top body }
案例6:Hadoop sink
此接收器將事件寫入 Hadoop 分布式文件系統(HDFS)。
目前支持創建文本和序列文件。支持兩種文件類型的壓縮。
可以根據經過的時間或數據大小或事件數量定期滾動文件(關閉當前文件並創建新文件)。
它還根據事件源自的時間戳或機器等屬性對數據進行分區/分區。HDFS 目錄路徑可能包含格式轉義序列,將由 HDFS 接收器替換,以生成用於存儲事件的目錄/文件名。
以下是支持的轉義序列:
| Alias | Description |
|---|---|
| %{host} | Substitute value of event header named “host”. Arbitrary header names are supported. |
| %t | Unix time in milliseconds |
| %a | locale’s short weekday name (Mon, Tue, ...) |
| %A | locale’s full weekday name (Monday, Tuesday, ...) |
| %b | locale’s short month name (Jan, Feb, ...) |
| %B | locale’s long month name (January, February, ...) |
| %c | locale’s date and time (Thu Mar 3 23:05:25 2005) |
| %d | day of month (01) |
| %e | day of month without padding (1) |
| %D | date; same as %m/%d/%y |
| %H | hour (00..23) |
| %I | hour (01..12) |
| %j | day of year (001..366) |
| %k | hour ( 0..23) |
| %m | month (01..12) |
| %n | month without padding (1..12) |
| %M | minute (00..59) |
| %p | locale’s equivalent of am or pm |
| %s | seconds since 1970-01-01 00:00:00 UTC |
| %S | second (00..59) |
| %y | last two digits of year (00..99) |
| %Y | year (2010) |
| %z | +hhmm numeric timezone (for example, -0400) |
| %[localhost] | Substitute the hostname of the host where the agent is running |
| %[IP] | Substitute the IP address of the host where the agent is running |
| %[FQDN] | Substitute the canonical hostname of the host where the agent is running |
轉義字符串%[localhost],%[IP]和%[FQDN]都依賴於Java獲取主機名的能力,這在某些網絡環境中可能會失敗。
使用此接收器需要安裝 Hadoop,以便 Flume 可以使用 Hadoop jar 與 HDFS 集群進行通信。
其中關於 Hadoop 部分的安裝部署,請參考文章<Hadoop 3.1.2(HA)+Zookeeper3.4.13+Hbase1.4.9(HA)+Hive2.3.4+Spark2.4.0(HA)高可用集群搭建>
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/hdfs_sink.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/hdfs_sink.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://c1:8020/flume/syslogtcp
## 是否使用本地時間
a1.sinks.k1.hdfs.useLocalTimeStamp = true
## filePrefix 文件名稱前綴
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H
## fileSuffix 文件后綴
a1.sinks.k1.hdfs.fileSuffix = .log
## minBlockReplicas 指定每個HDFS塊的最小副本數。如果未指定,則它來自類路徑中的默認Hadoop配置。
a1.sinks.k1.hdfs.minBlockReplicas = 1
## DataStream不會壓縮輸出文件,默認為SequenceFile
a1.sinks.k1.hdfs.fileType = DataStream
## writeFormat 序列文件記錄的格式。在使用Flume創建數據文件之前設置為Text,否則 Apache Impala 或Apache Hive無法讀取這些文件。
a1.sinks.k1.hdfs.writeFormat = Text
## rollInterval 按照時間、大小、條數將臨時文件滾動成最.log文件,值為0時不按照這個規則滾動
a1.sinks.k1.hdfs.rollInterval = 300
## rollSize 觸發滾動的文件大小,以字節為單位(0:永不基於文件大小滾動)
a1.sinks.k1.hdfs.rollSize = 0
## rollCount 在滾動之前寫入文件的事件數(0 =從不基於事件數滾動)
a1.sinks.k1.hdfs.rollCount = 0
## idleTimeout 超時后非活動文件關閉(0 =禁用自動關閉空閑文件)
a1.sinks.k1.hdfs.idleTimeout = 0
## 將文件刷新到HDFS之前寫入文件的事件數
a1.sinks.k1.hdfs.batchSize = 0
## round 是否應將時間戳向下舍入(如果為true,則影響除%t之外的所有基於時間的轉義序列)
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 5
a1.sinks.k1.hdfs.roundUnit = minute
# c1 是 channel 的名稱,設置 channel的類型是內存。事件存儲在具可配置最大大小的內存中隊列中。它非常適合需要更高吞吐量的流量,並且在代理發生故障時准備丟失分階段數據。
a1.channels.c1.type = memory
## Channel 中存儲的最大事件數
a1.channels.c1.capacity = 1000
## 每個事件 Channel 從 Source 或提供給 Sink 的最大事件數
a1.channels.c1.transactionCapacity = 100
# 綁定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動 Flume 的 Agent 名稱是 a1
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 04:59:23,046 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-03-11 04:59:23,485 INFO node.Application: Starting Sink k1
2019-03-11 04:59:23,486 INFO node.Application: Starting Source r1
2019-03-11 04:59:23,488 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2019-03-11 04:59:23,488 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
2019-03-11 04:59:23,583 INFO source.SyslogTcpSource: Syslog TCP Source starting...
2019-03-11 04:59:23,601 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 04:59:23,601 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
測試產生syslog
[root@c0 ~]# echo "hello mshk.top flume hadoop testing one" | nc localhost 5140
在 c0 的控制台,可以看到以下信息:
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 04:59:23,046 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
2019-03-11 04:59:23,485 INFO node.Application: Starting Sink k1
2019-03-11 04:59:23,486 INFO node.Application: Starting Source r1
2019-03-11 04:59:23,488 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2019-03-11 04:59:23,488 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
2019-03-11 04:59:23,583 INFO source.SyslogTcpSource: Syslog TCP Source starting...
2019-03-11 04:59:23,601 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 04:59:23,601 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 05:12:08,792 WARN source.SyslogUtils: Event created from Invalid Syslog data.
2019-03-11 05:12:08,792 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
2019-03-11 05:12:08,813 INFO hdfs.BucketWriter: Creating hdfs://c1:8020/flume/syslogtcp/Syslog.1552252328793.tmp
2019-03-11 05:12:38,853 INFO hdfs.HDFSEventSink: Writer callback called.
2019-03-11 05:12:38,853 INFO hdfs.BucketWriter: Closing hdfs://c1:8020/flume/syslogtcp/Syslog.1552252328793.tmp
2019-03-11 05:12:38,885 INFO hdfs.BucketWriter: Renaming hdfs://c1:8020/flume/syslogtcp/Syslog.1552252328793.tmp to hdfs://c1:8020/flume/syslogtcp/Syslog.1552252328793
在 c0 上再打開一個窗口,去 hadoop 上檢查文件是否生成
[root@c0 ~]# hadoop fs -ls /flume/syslogtcp
Found 1 items
-rw-r--r-- 3 root supergroup 177 2019-03-11 00:32 /flume/syslogtcp/Syslog.1552251858905
[root@c0 ~]# hadoop fs -cat /flume/syslogtcp/Syslog.*
hello mshk.top flume hadoop testing one
案例7:File Roll Sink
在本地文件系統上存儲事件
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/file_roll.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/file_roll.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5555
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = file_roll
## 將存儲文件的目錄
a1.sinks.k1.sink.directory = /home/work/_app/apache-flume-1.9.0-bin/logs
# c1 是 channel 的名稱,設置 channel的類型是內存。事件存儲在具可配置最大大小的內存中隊列中。它非常適合需要更高吞吐量的流量,並且在代理發生故障時准備丟失分階段數據。
a1.channels.c1.type = memory
## Channel 中存儲的最大事件數
a1.channels.c1.capacity = 1000
## 每個事件 Channel 從 Source 或提供給 Sink 的最大事件數
a1.channels.c1.transactionCapacity = 100
# 綁定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動 Flume 的 Agent 名稱是 a1
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 05:18:30,099 INFO source.SyslogTcpSource: Syslog TCP Source starting...
2019-03-11 05:18:30,126 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 05:18:30,127 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
測試產生log
[root@c0 ~]# echo "hello mshk.top syslog" | nc localhost 5555
[root@c0 ~]# echo "hello mshk.top syslog 2" | nc localhost 5555
查看/home/work/_app/apache-flume-1.9.0-bin/logs下是否生成文件,默認每30秒生成一個新文件
[root@c0 ~]# ll /home/work/_app/apache-flume-1.9.0-bin/logs
total 12
-rw-r--r--. 1 root root 0 Mar 11 05:18 1552252709477-1
-rw-r--r--. 1 root root 0 Mar 11 05:19 1552252709477-2
-rw-r--r--. 1 root root 0 Mar 11 05:19 1552252709477-3
-rw-r--r--. 1 root root 0 Mar 11 05:20 1552252709477-4
-rw-r--r--. 1 root root 0 Mar 11 05:20 1552252709477-5
-rw-r--r--. 1 root root 0 Mar 11 05:21 1552252709477-6
-rw-r--r--. 1 root root 46 Mar 11 05:21 1552252709477-7
-rw-r--r--. 1 root root 0 Mar 11 05:22 1552252709477-8
-rw-r--r--. 1 root root 0 Mar 11 05:22 1552252709477-9
-rw-r--r--. 1 root root 1192 Mar 11 03:48 log_exec_tail
-rw-r--r--. 1 root root 12 Mar 11 02:00 spool_text.log.COMPLETED
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/logs/1552252709477*
hello mshk.top syslog
hello mshk.top syslog 2
案例8:Replicating Channel Selector
Flume 支持 Fan out 流從一個源到多個通道。。在復制的情況下,流的事件被發送到所有的配置通道。
這次我們需要用到 c0、c1 兩台機器
在 c0 創建 /home/work/_app/apache-flume-1.9.0-bin/conf/replicating_Channel_Selector.conf 文件編輯並保存,內容如下
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/replicating_Channel_Selector.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c0 c1
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
## 設置 Channel的名稱分別是 c0 c1
a1.sources.r1.channels = c0 c1
a1.sources.r1.selector.type = replicating
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c0
a1.sinks.k1.hostname = c0
a1.sinks.k1.port = 5555
# k2 是 sink 的名稱,設置 sink 的類型
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = c1
a1.sinks.k2.port = 5555
# Use a channel which buffers events in memory
a1.channels.c0.type = memory
a1.channels.c0.capacity = 1000
a1.channels.c0.transactionCapacity = 100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
在 c0 創建 /home/work/_app/apache-flume-1.9.0-bin/conf/replicating_Channel_Selector_avro.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/replicating_Channel_Selector_avro.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c0
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = avro
a1.sources.r1.channels = c0
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = logger
# c1 是 channel 的名稱,設置 channel的類型是內存。事件存儲在具可配置最大大小的內存中隊列中。它非常適合需要更高吞吐量的流量,並且在代理發生故障時准備丟失分階段數據。
a1.channels.c0.type = memory
## Channel 中存儲的最大事件數
a1.channels.c0.capacity = 1000
## 每個事件 Channel 從 Source 或提供給 Sink 的最大事件數
a1.channels.c0.transactionCapacity = 100
# 綁定 source 和 sink 到 channel
a1.sources.r1.channels = c0
a1.sinks.k1.channel = c0
在 c0 上將2個配置文件復制到 c1 上一份
[root@c0 ~]# scp -r /home/work/_app/apache-flume-1.9.0-bin/conf/replicating_Channel_Selector* c1:/home/work/_app/apache-flume-1.9.0-bin/conf/
replicating_Channel_Selector_avro.conf 100% 485 832.0KB/s 00:00
replicating_Channel_Selector.conf 100% 723 1.5MB/s 00:00
打開4個窗口,在 c0 和 c1 上同時啟動兩個 Flume 的 Agent
# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 05:34:22,172 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 05:34:22,175 INFO source.AvroSource: Avro source r1 started.
# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:03:02,811 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 01:03:02,814 INFO source.AvroSource: Avro source r1 started.
# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 05:36:41,054 INFO sink.AbstractRpcSink: Rpc sink k1 started.
2019-03-11 05:36:41,056 INFO sink.AbstractRpcSink: Rpc sink k2 started.
# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:06:04,019 INFO sink.AbstractRpcSink: Rpc sink k1 started.
2019-03-11 01:06:04,019 INFO sink.AbstractRpcSink: Rpc sink k2 started.
然后在 c0 或 c1 分別測試產生syslog
[root@c1 ~]# echo "hello mshk.top" | nc localhost 5140
[root@c0 ~]# echo "hello mshk.top1" | nc localhost 5140
在 c0 和 c1 的sink窗口,可以看到以下信息,這說明信息得到了復用:
# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 11:47:45,208 INFO ipc.NettyServer: [id: 0xcdc38369, /10.0.0.101:54420 => /10.0.0.100:5555] BOUND: /10.0.0.100:5555
2019-03-11 11:47:45,208 INFO ipc.NettyServer: [id: 0xcdc38369, /10.0.0.101:54420 => /10.0.0.100:5555] CONNECTED: /10.0.0.101:54420
2019-03-11 11:48:28,714 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70 hello mshk.top }
2019-03-11 11:48:51,429 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70 31 hello mshk.top1 }
# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 06:08:35,762 INFO ipc.NettyServer: [id: 0xcbc7fefa, /10.0.0.101:57248 => /10.0.0.101:5555] OPEN
2019-03-11 06:08:35,763 INFO ipc.NettyServer: [id: 0xcbc7fefa, /10.0.0.101:57248 => /10.0.0.101:5555] BOUND: /10.0.0.101:5555
2019-03-11 06:08:35,763 INFO ipc.NettyServer: [id: 0xcbc7fefa, /10.0.0.101:57248 => /10.0.0.101:5555] CONNECTED: /10.0.0.101:57248
2019-03-11 06:09:21,731 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70 hello mshk.top }
2019-03-11 06:09:43,734 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70 31 hello mshk.top1 }
案例9:Multiplexing Channel Selector
多路復用情況,當事件的屬性與預配置的值匹配時,事件將被傳遞到可用通道的子集。
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/Multiplexing_Channel_Selector.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/Multiplexing_Channel_Selector.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# r1 是 source 的名稱,設置 source 的 channel,通過HTTP POST和GET接受Flume事件的源
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
# 映射允許每個值通道可以重疊。默認值可以包含任意數量的通道。
a1.sources.r1.selector.mapping.baidu = c0
a1.sources.r1.selector.mapping.ali = c1
a1.sources.r1.selector.default = c0
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c0
a1.sinks.k1.hostname = c0
a1.sinks.k1.port = 5555
# k2 是 sink 的名稱,設置 sink 的類型
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = c1
a1.sinks.k2.port = 5555
# Use a channel which buffers events in memory
a1.channels.c0.type = memory
a1.channels.c0.capacity = 1000
a1.channels.c0.transactionCapacity = 100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
在 c0 創建 /home/work/_app/apache-flume-1.9.0-bin/conf/Multiplexing_Channel_Selector_avro.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/Multiplexing_Channel_Selector_avro.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = avro
a1.sources.r1.channels = c0
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c0.type = memory
a1.channels.c0.capacity = 1000
a1.channels.c0.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c0
a1.sinks.k1.channel = c0
將2個配置文件復制到 c1 上一份
[root@c0 ~]# scp -r /home/work/_app/apache-flume-1.9.0-bin/conf/Multiplexing_Channel_Selector* c1:/home/work/_app/apache-flume-1.9.0-bin/conf/
Multiplexing_Channel_Selector_avro.conf 100% 485 639.8KB/s 00:00
Multiplexing_Channel_Selector.conf 100% 963 1.4MB/s 00:00
打開4個窗口,在 c0 和 c 上同時啟動兩個 Flume 的 Agent
# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 06:05:23,297 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: []
2019-03-11 06:05:23,297 WARN node.AbstractConfigurationProvider: No configuration found for this host:a1
2019-03-11 06:05:23,308 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }
# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:34:05,370 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: []
2019-03-11 01:34:05,377 WARN node.AbstractConfigurationProvider: No configuration found for this host:a1
2019-03-11 01:34:05,383 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }
# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 06:20:59,177 INFO server.Server: Started @1519ms
2019-03-11 06:20:59,178 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 06:20:59,178 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:50:05,998 INFO server.Server: Started @1315ms
2019-03-11 01:50:05,998 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 01:50:05,998 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
然后在 c0 上,測試產生log
[root@c0 ~]# curl -X POST -d '[{ "headers" :{"type" : "baidu"},"body" : "mshk.top_TEST1"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "ali"},"body" : "mshk.top_TEST2"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "qq"},"body" : "mshk.top_TEST3"}]' http://localhost:5140
在 c0 的 sink 窗口,可以看到以下信息:
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 06:05:23,297 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: []
2019-03-11 06:05:23,297 WARN node.AbstractConfigurationProvider: No configuration found for this host:a1
2019-03-11 06:05:23,308 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }
2019-03-11 06:22:58,825 INFO sink.LoggerSink: Event: { headers:{type=baidu} body: 6D 73 68 6B 2E 74 6F 70 5F 54 45 53 54 31 mshk.top_TEST1 }
2019-03-11 06:22:58,825 INFO sink.LoggerSink: Event: { headers:{type=qq} body: 6D 73 68 6B 2E 74 6F 70 5F 54 45 53 54 33 mshk.top_TEST3 }
在 c1 的 sink 窗口,可以看到以下信息:
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:34:05,370 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: []
2019-03-11 01:34:05,377 WARN node.AbstractConfigurationProvider: No configuration found for this host:a1
2019-03-11 01:34:05,383 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }
2019-03-11 01:50:56,054 INFO sink.LoggerSink: Event: { headers:{type=ali} body: 6D 73 68 6B 2E 74 6F 70 5F 54 45 53 54 32 mshk.top_TEST2 }
可以看到,根據header中不同的條件分布到不同的channel上
案例10:Flume Sink Processors
Sink Groups允許用戶在一個代理中對多個 sink 進行分組。Sink Processor 能夠實現分組內的sink負載均衡。以及組內 sink 容錯,實現當組內一個 sink 失敗時,切換至其他的 sink。
- Default Sink Processor
默認的 Sink Processor 僅接受單獨一個 sink。不必對單個 sink 使用 processor。對單個 sink 可以使用 source-channel-sink 的方式。
- Failorver Sink Processor
Failover Sink Processor(容錯處理器)擁有一個 sink 的優先級列表,用來保證只有一個 sink 可用。
容錯機制將失敗的 sink 放入一個冷卻池中,並給他設置一個冷卻時間,如果重試中不斷失敗,冷卻時間將不斷增加。一旦 sink 成功的發送 event,sink 將被重新保存到一個可用`sink` 池中。在這個可用 `sink` 池中,每一個 sink 都有一個關聯優先級值,值越大優先級越高。當一個 sink 發送 event 失敗時,剩下的 sink 中優先級最高的 sink 將試着發送 event。
failover 的機器是一直發送給其中一個 sink,當這個 sink 不可用的時候,自動發送到下一個 sink。
接下來我們開始繼續實驗,創建 /home/work/_app/apache-flume-1.9.0-bin/conf/Flume_Sink_Processors.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/Flume_Sink_Processors.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c0 c1
#這個是配置failover的關鍵,需要有一個sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
# 處理的類型是failover
a1.sinkgroups.g1.processor.type = failover
# 優先級,數字越大優先級越高,每個sink的優先級必須不相同
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
# 失敗的接收器的最大退避時間,設置為10秒,當然可以根據你的實際狀況更改成更快或者很慢
a1.sinkgroups.g1.processor.maxpenalty = 10000
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.channels = c0 c1
a1.sources.r1.selector.type = replicating
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c0
a1.sinks.k1.hostname = c0
a1.sinks.k1.port = 5555
# k2 是 sink 的名稱,設置 sink 的類型
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = c1
a1.sinks.k2.port = 5555
# Use a channel which buffers events in memory
a1.channels.c0.type = memory
a1.channels.c0.capacity = 1000
a1.channels.c0.transactionCapacity = 100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/Flume_Sink_Processors_avro.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/Flume_Sink_Processors_avro.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c0
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = avro
a1.sources.r1.channels = c0
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c0.type = memory
a1.channels.c0.capacity = 1000
a1.channels.c0.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c0
a1.sinks.k1.channel = c0
將2個配置文件復制到 c1 上一份
[root@c0 ~]# scp -r /home/work/_app/apache-flume-1.9.0-bin/conf/Flume_Sink_Processors* c1:/home/work/_app/apache-flume-1.9.0-bin/conf/
Flume_Sink_Processors_avro.conf 100% 485 585.8KB/s 00:00
Flume_Sink_Processors.conf 100% 1175 1.6MB/s 00:00
打開4個窗口,在 c0 和 c1 上同時啟動兩個 Flume 的 Agent
# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 06:29:23,481 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 06:29:23,483 INFO source.AvroSource: Avro source r1 started.
# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:57:31,987 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 01:57:31,987 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 01:57:31,989 INFO source.AvroSource: Avro source r1 started.
# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 06:31:44,971 INFO sink.AbstractRpcSink: Rpc sink k2: Building RpcClient with hostname: c1, port: 5555
2019-03-11 06:31:44,971 INFO sink.AvroSink: Attempting to create Avro Rpc client.
2019-03-11 06:31:44,971 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
2019-03-11 06:31:44,985 INFO sink.AbstractRpcSink: Rpc sink k2 started.
# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:59:51,704 INFO sink.AbstractRpcSink: Rpc sink k2: Building RpcClient with hostname: c1, port: 5555
2019-03-11 01:59:51,704 INFO sink.AvroSink: Attempting to create Avro Rpc client.
2019-03-11 01:59:51,718 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
2019-03-11 01:59:51,737 INFO sink.AbstractRpcSink: Rpc sink k2 started.
然后在 c0上,測試產生log
[root@c0 ~]# echo "mshk.top test1 failover" | nc localhost 5140
因為 c1 的優先級高,所以在 c1 的 sink 窗口,可以看到以下信息,而 c0 沒有:
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 01:57:31,987 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 01:57:31,987 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 01:57:31,989 INFO source.AvroSource: Avro source r1 started.
2019-03-11 02:02:11,750 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 31 20 66 mshk.top test1 failover }
這時我們停止掉 c1 機器上的 sink (ctrl+c),再次輸出測試數據
[root@c0 ~]# echo "mshk.top test2 failover" | nc localhost 5140
可以在 c0 的 sink 窗口,看到讀取到了剛才發送的兩條測試數據:
# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 06:29:23,481 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 06:29:23,483 INFO source.AvroSource: Avro source r1 started.
2019-03-11 07:09:58,232 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 32 20 66 mshk.top test2 f }
我們再在 c1 的 sink 窗口中,啟動 sink:
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
輸入兩批測試數據:
[root@c0 ~]# echo "mshk.top test3 failover" | nc localhost 5140 && echo "mshk.top test4 failover" | nc localhost 5140
在 c1 的 sink 窗口,我們可以看到以下信息,因為優先級的關系,log消息會再次落到 c1 上:
2019-03-11 02:39:56,644 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 33 20 66 mshk.top test3 f }
2019-03-11 02:39:56,644 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 34 20 66 mshk.top test4 f }
案例11:Load balancing Sink Processor
Load balancing Sink processor(負載均衡處理器)在多個 sink 間實現負載均衡。數據分發到多個活動的 sink,處理器用一個索引化的列表來存儲這些 sink 的信息。處理器實現兩種數據分發機制,輪循選擇機制和隨機選擇機制。默認的分發機制是輪循選擇機制,可以通過配置修改。同時我們可以通過繼承AbstractSinkSelector來實現自定義數據分發選擇機制。
選擇器按照我們配置的選擇機制執行選擇 sink。當 sink 失敗時,處理器將根據我們配置的選擇機制,選擇下一個可用的 sink。這種方式中沒有黑名單,而是主動嘗試每一個可用的 sink。如果所有的 sink 都失敗了,選擇器將把這些失敗傳遞給 sink的執行者。
如果設置 backoff 為 true,處理器將會把失敗的 sink 放進黑名單中,並且為失敗的 sink 設置一個在黑名單駐留的時間,在這段時間內,sink 將不會被選擇接收數據。當超過黑名單駐留時間,如果該 sink 仍然沒有應答或者應答遲緩,黑名單駐留時間將以指數的方式增加,以避免長時間等待 sink 應答而阻塞。如果設置 backoff為false,在輪循的方式下,失敗的數據將被順序的傳遞給下一個 sink,因此數據分發就變成非均衡的了。
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/Load_balancing_Sink_Processors.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/Load_balancing_Sink_Processors.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# 這個是配置Load balancing的關鍵,需要有一個sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
# 失敗的接收器是否會以指數方式退回。
a1.sinkgroups.g1.processor.backoff = true
# 輪循機制,必須是round_robin,random或自定義類的FQCN,它繼承自AbstractSinkSelector
a1.sinkgroups.g1.processor.selector = round_robin
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = c0
a1.sinks.k1.port = 5555
# k2 是 sink 的名稱,設置 sink 的類型
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = c1
a1.sinks.k2.port = 5555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/Load_balancing_Sink_Processors_avro.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/Load_balancing_Sink_Processors_avro.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c0
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = avro
a1.sources.r1.channels = c0
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c0.type = memory
a1.channels.c0.capacity = 1000
a1.channels.c0.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c0
a1.sinks.k1.channel = c0
將2個配置文件復制到 c1 上一份
[root@c0 ~]# scp -r /home/work/_app/apache-flume-1.9.0-bin/conf/Load_balancing_Sink_Processors* c1:/home/work/_app/apache-flume-1.9.0-bin/conf/
Load_balancing_Sink_Processors_avr.conf 100% 485 678.9KB/s 00:00
Load_balancing_Sink_Processors.conf 100% 802 1.0MB/s 00:00
打開4個窗口,在 c0 和 c1 上同時啟動兩個 Flume 的 Agent
# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 07:18:38,157 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }...
2019-03-11 07:18:38,428 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 07:18:38,429 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 07:18:38,431 INFO source.AvroSource: Avro source r1 started.
# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 02:46:45,515 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }...
2019-03-11 02:46:45,843 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 02:46:45,843 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 02:46:45,845 INFO source.AvroSource: Avro source r1 started.
# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 07:24:27,506 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k2 started
2019-03-11 07:24:27,506 INFO sink.AbstractRpcSink: Rpc sink k2: Building RpcClient with hostname: c1, port: 5555
2019-03-11 07:24:27,506 INFO sink.AvroSink: Attempting to create Avro Rpc client.
2019-03-11 07:24:27,507 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
2019-03-11 07:24:27,515 INFO sink.AbstractRpcSink: Rpc sink k2 started.
# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 02:52:32,325 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k2 started
2019-03-11 02:52:32,325 INFO sink.AbstractRpcSink: Rpc sink k2: Building RpcClient with hostname: c1, port: 5555
2019-03-11 02:52:32,325 INFO sink.AvroSink: Attempting to create Avro Rpc client.
2019-03-11 02:52:32,326 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
2019-03-11 02:52:32,341 INFO sink.AbstractRpcSink: Rpc sink k2 started.
然后在 c0上,測試產生log,一行一行輸入,輸入太快,容易落到一台機器上
[root@c0 ~]# echo "mshk.top test1" | nc localhost 5140
[root@c0 ~]# echo "mshk.top test2" | nc localhost 5140
[root@c0 ~]# echo "mshk.top test3" | nc localhost 5140
[root@c0 ~]# echo "mshk.top test4" | nc localhost 5140
在 c0 的 sink 窗口,可以看到以下信息
# c0
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 07:18:38,157 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }...
2019-03-11 07:18:38,428 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 07:18:38,429 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 07:18:38,431 INFO source.AvroSource: Avro source r1 started.
2019-03-11 02:55:16,074 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 31 mshk.top test1 }
2019-03-11 02:55:22,020 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 33 mshk.top test3 }
在 c1 的 sink 窗口,可以看到以下信息:
# c1
[root@c1 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
...
2019-03-11 02:46:45,515 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }...
2019-03-11 02:46:45,843 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 02:46:45,843 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 02:46:45,845 INFO source.AvroSource: Avro source r1 started.
2019-03-11 07:27:16,039 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 32 mshk.top test2 }
2019-03-11 07:27:25,042 INFO sink.LoggerSink: Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 6D 73 68 6B 2E 74 6F 70 20 74 65 73 74 34 mshk.top test4 }
說明輪詢模式起到了作用。
案例12:Taildir Source
Taildir Source 是在 Flume 1.7.0 版本推出的組件 ,通過 tail 監控正則表達式匹配目錄下的所有文件,並在檢測到添加到每個文件的新行后幾乎實時地操作。
如果正在寫入新行,則此源將重試讀取它們以等待寫入完成。Taildir Source 定期以 JSON 格式寫入給定位置文件上每個文件的最后讀取位置。如果 Flume 由於某種原因停止或停止,它可以從寫在現有位置文件上的位置重新開始讀取。在其他用例中,Taildir Source 也可以使用給定的位置文件從每個文件的任意位置開始讀取。當指定路徑上沒有位置文件時,默認情況下 Taildir Source 將從每個文件的第一行開始讀取。
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_source.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_source.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
## 以JSON格式文件以記錄每個尾部文件的inode,絕對路徑和最后位置
a1.sources.r1.positionFile = /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json
## 以空格分隔的文件組列表。每個文件組都指示一組要掛起的文件。
a1.sources.r1.filegroups = f1 f2
## 文件組的絕對路徑
a1.sources.r1.filegroups.f1 = /home/work/_app/apache-flume-1.9.0-bin/logs/taildir_example.log
a1.sources.r1.headers.f1.headerKey1 = value1
## 文件組的絕對路徑
a1.sources.r1.filegroups.f2 =/home/work/_app/apache-flume-1.9.0-bin/logs/.*mshk.top.log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
## 是否添加存儲絕對路徑文件名的標頭
a1.sources.r1.fileHeader = true
# 控制從同一文件連續讀取的批次數。如果源正在拖尾多個文件,並且其中一個文件以快速寫入,則可以防止處理其他文件,因為繁忙文件將在無限循環中讀取。在這種情況下,降低此值。
a1.sources.r1.maxBatchCount = 1000
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = logger
# c1 是 channel 的名稱,設置 channel的類型是內存。事件存儲在具可配置最大大小的內存中隊列中。它非常適合需要更高吞吐量的流量,並且在代理發生故障時准備丟失分階段數據。
a1.channels.c1.type = memory
## Channel 中存儲的最大事件數
a1.channels.c1.capacity = 1000
## 每個事件 Channel 從 Source 或提供給 Sink 的最大事件數
a1.channels.c1.transactionCapacity = 100
# 綁定 source 和 sink 到 channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動 Flume 的 Agent 名稱是 a1
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/taildir_source.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 12:34:24,138 INFO taildir.ReliableTaildirEventReader: headerTable: {f1={headerKey1=value1}, f2={headerKey1=value2, headerKey2=value2-2}}
2019-03-11 12:34:24,143 INFO taildir.ReliableTaildirEventReader: Updating position from position file: /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json
2019-03-11 12:34:24,144 INFO taildir.ReliableTaildirEventReader: File not found: /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json, not updating position
2019-03-11 12:34:24,146 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 12:34:24,146 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
創建指定文件
[root@c0 ~]# echo "hello mshk.top" > $FLUME_HOME/logs/taildir_example.log
[root@c0 ~]# echo "hello mshk.top1" > $FLUME_HOME/logs/abc.mshk.top.log.1
在 c0 的控制台,可以看到以下信息
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/taildir_source.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-11 12:34:24,138 INFO taildir.ReliableTaildirEventReader: headerTable: {f1={headerKey1=value1}, f2={headerKey1=value2, headerKey2=value2-2}}
2019-03-11 12:34:24,143 INFO taildir.ReliableTaildirEventReader: Updating position from position file: /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json
2019-03-11 12:34:24,144 INFO taildir.ReliableTaildirEventReader: File not found: /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json, not updating position
2019-03-11 12:34:24,146 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2019-03-11 12:34:24,146 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
2019-03-11 12:37:29,191 INFO taildir.ReliableTaildirEventReader: Opening file: /home/work/_app/apache-flume-1.9.0-bin/logs/taildir_example.log, inode: 1613028097, pos: 0
2019-03-11 12:37:34,156 INFO sink.LoggerSink: Event: { headers:{headerKey1=value1, file=/home/work/_app/apache-flume-1.9.0-bin/logs/taildir_example.log} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70 hello mshk.top }
2019-03-11 12:37:39,198 INFO taildir.ReliableTaildirEventReader: Opening file: /home/work/_app/apache-flume-1.9.0-bin/logs/abc.mshk.top.log.1, inode: 1613028098, pos: 0
2019-03-11 12:37:39,199 INFO sink.LoggerSink: Event: { headers:{headerKey1=value2, headerKey2=value2-2, file=/home/work/_app/apache-flume-1.9.0-bin/logs/abc.mshk.top.log.1} body: 68 65 6C 6C 6F 20 6D 73 68 6B 2E 74 6F 70 31 hello mshk.top1 }
再次查看 /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json 文件,可以看到以下內容
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/taildir_position.json
[{"inode":1613028097,"pos":15,"file":"/home/work/_app/apache-flume-1.9.0-bin/logs/taildir_example.log"},{"inode":1613028098,"pos":16,"file":"/home/work/_app/apache-flume-1.9.0-bin/logs/abc.mshk.top.log.1"}]
可以看到 taildir_position.json 記錄了每個消費位置的元數據,每消費一次便會更新這個文件。
案例13:Hbase
Hbase 配置是從類路徑中遇到的第一個 hbase-site.xml 中獲取的。實現由配置指定的 HbaseEventSerializer 的類用於將事件轉換為 HBase put。然后將這些放置和增量寫入HBase。
如果 Hbase 無法寫入某些事件,則接收器將重播該事務中的所有事件。
Flume 提供了兩個序列化器。SimpleHbaseEventSerializer(org.apache.flume.sink.hbase.SimpleHbaseEventSerializer)按原樣將事件主體寫入HBase,並可選擇增加Hbase中的列。RegexHbaseEventSerializer(org.apache.flume.sink.hbase.RegexHbaseEventSerializer)根據給定的正則表達式打破事件體,並將每個部分寫入不同的列。
在測試之前,請先參考<Hadoop 3.1.2(HA)+Zookeeper3.4.13+Hbase1.4.9(HA)+Hive2.3.4+Spark2.4.0(HA)高可用集群搭建>將 Hbase 啟動
通過下面的命令,在 Hbase 中創建 flume2hbase_mshk_top 表:
[root@c0 ~]# hbase shell
HBase Shell
Use "help" to get list of supported commands.
Use "exit" to quit this interactive shell.
Version 1.4.9, rd625b212e46d01cb17db9ac2e9e927fdb201afa1, Wed Dec 5 11:54:10 PST 2018
hbase(main):001:0> list
TABLE
mysql2hase_mshk
1 row(s) in 0.1820 seconds
=> ["mysql2hase_mshk"]
hbase(main):002:0> version
1.4.9, rd625b212e46d01cb17db9ac2e9e927fdb201afa1, Wed Dec 5 11:54:10 PST 2018
hbase(main):003:0> create 'flume2hbase_mshk_top','uid','name'
0 row(s) in 1.3600 seconds
=> Hbase::Table - flume2hbase_mshk_top
hbase(main):004:0> scan 'flume2hbase_mshk_top'
ROW COLUMN+CELL
0 row(s) in 0.0330 seconds
hbase(main):005:0> quit
創建 /home/work/_app/apache-flume-1.9.0-bin/conf/hbase_simple.conf 文件編輯並保存,內容如下:
[root@c0 ~]# cat /home/work/_app/apache-flume-1.9.0-bin/conf/hbase_simple.conf
# a1 是 agent 名稱,列出 agent 的 source,sink 和 channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# r1 是 source 的名稱,設置 source 的 channel
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# k1 是 sink 的名稱,設置 sink 的類型
a1.sinks.k1.type = logger
a1.sinks.k1.type = hbase
a1.sinks.k1.table = flume2hbase_mshk_top
a1.sinks.k1.columnFamily = name
a1.sinks.k1.column = mshk
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動 Flume 的 Agent 名稱是 a1
[root@c0 ~]# flume-ng agent -c . -f $FLUME_HOME/conf/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/home/work/_app/hadoop-3.1.2/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/work/_app/hbase-1.4.9/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/work/_app/hive-2.3.4) for Hive access
...
2019-03-19 14:22:25,605 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
2019-03-19 14:22:25,605 INFO node.AbstractConfigurationProvider: Creating channels
2019-03-19 14:22:25,610 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
2019-03-19 14:22:25,614 INFO node.AbstractConfigurationProvider: Created channel c1
2019-03-19 14:22:25,614 INFO source.DefaultSourceFactory: Creating instance of source r1, type syslogtcp
2019-03-19 14:22:25,626 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: hbase
2019-03-19 14:22:25,840 INFO hbase.HBaseSink: The write to WAL option is set to: true
2019-03-19 14:22:25,842 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
2019-03-19 14:22:25,847 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@311a0c0d counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
...
0x3000003d05c0004, negotiated timeout = 4000
2019-03-19 14:22:27,936 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2019-03-19 14:22:27,936 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
測試產生log
[root@c0 ~]# echo "hello mshk.top from flume" | nc localhost 5140
這時登錄到 Hbase 中,可以發現新數據已經插入
[root@c0 ~]# hbase shell
HBase Shell
Use "help" to get list of supported commands.
Use "exit" to quit this interactive shell.
Version 1.4.9, rd625b212e46d01cb17db9ac2e9e927fdb201afa1, Wed Dec 5 11:54:10 PST 2018
hbase(main):001:0> list
TABLE
flume2hbase_mshk_top
mysql2hase_mshk
2 row(s) in 0.2010 seconds
=> ["flume2hbase_mshk_top", "mysql2hase_mshk"]
hbase(main):002:0> scan 'flume2hbase_mshk_top'
ROW COLUMN+CELL
1552977018028-PiTWUgkag4-0 column=name:payload, timestamp=1552977021290, value=hello mshk.top from flume
1 row(s) in 0.1230 seconds
hbase(main):003:0> quit
常見問題
如何讓 Flume 以守護進程方式運行
運行以下命令:
[root@c0 ~]# nohup flume-ng agent -c . -f $FLUME_HOME/conf/nginx_logs.conf -n a1 -Dflume.root.logger=INFO,console &
[1] 10276
[root@c0 ~]# nohup: ignoring input and appending output to ‘nohup.out’
經過這么多 Flume的例子測試,如果你全部做完后,會發現 Flume 的功能真的很強大,可以進行各種搭配來完成你想要的工作,俗話說師傅領進門,修行在個人,如何能夠結合你的產品業務,將 Flume 更好的應用起來,快去動手實踐吧。
希望本文對您有幫助,感謝您的支持和閱讀我的博客。
博文作者:迦壹
博客地址:Flume1.9.0的安裝、部署、簡單應用(含分布式、與Hadoop3.1.2、Hbase1.4.9的案例)
轉載聲明:可以轉載, 但必須以超鏈接形式標明文章原始出處和作者信息及版權聲明,謝謝合作!
假設您認為這篇文章對您有幫助,可以通過以下方式進行捐贈,謝謝!

比特幣地址:1KdgydfKMcFVpicj5w4vyn3T88dwjBst6Y
以太坊地址:0xbB0a92d634D7b9Ac69079ed0e521CC2e0a97c420
