Flume最早是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統。
Flume特性
1.提供上下文路由特征
2.Flume的管道是基於事務,保證了數據在傳送和接收時的一致性
3.Flume是可靠的,容錯性高的,可升級的,易管理的,並且可定制的
4.Flume可用將應用產生的數據存儲到任何集中存儲器中,比如HDFS,HBase
5.可以被水平擴展
6.當收集數據的速度超過將寫入數據的時候,也就是當收集信息遇到峰值時,這時候收集的信息非常大,甚至超過了系統的寫入數據能力,這時候,Flume會在數據生產者和數據收容器間做出調整,保證其能夠在兩者之間提供平穩的數據
Flume核心概念
-
agent
flume最核心的角色就是agent。flume日志采集系統是由一個個agent連接起來的數據傳輸通道
對於每個agent來說就是一個獨立的守護進程(JVM)。它負責從數據源接收數據,並發送到下一個目的地。
agent內部有三個重要的組件:source、channel、sink -
source
從數據發生器接收數據,並將接收的數據以Event的形式傳遞給一個或多個channel,Flume提供多種數據接收方式,比如Avro,Thrift等。 -
channel
channel是一種短暫的存儲容器,它從source處接收到event格式數據后進行緩存,直到被消費掉。
它在source和sink之間起到了橋梁作用,channel是一個完整的事務,這一點保障了數據在收發時的一致性,並且可以和任意數量的source和sink連接。
支持的類型有:JDBC channel,FileSystem Channel, Memory channel等。 -
sink
sink將數據存儲到集中存儲器比如Hbase和HDFS,它從channels消費數據(events)並將其傳遞給目標地,目標地可能是另一個sink,也可能HDFS,HBase -
Event
數據在flume內部是以Event封裝的形式存在的。因此source組件在獲取到原始數據后,需要封裝成Event后發送到channel中,然后sink從channel取出Event后,根據配置要求再轉成其他的形式進行數據輸出。
Event封裝的對象主要有兩部分:Headers和Body
Headers是一個集合Map類型,用於存儲元數據(如標志、描述等)
Body就是一個字節數組,裝載具體的數據內容 -
transaction
Flume的事務機制,類似於數據庫的事務機制
Flume使用獨立的事務分別從source到channel,以及從channel到sink的event傳遞。
注意:在任何時刻,Event至少在一個Channel是完整有效的 -
Interceptor
攔截器,攔截工作在source組件之后,source產生的event會被傳入的攔截器根據需要進行攔截處理。
攔截器可以組成攔截器鏈。
Flume組件詳解
Source
Source | Desc |
---|---|
Avor Source | 通過監聽一個網絡端口來接受數據,而且接受的數據必須是使用Avor序列化框架序列化后的數據。 |
Thrift Source | 監聽Thrift端口並從外部Thrift客戶端流接收事件 |
Exec Source | 啟動一個用戶所指定的linux shell命令,采集這個Linux shell命令的標准輸出,作出收集到的數據,轉為event寫入channel |
JMS Source | 從JMS目標(例如隊列或主題)讀取消息;作為JMS應用程序,它應可與任何JMS提供程序一起使用,但僅經過ActiveMQ的測試;注意,應該使用plugins.d目錄(首選),命令行上的-classpath或通過flume-env.sh中的FLUME_CLASSPATH變量將提供的JMS jar包含在Flume類路徑中。 |
Spooling Directory Source | 監視一個指定的文件夾,如果文件夾下有沒采集過的新文件,則將這些新文件中的數據采集,並轉成event寫入channel。(注意:spooling目錄中的文件必須是不可變的,而且是不能重名的!否則,source會loudly fail !) |
Taildir Source | 監視指定目錄下的一批文件,只要某個文件中有新寫入的行,則會被tail到;它會記錄每一個文件所tail到的位置,記錄到一個指定的positionfile保存目錄中,格式為json(如果需要的時候,可以人為修改,就可以讓source從任意指定的位置開始讀取數據);它對采集完成的文件,不會做任何修改。(公司項目采用的Taildir Source) |
Kafka Source | 就是用Kafka Consumer連接Kafka,讀取數據,然后轉換成event,寫入channel |
NetCat Source | 啟動一個socket服務,監聽一個端口,將端口上收到的數據,轉成event寫入channel |
Sequence Generator Source | 一個簡單的序列生成器,它使用從0開始,遞增1並在totalEvents處停止的計數器連續生成事件;當無法發送event到channel時會進行重試。通常用於測試。 |
Syslog Sources | 讀取系統日志數據生成event |
Http Source | 通過http post/get來接收數據,通常get用於測試,該source基於Jetty9.4,並提供了設置其他特定於Jetty的參數的功能,這些參數將直接傳遞給Jetty組件 |
Stress Source | 主要用於壓測,用於可以配置要發生的event總數以及要發送成功event的最大數 |
Custom Source | 自定義Source |
taildir Source | 監聽指定目錄的一批文件,只要某個文件被寫入,那么就會被tail到。這里原理其實就是source會記錄每個文件所讀取到的位置,然后記錄到一個指定的positionfile目錄文件中,通常為json格式,而且是可見的,因此可以人為修改。由於該種機制,可以實現從任意指定位置讀取數據,所以這個source是可以保障可靠性的。但是會有數據重復的問題。 |
Channel
Channel | Desc |
---|---|
Memory Channel | event存儲在內存中,且可以配置最大值。對於需要高吞吐而且可以容忍數據丟失的情況下,可以選擇該channel |
JDBC Channel | event被持久到數據庫中,目前支持derby.適用於可恢復的場景 |
Kafka Channel | agent利用Kafka作為channel數據緩存,Kafka Channel要跟Kafka Source,Kafka sink區別開來,Kafka Cannel在應用時,可以沒有source |
File Channel | event被緩存在本地磁盤文件中,可靠性高,不會丟失;但在極端情況下可能會重復數據 |
Spillable Memory Channel | event存儲在內存和磁盤上。內存充當主存儲,磁盤充當溢出 |
Sink
Sink | Desc |
---|---|
HDFS Sink | 數據最終被發往hdfs,可以生成text文件或sequence文件,而且支持壓縮;支持生成文件的周期性roll機制;基於文件size,或者時間間隔,或者event數量;目標路徑,可以使用動態通配符替換,比如用%D代表當前日期;當然,它也能從event的header中,取到一些標記來作為通配符替換 |
Hive Sink | 可將text或json數據直接存儲到hive分區表 |
Logger Sink | 數據輸出到日志中,通常用於debug |
Avro Sink | avro sink用來向avro source 發送avro序列化數據,這樣就可以實現agent之間的級聯 |
Thrift Sink | 同avro sink |
IRC Sink | 同avro sink |
File Roll Sink | 數據存儲到本地文件系統 |
Null Sink | 直接丟棄 |
Hbase Sink | 數據存儲到hbase中 |
Hbase2 Sink | 等同於hbase 2版本的HBaseSink |
AsyncHBaseSink | 異步模式寫入hbase |
MorphlineSolrSink | 該接收器從Flume事件中提取數據,對其進行轉換,並將其幾乎實時地加載到Apache Solr服務器中,后者再為最終用戶或搜索應用程序提供查詢 |
ElasticSearchSink | 直接存儲到es中 |
Kite Dataset Sink | 將事件寫入Kite數據集。該接收器將反序列化每個傳入事件的主體,並將結果記錄存儲在Kite數據集中。它通過按URI加載數據集來確定目標數據集 |
Kafka Sink | 存儲到Kafka中 |
HTTP Sink | 將接收到的數據通過post請求發送到遠程服務,event內容作為請求體發送 |
Custom Sink | 自定義Sink |
Interceptor
攔截器,就是工作在source之后,可以從source獲得event,做一個邏輯處理,然后再返回處理之后的event。這也就可以讓用戶不需要改動source代碼的情況下,插入一些處理邏輯。
Interceptor | Desc |
---|---|
host | 往event的header中插入主機名信息 |
timestamp | 向event中,寫入一個kv到header里;key的名稱可以隨意配置,value就是當前時間戳 |
static | 讓用戶往event中添加一個自定義header,key-value形式的,當然這個kv在配置文件中是寫死的 |
regex_filter | 將event中的body內容和指定的正則表達式進行匹配 |
custom type as FQCN | 自定義實現攔截器 |
uuid | 用於在每個event header中生成一個uuid字符串 |
search_replace | 該攔截器基於Java正則表達式提供簡單的基於字符串的搜索和替換功能,類似於Java中的Matcher.replaceAll方法 |
RegexExtractorInterceptorMillisSerializer | 該攔截器使用指定的正則表達式提取正則表達式匹 |
配組,並將匹配組附加到事件的header里 |
Selector
一個source可以對接多個channel,那么問題來了,source的數據是怎么在多個channel之間進行傳遞的呢?這就是selector的功能了,通過selector選擇器根據策略可以將event從source傳遞到指定的channel中去。
Selector | DESC |
---|---|
replication selector | 默認的選擇器,將event進行復制分發給下游所有的節點 |
Multiplexing selector | 多路選擇器,可以根據event中的一個指定key對應的value來決定這條消息會被寫入到哪個channel中 |
Custom Selector | 自定義選擇器 |
Processor
一個agent中,多個sink可以被組裝到一個組中,而數據在組內多個sink之間發送。接收處理器可以在組內提供負載均衡的功能,或者是在臨時故障的情況下實現從一個接收器轉移到另一個接收器上。
Processor | DESC |
---|---|
default | 默認的接收處理器僅接受一個sink,當然用戶也沒有必要為了一個sink去創建processor |
Failover | 故障轉移模式,即一個組內只有優先級高的sink在工作,而其他的sink處於等待中 |
load_balance | 負載均衡模式,允許channel中的數據在一組sink中的多個sink之間進行輪轉,具體的策略有:round-robin(輪流發送);random(隨機發送) |
Custom processor | 自定義處理器 |
Flume安裝部署與使用