Flume 開發環境搭建與編寫 Hello World 示例


2019-05-10

關鍵字:Flume 環境搭建、Flume 如何安裝、Flume 示例程序

這是一篇很直的文章,一切圍繞着快速構建開發環境為目的,不講原理不講人生也不講理想,只求用最短的時間讓您的 Flume 跑起來。


1、下載與安裝

 

首先,下載沒跑的,去官網下載,這里貼出傳送門: Flume 官方下載

 

其次,關於選擇 Flume 版本的問題,對於學習,選擇最新穩定版是肯定沒錯的。如果是用生生產環境,不敢要求最新,但至少得是穩定版本。官方網站中直接給出來的通常都只是最新版本而已。如果您要自行選擇版本,可以去這里看看: Flume 歷史版本下載 。這里不僅可以拿到大多數歷史版本,甚至於還貼心地將穩定版本都打包到 stable 目錄下了。

 

在選擇好您需要的版本后下載下來,直接解壓就可以了,Apache 的軟件這點真心方便,解壓即用圈粉無數。

tar -zxvf apache-flume-1.9.0-bin.tar.gz 

 

解壓完成后可以看到如下目錄結構

至此,下載與安裝就算是完成了,接下來看看如何配置我們的 Flume 使用環境。

 

2、配置

 

Flume 的配置文件都放在 conf 目錄下。默認情況下 conf 目錄內會有以下幾個文件

在這里,我們僅需要關注 flume-conf.properties.template 即可。不過在使用它之前,我們首先要將它的 .template 后綴去掉。

cp flume-conf.properties.template flume-conf.properties

這里強烈建議您使用 cp 而不是 mv ,留有一份默認配置的副本總是好的,省得以后需要恢復默認配置時還得重新解壓安裝包。

 

下面我們來看看 Flume 默認的配置信息

 1 agent.sources = seqGenSrc
 2 agent.channels = memoryChannel
 3 agent.sinks = loggerSink
 4 
 5 # For each one of the sources, the type is defined  6 agent.sources.seqGenSrc.type = seq
 7 
 8 # The channel can be defined as follows.  9 agent.sources.seqGenSrc.channels = memoryChannel
10 
11 # Each sink's type must be defined
12 agent.sinks.loggerSink.type = logger
13 
14 #Specify the channel the sink should use
15 agent.sinks.loggerSink.channel = memoryChannel
16 
17 # Each channel's type is defined.
18 agent.channels.memoryChannel.type = memory
19 
20 # Other config values specific to each type of channel(sink or source)
21 # can be defined as well
22 # In this case, it specifies the capacity of the memory channel
23 agent.channels.memoryChannel.capacity = 100

 

首先,我們注意到每一條配置項的第 1 個單詞都是 agent ,這個 agent 並不是固定不變的,它代表的就是某個 Flume 代理的名稱,即所有以該單詞開始的配置項,都會被應用到以該單詞為名稱的 Flume 代理上。而我們在啟動 Flume 實例時,會通過 -n 參數來指定我們要啟動的代理名稱。比如在這里,我們的代理名稱就是 agent ,那我們如果想要應用這份配置表,則啟動時就應該以下面這條啟動命令

flume-ng agent ... -n agent ...

 

其次我們來關注下前面 3 條配置項。

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink

這 3 條分別用於配置 源、通道與接收器 的名稱。這僅僅是起個我們能認得的名字而已,所以可以隨意填寫配置值,但是這里起的名字在后面每一條配置項中都會用到。

 

然后是下一條配置項

agent.sources.seqGenSrc.type = seq

這里就看到了,我們前面定義源名稱時所起的源名字在這里就用到了。整條配置項的意思是這條要配置的是名稱為 agent 的 Flume 代理的 seqGenSrc 源的類型(type),然后它的值是 seq ,seq 表示 “序列發生器” 。序列發生器是什么東東呢?說白了就是自動產生從 0 到 Long.MAX_VALUE 之間的數字的東西,一般用於測試使用。

 

這里就有一個很有意思的地方:我怎么知道 seq 就代表序列發生器呢?那除了序列發生器,還有什么源類型可以填呢?

 

答案就是:看官網文檔!這里貼出一個官網文檔的鏈接: Flume 1.9 官網文檔 。進去后關注下左側邊欄,找到如下圖所示的標簽

在這個 Flume Sources 標簽下列出的子標簽,就是 Flume 目前支持的源類型。上面的 seq 值也在這里可以找到

您在后續的使用過程中需要哪種類型的源又不知道該填什么值的時候,來這查查就好了。Flume 所支持的源類型已經能夠全面覆蓋大數據開發領域了。

 

當然,不同類型的源的附屬配置項也不同,具體的還得參考官網上的文檔。

 

再下面兩條配置項的都差不多的了,想知道 Flume 支持哪些類型,直接上去查就好了

agent.sources.seqGenSrc.channels = memoryChannel
agent.sinks.loggerSink.type = logger

 

再接下來,配置的是源和接收器的通道。

agent.sinks.loggerSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory

關於源、接收器與通道之間的關系,有需要的同學請參考筆者的另一篇博客 Flume 架構解析

 

最后一條配置項配置的是和通道相關的額外配置。這個配置並不是必配項,關鍵得看我們所選擇的通道類型。

agent.channels.memoryChannel.capacity = 100

 

以上借用 Flume 默認的配置項來簡單講解了一下 Flume 配置。下面我們來做個 “Hello World” 實例實操一下。

 

3、Hello World 示例

 

不要誤會,筆者這里說的 “Hello World” 示例並不是真的要輸出  Hello World 。只是說平時我們在學習一門新的編程語言時,幾乎都是以輸出一個 “Hello World” 字符串來作為第一支程序,算是入門的意思。那這里我們一起來做一個 Flume 入門級程序,因此筆者將它稱為 “Hello World” 示例。

 

關於我們的這支 “Hello World” 程序,我們以 netcat 作為數據輸入源,控制台作為數據輸出端。在我們有了上一節的 Flume 配置基礎知識以后,筆者在這就直接貼上配置項信息就好了,筆者可以少啰嗦幾句,大家理解起來也可以更直接一點

agent.sources = mnetcat
agent.channels = memoryChannel
agent.sinks = mconsolesink

agent.sources.mnetcat.type = netcat
agent.sources.mnetcat.bind = localhost
agent.sources.mnetcat.port = 7777

agent.sinks.mconsolesink.type = logger
agent.sinks.mconsolesink.maxBytesToLog = 128

agent.sources.mnetcat.channels = memoryChannel
agent.sinks.mconsolesink.channel = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100

 

接下來,我們來運行我們的 Flume 代理。通過以下命令來運行

./bin/flume-ng agent -c conf -f conf/flume-conf.properties -n agent -Dflume.root.logger=INFO,console

筆者是在 Flume 安裝目錄下執行的這條命令。順利的話,我們可以在控制台最后看到以下提示信息

(conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink mconsolesink
(conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source mnetcat
(lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting
(lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:7777]

 

此時查看網絡信息也能看到當前機器上 7777 端口已經處於監聽狀態了。

netstat -ntlp
tcp6       0      0 127.0.0.1:7777          :::*                    LISTEN      43017/java 

 

接下來,我們再開啟另一個窗口,通過 nc 命令來向 7777 端口發送消息。

nc localhost 7777

 

我們鍵入 “Hello World” 字樣,然后回車

hello world
OK

 

順利的話,就可以在 Flume 運行的窗口上看到如下打印信息了

Event: { headers:{} body: }
Event: { headers:{} body: }
Event: { headers:{} body: }
Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }

還真是輸出一個 hello world 字符串的示例呢!

 


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM