一.前述
Copy過來一段介紹Apache Flume 是一個從可以收集例如日志,事件等數據資源,並將這些數量龐大的數據從各項數據資源中集中起來存儲的工具/服務,或者數集中機制。flume具有高可用,分布式,配置工具,其設計的原理也是基於將數據流,如日志數據從各種網站服務器上匯集起來存儲到HDFS,HBase等集中存儲器中。官網:http://flume.apache.org/FlumeUserGuide.html
二.架構
1.基本架構

介紹:
Source:(相當於一個來源)
從數據發生器接收數據,並將接收的數據以Flume的event格式傳遞給一個或者多個通道channal,Flume提供多種數據接收的方式,比如Avro,Thrift,twitter1%等
Channel:(相當於一個中轉)
channal是一種短暫的存儲容器,它將從source處接收到的event格式的數據緩存起來,直到它們被sinks消費掉,它在source和sink間起着一共橋梁的作用,channal是一個完整的事務,這一點保證了數據在收發的時候的一致性. 並且它可以和任意數量的source和sink鏈接. 支持的類型有: JDBC channel , File System channel , Memort channel等.
sink:(相當於最后的寫出)
sink將數據存儲到集中存儲器比如Hbase和HDFS,它從channals消費數據(events)並將其傳遞給目標地. 目標地可能是另一個sink,也可能HDFS,HBase.
2.延伸架構
2.1利用AVRO中轉

2.2一般多個來源時可以配置這樣

ps:
1、上傳
2、解壓
3、修改conf/flume-env.sh 文件中的JDK目錄
注意:JAVA_OPTS 配置 如果我們傳輸文件過大 報內存溢出時 需要修改這個配置項
4、驗證安裝是否成功 ./flume-ng version
5、配置環境變量
export FLUME_HOME=/home/apache-flume-1.6.0-bin
3.2 Source、Channel、Sink有哪些類型
Flume Source
Source類型 | 說明
Avro Source | 支持Avro協議(實際上是Avro RPC),內置支持
Thrift Source | 支持Thrift協議,內置支持
Exec Source | 基於Unix的command在標准輸出上生產數據
JMS Source | 從JMS系統(消息、主題)中讀取數據
Spooling Directory Source | 監控指定目錄內數據變更
Twitter 1% firehose Source| 通過API持續下載Twitter數據,試驗性質
Netcat Source | 監控某個端口,將流經端口的每一個文本行數據作為Event輸入
Sequence Generator Source | 序列生成器數據源,生產序列數據
Syslog Sources | 讀取syslog數據,產生Event,支持UDP和TCP兩種協議
HTTP Source | 基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式
Legacy Sources | 兼容老的Flume OG中Source(0.9.x版本)
Flume Channel
Channel類型 說明
Memory Channel | Event數據存儲在內存中
JDBC Channel | Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby
File Channel | Event數據存儲在磁盤文件中
Spillable Memory Channel | Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件
Pseudo Transaction Channel | 測試用途
Custom Channel | 自定義Channel實現
Flume Sink
Sink類型 說明
HDFS Sink | 數據寫入HDFS
Logger Sink | 數據寫入日志文件
Avro Sink | 數據被轉換成Avro Event,然后發送到配置的RPC端口上
Thrift Sink | 數據被轉換成Thrift Event,然后發送到配置的RPC端口上
IRC Sink | 數據在IRC上進行回放
File Roll Sink | 存儲數據到本地文件系統
Null Sink | 丟棄到所有數據
HBase Sink | 數據寫入HBase數據庫
Morphline Solr Sink | 數據發送到Solr搜索服務器(集群)
ElasticSearch Sink | 數據發送到Elastic Search搜索服務器(集群)
Kite Dataset Sink | 寫數據到Kite Dataset,試驗性質的
Custom Sink | 自定義Sink實現
案例1、 A simple example
http://flume.apache.org/FlumeUserGuide.html#a-simple-example
配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
啟動flume
flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console 指定配置目錄
安裝telnet
yum install telnet
退出 ctrl+] quit
Memory Chanel 配置
capacity:默認該通道中最大的可以存儲的event數量是100,
trasactionCapacity:每次最大可以source中拿到或者送到sink中的event數量也是100
keep-alive:event添加到通道中或者移出的允許時間
byte**:即event的字節量的限制,只包括eventbody
案例2、兩個flume做集群(第一個agent的sink作為第二個agent的source)
node01服務器中,配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node1
a1.sources.r1.port = 44444
# Describe the sink
# a1.sinks.k1.type = logger
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node2
a1.sinks.k1.port = 60000
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
node02服務器中,安裝Flume(步驟略)
配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 60000
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
先啟動node02的Flume
flume-ng agent -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console
再啟動node01的Flume
flume-ng agent -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console
打開telnet 測試 node02控制台輸出結果
案例3、Exec Source(監聽一個文件)
http://flume.apache.org/FlumeUserGuide.html#exec-source
配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/flume.exec.log
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
啟動Flume
flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console
創建空文件演示 touch flume.exec.log
循環添加數據
for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done
案例4、Spooling Directory Source(監聽一個目錄)
http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
啟動Flume
flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console
拷貝文件演示
mkdir logs
cp flume.exec.log logs/
案例5、hdfs sink
http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true
# Describe the sink
***只修改上一個spool sink的配置代碼塊 a1.sinks.k1.type = logger
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M
##每隔60s或者文件大小超過10M的時候產生新文件
# hdfs有多少條消息時新建文件,0不基於消息個數
a1.sinks.k1.hdfs.rollCount=0
# hdfs創建多長時間新建文件,0不基於時間
a1.sinks.k1.hdfs.rollInterval=60
# hdfs多大時新建文件,0不基於文件大小
a1.sinks.k1.hdfs.rollSize=10240
# 當目前被打開的臨時文件在該參數指定的時間(秒)內,沒有任何數據寫入,則將該臨時文件關閉並重命名成目標文件
a1.sinks.k1.hdfs.idleTimeout=3
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp=true
## 每五分鍾生成一個目錄:
# 是否啟用時間上的”舍棄”,這里的”舍棄”,類似於”四舍五入”,后面再介紹。如果啟用,則會影響除了%t的其他所有時間表達式
a1.sinks.k1.hdfs.round=true
# 時間上進行“舍棄”的值;
a1.sinks.k1.hdfs.roundValue=5
# 時間上進行”舍棄”的單位,包含:second,minute,hour
a1.sinks.k1.hdfs.roundUnit=minute
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1(將source,channel,sink關聯)
############################################################
創建HDFS目錄
hadoop fs -mkdir /flume
啟動Flume
flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console
查看hdfs文件
hadoop fs -ls /flume/...
hadoop fs -get /flume/...
http://flume.apache.org/
安裝
1、上傳
2、解壓
3、修改conf/flume-env.sh 文件中的JDK目錄
注意:JAVA_OPTS 配置 如果我們傳輸文件過大 報內存溢出時 需要修改這個配置項
4、驗證安裝是否成功 ./flume-ng version
5、配置環境變量
export FLUME_HOME=/home/apache-flume-1.6.0-bin
Source、Channel、Sink有哪些類型
Flume Source
Source類型 | 說明
Avro Source | 支持Avro協議(實際上是Avro RPC),內置支持
Thrift Source | 支持Thrift協議,內置支持
Exec Source | 基於Unix的command在標准輸出上生產數據
JMS Source | 從JMS系統(消息、主題)中讀取數據
Spooling Directory Source | 監控指定目錄內數據變更
Twitter 1% firehose Source| 通過API持續下載Twitter數據,試驗性質
Netcat Source | 監控某個端口,將流經端口的每一個文本行數據作為Event輸入
Sequence Generator Source | 序列生成器數據源,生產序列數據
Syslog Sources | 讀取syslog數據,產生Event,支持UDP和TCP兩種協議
HTTP Source | 基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式
Legacy Sources | 兼容老的Flume OG中Source(0.9.x版本)
Flume Channel
Channel類型 說明
Memory Channel | Event數據存儲在內存中
JDBC Channel | Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby
File Channel | Event數據存儲在磁盤文件中
Spillable Memory Channel | Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件
Pseudo Transaction Channel | 測試用途
Custom Channel | 自定義Channel實現
Flume Sink
Sink類型 說明
HDFS Sink | 數據寫入HDFS
Logger Sink | 數據寫入日志文件
Avro Sink | 數據被轉換成Avro Event,然后發送到配置的RPC端口上
Thrift Sink | 數據被轉換成Thrift Event,然后發送到配置的RPC端口上
IRC Sink | 數據在IRC上進行回放
File Roll Sink | 存儲數據到本地文件系統
Null Sink | 丟棄到所有數據
HBase Sink | 數據寫入HBase數據庫
Morphline Solr Sink | 數據發送到Solr搜索服務器(集群)
ElasticSearch Sink | 數據發送到Elastic Search搜索服務器(集群)
Kite Dataset Sink | 寫數據到Kite Dataset,試驗性質的
Custom Sink | 自定義Sink實現
案例1、 A simple example
http://flume.apache.org/FlumeUserGuide.html#a-simple-example
配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
啟動flume
flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console
安裝telnet
yum install telnet
退出 ctrl+] quit
Memory Chanel 配置
capacity:默認該通道中最大的可以存儲的event數量是100,
trasactionCapacity:每次最大可以source中拿到或者送到sink中的event數量也是100
keep-alive:event添加到通道中或者移出的允許時間
byte**:即event的字節量的限制,只包括eventbody
案例2、兩個flume做集群
node01服務器中,配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node1
a1.sources.r1.port = 44444
# Describe the sink
# a1.sinks.k1.type = logger
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node2
a1.sinks.k1.port = 60000
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
node02服務器中,安裝Flume(步驟略)
配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 60000
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
先啟動node02的Flume
flume-ng agent -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console
再啟動node01的Flume
flume-ng agent -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console
打開telnet 測試 node02控制台輸出結果
案例3、Exec Source
http://flume.apache.org/FlumeUserGuide.html#exec-source
配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/flume.exec.log
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
啟動Flume
flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console
創建空文件演示 touch flume.exec.log
循環添加數據
for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done
案例4、Spooling Directory Source
http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
啟動Flume
flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console
拷貝文件演示
mkdir logs
cp flume.exec.log logs/
案例5、hdfs sink
http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true
# Describe the sink
***只修改上一個spool sink的配置代碼塊 a1.sinks.k1.type = logger
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M
##每隔60s或者文件大小超過10M的時候產生新文件
# hdfs有多少條消息時新建文件,0不基於消息個數
a1.sinks.k1.hdfs.rollCount=0
# hdfs創建多長時間新建文件,0不基於時間
a1.sinks.k1.hdfs.rollInterval=60
# hdfs多大時新建文件,0不基於文件大小
a1.sinks.k1.hdfs.rollSize=10240
# 當目前被打開的臨時文件在該參數指定的時間(秒)內,沒有任何數據寫入,則將該臨時文件關閉並重命名成目標文件
a1.sinks.k1.hdfs.idleTimeout=3
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp=true
## 每五分鍾生成一個目錄:
# 是否啟用時間上的”舍棄”,這里的”舍棄”,類似於”四舍五入”,后面再介紹。如果啟用,則會影響除了%t的其他所有時間表達式
a1.sinks.k1.hdfs.round=true
# 時間上進行“舍棄”的值;
a1.sinks.k1.hdfs.roundValue=5
# 時間上進行”舍棄”的單位,包含:second,minute,hour
a1.sinks.k1.hdfs.roundUnit=minute
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
創建HDFS目錄
hadoop fs -mkdir /flume
啟動Flume
flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console
查看hdfs文件
hadoop fs -ls /flume/...
hadoop fs -get /flume/...
作業:
1、flume如何收集java請求數據
2、項目當中如何來做? 日志存放/log/目錄下 以yyyyMMdd為子目錄 分別存放每天的數據
