Apache Flume是一個分布式、可靠、可用的系統,用於從大量不同的源有效地收集、聚合、移動大量日志數據進行集中式數據存儲。
Flume簡介
Flume的核心是Agent,Agent中包含Source、Channel、Sink。Agent是最小的獨立運行單位。在Agent中,數據流向為Source->Channel->Sink。
其中,
Source:收集數據,傳遞給Channel。支持多種收集方式,如RPC、syslog、監控目錄。
Channel:數據通道,接收Source的數據並儲存,傳遞給Sink。Channel中的數據在被Sink消費前會一直保存,等Sink成功把數據發送到下一跳Channel或最終目的地后才會刪除緩存的數據。
Sink:消費Channel中的數據,傳遞到下一跳Channel或最終目的地,完成后將數據從Channel中移除。
Flume傳輸的數據的基本單位是Event,Event同時也是事務操作的基本單位。通常傳輸的日志內容存儲在Event中。Event由可選的header和載有數據的byte array構成。
Flume支持多個Agent相連,形成多級Agent。此時上一級Sink和下一級Source都必須使用Avro協議。
使用多級Flume可以實現日志的聚合,第一層Agent接收日志,第二層Agent統一處理。
Flume支持將流從一個Source扇出到多個Channel。有兩種模式的扇出,復制和復用。在復制流程中,事件被發送到所有配置的通道。在復用的情況下,事件僅發送到合格信道的子集。
Avro
Apache Avro是一種數據序列化系統。它是一個基於 RPC 的框架,被 Apache 項目廣泛用於數據存儲和通信。Avro提供了豐富的數據結構、緊湊快速的二進制數據格式、與動態語言的簡單集成。
Avro 依賴於與數據存儲在一起的模式。因為沒有每個值的開銷,實現了輕松而又快速的序列化。當在RPC中使用Avro時,客戶端和服務器在連接握手中交換模式。Avro模式是使用JSON定義的,字段在客戶端和服務器之間的對應很容易得到解決。
Source
Flume支持多種類型的Source,包括Avro、Thrift、Exec、JMS、Spooling Directory、Taildir、Kafka、NetCat、Sequence Generator、Syslog Sources、HTTP、Stress、Custom、Scribe。
安裝后測試時,可以使用NetCat Source監聽一個端口,然后Telnet登錄該端口輸入字符串即可。
程序接入最便捷的方式是讓Flume讀取現有的日志文件,可以使用如下Source:
Taildir Source:觀察指定的文件,並在檢測到添加到每個文件的新行后幾乎實時地尾隨它們。
Spooling Directory Source:監測配置的目錄下新增的文件,並將文件中的數據讀取出來。需要注意兩點:拷貝到 spool 目錄下的文件不可以再打開編輯;spool 目錄下不可包含相應的子目錄。
Exec Source:以運行Linux命令的方式,持續的輸出最新的數據,如tail -F文件名指令。
Channel
Flume支持多種類型的Channel,包括Memory、JDBC、Kafka、File、Spillable Memory、Custom、Pseudo Transaction。其中,Memory Channel 可以實現高速的吞吐,但是無法保證數據的完整性;File Channel 是一個持久化的隧道(channel),它持久化所有的事件,並將其存儲到磁盤中。
Sink
Flume支持多種類型的Sink,包括HDFS、Hive、Logger、Avro、Thrift、IRC、File Roll、Null、HBase、MorphlineSolr、Elastic Search、Kite Dataset、Kafka、Custom。Sink在設置存儲數據時,可以向文件系統、數據庫、Hadoop存數據,在日志數據較少時,可以將數據存儲在文件系中,並且設定一定的時間間隔保存數據。在日志數據較多時,可以將相應的日志數據存儲到Hadoop中,便於日后進行相應的數據分析。
簡單使用示例
創建example.conf文件,內容如下
- # 配置一個agent,名稱為a1,Source、Channel、Sink分別只有1個
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # 配置Source,類型為netcat,監聽本機的44444端口
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = localhost
- a1.sources.r1.port = 44444
- # 配置Sink,類型為logger,輸出日志到console
- a1.sinks.k1.type = logger
- # 配置Channel,類型為memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # 綁定Source、Sink和Channel的對應關系
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
啟動Flume agent
- bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
打開另一個終端,Telnet連接44444端口,並發送數據
- $ telnet localhost 44444
- Trying 127.0.0.1...
- Connected to localhost.localdomain (127.0.0.1).
- Escape character is '^]'.
- Hello world! <ENTER>
- OK
可看到Flume在console輸出如下內容
- 12/06/19 15:32:19 INFO source.NetcatSource: Source starting
- 12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
- 12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }
原文地址:Apache Flume日志收集系統簡介