1.介紹一下flume的channel
channel被設計為event中轉臨時緩沖區,存儲source收集並且沒有被sink讀取的event,平衡source收集和sink讀取的速度,可以將其視為flume內部的消息隊列。channel線程安全並且具有事務性,支持source寫失敗寫,sink讀失敗重復讀的操作。常見的類型包括Memory Channel,File Channel,Kafka Channel
2.Memory Channel與File Channel的優缺點
Memory Channel讀寫速度快,但是存儲數據量小。Flume進程掛掉、服務器停機或者重啟都會導致數據丟失。在資源充足,不關心數據丟失的場景下可以使用。
File Channel存儲容量大,無數據丟失的風險。讀寫速度慢,但可以通過配置多磁盤文件路徑,通過磁盤並行寫入提高File Channel性能。Flume將Event順序寫入到File Channel文件的末尾,可以通過配置maxFileSize參數配置數據文件大小,當文件大小達到這個值,創建新的文件,並將該文件設置為只讀,直到Flume把該文件讀取完成,刪除該文件。
3.Kafka Channel的優點有哪些
Memory Channel有很大程度丟失數據的風險,File Channel雖然無數據丟失風險,但如果緩存下來的消息來沒來得及寫入Sink,Agent就出現故障,File Channel中的消息一樣不能被繼續使用。Kafka的容錯能力解決了這一點。
Flume一旦配置了Kafka為Channel,則不再需要配置Sink組件,減少了Flume啟動的進程數,降低了服務器內存、磁盤等資源的使用率。
4.Flume的攔截器是什么
Source在將Event寫入到Channel之前可以使用攔截器對Event進行各種形式的處理,Source和Channel之間可以設置多個攔截器,不同的攔截器可以設置不同的規則對Event進行處理
5.Flume的選擇器是什么
Source發送的Event通過Channel選擇器來選擇以哪種方式寫入到Channel中,Flume提供了三種類型的選擇器,復制選擇器、復用選擇器以及自定義選擇器
1)復制選擇器:一個Source以復制的方式將一個Event寫入到多個Channel中,不同的Sink可以從不同的Channel中獲取到相同的Event。
如果Source沒有指定Channel選擇器,則該SOurce使用復制Channel選擇器,復制選擇器有一個配置參數optional,該參數指定的所有channel是可選的,當時間寫入到這些channel時有失敗發生,則忽略這些失敗,否則拋出異常,要求Source重試。
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3
2)復用選擇器:需要和攔截器配合使用,根據Event的頭信息的不同寫入到不同的Channel中。
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.optional.US = c4
a1.sources.r1.selector.default = c4
3)自定義選擇器:自定義選擇器需要實現ChannelSelector接口,或者繼承AbstractChannelSelector類。
6.了解Flume的負載均衡和故障轉移嗎
設置sink組,同一個sink組內有多個sink,不同sink之間可以配置成負載均衡或故障轉移
7.Flume的事務機制
flume基於事務傳輸event(批量傳輸),使用兩個獨立的事務分別處理source到channel和channel到sink,失敗時會將所有數據回滾進行重試。該事務遵循“最少一次”語義,因此數據不會丟失,但有可能重復。
source-channel之間的重復可以靠TailDir Source自帶的斷點續傳功能解決
put事務:
1)doPut:將批數據先寫入到臨時緩沖區putLIst(putList就是一個臨時的緩沖區)
2)doCommit:檢查channel內存隊列是否足夠合並
3)doRollback:channel內存隊列空間不足,回滾,等待內存通道的容量滿足合並
channel-sink之間的重復,可以延長等待時間,或者設置UUID攔截器,然后再redis里維護一個布隆表來使下游實時應用去重。
take事務:
1)doTake:將數據取到臨時緩沖區takeList
2)將數據發送到下一個節點
3)doCommit:如果數據全部發送成功,則清除臨時緩沖區takeList
4)doRollback:數據發送過程中如果出現異常,rollback將臨時緩沖區takeList中的數據歸還給channel內存隊列
TairDir Source配置
# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1
# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1
######## source相關配置 ########
# source類型
agent.sources.s1.type = TAILDIR
# 元數據位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 監控的目錄
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true
######## channel相關配置 ########
# channel類型
agent.channels.c1.type = file
# 數據存放路徑
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 檢查點路徑
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多緩存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐給sink多少
agent.channels.c1.transactionCapacity = 100
######## sink相關配置 ########
# sink類型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 壓縮
agent.sinks.r1.kafka.producer.compression.type = snappy
8.Flume參數調優
Source:
1)增加Source個數,可以增大Source讀取數據的能力。
2)batchSize參數決定Source一次批量運輸到Channel的event條數,適當調大這個參數可以提高Source搬運Event到Channel時的性能。
Channel:
1)使用File Channel時dataDirs配置多個不同盤下的目錄可以提高性能
2)Capacity參數決定Channel可容納最大的Event條數。transactionCapacty參數決定每次Source往Channel里面寫的最大event條數和每次sink從channel里面讀的最大event條數,transactionCapacty需要大於Source和Sink的batchSize參數
Sink:
1)適當增加Sink的個數可以增加Sink消費event的能力,但過多的sink會占用系統資源,造成不必要的浪費
2)batchSize參數決定Sink批量從Channel讀取的event條數,適當調大這個參數可以提高Sink從Channel搬運Event的性能。
