一、Flume介紹
Flume是一個分布式、可靠、和高可用的海量日志聚合的系統,支持在系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。
設計目標:
- 可靠性當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功后,再刪除;如果數據發送失敗,可以重新發送。),Store on failure(這也是scribe采用的策略,當數據接收方crash時,將數據寫到本地,待恢復后,繼續發送),Best effort(數據發送到接收方后,不會進行確認)。
- 可擴展性Flume采用了三層架構,分別為agent,collector和storage,每一層均可以水平擴展。其中,所有agent和collector由master統一管理,這使得系統容易監控和維護,且master允許有多個(使用ZooKeeper進行管理和負載均衡),這就避免了單點故障問題。
- 可管理性所有agent和colletor由master統一管理,這使得系統便於維護。多master情況,Flume利用ZooKeeper和gossip,保證動態配置數據的一致性。用戶可以在master上查看各個數據源或者數據流執行情況,且可以對各個數據源配置和動態加載。Flume提供了web 和shell script command兩種形式對數據流進行管理。
- 功能可擴展性用戶可以根據需要添加自己的agent,collector或者storage。此外,Flume自帶了很多組件,包括各種agent(file, syslog等),collector和storage(file,HDFS等)。
一般實時系統,所選用組件如下:
數據采集 :負責從各節點上實時采集數據,選用cloudera的flume來實現
數據接入 :由於采集數據的速度和數據處理的速度不一定同步,因此添加一個消息中間件來作為緩沖,選用apache的kafka
流式計算 :對采集到的數據進行實時分析,選用apache的storm
數據輸出 :對分析后的結果持久化,暫定用mysql ,另一方面是模塊化之后,假如當Storm掛掉了之后,數據采集和數據接入還是繼續在跑着,數據不會丟失,storm起來之后可以繼續進行流式計算;
二、Flume 的 一些核心概念
其它參數配置請參見:http://itindex.net/detail/56260-flume-kafka-hdfs
http://www.jianshu.com/p/f0a08bd4f975
三、Flume的整體構成圖
注意:(1)源將事件寫到一個多或者多個通道中;(2)接收器只從一個通道接收事件;(3)代理可能會有多個源、通道與接收器。
四、常用配置模式
常用配置模式一:掃描指定文件
agent.sources.s1.type=exec agent.sources.s1.command=tail -F /Users/it-od-m/Downloads/abc.log agent.sources.s1.channels=c1 agent.channels.c1.type=memory agent.channels.c1.capacity=10000 agent.channels.c1.transactionCapacity=100 #設置Kafka接收器 agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #設置Kafka的broker地址和端口號 agent.sinks.k1.brokerList=127.0.0.1:9092 #設置Kafka的Topic agent.sinks.k1.topic=testKJ1 #設置序列化方式 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder agent.sinks.k1.channel=c1
常用配置模式二
Agent名稱定義為agent. Source:可以理解為輸入端,定義名稱為s1 channel:傳輸頻道,定義為c1,設置為內存模式 sinks:可以理解為輸出端,定義為sk1, agent.sources = s1 agent.channels = c1 agent.sinks = sk1 #設置Source的內省為netcat 端口為5678,使用的channel為c1 agent.sources.s1.type = netcat agent.sources.s1.bind = localhost agent.sources.s1.port = 3456 agent.sources.s1.channels = c1 #設置Sink為logger模式,使用的channel為c1 agent.sinks.sk1.type = logger agent.sinks.sk1.channel = c1 #設置channel信息 agent.channels.c1.type = memory #內存模式 agent.channels.c1.capacity = 1000 agent.channels.c1.transactionCapacity = 100 #傳輸參數設置。
常用配置模式三:掃描目錄新增文件
agent.sources = s1 agent.channels = c1 agent.sinks = sk1 #設置spooldir agent.sources.s1.type = spooldir agent.sources.s1.spoolDir = /Users/it-od-m/logs agent.sources.s1.fileHeader = true agent.sources.s1.channels = c1 agent.sinks.sk1.type = logger agent.sinks.sk1.channel = c1 #In Memory !!! agent.channels.c1.type = memory agent.channels.c1.capacity = 10004 agent.channels.c1.transactionCapacity = 100
與Kafka結合時,通常采用第一種模式。
配置好參數以后,使用如下命令啟動Flume:
./bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console
最后一行顯示Component type:SINK,name:k1 started表示啟動成功.
說明:在啟動Flume之前,Zookeeper和Kafka要先啟動成功,不然啟動Flume會報連不上Kafka的錯誤。