flume


@

flume簡介

flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(比如文本、HDFS、Hbase等)的能力 。flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日志數據(字節數組形式)並且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。Sink負責持久化日志或者把事件推向另一個Source。

flume架構

flume運行的核心是Agent。flume以Agent為最小的獨立運行單位。一個Agent就是一個JVM
flume是一個完整的數據收集工具,含有三個核心組件。分別為:source、channel、sink,

Source是數據的收集端,負責將數據捕獲后進行特殊的格式化,將數據封裝到事件(event) 里,然后將事件推入Channel中。 Flume提供了很多內置的Source, 支持 Avro, log4j, syslog 和 http post(body為json格式)。可以讓應用程序同已有的Source直接打交道,如AvroSource
如果內置的Source無法滿足需要, Flume還支持自定義Source。

Channel是連接Source和Sink的組件,大家可以將它看做一個數據的緩沖區(數據隊列),它可以將事件暫存到內存中也可以持久化到本地磁盤上, 直到Sink處理完該事件。介紹兩個較為常用的Channel, MemoryChannel和FileChannel。

Sink從Channel中取出事件,然后將數據發到別處,可以向文件系統、數據庫、 hadoop存數據, 也可以是其他agent的Source。在日志數據較少時,可以將數據存儲在文件系統中,並且設定一定的時間間隔保存數據。

  • Client:Client生產數據,運行在一個獨立的線程。
  • Event: 一個數據單元,消息頭和消息體組成。(Events可以是日志記錄、 avro 對象等。)
  • Flow: Event從源點到達目的點的遷移的抽象。
  • Agent: 一個獨立的Flume進程,包含組件Source、 Channel、 Sink。(Agent使用JVM 運行Flume。每台機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。)
  • Source: 數據收集組件。(source從Client收集數據,傳遞給Channel)
    -Channel: 中轉Event的一個臨時存儲,保存由Source組件傳遞過來的Event。(Channel連接 sources 和 sinks ,這個有點像一個隊列。)
    -Sink: 從Channel中讀取並移除Event, 將Event傳遞到FlowPipeline中的下一個Agent(如果有的話)(Sink從Channel收集數據,運行在一個獨立線程。)

Flume數據流

Flume 的核心是把數據從數據源收集過來,再送到目的地。為了保證輸送一定成功,在送到目的地之前,會先緩存數據,待數據真正到達目的地后,刪除自己緩存的數據

Flume 傳輸的數據的基本單位是 Event,如果是文本文件,通常是一行記錄,這也是事務的基本單位。 Event 從 Source,流向Channel,再到 Sink,本身為一個** byte 數組**,並可攜帶 headers 信息。 Event 代表着一個數據流的最小完整單元,從外部數據源來,向外部的目的地去。

值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不同類型的Source,Channel和Sink可以自由組合。組合方式基於用戶設置的配置文件,非常靈活。

比如:Channel可以把事件暫存在內存里,也可以持久化到本地硬盤上。Sink可以把日志寫入HDFS, HBase,甚至是另外一個Source等等。Flume支持用戶建立多級流,

也就是說,多個agent可以協同工作。

Flume可靠性

Flume 使用事務性的方式保證傳送Event整個過程的可靠性。 Sink 必須在Event 已經被傳達到下一站agent里,又或者,已經被存入外部數據目的地之后,才能把 Event 從 Channel 中 remove 掉。這樣數據流里的 event 無論是在一個 agent 里還是多個 agent 之間流轉,都能保證可靠,因為以上的事務保證了 event 會被成功存儲起來。比如 Flume支持在本地保存一份channel文件作為備份,而memory channel 將event存在內存 queue 里,速度快,但丟失的話無法恢復。

Flume的安裝及使用

Flume的安裝

1、上傳至虛擬機,並解壓

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/soft/

在環境變量中增加如下命令,可以使用 soft 快速切換到 /usr/local/soft

alias soft='cd /usr/local/soft/'

2、重命名目錄,並配置環境變量

mv apache-flume-1.9.0-bin/ flume-1.9.0
vim /etc/profile
source /etc/profile

3、查看flume版本

flume-ng version
[root@master soft]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@master soft]# 

4、測試flume

  • 監控一個目錄,將數據打印出來

    • 配置文件
    # 首先先給agent起一個名字 叫a1
    # 分別給source channel sink取名字
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    # 分別對source、channel、sink進行配置
    
    # 配置source
    # 將source的類型指定為 spooldir 用於監聽一個目錄下文件的變化
    # 因為每個組件可能會出現相同的屬性名稱,所以在對每個組件進行配置的時候 
    # 需要加上 agent的名字.sources.組件的名字.屬性 = 屬性值
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/data/
    a1.sources.r1.fileSuffix = .ok
    #在采集之后的文件后面添加后綴,區分哪些文件是采集過得,哪些是沒有采集的
    a1.sources.r1.fileHeader = true
    
    #攔截器
    # 給r1這個souces配置一個攔截器並取名為 i1
    a1.sources.r1.interceptors = i1
    # 將攔截器i1的類型設置為timestamp 會將處理數據的時間以毫秒的格式插入event的header中
    # a1.sources.r1.interceptors.i1.type = timestamp
    # 將攔截器i1的類型設置為regex_filter 會根據正則表達式過濾數據
    a1.sources.r1.interceptors.i1.type = regex_filter
    # 配置正則表達式,匹配3-6位的數字
    a1.sources.r1.interceptors.i1.regex = \\d{3,6}
    # excludeEvents = true 表示將匹配到的過濾,未匹配到的放行
    a1.sources.r1.interceptors.i1.excludeEvents = true
    
    # 配置sink
    # 使用logger作為sink組件,可以將收集到數據直接打印到控制台
    a1.sinks.k1.type = logger
    
    # 配置channel
    # 將channel的類型設置為memory,表示將event緩存在內存中
    a1.channels.c1.type = memory
    
    # 組裝
    # 將sources的channels屬性指定為c1
    a1.sources.r1.channels = c1
    
    # 將sinks的channel屬性指定為c1
    a1.sinks.k1.channel = c1
    
    • 啟動agent,-Dflume.root.logger=DEBUG,console 配置日志打印級別(非必選)
    flume-ng agent -n a1 -f ./spoolingtest.conf -Dflume.root.logger=DEBUG,console
    
    • 新建/root/data目錄
    mkdir /root/data
    

    在這里插入圖片描述

    • 在/root/data/目錄下新建文件,輸入內容,觀察flume進程打印的日志
    # 隨意在a.txt中加入一些內容
    vim /root/data/a.txt
    

5、flume的使用

spoolingToHDFS.conf
  • 配置文件
# a表示給agent命名為a
# 給source組件命名為r1
a.sources = r1
# 給sink組件命名為k1
a.sinks = k1 
# 給channel組件命名為c1
a.channels = c1
#指定spooldir的屬性
a.sources.r1.type = spooldir 
a.sources.r1.spoolDir = /root/data 
a.sources.r1.fileHeader = true 
a.sources.r1.interceptors = i1 
a.sources.r1.interceptors.i1.type = timestamp
#指定sink的類型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir1
# 指定文件名前綴
a.sinks.k1.hdfs.filePrefix = student
# 指定達到多少數據量寫一次文件 單位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少條寫一次文件
a.sinks.k1.hdfs.rollCount = 1000
# 指定文件類型為 流 來什么輸出什么
a.sinks.k1.hdfs.fileType = DataStream
# 指定文件輸出格式 為text
a.sinks.k1.hdfs.writeFormat = text
# 指定文件名后綴
a.sinks.k1.hdfs.fileSuffix = .txt

#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 1000
# 表示sink每次會從channel里取多少數據
a.channels.c1.transactionCapacity = 100
# 組裝
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1
  • 在 /root/data/目錄下准備數據
The Zen of Python, by Tim Peters

Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!
  • 啟動agent
flume-ng agent -n a -f ./spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console

在這里插入圖片描述

hbaseLogToHDFS
  • 配置文件
# a表示給agent命名為a
# 給source組件命名為r1
a.sources = r1
# 給sink組件命名為k1
a.sinks = k1 
# 給channel組件命名為c1
a.channels = c1
#指定exec的屬性
a.sources.r1.type = exec 
a.sources.r1.command = tail -f /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log
#指定sink的類型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir2
# 指定文件名前綴
a.sinks.k1.hdfs.filePrefix = hbaselog
# 指定達到多少數據量寫一次文件 單位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少條寫一次文件
a.sinks.k1.hdfs.rollCount = 1000
# 指定文件類型為 流 來什么輸出什么
a.sinks.k1.hdfs.fileType = DataStream
# 指定文件輸出格式 為text
a.sinks.k1.hdfs.writeFormat = text
# 指定文件名后綴
a.sinks.k1.hdfs.fileSuffix = .txt

#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 1000
# 表示sink每次會從channel里取多少數據
a.channels.c1.transactionCapacity = 100
# 組裝
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1
hbaselogToHBase
  • 在hbase中創建log表
create 'log','cf1'
  • 配置文件
# a表示給agent命名為a
# 給source組件命名為r1
a.sources = r1
# 給sink組件命名為k1
a.sinks = k1 
# 給channel組件命名為c1
a.channels = c1
#指定exec的屬性
a.sources.r1.type = exec 
a.sources.r1.command = cat /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log
#指定sink的類型
a.sinks.k1.type = hbase
a.sinks.k1.table = log
a.sinks.k1.columnFamily = cf1

#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 100000
# 表示sink每次會從channel里取多少數據
a.channels.c1.transactionCapacity = 100
# 組裝
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1
  • netcatLogger

    監聽telnet端口

    • 安裝telnet
    yum install telnet
    
    • 配置文件
    # a表示給agent命名為a
    # 給source組件命名為r1
    a.sources = r1
    # 給sink組件命名為k1
    a.sinks = k1 
    # 給channel組件命名為c1
    a.channels = c1
    #指定netcat的屬性
    a.sources.r1.type = netcat 
    # 0.0.0.0 表示任意ip
    a.sources.r1.bind = 0.0.0.0 
    a.sources.r1.port = 8888 
    
    #指定sink的類型
    a.sinks.k1.type = logger
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次會從channel里取多少數據
    a.channels.c1.transactionCapacity = 100
    # 組裝
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 啟動

      • 先啟動agent
      flume-ng agent -n a -f ./netcatToLogger.conf -Dflume.root.logger=DEBUG,console
      
      • 在啟動telnet
      telnet master 8888
      可以測試端口有沒有進程
      
httpToLogger
  • 配置文件
# a表示給agent命名為a
# 給source組件命名為r1
a.sources = r1
# 給sink組件命名為k1
a.sinks = k1 
# 給channel組件命名為c1
a.channels = c1
#指定http的屬性
a.sources.r1.type = http
a.sources.r1.port = 6666 

#指定sink的類型
a.sinks.k1.type = logger
#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 1000
# 表示sink每次會從channel里取多少數據
a.channels.c1.transactionCapacity = 100
# 組裝
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1
  • 啟動

    • 先啟動agent
    flume-ng agent -n a -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console
    
    • 再使用curl發起一個http請求
    curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"},{ "headers" :{"a2" : "a11","b2" : "b11"},"body" : "hello~http~flume2~"}]' http://master:6666
    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM