鏈接:https://pan.baidu.com/s/1q4Frq77GBDxTw26eY5ADRQ
提取碼:lycc
一、Flume簡介
1、概念
flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。支持在日志系統中定制各類數據發送方,用於收集數據
flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日志數據(字節數組形式)並且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。Sink負責持久化日志或者把事件推向另一個Source。
2、flume架構

Flume 運行的核心是 Agent。Flume以agent為最小的獨立運行單位。一個agent就是一個JVM。它是一個完整的數據收集工具,含有三個核心組件,分別是source、 channel、 sink。通過這些組件, Event 可以從一個地方流向另一個地方,
3、flume組件
- Client:Client生產數據,運行在一個獨立的線程。
- Event: 一個數據單元,消息頭和消息體組成。(Events可以是日志記錄、 avro 對象等。)
- Flow: Event從源點到達目的點的遷移的抽象。
- Agent: 一個獨立的Flume進程,包含組件Source、 Channel、 Sink。(Agent使用JVM 運行Flume。每台機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。)
- Source: 數據收集組件。(source從Client收集數據,傳遞給Channel)

- Channel: 中轉Event的一個臨時存儲,保存由Source組件傳遞過來的Event。(Channel連接 sources 和 sinks ,這個有點像一個隊列。)
- Sink: 從Channel中讀取並移除Event, 將Event傳遞到FlowPipeline中的下一個Agent(如果有的話)(Sink從Channel收集數據,運行在一個獨立線程。)

4、flume數據流
1、Flume 的核心是把數據從數據源收集過來,再送到目的地。為了保證輸送一定成功,在送到目的地之前,會先緩存數據,待數據真正到達目的地后,刪除自己緩存的數據
2、Flume 傳輸的數據的基本單位是 Event,如果是文本文件,通常是一行記錄,這也是事務的基本單位。 Event 從 Source,流向 Channel,再到 Sink,本身為一個 byte 數組,並可攜帶 headers 信息。 Event 代表着一個數據流的最小完整單元,從外部數據源來,向外部的目的地去。
值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不同類型的Source,Channel和Sink可以自由組合。組合方式基於用戶設置的配置文件,非常靈活。
比如:Channel可以把事件暫存在內存里,也可以持久化到本地硬盤上。Sink可以把日志寫入HDFS, HBase,甚至是另外一個Source等等。Flume支持用戶建立多級流,
也就是說,多個agent可以協同工作。
5、flume可靠性
Flume 使用事務性的方式保證傳送Event整個過程的可靠性。 Sink 必須在Event 已經被傳達到下一站agent里,又或者,已經被存入外部數據目的地之后,才能把 Event 從 Channel 中 remove 掉。這樣數據流里的 event 無論是在一個 agent 里還是多個 agent 之間流轉,都能保證可靠,因為以上的事務保證了 event 會被成功存儲起來。比如 Flume支持在本地保存一份channel文件作為備份,而memory channel 將event存在內存 queue 里,速度快,但丟失的話無法恢復。
6、agent連接方式
順序連接

並連連接(個人理解)

多級流

負載均衡功能

二、flume安裝
1、上傳至虛擬機,解壓,重命名


2、配置環境變量


3、測試使用
查看flume版本
flume-ng version

測試使用
其他需要啥直接往上加或修改即可
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sinks.k1.channels = c1
監視root用戶data目錄下文件,並打印在控制台上
# 首先先給agent起一個名字 叫a1
# 分別給source channel sink取名字
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 分別對source、channel、sink進行配置
# 配置source
# 將source的類型指定為 spooldir 用於監聽一個目錄下文件的變化
# 因為每個組件可能會出現相同的屬性名稱,所以在對每個組件進行配置的時候
# 需要加上 agent的名字.sources.組件的名字.屬性 = 屬性值
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sources.r1.fileSuffix = .ok
a1.sources.r1.fileHeader = true
# 給r1這個souces配置一個攔截器並取名為 i1
a1.sources.r1.interceptors = i1
# 將攔截器i1的類型設置為timestamp 會將處理數據的時間以毫秒的格式插入event的header中
# a1.sources.r1.interceptors.i1.type = timestamp
# 將攔截器i1的類型設置為regex_filter 會根據正則表達式過濾數據
a1.sources.r1.interceptors.i1.type = regex_filter
# 配置正則表達式
a1.sources.r1.interceptors.i1.regex = \\d{3,6}
# excludeEvents = true 表示將匹配到的過濾,未匹配到的放行
a1.sources.r1.interceptors.i1.excludeEvents = true
# 配置sink
# 使用logger作為sink組件,可以將收集到數據直接打印到控制台
a1.sinks.k1.type = logger
# 配置channel
# 將channel的類型設置為memory,表示將event緩存在內存中
a1.channels.c1.type = memory
# 組裝
# 將sources的channels屬性指定為c1
a1.sources.r1.channels = c1
# 將sinks的channel屬性指定為c1
a1.sinks.k1.channel = c1


在root用戶新建目錄data,編寫日志


開始監聽
flume-ng agent -n a1 -f ./spoolingtest.conf


三、Flume使用
查看官網幫助文檔

1、spoolingToHDFS.conf

配置文件
# a表示給agent命名為a
# 給source組件命名為r1
a.sources = r1
# 給sink組件命名為k1
a.sinks = k1
# 給channel組件命名為c1
a.channels = c1
#指定spooldir的屬性
a.sources.r1.type = spooldir
a.sources.r1.spoolDir = /root/data
a.sources.r1.fileHeader = true
a.sources.r1.interceptors = i1
a.sources.r1.interceptors.i1.type = timestamp
#指定sink的類型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir1
# 指定文件名前綴
a.sinks.k1.hdfs.filePrefix = student
# 指定達到多少數據量寫一次文件 單位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少條寫一次文件
a.sinks.k1.hdfs.rollCount = 1000
# 指定文件類型為 流 來什么輸出什么
a.sinks.k1.hdfs.fileType = DataStream
# 指定文件輸出格式 為text
a.sinks.k1.hdfs.writeFormat = text
# 指定文件名后綴
a.sinks.k1.hdfs.fileSuffix = .txt
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次會從channel里取多少數據
a.channels.c1.transactionCapacity = 100
# 組裝
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
在 /root/data/目錄下准備數據
The Zen of Python, by Tim PetersBeautiful is better than ugly.Explicit is better than implicit.Simple is better than complex.Complex is better than complicated.Flat is better than nested.Sparse is better than dense.Readability counts.Special cases aren't special enough to break the rules.Although practicality beats purity.Errors should never pass silently.Unless explicitly silenced.In the face of ambiguity, refuse the temptation to guess.There should be one-- and preferably only one --obvious way to do it.Although that way may not be obvious at first unless you're Dutch.Now is better than never.Although never is often better than *right* now.If the implementation is hard to explain, it's a bad idea.If the implementation is easy to explain, it may be a good idea.Namespaces are one honking great idea -- let's do more of those!
啟動agent
flume-ng agent -n a -f ./spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console
2、hbaseLogToHDFS
配置文件
# a表示給agent命名為a
# 給source組件命名為r1
a.sources = r1
# 給sink組件命名為k1
a.sinks = k1
# 給channel組件命名為c1
a.channels = c1
#指定exec的屬性
a.sources.r1.type = exec
a.sources.r1.command = tail -f /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log
#指定sink的類型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir2
# 指定文件名前綴
a.sinks.k1.hdfs.filePrefix = hbaselog
# 指定達到多少數據量寫一次文件 單位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少條寫一次文件
a.sinks.k1.hdfs.rollCount = 1000
# 指定文件類型為 流 來什么輸出什么
a.sinks.k1.hdfs.fileType = DataStream
# 指定文件輸出格式 為text
a.sinks.k1.hdfs.writeFormat = text
# 指定文件名后綴
a.sinks.k1.hdfs.fileSuffix = .txt
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次會從channel里取多少數據
a.channels.c1.transactionCapacity = 100
# 組裝
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
3、hbaselogToHBase
在hbase中創建log表
create 'log','cf1'
配置文件
# a表示給agent命名為a
# 給source組件命名為r1
a.sources = r1
# 給sink組件命名為k1
a.sinks = k1
# 給channel組件命名為c1
a.channels = c1
#指定exec的屬性
a.sources.r1.type = exec
a.sources.r1.command = cat /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log
#指定sink的類型
a.sinks.k1.type = hbase
a.sinks.k1.table = log
a.sinks.k1.columnFamily = cf1
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 100000
# 表示sink每次會從channel里取多少數據
a.channels.c1.transactionCapacity = 100
# 組裝
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
4、netcatLogger
監聽telnet端口
安裝telnet
yum install telnet
配置文件
# a表示給agent命名為a
# 給source組件命名為r1
a.sources = r1
# 給sink組件命名為k1
a.sinks = k1
# 給channel組件命名為c1
a.channels = c1
#指定netcat的屬性
a.sources.r1.type = netcat
a.sources.r1.bind = 0.0.0.0
a.sources.r1.port = 8888
#指定sink的類型
a.sinks.k1.type = logger
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次會從channel里取多少數據
a.channels.c1.transactionCapacity = 100
# 組裝
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
啟動
先啟動agent
flume-ng agent -n a -f ./netcatToLogger.conf -Dflume.root.logger=DEBUG,console
在啟動telnet
telnet master 8888
5、httpToLogger
配置文件
# a表示給agent命名為a
# 給source組件命名為r1
a.sources = r1
# 給sink組件命名為k1
a.sinks = k1
# 給channel組件命名為c1
a.channels = c1
#指定http的屬性
a.sources.r1.type = http
a.sources.r1.port = 6666
#指定sink的類型
a.sinks.k1.type = logger
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
# 表示sink每次會從channel里取多少數據
a.channels.c1.transactionCapacity = 100
# 組裝
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
啟動
先啟動agent
flume-ng agent -n a -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console
再使用curl發起一個http請求
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"},{ "headers" :{"a2" : "a11","b2" : "b11"},"body" : "hello~http~flume2~"}]' http://master:6666
