Flume簡介
Apache Flume是一個分布式、可靠、高可用的日志收集系統,支持各種各樣的數據來源,如http,log文件,jms,監聽端口數據等等,能將這些數據源的海量日志數據進行高效收集、聚合、移動,最后存儲到指定存儲系統中,如kafka、分布式文件系統、Solr搜索服務器等;
Apache Flume主要有以下幾大模塊組成:
- 數據源采集(Source)
- 數據攔截(Interceptor)
- 通道選擇器(Channel Selector)
- 數據通道(Channel)
- Sink處理器(Sink Processor)
- Sink(Sink)
- 事件序列化(Serialization)
模塊組成圖如下所示:
下面將對各個模塊做個簡單的介紹,在這之前,有必要先了解一下什么是事件?
在Flume中,所謂的事件指的是Flume數據流中的數據單位,包含header和body,用於存儲日志數據,其中header是一個map結構,我們可以往header存放一些信息,如時間戳,appid等,以便后續對事件進行處理,body存放的是收集的日志內容字節流,結構如下圖所示:
數據源采集(Source)
先看下source模塊在流程圖中所處的位置,這里以最簡單的架構圖來作為示例,如下圖所示:
Flume source主要功能是消費傳遞給它的事件;
Flume內置了各種類型的Source,用於處理各種類型的事件,如下所示,理論上Flume支持所有類型的事件,因為Flume支持自定義Source:
- Avro Source:支持Avro協議(實際上是Avro RPC)
- Thrift Source:支持Thrift協議
- Exec Source:基於Unix的command在標准輸出上生產數據
- JMS Source:從JMS系統中讀取數據
- Spooling Directory Source:監控指定目錄內數據變更
- Twitter 1% firehose Source:通過API持續下載Twitter數據,試驗性質
- Netcat Source:監控某個端口,將流經端口的每一個文本行數據作為Event輸入
- Sequence Generator Source:序列生成器數據源,生產序列數據
- Syslog Sources:讀取syslog數據,產生Event,支持UDP和TCP兩種協議
- HTTP Source:基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式(實際上支持任何形式,因為handle可以自定義)
- Legacy Sources:兼容老的Flume OG中Source(0.9.x版本)
這里列舉幾個比較常用的source,
如Exec Source,通過它我們可以監聽一個日志文件的變化,如下配置,
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /var/log/secure a1.sources.r1.channels = c1
Avro Source,通過它,我們可以將兩個Flume Agent關聯起來(因為agent的source和sink都支持Avro),正是這個特性,大大提高了flume的靈活性,可用性...
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141
HTTP Source,通過它,可以接收http請求上報的數據,如下是配置示例,監聽5140端口的http請求,這里的handle是可以自定義的,也就是說我們可以接收任何類型的上報數據,如json格式、xml等等。
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 a1.sources.r1.handler = org.example.rest.RestHandler a1.sources.r1.handler.nickname = random props
數據攔截(Interceptor)
先看下interceptor模塊在流程圖中所處的位置,如下圖所示:
攔截器主要的功能是對事件進行過濾,修改;
Flume內置支持的攔截器如下(主要兩類:過濾和修改):
- Timestamp Interceptor:在事件頭中插入以毫秒為單位的時間戳,如果在之前已經有這個時間戳,則保留原有的時間戳。
- Host Interceptor:
- Static Interceptor
- UUID Interceptor
- Morphline Interceptor
- Search and Replace Interceptor
- Regex Filtering Interceptor
- Regex Extractor Interceptor
當然,flume是支持自定義攔截器的,如下是一個簡單的配置示例:
#攔截器 a1.sources.r1.interceptors = i1 #a1.sources.r1.interceptors.i1.type = org.apache.flume.sw.interceptor.SignCheckInterceptor$Builder a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.RegexFilteringInterceptor$Builder a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
通道選擇器(Channel Selector)
先看下interceptor模塊在流程圖中所處的位置,如下圖所示:
通道選擇器的主要功能是對事件流進行復制和分流;
Flume內置了兩種類型的通道選擇器:
- 復制(Replicating Channel Selector),使用該選擇器,我們可以同時讓同一事件傳遞到多個channel中,最后流入多個sink;
- 分流(Multiplexing Channel Selector),使用該選擇器,我們可以讓特定的事件流入到特定的channel中,如不同項目產生的日志事件,交由不同的sink處理;
如下是一個分流的配置示例:
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4
當然,通道選擇器是支持自定義的,我們可以自己實現通道選擇器,並做如下配置:
a1.sources = r1 a1.channels = c1 a1.sources.r1.selector.type = org.example.MyChannelSelector
數據通道(Channel)
先看下channel模塊在流程圖中所處的位置,如下圖所示:
通道Channel的主要功能是緩存日志事件;
Flume內置的Channel如下:
- Memory Channel:內存通道
- JDBC Channel:存儲在持久化存儲中,當前Flume Channel內置支持Derby
- File Channel:存儲在磁盤文件中
- Spillable Memory Channel:存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用)
- Pseudo Transaction Channel:測試用途
同樣,Flume支持自定義通道;
如下是一個內存通道的配置示例:
a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000
Sink處理器
先看下Sink處理器在流程圖中所處的位置,如下圖所示:
Sink處理器的主要功能是讓一組sink groups支持負載均衡和災難轉移功能,我覺得跟通道選擇器有點類似通過自定義的方式,我覺得是可以實現通道選擇器的功能的;
Flume內置的sink處理器如下:
- load_balance:負載均衡
- failover:主備(災難轉移)
同樣的,也支持自定義sink處理器;
如下是一個負載均衡的例子,使用隨機選擇算法:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
Sink(Sink)
先看下Sink模塊在流程圖中所處的位置,如下圖所示:
Sink的主要功能是將事件輸出到下一個agent的source或其它存儲系統如,分布式文件系統、kafka、本地文件系統、日志等;
Flume內置的sink如下:
- HDFS Sink:數據寫入HDFS
- Logger Sink:數據寫入日志文件
- Avro Sink:數據被轉換成Avro Event,然后發送到配置的RPC端口上
- Thrift Sink:數據被轉換成Thrift Event,然后發送到配置的RPC端口上
- IRC Sink:數據在IRC上進行回放
- File Roll Sink:存儲數據到本地文件系統
- Null Sink:丟棄到所有數據
- HBase Sink:數據寫入HBase數據庫
- Morphline Solr Sink:數據發送到Solr搜索服務器(集群)
- ElasticSearch Sink:數據發送到Elastic Search搜索服務器(集群)
- Kite Dataset Sink:寫數據到Kite Dataset,試驗性質的
當然,flume也是支持自定義的;
我們舉個本地文件系統的例子,配置如下即可:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume
事件序列化(Serialization)
序列化在流程圖中所處的位置與Sink一樣,這里就不畫了,簡單地說,Sink負責將事件輸出到外部,那么以何種形式輸出(直接文本形式還是其它形式),需要包含哪些東西(body還是header還是其它內容...),就是由事件序列化來完成的;
Flume內置的事件序列化如下:
- Body Text Serializer:看名字就知道,直接將事件的body作為文本形式輸出,事件header將被忽略
- Avro Event Serializer:Avro序列化,包含事件全部信息
Flume同樣支持自定義事件序列化,需要實現EventSerializer接口;
下面舉個Body Text Serializer的配置示例:
a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false
結語
上面對flume各個模塊,或者說組件,做了一個簡短的介紹,基本知道了Flume是個怎么回事,接下來將對各個組件做個介紹,並開發各個組件的自定義實現。