flume使用示例


flume的特點:

flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(比如文本、HDFS、Hbase等)的能力 。

flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日志數據(字節數組形式)並且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。Sink負責持久化日志或者把事件推向另一個Source。

 

 flume的可靠性 :

 當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功后,再刪除;如果數據發送失敗,可以重新發送。),Store on failure(這也是scribe采用的策略,當數據接收方crash時,將數據寫到本地,待恢復后,繼續發送),Besteffort(數據發送到接收方后,不會進行確認)。

 

flume的可恢復性:

還是靠Channel。推薦使用FileChannel,事件持久化在本地文件系統里(性能較差)。 

 

flume的一些核心概念:

Agent使用JVM 運行Flume。每台機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。

 

Client生產數據,運行在一個獨立的線程。

Source從Client收集數據,傳遞給Channel。

Sink從Channel收集數據,運行在一個獨立線程。

Channel連接 sources 和 sinks ,這個有點像一個隊列。

Events可以是日志記錄、 avro 對象等。

Flume以agent為最小的獨立運行單位。一個agent就是一個JVM。單agent由Source、Sink和Channel三大組件構成,如下圖:

 

  值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不同類型的Source,Channel和Sink可以自由組合。組合方式基於用戶設置的配置文件,非常靈活。比如:Channel可以把事件暫存在內存里,也可以持久化到本地硬盤上。Sink可以把日志寫入HDFS, HBase,甚至是另外一個Source等等。Flume支持用戶建立多級流,也就是說,多個agent可以協同工作,並且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,這也正是NB之處。如下圖所示:

 

  

二、如何安裝?
    1.下載安裝包

           2.配置環境變量

           3.修改配置文件(案例給出)

           4.啟動服務(案例給出)

           5.驗證   flume-ng -version

三、flume的案例
案例1:Avro 可以發送一個給定的文件給Flume,Avro 源使用AVRO RPC機制

 

(a)創建agent配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/avro.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

   (b)啟動服務 flume agent a1

 

flume-ng agent -c .-f /home/hadoop/flume-1.5.0-bin/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

  (c)創建指定文件

 

echo "hello world" > /home/hadoop/flume-1.5.0-bin/log.00

  

(d)使用avro-client發送文件

 

flume-ng avro-client -c . -H m1 -p 4141 -F /home/hadoop/flume-1.5.0-bin/log.00

      

(f)在m1的控制台,可以看到以下信息,注意最后一行:  hello world

 

 

案例2:Spool 監測配置的目錄下新增的文件,並將文件中的數據讀取出來。需要注意兩點:

1) 拷貝到spool目錄下的文件不可以再打開編輯。
2) spool目錄下不可包含相應的子目錄
 

(a)創建agent配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/spool.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= spooldir

a1.sources.r1.channels = c1

a1.sources.r1.spoolDir = /home/hadoop/flume-1.5.0-bin/logs

a1.sources.r1.fileHeader = true

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

      

(b)啟動服務flume agent a1

 

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console

(c)追加文件到/home/hadoop/flume-1.5.0-bin/logs目錄

 

echo "spool test1" > /home/hadoop/flume-1.5.0-bin/logs/spool_text.log

(d)在m1的控制台,可以看到以下相關信息:

 Event: { headers:{file=/home/hadoop/flume-1.5.0-bin/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31        spool test1 }

 

案例3:Exec 執行一個給定的命令獲得輸出的源,如果要使用tail命令,必選使得file足夠大才能看到輸出內容

(a)創建agent配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= exec

a1.sources.r1.channels = c1

a1.sources.r1.command= tail-F /home/hadoop/flume-1.5.0-bin/log_exec_tail

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(b)啟動服務flume agent a1

 

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console

(c)生成足夠多的內容在文件里

 

for i in {1..100};do echo "exec tail$i" >> /home/hadoop/flume-1.5.0-bin/log_exec_tail;echo $i;sleep 0.1;done

(e)在m1的控制台,可以看到以下信息:

 

Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 74 65 73 74 exec tail test }

Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 74 65 73 74 exec tail test }

 

案例4:Syslogtcp 監聽TCP的端口做為數據源
(a)創建agent配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

   

 (b)啟動flume agent a1

 

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console

 (c)測試產生syslog

 

echo "hello idoall.org syslog" | nc localhost 5140

 (d)在m1的控制台,可以看到以下信息:

 

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }

  

案例5:JSONHandler

(a)創建agent配置文件

 

 

vi /home/hadoop/flume-1.5.0-bin/conf/post_json.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 8888

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(b)啟動flume agent a1

 

 

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console

(c)生成JSON 格式的POST request

 

curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]' http://localhost:8888

(d)在m1的控制台,可以看到以下信息:

 

Event: { headers:{b=b1, a=a1}

body: 69 64 6F 61 6C 6C 2E 6F 72 67 5F 62 6F 64 79  idoall.org_body }

    

案例6:Hadoop sink
(a)創建agent配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type= hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://m1:9000/user/flume/syslogtcp

a1.sinks.k1.hdfs.filePrefix = Syslog

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(b)啟動flume agent a1

 

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console

(c)測試產生syslog

 

echo "hello idoall flume -> hadoop testing one" | nc localhost 5140

(d) 在m1上再打開一個窗口,去hadoop上檢查文件是否生成

 

hadoop fs -ls /user/flume/syslogtcp

hadoop fs -cat /user/flume/syslogtcp/Syslog.1407644509504

    

案例7:File Roll Sink
(a)創建agent配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5555

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type= file_roll

a1.sinks.k1.sink.directory = /home/hadoop/flume-1.5.0-bin/logs

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(b)啟動flume agent a1

 

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console

(c)測試產生log

 

echo "hello idoall.org syslog" | nc localhost 5555

echo "hello idoall.org syslog 2" | nc localhost 5555

(d)查看/home/hadoop/flume-1.5.0-bin/logs下是否生成文件,默認每30秒生成一個新文件

 

ll /home/hadoop/flume-1.5.0-bin/logs

cat /home/hadoop/flume-1.5.0-bin/logs/1407646164782-1

cat /home/hadoop/flume-1.5.0-bin/logs/1407646164782-2

hello idoall.org syslog

hello idoall.org syslog 2

 

 

案例8:Replicating Channel Selector Flume支持Fan out流從一個源到多個通道。有兩種模式的Fan out,分別是復制和復用。在復制的情況下,流的事件被發送到所有的配置通道。在復用的情況下,事件被發送到可用的渠道中的一個子集。Fan out流需要指定源和Fan out通道的規則。這次我們需要用到m1,m2兩台機器
(a)在m1創建replicating_Channel_Selector配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf

 

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.type= replicating

# Describe the sink

a1.sinks.k1.type= avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname= m1

a1.sinks.k1.port = 5555

a1.sinks.k2.type= avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname= m2

a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type= memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

(b)在m1創建replicating_Channel_Selector_avro配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5555

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(c)在m1上將2個配置文件復制到m2上一份

 

scp -r /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf

 scp -r /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf

(d)打開4個窗口,在m1和m2上同時啟動兩個flume agent

 

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console

(e)然后在m1或m2的任意一台機器上,測試產生syslog

 

echo "hello idoall.org syslog" | nc localhost 5140

(f)在m1和m2的sink窗口,分別可以看到以下信息,這說明信息得到了同步:

 

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }

 

案例9:Multiplexing Channel Selector
(a)在m1創建Multiplexing_Channel_Selector配置文件

 

 

vi /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf

 

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type= org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.type= multiplexing

a1.sources.r1.selector.header = type

#映射允許每個值通道可以重疊。默認值可以包含任意數量的通道。

a1.sources.r1.selector.mapping.baidu = c1

a1.sources.r1.selector.mapping.ali = c2

a1.sources.r1.selector.default = c1

# Describe the sink

a1.sinks.k1.type= avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname= m1

a1.sinks.k1.port = 5555

a1.sinks.k2.type= avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname= m2

a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type= memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

(b)在m1創建Multiplexing_Channel_Selector_avro配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5555

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(c)將2個配置文件復制到m2上一份

 

scp -r /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf

scp -r /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf

(d)打開4個窗口,在m1和m2上同時啟動兩個flume agent

 

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console

 

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console

(e)然后在m1或m2的任意一台機器上,測試產生syslog

 

curl -X POST -d '[{ "headers" :{"type" : "baidu"},"body" : "idoall_TEST1"}]' http://localhost:5140 &&

curl -X POST -d '[{ "headers" :{"type" : "ali"},"body" : "idoall_TEST2"}]' http://localhost:5140 &&

curl -X POST -d '[{ "headers" :{"type" : "qq"},"body" : "idoall_TEST3"}]' http://localhost:5140

(f)在m1的sink窗口,可以看到以下信息:

 

Event: { headers:{type=baidu} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 31} 

Event: { headers:{type=qq} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 33}

(g)在m2的sink窗口,可以看到以下信息:

 

Event: { headers:{type=ali} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 32}

可以看到,根據header中不同的條件分布到不同的channel上

 

案例10:Flume Sink Processors failover的機器是一直發送給其中一個sink,當這個sink不可用的時候,自動發送到下一個sink。

(a)在m1創建Flume_Sink_Processors配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf

 

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

#這個是配置failover的關鍵,需要有一個sink group

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

#處理的類型是failover

a1.sinkgroups.g1.processor.type= failover

#優先級,數字越大優先級越高,每個sink的優先級必須不相同

a1.sinkgroups.g1.processor.priority.k1 = 5

a1.sinkgroups.g1.processor.priority.k2 = 10

#設置為10秒,當然可以根據你的實際狀況更改成更快或者很慢

a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.type= replicating

# Describe the sink

a1.sinks.k1.type= avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname= m1

a1.sinks.k1.port = 5555

a1.sinks.k2.type= avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname= m2

a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type= memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

(b)在m1創建Flume_Sink_Processors_avro配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf

  

a1.sources = r1

a1.sinks = k1

a1.channels = c

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5555

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(c)將2個配置文件復制到m2上一份

 

scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf

 

scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf

(d)打開4個窗口,在m1和m2上同時啟動兩個flume agent

 

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console

(e)然后在m1或m2的任意一台機器上,測試產生log

 

echo "idoall.org test1 failover" | nc localhost 5140

(f)因為m2的優先級高,所以在m2的sink窗口,可以看到以下信息,而m1沒有:

 

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }

(g)這時我們停止掉m2機器上的sink(ctrl+c),再次輸出測試數據:

 

echo "idoall.org test2 failover" | nc localhost 5140

(h)可以在m1的sink窗口,看到讀取到了剛才發送的兩條測試數據:

 

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }

(i)我們再在m2的sink窗口中,啟動sink:

 

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

(j)輸入兩批測試數據:

 

echo "idoall.org test3 failover" | nc localhost 5140 && echo "idoall.org test4 failover" | nc localhost 5140

(k)在m2的sink窗口,我們可以看到以下信息,因為優先級的關系,log消息會再次落到m2上:

 

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }

 

 

案例11:Load balancing Sink Processor load balance type和failover不同的地方是,load balance有兩個配置,一個是輪詢,一個是隨機。兩種情況下如果被選擇的sink不可用,就會自動嘗試發送到下一個可用的sink上面。

(a)在m1創建Load_balancing_Sink_Processors配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf

  

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1

#這個是配置Load balancing的關鍵,需要有一個sink group

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type= load_balance

a1.sinkgroups.g1.processor.backoff = true

a1.sinkgroups.g1.processor.selector = round_robin

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type= avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname= m1

a1.sinks.k1.port = 5555

a1.sinks.k2.type= avro

a1.sinks.k2.channel = c1

a1.sinks.k2.hostname= m2

a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

(b)在m1創建Load_balancing_Sink_Processors_avro配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf

  

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5555

# Describe the sink

a1.sinks.k1.type= logger

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(c)將2個配置文件復制到m2上一份

 

scp -r /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf

 

scp -r /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf root@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf

(d)打開4個窗口,在m1和m2上同時啟動兩個flume agent

 

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console

(e)然后在m1或m2的任意一台機器上,測試產生log,一行一行輸入,輸入太快,容易落到一台機器上

 

echo "idoall.org test1" | nc localhost 5140

echo "idoall.org test2" | nc localhost 5140

echo "idoall.org test3" | nc localhost 5140

echo "idoall.org test4" | nc localhost 5140

(f)在m1的sink窗口,可以看到以下信息:

 

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }

(g)在m2的sink窗口,可以看到以下信息:

 

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }

Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }

 說明輪詢模式起到了作用。    

 

 

案例12:Hbase sink
 (a)在測試之前,請先將hbase啟動

(b)然后將以下文件復制到flume中:

 

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/protobuf-java-2.5.0.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-client-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-common-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-protocol-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-server-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-hadoop2-compat-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/hbase-hadoop-compat-0.96.2-hadoop2.jar /home/hadoop/flume-1.5.0-bin/lib

cp/home/hadoop/hbase-0.96.2-hadoop2/lib/htrace-core-2.04.jar /home/hadoop/flume-1.5.0-bin/lib

(c)確保test_idoall_org表在hbase中已經存在。

(d)在m1創建hbase_simple配置文件

 

vi /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf

  

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type= syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type= logger

a1.sinks.k1.type= hbase

a1.sinks.k1.table = test_idoall_org

a1.sinks.k1.columnFamily = name

a1.sinks.k1.column = idoall

a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

a1.sinks.k1.channel = memoryChannel

# Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(e)啟動flume agent

 

flume-ngagent -c . –f /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console

(f)測試產生syslog

 

echo "hello idoall.org from flume" | nc localhost 5140

(g)這時登錄到hbase中,可以發現新數據已經插入

 

hbase shell

  

hbase(main):001:0> list

TABLE                                                                                                        

hbase2hive_idoall                                                                                                  

hive2hbase_idoall                                                                                                  

test_idoall_org

                                                                                                 

=> ["hbase2hive_idoall","hive2hbase_idoall","test_idoall_org"]

 

hbase(main):002:0> scan "test_idoall_org"

  

hbase(main):004:0> quit

    經過這么多flume的例子測試,如果你全部做完后,會發現flume的功能真的很強大,可以進行各種搭配來完成你想要的工作,俗話說師傅領進門,修行在個人,如何能夠結合你的產品業務,將flume更好的應用起來,快去動手實踐吧。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM