本文為轉載篇!原文:
https://www.cnblogs.com/zhangyinhua/p/7803486.html
https://www.cnblogs.com/ciade/p/5495218.html
原理
一、Flume簡介
flume 作為 cloudera 開發的實時日志收集系統,受到了業界的認可與廣泛應用。Flume 初始的發行版本目前被統稱為 Flume OG(original generation),屬於 cloudera。
但隨着 FLume 功能的擴展,Flume OG 代碼工程臃腫、核心組件設計不合理、核心配置不標准等缺點暴露出來,尤其是在 Flume OG 的最后一個發行版本 0.9.4. 中,日
志傳輸不穩定的現象尤為嚴重,為了解決這些問題,2011 年 10 月 22 號,cloudera 完成了 Flume-728,對 Flume 進行了里程碑式的改動:重構核心組件、核心配置以
及代碼架構,重構后的版本統稱為 Flume NG(next generation);改動的另一原因是將 Flume 納入 apache 旗下,cloudera Flume 改名為 Apache Flume。
備注:Flume參考資料
官方網站: http://flume.apache.org/
用戶文檔: http://flume.apache.org/FlumeUserGuide.html
開發文檔: http://flume.apache.org/FlumeDeveloperGuide.html
二、Flume特點
flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,
並寫到各種數據接受方(比如文本、HDFS、Hbase等)的能力 。
flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日志數據(字節數組形式)並且攜帶有頭信息,這些Event由Agent外部的Source生成,當
Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。
Sink負責持久化日志或者把事件推向另一個Source。
1)flume的可靠性
當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到數據agent首先將
event寫到磁盤上,當數據傳送成功后,再刪除;如果數據發送失敗,可以重新發送。),Store on failure(這也是scribe采用的策略,當數據接收方crash時,將
數據寫到本地,待恢復后,繼續發送),Besteffort(數據發送到接收方后,不會進行確認)。
2)flume的可恢復性
還是靠Channel。推薦使用FileChannel,事件持久化在本地文件系統里(性能較差)。
三、Flume的一些核心概念
Client:Client生產數據,運行在一個獨立的線程。
Event: 一個數據單元,消息頭和消息體組成。(Events可以是日志記錄、 avro 對象等。)
Flow: Event從源點到達目的點的遷移的抽象。
Agent: 一個獨立的Flume進程,包含組件Source、 Channel、 Sink。(Agent使用JVM 運行Flume。每台機器運行一個agent,但是可以在一個agent中包含
多個sources和sinks。)
Source: 數據收集組件。(source從Client收集數據,傳遞給Channel)
Channel: 中轉Event的一個臨時存儲,保存由Source組件傳遞過來的Event。(Channel連接 sources 和 sinks ,這個有點像一個隊列。)
Sink: 從Channel中讀取並移除Event, 將Event傳遞到FlowPipeline中的下一個Agent(如果有的話)(Sink從Channel收集數據,運行在一個獨立線程。)
3.1、Agent結構
Flume 運行的核心是 Agent。Flume以agent為最小的獨立運行單位。一個agent就是一個JVM。它是一個完整的數據收集工具,含有三個核心組件,分別是
source、 channel、 sink。通過這些組件, Event 可以從一個地方流向另一個地方,如下圖所示。
3.2、source
Source是數據的收集端,負責將數據捕獲后進行特殊的格式化,將數據封裝到事件(event) 里,然后將事件推入Channel中。 Flume提供了很多內置的
Source, 支持 Avro, log4j, syslog 和 http post(body為json格式)。可以讓應用程序同已有的Source直接打交道,如AvroSource,
SyslogTcpSource。 如果內置的Source無法滿足需要, Flume還支持自定義Source。
source類型:
3.3、Channel
Channel是連接Source和Sink的組件,大家可以將它看做一個數據的緩沖區(數據隊列),它可以將事件暫存到內存中也可以持久化到本地磁盤上, 直
到Sink處理完該事件。介紹兩個較為常用的Channel, MemoryChannel和FileChannel。
Channel類型:
3.4、Sink
Sink從Channel中取出事件,然后將數據發到別處,可以向文件系統、數據庫、 hadoop存數據, 也可以是其他agent的Source。在日志數據較少時,可
以將數據存儲在文件系統中,並且設定一定的時間間隔保存數據。
Sink類型:
四、Flume攔截器、數據流以及可靠性
4.1、Flume攔截器
當我們需要對數據進行過濾時,除了我們在Source、 Channel和Sink進行代碼修改之外, Flume為我們提供了攔截器,攔截器也是chain形式的。
攔截器的位置在Source和Channel之間,當我們為Source指定攔截器后,我們在攔截器中會得到event,根據需求我們可以對event進行保留還是
拋棄,拋棄的數據不會進入Channel中。
4.2、Flume數據流
1)Flume 的核心是把數據從數據源收集過來,再送到目的地。為了保證輸送一定成功,在送到目的地之前,會先緩存數據,待數據真正到達目的地后,
刪除自己緩存的數據。
2) Flume 傳輸的數據的基本單位是 Event,如果是文本文件,通常是一行記錄,這也是事務的基本單位。 Event 從 Source,流向 Channel,再到 Sink,
本身為一個 byte 數組,並可攜帶 headers 信息。 Event 代表着一個數據流的最小完整單元,從外部數據源來,向外部的目的地去。
值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不同類型的Source,Channel和Sink可以自由組合。組合方式基於用戶設置的配置文件,非常靈活。
比如:Channel可以把事件暫存在內存里,也可以持久化到本地硬盤上。Sink可以把日志寫入HDFS, HBase,甚至是另外一個Source等等。Flume支持用戶建立多級流,
也就是說,多個agent可以協同工作,並且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,這也正是Flume強大之處。如下圖所示:
4.3、Flume可靠性
Flume 使用事務性的方式保證傳送Event整個過程的可靠性。 Sink 必須在Event 被存入 Channel 后,或者,已經被傳達到下一站agent里,又或者,
已經被存入外部數據目的地之后,才能把 Event 從 Channel 中 remove 掉。這樣數據流里的 event 無論是在一個 agent 里還是多個 agent 之間流轉,
都能保證可靠,因為以上的事務保證了 event 會被成功存儲起來。比如 Flume支持在本地保存一份文件 channel 作為備份,而memory channel 將
event存在內存 queue 里,速度快,但丟失的話無法恢復。
五、Flume使用場景
Flume在英文中的意思是水道, 但Flume更像可以隨意組裝的消防水管,下面根據官方文檔,展示幾種Flow。
5.1、多個agent順序連接
可以將多個Agent順序連接起來,將最初的數據源經過收集,存儲到最終的存儲系統中。這是最簡單的情況,一般情況下,應該控制這種順序連接的
Agent 的數量,因為數據流經的路徑變長了,如果不考慮failover的話,出現故障將影響整個Flow上的Agent收集服務。
5.2、多個Agent的數據匯聚到同一個Agent
這種情況應用的場景比較多,比如要收集Web網站的用戶行為日志, Web網站為了可用性使用的負載集群模式,每個節點都產生用戶行為日志,可以為
每 個節點都配置一個Agent來單獨收集日志數據,然后多個Agent將數據最終匯聚到一個用來存儲數據存儲系統,如HDFS上。
5.3、多級流
Flume還支持多級流,什么多級流?結合在雲開發中的應用來舉個例子,當syslog, java, nginx、 tomcat等混合在一起的日志流開始流入一個agent
后,可以agent中將混雜的日志流分開,然后給每種日志建立一個自己的傳輸通道。
5.4、load balance功能
上圖Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink組件上,而每個Sink組件分別連接到一個獨立的Agent上 。
六、Flume核心組件
Flume主要由3個重要的組件構成:
1)Source: 完成對日志數據的收集,分成transtion 和 event 打入到channel之中
Flume提供了各種source的實現,包括Avro Source、 Exce Source、 Spooling
Directory Source、 NetCat Source、 Syslog Source、 Syslog TCP Source、
Syslog UDP Source、 HTTP Source、 HDFS Source, etc。
2)Channel: Flume Channel主要提供一個隊列的功能,對source提供中的數據進行簡單的緩存。
Flume對於Channel, 則提供了Memory Channel、 JDBC Chanel、 File Channel,etc
3)Sink: Flume Sink取出Channel中的數據,進行相應的存儲文件系統,數據庫,或者提交到遠程服務器。
包括HDFS sink、 Logger sink、 Avro sink、 File Roll sink、 Null sink、 HBasesink, etc。
6.1、Source
Spool Source 如何使用?
在實際使用的過程中,可以結合log4j使用,使用log4j的時候,將log4j的文件分割機制設為1分鍾一次,將文件拷貝到spool的監控目錄。
log4j有一個TimeRolling的插件,可以把log4j分割的文件到spool目錄。基本實現了實時的監控。 Flume在傳完文件之后,將會修 改文
件的后綴,變為.COMPLETED(后綴也可以在配置文件中靈活指定)
Exec Source 和Spool Source 比較
1) ExecSource可以實現對日志的實時收集,但是存在Flume不運行或者指令執行出錯時,將無法收集到日志數據,無法何證日志數據
的完整性。
2) SpoolSource雖然無法實現實時的收集數據,但是可以使用以分鍾的方式分割文件,趨近於實時。
3)總結:如果應用無法實現以分鍾切割日志文件的話,可以兩種 收集方式結合使用。
6.2、Channel
1)MemoryChannel可以實現高速的吞吐, 但是無法保證數據完整性
2)MemoryRecoverChannel在官方文檔的建議上已經建義使用FileChannel來替換。
FileChannel保證數據的完整性與一致性。在具體配置不現的FileChannel時,建議FileChannel設置的目錄和程序日志文件保存的目錄
設成不同的磁盤,以便提高效率。
6.3、Sink
Flume Sink在設置存儲數據時,可以向文件系統中,數據庫中, hadoop中儲數據,在日志數據較少時,可以將數據存儲在文件系中,並
且設定一定的時間間隔保存數據。在日志數據較多時,可以將相應的日志數據存儲到Hadoop中,便於日后進行相應的數據分析。
使用案例
一、安裝
1.下載安裝包
2.配置環境變量
3.修改配置文件(案例給出)
4.啟動服務(案例給出)
5.驗證 flume-ng -version
二、flume的案例
案例1:Avro 可以發送一個給定的文件給Flume,Avro 源使用AVRO RPC機制
(a)創建agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/avro.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(b)啟動服務 flume agent a1
flume-ng agent -c .-f /home/hadoop/flume-1.5.0-bin/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
(c)創建指定文件
echo "hello world" > /home/hadoop/flume-1.5.0-bin/log.00
(d)使用avro-client發送文件
flume-ng avro-client -c . -H m1 -p 4141 -F /home/hadoop/flume-1.5.0-bin/log.00
(f)在m1的控制台,可以看到以下信息,注意最后一行: hello world
案例2:Spool 監測配置的目錄下新增的文件,並將文件中的數據讀取出來
需要注意兩點:
1) 拷貝到spool目錄下的文件不可以再打開編輯。
2) spool目錄下不可包含相應的子目錄
(a)創建agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/spool.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= spooldir a1.sources.r1.channels = c1 a1.sources.r1.spoolDir = /home/hadoop/flume-1.5.0-bin/logs a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(b)啟動服務flume agent a1
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console
(c)追加文件到/home/hadoop/flume-1.5.0-bin/logs目錄
echo "spool test1" > /home/hadoop/flume-1.5.0-bin/logs/spool_text.log
(d)在m1的控制台,可以看到以下相關信息:
Event: { headers:{file=/home/hadoop/flume-1.5.0-bin/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 spool test1 }
案例3:Exec 執行一個給定的命令獲得輸出的源,如果要使用tail命令,必選使得file足夠大才能看到輸出內容
(a)創建agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= exec a1.sources.r1.channels = c1 a1.sources.r1.command= tail-F /home/hadoop/flume-1.5.0-bin/log_exec_tail # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(b)啟動服務flume agent a1
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console
(c)生成足夠多的內容在文件里
for i in {1..100};do echo "exec tail$i" >> /home/hadoop/flume-1.5.0-bin/log_exec_tail;echo $i;sleep 0.1;done
(e)在m1的控制台,可以看到以下信息:
Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 74 65 73 74 exec tail test } Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 74 65 73 74 exec tail test }
案例4:Syslogtcp 監聽TCP的端口做為數據源
(a)創建agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(b)啟動flume agent a1
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console
(c)測試產生syslog
echo "hello idoall.org syslog" | nc localhost 5140
(d)在m1的控制台,可以看到以下信息:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }
案例5:JSONHandler
(a)創建agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/post_json.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 8888 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(b)啟動flume agent a1
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console
(c)生成JSON 格式的POST request
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]' http://localhost:8888
(d)在m1的控制台,可以看到以下信息:
Event: { headers:{b=b1, a=a1} body: 69 64 6F 61 6C 6C 2E 6F 72 67 5F 62 6F 64 79 idoall.org_body }
案例6:Hadoop sink
(a)創建agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type= hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://m1:9000/user/flume/syslogtcp a1.sinks.k1.hdfs.filePrefix = Syslog a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(b)啟動flume agent a1
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console
(c)測試產生syslog
echo "hello idoall flume -> hadoop testing one" | nc localhost 5140
(d) 在m1上再打開一個窗口,去hadoop上檢查文件是否生成
hadoop fs -ls /user/flume/syslogtcp hadoop fs -cat /user/flume/syslogtcp/Syslog.1407644509504
案例7:File Roll Sink
(a)創建agent配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5555 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type= file_roll a1.sinks.k1.sink.directory = /home/hadoop/flume-1.5.0-bin/logs # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(b)啟動flume agent a1
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console
(c)測試產生log
echo "hello idoall.org syslog" | nc localhost 5555 echo "hello idoall.org syslog 2" | nc localhost 5555
(d)查看/home/hadoop/flume-1.5.0-bin/logs下是否生成文件,默認每30秒生成一個新文件
ll /home/hadoop/flume-1.5.0-bin/logs cat /home/hadoop/flume-1.5.0-bin/logs/1407646164782-1 cat /home/hadoop/flume-1.5.0-bin/logs/1407646164782-2 hello idoall.org syslog hello idoall.org syslog 2
案例8:Replicating Channel Selector Flume支持Fan out流從一個源到多個通道
有兩種模式的Fan out,分別是復制和復用。在復制的情況下,流的事件被發送到所有的配置通道。在復用的情況下,事件被發送到可用的渠道中的一個子集。Fan out流需要指定源和Fan out通道的規則。這次我們需要用到m1,m2兩台機器
(a)在m1創建replicating_Channel_Selector配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type= replicating # Describe the sink a1.sinks.k1.type= avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname= m1 a1.sinks.k1.port = 5555 a1.sinks.k2.type= avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname= m2 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type= memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
(b)在m1創建replicating_Channel_Selector_avro配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(c)在m1上將2個配置文件復制到m2上一份
scp -r /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf scp -r /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf
(d)打開4個窗口,在m1和m2上同時啟動兩個flume agent
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console
(e)然后在m1或m2的任意一台機器上,測試產生syslog
echo "hello idoall.org syslog" | nc localhost 5140
(f)在m1和m2的sink窗口,分別可以看到以下信息,這說明信息得到了同步:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }
案例9:Multiplexing Channel Selector
(a)在m1創建Multiplexing_Channel_Selector配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type= org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type= multiplexing a1.sources.r1.selector.header = type #映射允許每個值通道可以重疊。默認值可以包含任意數量的通道。 a1.sources.r1.selector.mapping.baidu = c1 a1.sources.r1.selector.mapping.ali = c2 a1.sources.r1.selector.default = c1 # Describe the sink a1.sinks.k1.type= avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname= m1 a1.sinks.k1.port = 5555 a1.sinks.k2.type= avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname= m2 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type= memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
(b)在m1創建Multiplexing_Channel_Selector_avro配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(c)將2個配置文件復制到m2上一份
scp -r /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf scp -r /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf
(d)打開4個窗口,在m1和m2上同時啟動兩個flume agent
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console
(e)然后在m1或m2的任意一台機器上,測試產生syslog
curl -X POST -d '[{ "headers" :{"type" : "baidu"},"body" : "idoall_TEST1"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "ali"},"body" : "idoall_TEST2"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "qq"},"body" : "idoall_TEST3"}]' http://localhost:5140
(f)在m1的sink窗口,可以看到以下信息:
Event: { headers:{type=baidu} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 31} Event: { headers:{type=qq} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 33}
(g)在m2的sink窗口,可以看到以下信息:
Event: { headers:{type=ali} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 32}
可以看到,根據header中不同的條件分布到不同的channel上
案例10:Flume Sink Processors failover的機器是一直發送給其中一個sink,當這個sink不可用的時候,自動發送到下一個sink
(a)在m1創建Flume_Sink_Processors配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 #這個是配置failover的關鍵,需要有一個sink group a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 #處理的類型是failover a1.sinkgroups.g1.processor.type= failover #優先級,數字越大優先級越高,每個sink的優先級必須不相同 a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 #設置為10秒,當然可以根據你的實際狀況更改成更快或者很慢 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type= replicating # Describe the sink a1.sinks.k1.type= avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname= m1 a1.sinks.k1.port = 5555 a1.sinks.k2.type= avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname= m2 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type= memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
(b)在m1創建Flume_Sink_Processors_avro配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf a1.sources = r1 a1.sinks = k1 a1.channels = c # Describe/configure the source a1.sources.r1.type= avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(c)將2個配置文件復制到m2上一份
scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf
(d)打開4個窗口,在m1和m2上同時啟動兩個flume agent
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
(e)然后在m1或m2的任意一台機器上,測試產生log
echo "idoall.org test1 failover" | nc localhost 5140
(f)因為m2的優先級高,所以在m2的sink窗口,可以看到以下信息,而m1沒有:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }
(g)這時我們停止掉m2機器上的sink(ctrl+c),再次輸出測試數據:
echo "idoall.org test2 failover" | nc localhost 5140
(h)可以在m1的sink窗口,看到讀取到了剛才發送的兩條測試數據:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 } Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }
(i)我們再在m2的sink窗口中,啟動sink:
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
(j)輸入兩批測試數據:
echo "idoall.org test3 failover" | nc localhost 5140 && echo "idoall.org test4 failover" | nc localhost 5140
(k)在m2的sink窗口,我們可以看到以下信息,因為優先級的關系,log消息會再次落到m2上:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 } Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }
案例11:Load balancing Sink Processor load balance type和failover不同的地方是,load balance有兩個配置,一個是輪詢,一個是隨機
兩種情況下如果被選擇的sink不可用,就會自動嘗試發送到下一個可用的sink上面。
(a)在m1創建Load_balancing_Sink_Processors配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 #這個是配置Load balancing的關鍵,需要有一個sink group a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type= load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type= avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname= m1 a1.sinks.k1.port = 5555 a1.sinks.k2.type= avro a1.sinks.k2.channel = c1 a1.sinks.k2.hostname= m2 a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
(b)在m1創建Load_balancing_Sink_Processors_avro配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(c)將2個配置文件復制到m2上一份
scp -r /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf scp -r /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf
(d)打開4個窗口,在m1和m2上同時啟動兩個flume agent
flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
(e)然后在m1或m2的任意一台機器上,測試產生log,一行一行輸入,輸入太快,容易落到一台機器上
echo "idoall.org test1" | nc localhost 5140 echo "idoall.org test2" | nc localhost 5140 echo "idoall.org test3" | nc localhost 5140 echo "idoall.org test4" | nc localhost 5140
(f)在m1的sink窗口,可以看到以下信息:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 } Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }
(g)在m2的sink窗口,可以看到以下信息:
Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 } Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }
說明輪詢模式起到了作用。
案例12:Hbase sink
(a)在測試之前,請先將hbase啟動
(b)然后將以下文件復制到flume中:
cp/home/hadoop/hbase-0.96.2-hadoop2/lib/protobuf-java-2.5.0.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-client-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-common-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-protocol-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-server-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-hadoop2-compat-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-hadoop-compat-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib cp/home/hadoop/hbase-0.96.2-hadoop2/lib/htrace-core-2.04.jar /home/hadoop/flume-1.5.0-bin/lib
(c)確保test_idoall_org表在hbase中已經存在。
(d)在m1創建hbase_simple配置文件
vi /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type= syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type= logger a1.sinks.k1.type= hbase a1.sinks.k1.table = test_idoall_org a1.sinks.k1.columnFamily = name a1.sinks.k1.column = idoall a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer a1.sinks.k1.channel = memoryChannel # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(e)啟動flume agent
flume-ngagent -c . –f /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console
(f)測試產生syslog
echo "hello idoall.org from flume" | nc localhost 5140
(g)這時登錄到hbase中,可以發現新數據已經插入
hbase shell hbase(main):001:0> list TABLE hbase2hive_idoall hive2hbase_idoall test_idoall_org => ["hbase2hive_idoall","hive2hbase_idoall","test_idoall_org"] hbase(main):002:0> scan "test_idoall_org" hbase(main):004:0> quit
如有侵權,請告知!