本文系微博運維數據平台(DIP)在Flume方面的優化擴展經驗總結,在使用Flume FileChannel的場景下將吞吐率由10M/s~20M/s提升至80M/s~90M/s,分為四個部分進行介紹:
- 應用場景
- Flume實例架構
- Flume調試及優化擴展
- Flume Todo
- 生產環境部署
1. 應用場景
我們的應用場景是一個典型的實時數據傳輸(接收)過程,架構圖如下:

包括三個組件:
(1)ServiceServer ScribeClient:業務產生的日志以“Log”的形式寫入業務部署服務器的本地磁盤,然后通過ScribeClient傳輸至我們的Flume集群;
(2)Flume:使用多個Flume實例構建Flume集群,通過動態域名、VIP對外提供服務;其中,每一個Flume實例使用ScribeSource接收ServcieServer ScribeClient傳輸過來的日志數據,然后使用FileChannel將ScribeSource接收過來的數據以“事務”的形式持久化至本地磁盤,最近通過KafkaSink將FileChannle中的數據輸出至Kafka集群;
(3)Kakfa:Kafka集群接收Flume集群傳輸過來的日志數據,用於后續的實時計算;
可以看出,以上整個過程就是日志實時寫入Kafka集群的過程,有幾點需要特殊說明:
(1)既然是實時數據傳輸,為什么不直接通過Kafka Producer API(或基於此實現的開源組件)將日志數據直接寫入Kafka集群,而是使用Scribe間接傳輸數據?
假設我們有一個Web服務,需要將Web的訪問日志實時寫入Kafka集群,這個可以通過Log4j擴展實現(不確定是否已有開源組件支持),這種方式數據實時性較強,但是Kafka集群運行過程中一旦出現異常(如:網絡流量波動)會直接影響該Web服務的運行狀態,進而影響線上業務,因此不能使用這種直接傳輸的方式;
Scribe可以在數據接收服務(這里特指Flume集群,也可以是Kafka)出現異常或不可用的情況下,暫時將數據緩存至本地磁盤,待數據接收服務恢復之后,繼續數據傳輸;雖然數據傳輸的實時性有所損耗,但整個數據傳輸過程更加可靠,而且避免了數據傳輸對線上服務的影響,因此使用這種間接傳輸的方式。
(2)Flume為什么使用FileChannel,而不使用吞吐率更高的MemoryChannel?
MemoryChannel使用內存存儲事務,吞吐率極高,但基於內存的事務實現模式在Flume部署服務器宕機或Flume實例異常終止的情況下,所有存儲在內存中的日志數據將全部丟失;另外,內存空間受限於RAM和JVM的約束,數據傳輸量波動(如數據量猛增)的情況下可能會引發異常;
FileChannel使用基於本地磁盤的事務實現模式,即使出現Flume部署服務器宕機或Flume實例異常終止的情況,因為接收到的日志數據都以事務的形式持久化至本地磁盤,可以在Flume實例恢復正常之后繼續數據傳輸,不會有數據丟失的情況;而且本地磁盤相對於內存而言,存儲空間比較富余,數據可靠性較強,因此使用FileChannel。
2. Flume實例架構
在我們的應用場景中,對於單獨一個Flume實例而言,架構如下:

宏觀上看,Flume實例內部僅有三個組件:ScribeSource、FileChannel、KafkaSink,實際上內部的結構還是比較復雜的,如下圖所示:

這里先介紹兩個比較重要的實例:
Receiver:Receiver是一個線程,對於Flume ScribeSource而言可以設置多個Receiver線程(通過指定ScribeSource workerThreads數值實現),它不斷地將Flume ScribeSource接收到的數據以“事務”的形式寫入FileChannel;
PollingRunner:PollingRunner也是一個線程,它不斷地將FileChannel中的數據以“事務”的形式讀取出來並寫入Kafka;
對應的Flume配置文件:
myagent.sources = scribe_source
myagent.channels = file_channel
myagent.sinks = kafka_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define file channel
myagent.channels.file_channel.type = file
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDirs = /data0/flume/data
# define kafka sink
myagent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka_sink.topic = mytopic
myagent.sinks.kafka_sink.brokerList = kafkahost:9092
myagent.sinks.kafka_sink.requiredAcks = 1
myagent.sinks.kafka_sink.batchSize = 1000
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = file_channel
myagent.sinks.kafka_sink.channel = file_channel
3. Flume調試及優化擴展
為了方便Flume的調試,我們自己開發模擬了一個Scrbie Client Simulator實例,可以兼容Scribe通信協議,以每秒大約90M/s的速率輸出數據至Flume(這里特指單實例Flume),其中模擬的日志數據來源於我們平台常見的業務數據,后續的討論均建立在這個Scribe Client Simulator實例的基礎上。
3.1 ScribeSource
ScribeSource中有一個非常重要的配置屬性“workerThreads”,如上所述,它的值被設定為5,那么這個值是如何得出的呢,它又會產生什么樣的作用?
ScribeSource中的每一個WorkerThread就是一個Receiver實例,即“workerThreads”的值決定着ScribeSource中有幾個Receiver實例,有多少個Receiver實例直接影響着ScribeSource接收數據的速率,調試過程如下:
(1)為了避免Channel自身的性能瓶頸對ScribeSource的影響,我們這里使用吞吐率極高的MemoryChannel;
(2)為了避免Sink自身的性能瓶頸對ScribeSource、MemoryChannel的影響,我們這里使用NullSink,它會將消費到的數據直接丟棄;
經過上述兩步,我們可以認為Flume ScribeSource的調試過程中完全可以忽略MemoryChannel、NullSink的影響。
(3)啟動Scrbie Client Simulator實例,使它不斷地往我們的Flume實例寫入數據,觀察Flume實例部署機器的網絡寫入流量情況,進而調整“workerThreads”值的大小(建議數值從1開始,逐漸增大),使該機器的網絡寫入流量達到業務需求;
根據我們場景的具體情況,經過上述三步的測試,最終將ScribeSource workerThreads的值選定為5,吞吐率大致為80~90M/s,這是我們認為的一個理想峰值。
Flume配置如下:
myagent.sources = scribe_source
myagent.channels = memory_channel
myagent.sinks = null_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define memory channel
myagent.channels.memory_channel.type = memory
myagent.channels.memory_channel.capacity = 10000
myagent.channels.memory_channel.transactionCapacity = 10000
myagent.channels.memory_channel.byteCapacityBufferPercentage = 20
myagent.channels.memory_channel.byteCapacity = 800000
# define null sink
myagent.sinks.null_sink.type = null
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = memory_channel
myagent.sinks.null_sink.channel = memory_channel
3.2 FileChannel
3.2.1 默認FileChannel
經過3.1的測試之后,我們可以認為Flume ScribeSource不存在接收數據的性能瓶頸,接下來開始調試FileChannel,關於使用FileChannel的原因可以參考1.(2)。
在3.1Flume配置的基礎之上,修改為FileChannel,其余配置保持不變,如下:
myagent.sources = scribe_source
myagent.channels = file_channel
myagent.sinks = null_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define file channel
myagent.channels.file_channel.type = file
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDirs = /data0/flume/data
# define null sink
myagent.sinks.null_sink.type = null
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = memory_channel
myagent.sinks.null_sink.channel = memory_channel
再次重復3.1的調試過程,使用Scrbie Client Simulator實例進行數據寫入測試時, 我們發現Flume實例部署機器的網絡寫入流量下降很多,大約只有10M/s~20M/s。可以看出,在吞吐率方面,FileChannel與MemoryChannel之間有很大的差距。我們來分析一下具體的原因。
根據2.中的Flume實例架構圖,我們可以大致得出ScribeSource中的某一個Receiver與FileChannel的交互流程,如下圖:

Receiver的工作實際是一個將數據循環寫入FileChannel的過程,每一次的循環可以理解為一個指處理(批量寫入),每一次的批處理都需要經過以下幾個步驟:
(1)獲取FileChannel的事務——getTransaction;
(2)打開事務——begin;
(3)批量寫入數據——put;
(4)提交或回滾事務——commit or rollback;
(5)關閉事務——close;
經過對Flume FileChannel相關源碼的分析,導致FileChannel吞吐率下降的主要原因集中於事務的提交過程——commit,有以下兩點:
(1)鎖競爭,從上圖中可以看出,Receiver的每一次批量寫入過程中都會涉及到事務提交(不考慮異常回滾的情況),事務提交的內部過程涉及到讀鎖或寫鎖的“加鎖”操作,多個Receiver(WorkerThread線程)共存的情況下鎖競爭的情況就會比較嚴重;
(2)Writer sync,FileChanel是基於本地磁盤實現的事務模式,每一次事務的提交都會伴隨着一次“sync”,眾所周知,“sync”是一種系統性能開銷比較大的操作;
綜合上述兩點,我們可以得出,多個Receiver的存在導致FileChannel存在資源競爭的問題(多個Receiver之間無法安全的共享一個FileChannel的事務),因為需要加鎖,必然帶來相互之間鎖的競爭;某一個Receiver獲得鎖之后,又需要進行系統性能開銷比較大的“sync”操作,且耗時相對較長,這就意味着該Receiver從獲取鎖到釋放鎖的過程會花費比較長的時間,在這段時間內該Receiver獨占FileChannel,其它Receiver只能處於阻塞狀態,直至可以獲取到鎖;基於上述兩個原因,導致FileChannel在多Receiver的環境下吞吐率嚴重下降。
3.2.2 擴展FileChannel
FileChannel的實現過程是比較復雜的,直接優化FileChannel的代碼不太現實,那么是否可以通過多個FileChannel的方式來解決吞吐率嚴重下降的問題呢?如果FileChannel的數目大於或等於ScribeSource Receiver的數目,ScribeSource Receiver使用“哈希”(Hash)的方式來選取FileChannel,就可以避免ScribeSource Receiver之間相互競爭FileChannel資源,如下圖所示:

雖然對於某一個FileChannel來說,與它交互的Receiver依然要經過獲取鎖——sync——釋放鎖的過程,但多個Receiver之間是並行的,總體上吞吐率得到提升。
那么如何實現這個方案呢?這里我們需要用到Flume提供的“Custom Channel Selector”機制,即實現我們自己的“Channel Selector”,代碼如下:

這里有兩個關鍵點:
(1)隨機code的生成,目前代碼實現提供兩種選擇:event.getBody().hashCode()或者System.currentTimeMillis();
(2)根據隨機code的值對FileChannel的數目取余(哈希),從而選取出一個FileChannel並返回;
那么如何使用上述方案及自己的擴展呢?Flume配置文件如下:
myagent.sources = scribe_source
myagent.channels = file_channel file_channel2 file_channel3
myagent.sinks = kafka_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
myagent.sources.scribe_source.selector.type = com.weibo.dip.flume.extension.channel.selector.HashChannelSelector
# define file channel
myagent.channels.file_channel.type = file
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDirs = /data0/flume/data
# define file channel2
myagent.channels.file_channel2.type = file
myagent.channels.file_channel2.checkpointDir = /data0/flume/checkpoint2
myagent.channels.file_channel2.dataDirs = /data0/flume/data2
# define file channel3
myagent.channels.file_channel3.type = file
myagent.channels.file_channel3.checkpointDir = /data0/flume/checkpoint3
myagent.channels.file_channel3.dataDirs = /data0/flume/data3
# define kafka sink
myagent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka_sink.topic = mytopic
myagent.sinks.kafka_sink.brokerList = kafkahost:9092
myagent.sinks.kafka_sink.requiredAcks = 1
myagent.sinks.kafka_sink.batchSize = 1000
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = file_channel file_channel2 file_channel3
myagent.sinks.kafka_sink.channel = file_channel
配置中需要顯示指定使用我們自己擴展的“Channel Selector”:myagent.sources.scribe_source.selector.type = com.weibo.dip.flume.extension.channel.selector.HashChannelSelector;然后指定三個FileChannel,並分別關聯至ScribeSource和NullSink。
經過我們的測試大概需要10-12個FileChannel(注意:這里的數值考慮了后續的KafkaSink,詳情見后),吞吐率即可達到80~90M/s。
FileChannel的吞吐率雖然得到提升,但是這么多的FileChannel使用上述逐個配置FileChannel的方式是極其不方便維護的,應該只使用一個“FileChannel”,如下圖:

我們應該利用Flume提供的“Custom Channel”機制,自己擴展一個“FileChannel”,取名為MultithreadingFileChannel,使其內部包含多個FileChannel,從而達到簡化配置的目的,核心源碼如下:

MultithreadingFileChannel不再需要“Channel Selector”的參與,自身內部封裝了FileChannel之間的“哈希”處理邏輯,具體體現在創建事務(createTransaction)的過程中。
使用MultithreadingFileChannel的Flume配置如下:
myagent.sources = scribe_source
myagent.channels = file_channel
myagent.sinks = null_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define file channel
myagent.channels.file_channel.type = com.weibo.dip.flume.extension.channel.MultithreadingFileChannel
amyagent.channels.file_channel.channels = 12
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDir = /data0/flume/data
# define null sink
myagent.sinks.null_sink.type = null
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = file_channel
myagent.sinks.null_sink.channel = file_channel
3.2.3 KafkaSink
經過3.1、3.2的調試過程之后,我們可以認為Flume ScribeSource、MultithreadingFileChannel不存在性能瓶頸,接下來開始調試KafkaSink。
我們將3.2.2中的NullSink替換為KafkaSink,如下圖:

Flume配置文件如下:
myagent.sources = scribe_source
myagent.channels = file_channel
myagent.sinks = kafka_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define file channel
myagent.channels.file_channel.type = com.weibo.dip.flume.extension.channel.MultithreadingFileChannel
amyagent.channels.file_channel.channels = 12
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDir = /data0/flume/data
# define kafka sink
myagent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka_sink.topic = mytopic
myagent.sinks.kafka_sink.brokerList = kafkahost:9092
myagent.sinks.kafka_sink.requiredAcks = 1
myagent.sinks.kafka_sink.batchSize = 1000
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = file_channel
myagent.sinks.null_sink.channel = file_channel
啟動調試過程后我們發現,Flume實例部署機器的網絡寫入流量大約為90M/s左右,而Kakfa實例(單機版)的網絡寫入流量(即Flume實例部署機器的網絡寫出流量)僅為60M左右。熟悉Kafka的同學可能知道,這並不是因為Kafka單機版實例導致的,究其原因,主要有以下幾點:
(1)KafkaSink是一個單實例,它從MultithreadingFileChannel中讀取數據時也需要事務的參與(實際上它是與MultithreadingFileChannel中的某一個FileChannel建立事務);雖然ScribeSource與MultithreadingFileChannel FileChannels之間、MultithreadingFileChannel FileChannels與KafkaSink之間使用了“Channels Hash”機制,但不能完全排除“碰撞”發生的可能性;一旦“碰撞”發生,則表示“碰撞”發生期間,KafkaSink從MultithreadingFileChannel中讀取不到任何數據;這也是為什么MultithreadingFileChannel中的FileChannels數目需要明顯大於ScribeSource Receiver數目的原因;
(2)KafkaSink Producer也是一個單實例,也就是說只有一個Producer在寫出數據,對吞吐率也會帶來一定的影響;
參考3.2.2的方案,我們嘗試使用多個KafkaSink實例來解決這個問題,如下圖:

Flume配置文件如下:
myagent.sources = scribe_source
myagent.channels = file_channel
myagent.sinks = kafka_sink kafka_sink2 kafka_sink3
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define file channel
myagent.channels.file_channel.type = com.weibo.dip.flume.extension.channel.MultithreadingFileChannel
amyagent.channels.file_channel.channels = 12
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDir = /data0/flume/data
# define kafka sink
myagent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka_sink.topic = mytopic
myagent.sinks.kafka_sink.brokerList = kafkahost:9092
myagent.sinks.kafka_sink.requiredAcks = 1
myagent.sinks.kafka_sink.batchSize = 1000
# define kafka sink2
myagent.sinks.kafka_sink2.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka_sink2.topic = mytopic
myagent.sinks.kafka_sink2.brokerList = kafkahost:9092
myagent.sinks.kafka_sink2.requiredAcks = 1
myagent.sinks.kafka_sink2.batchSize = 1000
# define kafka sink3
myagent.sinks.kafka_sink3.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.kafka_sink3.topic = mytopic
myagent.sinks.kafka_sink3.brokerList = kafkahost:9092
myagent.sinks.kafka_sink3.requiredAcks = 1
myagent.sinks.kafka_sink3.batchSize = 1000
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = file_channel
myagent.sinks.kafka_sink.channel = file_channel
myagent.sinks.kafka_sink2.channel = file_channel
myagent.sinks.kafka_sink3.channel = file_channel
經過我們的測試大概需要8-10個KafkaSink,吞吐率即可達到80~90M/s,這么多的KafkaSink使用上述逐個配置KafkaSink的方式是極其不方便維護的,應該只使用一個“KafkaSink”,如下圖:

MultithreadingKafkaSink與MultithreadingFileChannel不同,內部並不會包含多個KafkaSink,而是包含多個ChannelConsumer;每一個ChannelConsumer都從MultithreadingFileChannel讀取數據並通過自身內部的Kafka Producer實例(也就是說,每一個ChannelConsumer實例都包含一個Kafka Producer實例)將數據寫入Kakfa。
Flume配置文件如下:
myagent.sources = scribe_source
myagent.channels = file_channel
myagent.sinks = kafka_sink
# define scribe source
myagent.sources.scribe_source.type = org.apache.flume.source.scribe.ScribeSource
myagent.sources.scribe_source.port = 1466
myagent.sources.scribe_source.workerThreads = 5
# define file channel
myagent.channels.file_channel.type = com.weibo.dip.flume.extension.channel.MultithreadingFileChannel
amyagent.channels.file_channel.channels = 12
myagent.channels.file_channel.checkpointDir = /data0/flume/checkpoint
myagent.channels.file_channel.dataDir = /data0/flume/data
# define kafka sink
myagent.sinks.kafka_sink.type = com.weibo.dip.flume.extension.sink.MultithreadingKafkaSink
myagent.sinks.kafka_sink.topicHeaderName = category
myagent.sinks.kafka_sink.consumers = 10
myagent.sinks.kafka_sink.brokerList = kafkahost:9092
myagent.sinks.kafka_sink.batchSize = 1000
# Bind the source and sink to the channel
myagent.sources.scribe_source.channels = file_channel
myagent.sinks.null_sink.channel = file_channel
綜上所述,通過我們的優化擴展和相應的參數調優,我們將ScribeClient、Flume(使用FileChannel)、Kafka之間的數據傳輸速率提升至80~90M/s。
4. Flume Todo
雖然ScribeClient、Flume、Kafka之間的數據傳輸速率經過我們的擴展優化之后達到我們的預設值,但擴展過程中引入的ScribeSource.Receivers、MultithreadingFileChannel、MultithreadingKafkaSink是否也會對Flume實例或Flume實例部署服務器帶來一些問題,這里僅僅闡述一些可能出現的問題。
(1)ScribeSource.Receivers多個線程實例、MultithreadingKafkaSink.ChannelConsumers多個線程實例是否會導致Flume實例或Flume實例部署服務器CPU使用率或負載過高?
(2)MultithreadingFileChannel多個FileChannel的使用,是否會導致Flume實例部署服務器帶來過多的磁盤開銷?
5. 生產環境部署

(1)假設Flume集群的域名為flume.dip.weibo.com,端口為1466,ScribeClient通過該域名和端口發送數據;
(2)flume.dip.weibo.com指向若干個動態域名,這些動態域名依據不同的機房進行划分,如flume.cluster.dip.weibo.com、flume.cluster2.dip.weibo.com、flume.cluster3.dip.weibo.com;動態域名在這里的作用:不同的機房的ScribeClient在向flume.dip.weibo.com寫入數據時,網絡層面會自動根據ScribeClient所在的機房將數據導入至該機房對應的Flume動態域名,即:機房內數據傳輸;
(3)每一個動態域名被映射至一個VIP;
(4)每一個VIP被映射至多個Flume實例;(3)和(4)的作用體現在Flume故障轉換和負載均衡。
備注:調試過程中我們發現,數據吞吐率達到80~90M/s時,JVM大致需要15G MEM。