Apache Flume 1.7.0 各個模塊簡介


Flume簡介

Apache Flume是一個分布式、可靠、高可用的日志收集系統,支持各種各樣的數據來源,如http,log文件,jms,監聽端口數據等等,能將這些數據源的海量日志數據進行高效收集、聚合、移動,最后存儲到指定存儲系統中,如kafka、分布式文件系統、Solr搜索服務器等;

Apache Flume主要有以下幾大模塊組成:

  1. 數據源采集(Source)
  2. 數據攔截(Interceptor)
  3. 通道選擇器(Channel Selector)
  4. 數據通道(Channel)
  5. Sink處理器(Sink Processor)
  6. Sink(Sink)
  7. 事件序列化(Serialization)

模塊組成圖如下所示:

下面將對各個模塊做個簡單的介紹,在這之前,有必要先了解一下什么是事件?

在Flume中,所謂的事件指的是Flume數據流中的數據單位,包含header和body,用於存儲日志數據,其中header是一個map結構,我們可以往header存放一些信息,如時間戳,appid等,以便后續對事件進行處理,body存放的是收集的日志內容字節流,結構如下圖所示:

數據源采集(Source)

 先看下source模塊在流程圖中所處的位置,這里以最簡單的架構圖來作為示例,如下圖所示:

Flume source主要功能是消費傳遞給它的事件;

Flume內置了各種類型的Source,用於處理各種類型的事件,如下所示,理論上Flume支持所有類型的事件,因為Flume支持自定義Source:

  1. Avro Source:支持Avro協議(實際上是Avro RPC)
  2. Thrift Source:支持Thrift協議
  3. Exec Source:基於Unix的command在標准輸出上生產數據
  4. JMS Source:從JMS系統中讀取數據
  5. Spooling Directory Source:監控指定目錄內數據變更
  6. Twitter 1% firehose Source:通過API持續下載Twitter數據,試驗性質
  7. Netcat Source:監控某個端口,將流經端口的每一個文本行數據作為Event輸入
  8. Sequence Generator Source:序列生成器數據源,生產序列數據
  9. Syslog Sources:讀取syslog數據,產生Event,支持UDP和TCP兩種協議
  10. HTTP Source:基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式(實際上支持任何形式,因為handle可以自定義)
  11. Legacy Sources:兼容老的Flume OG中Source(0.9.x版本)

這里列舉幾個比較常用的source,

如Exec Source,通過它我們可以監聽一個日志文件的變化,如下配置,

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

Avro Source,通過它,我們可以將兩個Flume Agent關聯起來(因為agent的source和sink都支持Avro),正是這個特性,大大提高了flume的靈活性,可用性...

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

HTTP Source,通過它,可以接收http請求上報的數據,如下是配置示例,監聽5140端口的http請求,這里的handle是可以自定義的,也就是說我們可以接收任何類型的上報數據,如json格式、xml等等。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props

數據攔截(Interceptor)

先看下interceptor模塊在流程圖中所處的位置,如下圖所示:

攔截器主要的功能是對事件進行過濾,修改;

Flume內置支持的攔截器如下(主要兩類:過濾和修改):

  1. Timestamp Interceptor:在事件頭中插入以毫秒為單位的時間戳,如果在之前已經有這個時間戳,則保留原有的時間戳。
  2. Host Interceptor:
  3. Static Interceptor
  4. UUID Interceptor
  5. Morphline Interceptor
  6. Search and Replace Interceptor
  7. Regex Filtering Interceptor
  8. Regex Extractor Interceptor

當然,flume是支持自定義攔截器的,如下是一個簡單的配置示例:

#攔截器
a1.sources.r1.interceptors = i1
#a1.sources.r1.interceptors.i1.type = org.apache.flume.sw.interceptor.SignCheckInterceptor$Builder
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.RegexFilteringInterceptor$Builder
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

 通道選擇器(Channel Selector)

先看下interceptor模塊在流程圖中所處的位置,如下圖所示:

通道選擇器的主要功能是對事件流進行復制和分流;

Flume內置了兩種類型的通道選擇器:

  1. 復制(Replicating Channel Selector),使用該選擇器,我們可以同時讓同一事件傳遞到多個channel中,最后流入多個sink;
  2. 分流(Multiplexing Channel Selector),使用該選擇器,我們可以讓特定的事件流入到特定的channel中,如不同項目產生的日志事件,交由不同的sink處理;

如下是一個分流的配置示例:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

當然,通道選擇器是支持自定義的,我們可以自己實現通道選擇器,並做如下配置:

a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector

數據通道(Channel)

先看下channel模塊在流程圖中所處的位置,如下圖所示:

通道Channel的主要功能是緩存日志事件;

Flume內置的Channel如下:

  1. Memory Channel:內存通道
  2. JDBC Channel:存儲在持久化存儲中,當前Flume Channel內置支持Derby
  3. File Channel:存儲在磁盤文件中
  4. Spillable Memory Channel:存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用)
  5. Pseudo Transaction Channel:測試用途

 同樣,Flume支持自定義通道;

如下是一個內存通道的配置示例:

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

Sink處理器

 先看下Sink處理器在流程圖中所處的位置,如下圖所示:

Sink處理器的主要功能是讓一組sink groups支持負載均衡和災難轉移功能,我覺得跟通道選擇器有點類似通過自定義的方式,我覺得是可以實現通道選擇器的功能的;

Flume內置的sink處理器如下:

  1. load_balance:負載均衡
  2. failover:主備(災難轉移)

同樣的,也支持自定義sink處理器;

如下是一個負載均衡的例子,使用隨機選擇算法:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

Sink(Sink)

 先看下Sink模塊在流程圖中所處的位置,如下圖所示:

Sink的主要功能是將事件輸出到下一個agent的source或其它存儲系統如,分布式文件系統、kafka、本地文件系統、日志等;

Flume內置的sink如下:

  1. HDFS Sink:數據寫入HDFS
  2. Logger Sink:數據寫入日志文件
  3. Avro Sink:數據被轉換成Avro Event,然后發送到配置的RPC端口上
  4. Thrift Sink:數據被轉換成Thrift Event,然后發送到配置的RPC端口上
  5. IRC Sink:數據在IRC上進行回放
  6. File Roll Sink:存儲數據到本地文件系統
  7. Null Sink:丟棄到所有數據
  8. HBase Sink:數據寫入HBase數據庫
  9. Morphline Solr Sink:數據發送到Solr搜索服務器(集群)
  10. ElasticSearch Sink:數據發送到Elastic Search搜索服務器(集群)
  11. Kite Dataset Sink:寫數據到Kite Dataset,試驗性質的

當然,flume也是支持自定義的;

我們舉個本地文件系統的例子,配置如下即可:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

事件序列化(Serialization)

序列化在流程圖中所處的位置與Sink一樣,這里就不畫了,簡單地說,Sink負責將事件輸出到外部,那么以何種形式輸出(直接文本形式還是其它形式),需要包含哪些東西(body還是header還是其它內容...),就是由事件序列化來完成的;

Flume內置的事件序列化如下:

  1. Body Text Serializer:看名字就知道,直接將事件的body作為文本形式輸出,事件header將被忽略
  2. Avro Event Serializer:Avro序列化,包含事件全部信息

Flume同樣支持自定義事件序列化,需要實現EventSerializer接口;

下面舉個Body Text Serializer的配置示例:

a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

 結語

上面對flume各個模塊,或者說組件,做了一個簡短的介紹,基本知道了Flume是個怎么回事,接下來將對各個組件做個介紹,並開發各個組件的自定義實現。

參考資料

http://flume.apache.org/FlumeUserGuide.html

http://shiyanjun.cn/archives/915.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM