Flume的可靠性保證:故障轉移、負載均衡


Sink groups允許組織多個sink到一個實體上。 Sink processors能夠提供在組內所有Sink之間實現負載均衡的能力,而且在失敗的情況下能夠進行故障轉移從一個Sink到另一個Sink。

下面是官方配置:

從參數類型上可以看出有3種Processors類型:default, failover(故障轉移)和 load_balance(負載均衡),當然,官網上說目前自定義processors 還不支持。

下面是官網例子

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

一、Default Sink Processor

DefaultSink Processor 接收單一的Sink,不強制用戶為Sink創建Processor

二、Failover Sink Processor(故障轉移)

FailoverSink Processor會通過配置維護了一個優先級列表。保證每一個有效的事件都會被處理。

故障轉移的工作原理是將連續失敗sink分配到一個池中,在那里被分配一個冷凍期,在這個冷凍期里,這個sink不會做任何事。一旦sink成功發送一個event,sink將被還原到live 池中。

在這配置中,要設置sinkgroups processor為failover,需要為所有的sink分配優先級,所有的優先級數字必須是唯一的,這個得格外注意。此外,failover time的上限可以通過maxpenalty 屬性來進行設置。

下面是官網配置:

下面是官網例子

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

這里首先要申明一個sinkgroups,然后再設置2個sink ,k1與k2,其中2個優先級是5和10,而processor的maxpenalty被設置為10秒,默認是30秒。‘

下面是測試例子

 1 #配置文件:failover_sink_case13.conf
 2 #Name the components on this agent
 3 a1.sources= r1
 4 a1.sinks= k1 k2
 5 a1.channels= c1 c2
 6 
 7 a1.sinkgroups= g1
 8 a1.sinkgroups.g1.sinks= k1 k2
 9 a1.sinkgroups.g1.processor.type= failover
10 a1.sinkgroups.g1.processor.priority.k1= 5
11 a1.sinkgroups.g1.processor.priority.k2= 10
12 a1.sinkgroups.g1.processor.maxpenalty= 10000
13 
14 #Describe/configure the source
15 a1.sources.r1.type= syslogtcp
16 a1.sources.r1.port= 50000
17 a1.sources.r1.host= 192.168.233.128
18 a1.sources.r1.channels= c1 c2
19 
20 #Describe the sink
21 a1.sinks.k1.type= avro
22 a1.sinks.k1.channel= c1
23 a1.sinks.k1.hostname= 192.168.233.129
24 a1.sinks.k1.port= 50000
25 
26 a1.sinks.k2.type= avro
27 a1.sinks.k2.channel= c2
28 a1.sinks.k2.hostname= 192.168.233.130
29 a1.sinks.k2.port= 50000
30 # Usea channel which buffers events in memory
31 a1.channels.c1.type= memory
32 a1.channels.c1.capacity= 1000
33 a1.channels.c1.transactionCapacity= 100

 

這里設置了2個channels與2個sinks ,關於故障轉移的設置直接復制官網的例子。我們還要配置2個sinks對於的代理。

下面是第一個接受復制事件代理配置

 1 #配置文件:replicate_sink1_case11.conf
 2 # Name the components on this agent
 3 a2.sources = r1
 4 a2.sinks = k1
 5 a2.channels = c1
 6 
 7 # Describe/configure the source
 8 a2.sources.r1.type = avro
 9 a2.sources.r1.channels = c1
10 a2.sources.r1.bind = 192.168.233.129
11 a2.sources.r1.port = 50000
12 
13 # Describe the sink
14 a2.sinks.k1.type = logger
15 a2.sinks.k1.channel = c1
16 
17 # Use a channel which buffers events inmemory
18 a2.channels.c1.type = memory
19 a2.channels.c1.capacity = 1000
20 a2.channels.c1.transactionCapacity = 100

 

下面是第二個接受復制事件代理配置:

 1 #配置文件:replicate_sink2_case11.conf
 2 # Name the components on this agent
 3 a3.sources = r1
 4 a3.sinks = k1
 5 a3.channels = c1
 6 
 7 # Describe/configure the source
 8 a3.sources.r1.type = avro
 9 a3.sources.r1.channels = c1
10 a3.sources.r1.bind = 192.168.233.130
11 a3.sources.r1.port = 50000
12 
13 # Describe the sink
14 a3.sinks.k1.type = logger
15 a3.sinks.k1.channel = c1
16 
17 # Use a channel which buffers events inmemory
18 a3.channels.c1.type = memory
19 a3.channels.c1.capacity = 1000
20 a3.channels.c1.transactionCapacity = 100

 

敲命令

首先先啟動2個接受復制事件代理,如果先啟動源發送的代理,會報他找不到sinks的綁定,因為2個接事件的代理還未起來。

flume-ng agent -c conf -f conf/replicate_sink1_case11.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/replicate_sink2_case11.conf -n a1 -Dflume.root.logger=INFO,console

在啟動源發送的代理

flume-ng agent -c conf -f conf/failover_sink_case13.conf -n a1 -Dflume.root.logger=INFO,console

啟動成功后

打開另一個終端輸入,往偵聽端口送數據

echo "hello failoversink" | nc 192.168.233.128 50000

在啟動源發送的代理終端查看console輸出

因為k1的優先級是5,K2是10因此當K2正常運行的時候,是發送到K2的

然后我們中斷K2的代理進程。

再嘗試往偵聽端口送數據

echo "hello close k2"| nc 192.168.233.128 50000

我們發現源代理發生事件到K2失敗,然后他將K2放入到failover list(故障列表)

因為K1還是正常運行的,因此這個時候他會接收到數據。

然后我們再打開K2的大理進程,我們繼續往偵聽端口送數據

echo " hello open k2 again" | nc192.168.233.128 50000

三、Load balancing SinkProcessor(負載均衡)

負載均衡片處理器提供在多個Sink之間負載平衡的能力。實現支持通過round_robin(輪詢)或者random(隨機)參數來實現負載分發

默認情況下使用round_robin,但可以通過配置覆蓋這個默認值。還可以通過集成AbstractSinkSelector類來實現用戶自己的選擇機制。

當被調用的時候,這選擇器通過配置的選擇規則選擇下一個sink來調用。

下面是官網配置

下面是官網的例子

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

下面是測試例子

 1 #配置文件:load_sink_case14.conf
 2 # Name the components on this agent
 3 a1.sources = r1
 4 a1.sinks = k1 k2
 5 a1.channels = c1
 6 
 7 a1.sinkgroups = g1
 8 a1.sinkgroups.g1.sinks = k1 k2
 9 a1.sinkgroups.g1.processor.type =load_balance
10 a1.sinkgroups.g1.processor.backoff = true
11 a1.sinkgroups.g1.processor.selector =round_robin
12 
13 # Describe/configure the source
14 a1.sources.r1.type = syslogtcp
15 a1.sources.r1.port = 50000
16 a1.sources.r1.host = 192.168.233.128
17 a1.sources.r1.channels = c1
18 
19 # Describe the sink
20 a1.sinks.k1.type = avro
21 a1.sinks.k1.channel = c1
22 a1.sinks.k1.hostname = 192.168.233.129
23 a1.sinks.k1.port = 50000
24 
25 a1.sinks.k2.type = avro
26 a1.sinks.k2.channel = c1
27 a1.sinks.k2.hostname = 192.168.233.130
28 a1.sinks.k2.port = 50000
29 # Use a channel which buffers events inmemory
30 a1.channels.c1.type = memory
31 a1.channels.c1.capacity = 1000
32 a1.channels.c1.transactionCapacity = 100

 

這里要說明的是,因此測試的是負載均衡的例子,因此這邊使用一個channel來作為數據傳輸通道。這里sinks的對應的接收數據的代理配置,我們沿用故障轉移的接收代理配置。

敲命令

首先先啟動2個接受復制事件代理,如果先啟動源發送的代理,會報他找不到sinks的綁定,因為2個接事件的代理還未起來。

flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1
-Dflume.root.logger=INFO,console

flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1
-Dflume.root.logger=INFO,console

在啟動源發送的代理

flume-ng agent -cconf -f conf/load_sink_case14.conf -n a1 -Dflume.root.logger=INFO,console

啟動成功后

打開另一個終端輸入,往偵聽端口送數據

echo "loadbanlancetest1" | nc 192.168.233.128 50000
echo "loadbantest2" | nc 192.168.233.128 50000
echo "loadban test3"| nc 192.168.233.128 50000
echo "loadbantest4" | nc 192.168.233.128 50000
echo "loadbantest5" | nc 192.168.233.128 50000

在啟動源發送的代理終端查看console輸出

其中K1收到3條數據

其中K1收到2條數據

因為我們負載均衡選擇的類型是輪詢,因此可以看出flume 讓代理每次向一個sink發送2次事件數據后就換另一個sinks 發送。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM