Flume與Kafka集成


一、Flume介紹

  Flume是一個分布式、可靠、和高可用的海量日志聚合的系統,支持在系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。

設計目標:

  • 可靠性當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功后,再刪除;如果數據發送失敗,可以重新發送。),Store on failure(這也是scribe采用的策略,當數據接收方crash時,將數據寫到本地,待恢復后,繼續發送),Best effort(數據發送到接收方后,不會進行確認)。
  • 可擴展性Flume采用了三層架構,分別為agent,collector和storage,每一層均可以水平擴展。其中,所有agent和collector由master統一管理,這使得系統容易監控和維護,且master允許有多個(使用ZooKeeper進行管理和負載均衡),這就避免了單點故障問題。
  • 可管理性所有agent和colletor由master統一管理,這使得系統便於維護。多master情況,Flume利用ZooKeeper和gossip,保證動態配置數據的一致性。用戶可以在master上查看各個數據源或者數據流執行情況,且可以對各個數據源配置和動態加載。Flume提供了web 和shell script command兩種形式對數據流進行管理。
  • 功能可擴展性用戶可以根據需要添加自己的agent,collector或者storage。此外,Flume自帶了很多組件,包括各種agent(file, syslog等),collector和storage(file,HDFS等)。

一般實時系統,所選用組件如下:

數據采集 :負責從各節點上實時采集數據,選用cloudera的flume來實現 
數據接入 :由於采集數據的速度和數據處理的速度不一定同步,因此添加一個消息中間件來作為緩沖,選用apache的kafka 
流式計算 :對采集到的數據進行實時分析,選用apache的storm 
數據輸出 :對分析后的結果持久化,暫定用mysql ,另一方面是模塊化之后,假如當Storm掛掉了之后,數據采集和數據接入還是繼續在跑着,數據不會丟失,storm起來之后可以繼續進行流式計算; 

二、Flume 的 一些核心概念

其它參數配置請參見:http://itindex.net/detail/56260-flume-kafka-hdfs

http://www.jianshu.com/p/f0a08bd4f975

三、Flume的整體構成圖

注意:(1)源將事件寫到一個多或者多個通道中;(2)接收器只從一個通道接收事件;(3)代理可能會有多個源、通道與接收器。

四、常用配置模式

常用配置模式一:掃描指定文件

agent.sources.s1.type=exec
agent.sources.s1.command=tail -F /Users/it-od-m/Downloads/abc.log
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100

#設置Kafka接收器
agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#設置Kafka的broker地址和端口號
agent.sinks.k1.brokerList=127.0.0.1:9092
#設置Kafka的Topic
agent.sinks.k1.topic=testKJ1
#設置序列化方式
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder

agent.sinks.k1.channel=c1

 常用配置模式二

Agent名稱定義為agent.   
Source:可以理解為輸入端,定義名稱為s1  
channel:傳輸頻道,定義為c1,設置為內存模式  
sinks:可以理解為輸出端,定義為sk1,  

agent.sources = s1    
agent.channels = c1  
agent.sinks = sk1  

#設置Source的內省為netcat 端口為5678,使用的channel為c1  
agent.sources.s1.type = netcat  
agent.sources.s1.bind = localhost  
agent.sources.s1.port = 3456  
agent.sources.s1.channels = c1  

#設置Sink為logger模式,使用的channel為c1  
agent.sinks.sk1.type = logger  
agent.sinks.sk1.channel = c1  
#設置channel信息  
agent.channels.c1.type = memory #內存模式  
agent.channels.c1.capacity = 1000     
agent.channels.c1.transactionCapacity = 100 #傳輸參數設置。

 常用配置模式三:掃描目錄新增文件

agent.sources = s1  
agent.channels = c1  
agent.sinks = sk1  

#設置spooldir  
agent.sources.s1.type = spooldir  
agent.sources.s1.spoolDir = /Users/it-od-m/logs  
agent.sources.s1.fileHeader = true  

agent.sources.s1.channels = c1  
agent.sinks.sk1.type = logger  
agent.sinks.sk1.channel = c1  

#In Memory !!!  
agent.channels.c1.type = memory  
agent.channels.c1.capacity = 10004  
agent.channels.c1.transactionCapacity = 100

 與Kafka結合時,通常采用第一種模式。

 配置好參數以后,使用如下命令啟動Flume:

./bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console

 最后一行顯示Component type:SINK,name:k1 started表示啟動成功.

說明:在啟動Flume之前,Zookeeper和Kafka要先啟動成功,不然啟動Flume會報連不上Kafka的錯誤。

原文參見:http://www.jianshu.com/p/f0a08bd4f975


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM