最近在做一個分布式調用鏈跟蹤系統,
在兩個地方采用了flume (我使用的flume版本是1.5.0-cdh5.4.4),一個是宿主系統 ,用flume agent進行日志搜集。 一個是從kafka拉日志分析后寫入hbase.
后面這個flume(從kafka拉日志分析后寫入flume)用了3台 , 系統上線以后 ,線上拋了一個這樣的異常:
Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doPut(MemoryChannel.java:84)
at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
從異常信息直觀理解是MemoryChannel的事務的Put隊列滿了,為什么會這樣呢?
我們先從Flume的體系結構說起,Flume是apache一個負責日志采集和傳輸的開源工具,它的特點是能夠很靈活的通過配置實現不同數據存儲系統之間的數據交換 。
它有三個最主要的組件:
Source : 負責從數據源獲取數據,包含兩種類型的Source . EventDrivenSource 和 PollableSource , 前者指的是事件驅動型數據源,故名思議,就是需要外部系統主動送數據 ,比如AvroSource ,ThriftSource ; 而PollableSource 指的是需要Source主動從數據源拉取數據 ,比如KafkaSource 。Source 獲取到數據以后向Channel 寫入Event 。
Sink : 負責從Channel拉取Event , 寫入下游存儲或者對接其他Agent.
Channel:用於實現Source和Sink之間的數據緩沖, 主要有文件通道和內存通道兩類。
Flume的架構圖如下:
而我的flume 配置如下:
a1.sources = kafkasource
a1.sinks = hdfssink hbasesink
a1.channels = hdfschannel hbasechannel
a1.sources.kafkasource.channels = hdfschannel hbasechannel
a1.sinks.hdfssink.channel = hdfschannel
a1.sinks.hbasesink.channel = hbasechannel
a1.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.kafkasource.zookeeperConnect = zk1:2181,zk2:2181,zk3:2181
a1.sources.kafkasource.topic = nagual_topic
a1.sources.kafkasource.groupId = flume
a1.sources.kafkasource.kafka.consumer.timeout.ms = 500
a1.sinks.hdfssink.type = hdfs
a1.sinks.hdfssink.hdfs.path = hdfs://namenode:8020/flume/kafka_events/%y-%m-%d/%H%M
a1.sinks.hdfssink.hdfs.filePrefix = events-prefix
a1.sinks.hdfssink.hdfs.round = true
a1.sinks.hdfssink.hdfs.roundValue = 10
a1.sinks.hdfssink.hdfs.roundUnit = minute
a1.sinks.hdfssink.hdfs.fileType = SequenceFile
a1.sinks.hdfssink.hdfs.writeFormat = Writable
a1.sinks.hdfssink.hdfs.rollInterval = 60
a1.sinks.hdfssink.hdfs.rollCount = -1
a1.sinks.hdfssink.hdfs.rollSize = -1
a1.sinks.hbasesink.type = hbase
a1.sinks.hbasesink.table = htable_nagual_tracelog
a1.sinks.hbasesink.index_table = htable_nagual_tracelog_index
a1.sinks.hbasesink.serializer =NagualTraceLogEventSerializer
a1.sinks.hbasesink.columnFamily = rpcid
a1.sinks.hbasesink.zookeeperQuorum = zk1:2181,zk2:2181,zk3:2181
a1.channels.hdfschannel.type = memory
a1.channels.hdfschannel.capacity= 10000
a1.channels.hdfschannel.byteCapacityBufferPercentage = 20
a1.channels.hdfschannel.byteCapacity = 536870912
我的flume agent從kafka拉取日志以后,轉換成hbase 的row put操作,中間采用了memchannel ,為什么會出現之前提到的異常呢? 通過通讀一遍源碼, 基本找到了問題所在:
我們把源碼拆解成以下幾個主要步驟來分析:
1、flume 的啟動:
如上圖所示, 整個flume 啟動的主要流程是這樣的:
FLUME_HOME/bin目錄中的flume-ng啟動腳本啟動Application , Application創建一個PollingPropertiesFileConfigurationProvider, 這個Provider的作用是啟動 一個配置文件的監控線程FileWatcherRunnable ,定時監控配置文件的變更,
一旦配置文件變更,則重新得到SinkRunner, SourceRunner以及channel的配置, 包裝成MaterialedConfiguration,通過google guava的eventbus 推送配置變更給Application ,Application啟動一個LifeCycleSupervisor,由它來負責監控
SourceRunner ,SinkRunner,Channel的運行情況 。 SourceRunner ,SinkRunner ,Channel都繼承或實現了LifeCycleAware接口,LifeCycleSupervisor通過定時檢查這些組件的期望狀態是否和當前狀態一致, 如果不一致則調用期望狀態對應的方法,
(具體代碼可以參考LifeCycleSupervisor的內部類MonitorRunnable) . 按照 Channel 、 SinkRunner 、 SourceRunner(可以理解為先接水管,再接水盆,再接水龍頭)順序進行啟動,每個組件的啟動都做什么呢?
以我的flume配置文件來說明:
(1) MemChannel
我們首先先介紹一下MemChannel的幾個配置參數:
capacity :控制了MemChannel中一個LinkedBlockingDeque<Event> (我們后面簡稱為MemDeque)的最大event個數。
transactionCapacity: 控制了一個MemChannel的事務(MemoryTransaction)中putList 和takeList兩個 LinkedBlockingDeque 的最大長度 。
byteCapacityBufferPercentage: 控制了MemDeque中event header 的占比, 默認是20%
byteCapacity:控制了MemDeque的最大字節數, 默認值是應用分配到的最大堆內存(Xmx參數指定)的 80% (我們稱之為x ),這個值x乘以 1 - byteCapacityBufferPercentage * 0.01 就得到了MemDeque中event body的最大字節數。如何利用這個參數來進行流控, 我們后面還有詳細說明。
keepalive : 控制了一個 MemChannel 事務從MemDeque中讀寫操作的最大阻塞時間 , 單位:秒。
啟動以后, 建立了一個LinkedBlockingDeque ,這是一個雙端隊列,可以進行雙向讀寫,並且用capacity 參數控制了它的最大長度, 另外還創建了幾個信號量Semaphore ,
queueRemaining: 標識MemDeque的初始容量,
queueStored : 標識MemDeque寫入的 event數量,也就是待處理的event 數量。
bytesRemaing:標識MemDeque 中還能寫入多少字節的flume event body;
(2) SinkRunner:
它的機制是啟動一個所謂的PollingRunner 線程 ,通過輪詢操作,調用一個 SinkProcessor來進行實際的輪詢處理, 而這個SinkProcessor則調用 Sink的process 方法進行event處理, 在輪詢的處理上,有一個所謂的 補償機制( backoff) ,就是當sink獲取不到 event 的時候, PollingRunner 線程需要等待一段backoff時間,等channel中的數據得到了補償再來進行pollling 操作。而hbasesink 在啟動的時候,則把hbase 操作相關的配置:htable, columnfamily ,hbase zk集群的配置信息准備好了。也就是說SinkRunner采用的方式是Pull .
(3) SourceRunner:
SourceRunner包含兩類, 一類是對應EventDrivenSource 的 EventDrivenSourceRunner , 一個是對應PollableSource的PollableSourceRunner , 簡單的說,前者是push ,后者是pull 。
EventDrivenSource 有代表性的是thrift source , 在本地啟動java nio server以后, 從外部接收event ,交給ThriftSource 內部的ThriftSourceHandler進行處理。
而后者PollableSourceRunner ,則通過啟動一個PollingRunner線程 ,類似SinkRunner中的輪詢處理策略 ,啟動Source , 在Source內部, 使用ChannelProcessor處理events , ChannelProcessor內部會走一組過濾器構建的過濾器鏈 ,然后通過通道選擇器ChannelSelector選擇好通道以后 ,啟動事務 ,把一批event 寫入Channel .
我們用下面這張示意圖來說明各組件啟動以后的內部運作原理:
整個圖有些復雜, 說明如下:
PollingSourceRunner通過線程啟動定時任務 ,間隔一段時間調用kafkasource 從kafka broker 拉取日志,拉完以后,進入一個ChannelProcessor,這個通道處理器先通過一個過濾器鏈對event進行過濾 ,過濾以后,通過一個ChannelSelector通道選擇器,選擇evnet要投遞的Channel , 然后啟動Channel 下的一個事務 (注意,這個事務是用ThreadLocal維持的,也就是說一個線程對應了一個事務) , 事務啟動以后,批量向事務MemoryTransaction的一個putList的尾部寫入,putlist是一個LinkedBlockingDeque .
事務提交的時候, 把putlist中的event批量移除, 轉移到MemoryChannel的一個LinkedBlockingDeque 里面來.
而SinkRunner則啟動PollingRunner , 也通過定時啟動任務,調用SinkProcessor,最后調用HbaseSink的process方法,這個方法也負責啟動一個事務 ,批量從MemoryChannel的LinkedBlockingDeque中拉取event , 寫入takelist ,批量做完hbase 的put操作以后,做memoryTransaction的事務提交操作。事務提交的處理邏輯前面描述過。
而負責進行通道過載保護,正是在MemoryTransaction事務的提交時刻做的 ,這個過載保護的代碼可以參考MemoryChannel的MemoryTransaction 內部 類的doCommit方法, 它的思路是這樣的:
比較事務提交的時候,takelist和putlist的大小,如果takelist的長度比 putlist 長度要小, 則認為sink的消費能力(takelist長度標識)要比source的生產能力(putlist)要弱, 此時,需要通過一個bytesRemaining的 Semaphore來決定是否允許把putlist中的event轉移到MemoryChannel的linkedBlockingDeque來, 如果允許 ,則操作 , 操作以后,putlist 和takelist 都被清理掉了。 bytesRemaining信號量(標示還有多少flume event body存儲空間)和queueStored(標示有多少個event能被消費)信號量都被釋放了 。
綜上所述, Flume 的memorychannel采用了兩個雙端隊列putlist和takelist ,分別表示source 的生產能力 和sink 的消費能力,source 和sink啟動一個事務 ,source 寫putlist ,提交事務以后 ,把putlist批量移動到另一個deque .
而sink 則負責從MemoryChannel的Deque 取event, 寫入takelist(只做流控用) , 最后sink的事務提交以后,也把putlist 的event批量移動到deque 。 等於在一個事務里面用putlist 做了寫入緩沖,用takelist做了流控, memorychannel中的 deque是多個事務共享的存儲。
至此, 我們對memorychannel 的細節已經弄清楚了,回過頭來看之前出現的那個異常 , 就能知道為什么了?
首先, 我們的transactionCapacity參數沒有配置,那默認就是100 ,也就是putlist和takelist 的長度只有100 ,即寫入緩沖容量只有100個event .而MemoryChannel的Deque我們配置了10000 ,允許 flume event body的最大字節數我們配置了
536870912 * (1 - 20 * 0.01) = 400M左右 , 問題並不是出在了memorychannel的雙端隊列容量不夠用,而是下游的hbase sink因為有一個批量處理的默認值是100 ,而在這默認的100次處理中 ,每一次處理都涉及到了對象的avro反序列化 , 100次批量寫
入hbase 以后才會清理MemoryTransaction的 putlist,而這個時候上游kafka source 再有數據寫入putlist 就出現了前文描述的那個異常。
解決辦法:從異常提示的幾個思路,我們一一做個思考:
, consider committing more frequently, increasing capacity or increasing thread count
1)更頻繁的提交事務: 如果采用這種思路的話,比如只降低hbase sink 的批處理數量,而上游的kafka source的生產能力保持不變,可以預見的是會造成MemoryChannel中Deque堆積的event數量會越來越多(因為更頻繁的把event 從 putlist轉移到了 Memory Deque) . 這種方法只是把問題從putlist 轉移到了另一個Deque 。(要MemoryChannel的Deque更大了)。
2) 增加transactionCapacity: 即增加每一個事務的寫緩沖能力(putlist長度增加) ,但是調節到多少呢?如果上游的壓力陡增 ,還是會出現這個問題 。這種方法只能暫時緩解,不能徹底解決問題。
3) 增加線程數量: 這里我想flume作者的思路是把sink改為多線程增加消費能力來解決。 這個我認為才是解決問題的根本,增加下游的處理能力 。
那如何增加下游的處理能力呢,除了做flume本身的scaleout ,減少單台flume的壓力外。 還有幾種方法供我們思考:
A:把hbase sink擴展為多線程 , 每一個線程一個event隊列。ChannelProcessor在投遞的時候輪詢投遞到多個隊列 。------考慮用Akka ?
B: 使用Disruptor , ChannelProcessor作為生產者,SinkProcessor作為消費者 。
C: 直接換成 storm ,利用storm集群的實時處理能力?
用了兩天的時間,采用了storm ,的確極大提高了吞吐能力(20多萬條消息大概在兩分多鍾處理完,QPS 達到了1600。 后面單獨再寫文章來說明storm目前在使用過程中踩到的坑 。