Flume介紹
Flume是Apache基金會組織的一個提供的高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。
當前Flume有兩個版本,Flume 0.9x版本之前的統稱為Flume-og,Flume1.X版本被統稱為Flume-ng。
參考文檔:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6/FlumeUserGuide.html
Flume-og和Flume-ng的區別
主要區別如下:
1. Flume-og中采用master結構,為了保證數據的一致性,引入zookeeper進行管理。Flume-ng中取消了集中master機制和zookeeper管理機制,變成了一個純粹的傳輸工具。
2. Flume-ng中采用不同的線程進行數據的讀寫操作;在Flume-og中,讀數據和寫數據是由同一個線程操作的,如果寫出比較慢的話,可能會阻塞flume的接收數據的能力。
Flume結構
Flume中以Agent為基本單位,一個agent可以包括source、channel、sink,三種組件都可以有多個。其中source組件主要功能是接收外部數據,並將數據傳遞到channel中;sink組件主要功能是發送flume接收到的數據到目的地;channel的主要作用就是數據傳輸和保存的一個作用。Flume主要分為三類結構:單agent結構、多agent鏈式結構和多路復用agent結構。
單agent結構
多agent鏈式結構
多路復用agent結構
Source介紹
Source的主要作用是接收客戶端發送的數據,並將數據發送到channel中,source和channel之間的關系是多對多關系,不過一般情況下使用一個source對應多個channel。通過名稱區分不同的source。Flume常用source有:Avro Source、Thrift Source、Exec Source、Kafka Source、Netcat Source等。設置格式如下:
<agent-name>.sources=source_names
<agent-name>.sources.<source_name>.type=指定類型
<agent-name>.sources.<source_name>.channels=channels
.... 其他對應source類型需要的參數
Channel介紹
Channel的主要作用是提供一個數據傳輸通道,提供數據傳輸和數據存儲(可選)等功能。source將數據放到channel中,sink從channel中拿數據。通過不同的名稱來區分channel。Flume常用channel有:Memory Channel、JDBC Channel、Kafka Channel、File Channel等。設置格式如下:
<agent-name>.channels=channel_names
<agent-name>.channels.<channel_name>.type=指定類型
.... 其他對應channel類型需要的參數
Sink介紹
Sink的主要作用是定義數據寫出方式,一般情況下sink從channel中獲取數據,然后將數據寫出到file、hdfs或者網絡上。channel和sink之間的關系是一對多的關系。通過不同的名稱來區分sink。Flume常用sink有:
Hdfs Sink、Hive Sink、File Sink、HBase Sink、Avro Sink、Thrift Sink、Logger Sink等。
設置格式如下:
<agent-name>.sinks = sink_names
<agent-name>.sinks.<sink_name1>.type=指定類型
<agent-name>.sinks.<sink_name1>.channel=<channe_name>
.... 其他對應sink類型需要的參數
Flume中常用的source、channel、sink組件
1.2.2.1 source組件
Source類型 |
說明 |
Avro Source |
支持Avro協議(實際上是Avro RPC),內置支持 |
Thrift Source |
支持Thrift協議,內置支持 |
Exec Source |
基於Unix的command在標准輸出上生產數據 |
JMS Source |
從JMS系統(消息、主題)中讀取數據,ActiveMQ已經測試過 |
Spooling Directory Source |
監控指定目錄內數據變更 |
Twitter 1% firehose Source |
通過API持續下載Twitter數據,試驗性質 |
Netcat Source |
監控某個端口,將流經端口的每一個文本行數據作為Event輸入 |
Sequence Generator Source |
序列生成器數據源,生產序列數據 |
Syslog Sources |
讀取syslog數據,產生Event,支持UDP和TCP兩種協議 |
HTTP Source |
基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式 |
Legacy Sources |
兼容老的Flume OG中Source(0.9.x版本) |
1.2.2.2 Channel組件
Channel類型 |
說明 |
Memory Channel |
Event數據存儲在內存中 |
JDBC Channel |
Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby |
File Channel |
Event數據存儲在磁盤文件中 |
Spillable Memory Channel |
Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用) |
Pseudo Transaction Channel |
測試用途 |
Custom Channel |
自定義Channel實現 |
1.2.2.3 sink組件
Sink類型 |
說明 |
HDFS Sink |
數據寫入HDFS |
Logger Sink |
數據寫入日志文件 |
Avro Sink |
數據被轉換成Avro Event,然后發送到配置的RPC端口上 |
Thrift Sink |
數據被轉換成Thrift Event,然后發送到配置的RPC端口上 |
IRC Sink |
數據在IRC上進行回放 |
File Roll Sink |
存儲數據到本地文件系統 |
Null Sink |
丟棄到所有數據 |
HBase Sink |
數據寫入HBase數據庫 |
Morphline Solr Sink |
數據發送到Solr搜索服務器(集群) |
ElasticSearch Sink |
數據發送到Elastic Search搜索服務器(集群) |
Kite Dataset Sink |
寫數據到Kite Dataset,試驗性質的 |
Custom Sink |
自定義Sink實現 |
Flume支持眾多的source、channel、sink類型,詳細手冊可參考官方文檔
Flume安裝
安裝步驟如下:
1. 下載flume:wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6.tar.gz
2. 解壓flume。
3. 修改conf/flume-env.sh文件,如果沒有就新建一個。
4. 添加flume的bin目錄到環境變量中去。
5. 驗證是否安裝成功, flume-ng version
監聽Hive日志信息並寫入到hdfs
1. 編寫nginx配置信息
在hive根目錄下創建log文件夾,
2. 編寫flume的agent配置信息
配置如下
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /home/hadoop/bigdatasoftware/apache-hive-0.13.1-bin/log/hive.log #要監控的log文件
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop-001:9000/logs/%Y%m%d/%H0 #生成hdfs文件的目錄格式
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#設置每個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k2.hdfs.rollCount = 0
#最小冗余數
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3. 移動hdfs依賴包到flume的lib文件夾中。
cp ~/bigdatasoftware/hadoop-2.7.2/share/hadoop/common/lib/commons-configuration-1.6.jar ./
cp ~/bigdatasoftware/hadoop-2.7.2/share/hadoop/common/lib/hadoop-auth-2.7.2.jar ./
cp ~/bigdatasoftware/hadoop-2.7.2/share/hadoop/common/hadoop-common-2.7.2.jar ./
cp ~/bigdatasoftware/hadoop-2.7.2/share/hadoop/hdfs/hadoop-hdfs-2.7.2.jar ./
4. 進入flume根目錄,啟動
flume: ./bin/flume-ng agent --conf ./conf/ --name a2 --conf-file ./conf/flume-file-hdfs.conf
5.在hive上進行一系列操作
在hdfs上查看即可發現已生成文件