flume提供了一個度量框架,可以通過http的方式進行展現,當啟動agent的時候通過傳遞參數 -Dflume.monitoring.type=http參數給flume agent:
1 |
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 |
這樣flume會在5653端口上啟動一個HTTP服務器,訪問如下地址,將返回JSON格式的flume相關指標參數:
1 |
demo: |
Flume也可發送度量信息給Ganglia,用來監控Flume。在任何時候只能啟用一個Ganglia或HTTP監控。Flume默認一分鍾一次周期性的向Ganglia報告度量:
1 |
demo: |
日志相關
1 |
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console |
自定義Flume組件
Flume本身可插拔的架構設計,使得開發自定義插件變得很容易。Flume本身提供了非常豐富的source、channel、sink以及攔截器等插件可供選擇,基本可以滿足生產需要。具體可以參考Flume用戶文檔.
plugins.d目錄
plugins.d
是flume事先約定的存放自定義組件的目錄。flume在啟動的時候會自動將該目錄下的文件添加到classpath下,當然你也可以在flume-ng 啟動時通過指定--classpath,-C <cp>
參數將自己的文件手動添加到classpath下。
相關目錄說明:
1 |
plugins.d/xxx/lib - 插件jar |
攔截器
攔截器(Interceptor)是簡單插件式組件,設置在Source和Source寫入數據的Channel之間。Source接收到的事件在寫入對應的Channel之前,攔截器都可以轉換或刪除這些事件。每個攔截器實例只處理同一個Source接收的事件。攔截器可以基於任意標准刪除或轉換事件,但是攔截器必須返回盡可能多(盡可能少)的事件,如同原始傳遞過來的事件.因為攔截器必須在事件寫入Channel之前完成操作,只有當攔截器已成功轉換事件后,RPC Source(和任何其他可能產生超時的Source)才會響應發送事件的客戶端或Sink。因此盡量不要在攔截器中做大量耗時的處理操作。如果不得已這么處理了,那么需要相應的調整超時時間屬性。Flume自身提供了多種類型的攔截器,比如:時間戳攔截器、主機攔截器、正則過濾攔截器等等。更多內容可以參考Flume Interceptors
攔截器一般用於分析事件以及在需要的時候丟棄事件。編寫攔截器時,實現者只需要寫以一個實現Interceptor接口的類,同時實現Interceptor$Builder接口的Builer類。所有的Builder類必須有一個公共無參的構造方法,Flume使用該方法來進行實例化。可以使用傳遞到Builder類的Context實例配置攔截器。所有需要的參數都要傳遞到Context實例。下面是時間戳攔截器的實現:
1 |
public class TimestampInterceptor implements Interceptor { |
注:
- intercept()的兩個方法必須是線程安全的,因為如果source運行在多線程情況下,這些方法可能會被多個線程調用。
-
自定義攔截器的配置方式,interceptors type配置的是
XXXInterceptor$Builder
:1
2
3#自定義攔截器 --producer agent名稱 --src-1 source名稱 —-i1 攔截器名稱
producer.sources.src-1.interceptors = i1
producer.sources.src-1.interceptors.i1.type = com.networkbench.browser.flume.interceptor.MyBrowserInterceptor$Builder -
將自定義代碼打包放置到前面的
plugins.d/ext-interceptors(可以自己命名)/lib
目錄下,啟動flume時會自動加載該jar到classpath
解析器
Source使用嵌入式的反序列化器讀取監控目錄下的文件(這里以Spooling Directory Source為例),默認的反序列化器是LineDeserializer。該反序列化器會按行讀取文件中的內容,封裝成一個Event消息。默認一次讀取的最大長度是2048個字符,你可以通過如下配置參數設置改值:
1 |
# --producer agent名稱 --src-1 source名稱 |
因此在使用LineDeserializer時對源文件內容有個粗略的估計,否則,當某行的內容超出最大長度時。該行內容會被截取成兩個部分,封裝成兩個Event發送到channel中。這樣,在某些場景下該行消息相當於非法消息了。如,某個文件按行記錄一個http請求的所有內容,而事先我們無法預知一行http請求的最大長度(當然理論上你可以將maxLineLength設置成一個較大的值,解決該問題)。但是這里要說的是另外一種解決方案,很簡單,參考LineDeserializer實現一個不限制最大長度的解析器(flume之所以這么設計是出於什么角度考慮?)。反序列化器的定義和前面的攔截器基本相同:
1 |
public class LineDeserializer implements EventDeserializer { |
接下來的步驟和攔截器一致
1 |
#自定義解析器 |
source
Flume提供了豐富的source類型,如Avro Source、Exec Source、Spooling Directory Source ….
這里要說的是實際使用過程中遇到的一個問題。還是前面記錄http請求內容的場景,為了及時分析http請求的數據,我們將記錄http請求的原始文件按照分鍾進行切割,然后移動到spooling directory監控目錄(如/tmp-logs)下。但是由於一些原因,會出現監控目錄下文件重名的情況.
1 |
/tmp-logs/access_2015_10_01_16_30.log.COMPLETED #flume處理完的文件會自動進行重命名.COMPLETED |
這種情況下后進來的access_2015_10_01_16_30.log,在flume讀取完成后會對其進行重命名,但是該文件名已經被占用了,flume就會拋出如下的異常信息,停止處理該監控目錄下的其他文件。
1 |
25 九月 2015 16:48:59,228 INFO [pool-22-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile:348) - Preparing to move file /opt/nginx/tmp_logs/access-2015-09-25-13-51.log to /opt/nginx/tmp_logs/access-2015-09-25-13-51.log.COMPLETED |
跟蹤拋出異常的源碼,SpoolDirectorySource會啟動一個線程輪詢監控目錄下的目標文件,當讀取完該文件(readEvents)之后會對該文件進行重名(rollCurrentFile),當重命名失敗時會拋出IllegalStateException,被SpoolDirectoryRunnable catch重新拋出RuntimeException,導致當前線程退出,從源碼看SpoolDirectoryRunnable是單線程執行的,因此線程結束后,監控目錄下其他文件不再被處理:
1 |
# SpoolDirectorySource 啟動SpoolDirectoryRunnable |
現在基本清楚了異常棧的調用邏輯,那么和前面自定義解析器一樣,我們可以重寫ReliableSpoolingFileEventReader以及SpoolDirectorySource的相關實現,也就是自定義一個spooling source,在rollCurrentFile()重命名失敗時,做些處理措施,比如將該文件重新命名為access_2015_10_01_16_30.log(2).COMPLETED(此時文件內容已經讀取完畢了)繼續處理(注意要是.COMPLETED結尾,不然flume會再次讀取該文件)。
改寫完成之后,就和前面自定義解析器的處理步驟一樣了,打包放在plugins.d目錄下,配置:
1 |
producer.sources.src-1.type = com.networkbench.flume.source.SpoolDirectoryExtSource |
總結
基本上flume的各種組件都可以自定義開發,本人使用flume時間也沒多久,截止到目前為止遇到問題還有以下幾個:
消息重發
這個坑其實是自己挖的,當時想當然的理解flume的配置參數#producer.sinks.sink-1.requiredAcks = 1(默認是1),我設置成了10,當時使用的kafka sink,由於某個kafka節點出現了問題(還沒有仔細驗證,是否kafka正常時也會出現該問題?),導致flume一直重發某個時間點的數據,而最新的數據一直被阻塞(可能是被緩存在了channel中)。導致后台接收的一直是某個時間點的消息。后台想到自己改動的這個參數,改回1之后就正常了。下面是官方文檔對該參數的說明:
requiredAcks 1 (默認值) How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.
channel溢出
chanel溢出是因為前面的消息重發導致的,當時使用的channle是File Channel,其中有幾個配置項值得注意:
配置項 | 默認值 | 說明 |
---|---|---|
transactionCapacity | 10000 | 單個事務中可以寫入或讀取的事務的最大數量 |
maxFileSize | 2146435071 | 每個數據文件的最大大小(字節),一旦文件達到這個大小(或一旦寫入下個文件達到這個大小),該文件保存關閉並在那個目錄下創建一個新的數據文件。如果此值設置為高於默認值,仍以默認值為准 |
minimumRequiredSpace | 524288000 | channel繼續操作時每個卷所需的最少空間(字節),如果任何一個掛載數據目錄的卷只有這么多空間剩余,channel將停止操作來防止損壞和避免不完整的數據被寫入 |
capacity | 1000000 | channel可以保存的提交事件的最大數量 |
keep-alive | 3 | 每次寫入或讀取應該等待完成的最大的時間周期(秒) |
前面的channel溢出推測就是由capacity的達到了限制造成的。