轉自:http://blog.csdn.net/wzy0623/article/details/73650053
一、為什么要用到Flume
在以前搭建HAWQ數據倉庫實驗環境時,我使用Sqoop抽取從MySQL數據庫增量抽取數據到HDFS,然后用HAWQ的外部表進行訪問。這種方式只需要很少量的配置即可完成數據抽取任務,但缺點同樣明顯,那就是實時性。Sqoop使用MapReduce讀寫數據,而MapReduce是為了批處理場景設計的,目標是大吞吐量,並不太關心低延時問題。就像實驗中所做的,每天定時增量抽取數據一次。Flume是一個海量日志采集、聚合和傳輸的系統,支持在日志系統中定制各類數據發送方,用於收集數據。同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方的能力。Flume以流方式處理數據,可作為代理持續運行。當新的數據可用時,Flume能夠立即獲取數據並輸出至目標,這樣就可以在很大程度上解決實時性問題。
Flume是最初只是一個日志收集器,但隨着flume-ng-sql-source插件的出現,使得Flume從關系數據庫采集數據成為可能。下面簡單介紹Flume,並詳細說明如何配置Flume將MySQL表數據准實時抽取到HDFS。
二、Flume簡介
1. Flume的概念
Flume是分布式的日志收集系統,它將各個服務器中的數據收集起來並送到指定的地方去,比如說送到HDFS,簡單來說flume就是收集日志的,其架構如圖1所示。
2. Event的概念
在這里有必要先介紹一下Flume中event的相關概念:Flume的核心是把數據從數據源(source)收集過來,在將收集到的數據送到指定的目的地(sink)。為了保證輸送的過程一定成功,在送到目的地(sink)之前,會先緩存數據(channel),待數據真正到達目的地(sink)后,Flume再刪除自己緩存的數據。在整個數據的傳輸的過程中,流動的是event,即事務保證是在event級別進行的。那么什么是event呢?Event將傳輸的數據進行封裝,是Flume傳輸數據的基本單位,如果是文本文件,通常是一行記錄。Event也是事務的基本單位。Event從source,流向channel,再到sink,本身為一個字節數組,並可攜帶headers(頭信息)信息。Event代表着一個數據的最小完整單元,從外部數據源來,向外部的目的地去。
3. Flume架構介紹
Flume之所以這么神奇,是源於它自身的一個設計,這個設計就是agent。Agent本身是一個Java進程,運行在日志收集節點——所謂日志收集節點就是服務器節點。 Agent里面包含3個核心的組件:source、channel和sink,類似生產者、倉庫、消費者的架構。- Source:source組件是專門用來收集數據的,可以處理各種類型、各種格式的日志數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。
- Channel:source組件把數據收集來以后,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數據的——對采集到的數據進行簡單的緩存,可以存放在memory、jdbc、file等等。
- Sink:sink組件是用於把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。
4. Flume的運行機制
Flume的核心就是一個agent,這個agent對外有兩個進行交互的地方,一個是接受數據輸入的source,一個是數據輸出的sink,sink負責將數據發送到外部指定的目的地。source接收到數據之后,將數據發送給channel,chanel作為一個數據緩沖區會臨時存放這些數據,隨后sink會將channel中的數據發送到指定的地方,例如HDFS等。注意:只有在sink將channel中的數據成功發送出去之后,channel才會將臨時數據進行刪除,這種機制保證了數據傳輸的可靠性與安全性。三、安裝Hadoop和Flume
我的實驗在HDP 2.5.0上進行,HDP安裝中包含Flume,只要配置Flume服務即可。HDP的安裝步驟參見“ HAWQ技術解析(二) —— 安裝部署”四、配置與測試
1. 建立MySQL數據庫表
建立測試表並添加數據。- use test;
- create table wlslog
- (id int not null,
- time_stamp varchar(40),
- category varchar(40),
- type varchar(40),
- servername varchar(40),
- code varchar(40),
- msg varchar(40),
- primary key ( id )
- );
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(1,'apr-8-2014-7:06:16-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to standby');
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(2,'apr-8-2014-7:06:17-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to starting');
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(3,'apr-8-2014-7:06:18-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to admin');
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(4,'apr-8-2014-7:06:19-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to resuming');
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(5,'apr-8-2014-7:06:20-pm-pdt','notice','weblogicserver','adminserver','bea-000361','started weblogic adminserver');
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(6,'apr-8-2014-7:06:21-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to running');
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(7,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');
- commit;
2. 建立相關目錄與文件
(1)創建本地狀態文件- mkdir -p /var/lib/flume
- cd /var/lib/flume
- touch sql-source.status
- chmod -R 777 /var/lib/flume
(2)建立HDFS目標目錄
- hdfs dfs -mkdir -p /flume/mysql
- hdfs dfs -chmod -R 777 /flume/mysql
3. 准備JAR包
從 http://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下載flume-ng-sql-source-1.3.7.jar文件,並復制到Flume庫目錄。- cp flume-ng-sql-source-1.3.7.jar /usr/hdp/current/flume-server/lib/
- cp mysql-connector-java-5.1.17.jar /usr/hdp/current/flume-server/lib/mysql-connector-java.jar
4. 建立HAWQ外部表
- create external table ext_wlslog
- (id int,
- time_stamp varchar(40),
- category varchar(40),
- type varchar(40),
- servername varchar(40),
- code varchar(40),
- msg varchar(40)
- ) location ('pxf://mycluster/flume/mysql?profile=hdfstextmulti') format 'csv' (quote=e'"');
5. 配置Flume
在Ambari -> Flume -> Configs -> flume.conf中配置如下屬性:- agent.channels.ch1.type = memory
- agent.sources.sql-source.channels = ch1
- agent.channels = ch1
- agent.sinks = HDFS
- agent.sources = sql-source
- agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
- agent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/test
- agent.sources.sql-source.user = root
- agent.sources.sql-source.password = 123456
- agent.sources.sql-source.table = wlslog
- agent.sources.sql-source.columns.to.select = *
- agent.sources.sql-source.incremental.column.name = id
- agent.sources.sql-source.incremental.value = 0
- agent.sources.sql-source.run.query.delay=5000
- agent.sources.sql-source.status.file.path = /var/lib/flume
- agent.sources.sql-source.status.file.name = sql-source.status
- agent.sinks.HDFS.channel = ch1
- agent.sinks.HDFS.type = hdfs
- agent.sinks.HDFS.hdfs.path = hdfs://mycluster/flume/mysql
- agent.sinks.HDFS.hdfs.fileType = DataStream
- agent.sinks.HDFS.hdfs.writeFormat = Text
- agent.sinks.HDFS.hdfs.rollSize = 268435456
- agent.sinks.HDFS.hdfs.rollInterval = 0
- agent.sinks.HDFS.hdfs.rollCount = 0
屬性 |
描述 |
agent.channels.ch1.type |
Agent的channel類型 |
agent.sources.sql-source.channels |
Source對應的channel名稱 |
agent.channels |
Channel名稱 |
agent.sinks |
Sink名稱 |
agent.sources |
Source名稱 |
agent.sources.sql-source.type |
Source類型 |
agent.sources.sql-source.connection.url |
數據庫URL |
agent.sources.sql-source.user |
數據庫用戶名 |
agent.sources.sql-source.password |
數據庫密碼 |
agent.sources.sql-source.table |
數據庫表名 |
agent.sources.sql-source.columns.to.select |
查詢的列 |
agent.sources.sql-source.incremental.column.name |
增量列名 |
agent.sources.sql-source.incremental.value |
增量初始值 |
agent.sources.sql-source.run.query.delay |
發起查詢的時間間隔,單位是毫秒 |
agent.sources.sql-source.status.file.path |
狀態文件路徑 |
agent.sources.sql-source.status.file.name |
狀態文件名稱 |
agent.sinks.HDFS.channel |
Sink對應的channel名稱 |
agent.sinks.HDFS.type |
Sink類型 |
agent.sinks.HDFS.hdfs.path |
Sink路徑 |
agent.sinks.HDFS.hdfs.fileType |
流數據的文件類型 |
agent.sinks.HDFS.hdfs.writeFormat |
數據寫入格式 |
agent.sinks.HDFS.hdfs.rollSize |
目標文件輪轉大小,單位是字節 |
agent.sinks.HDFS.hdfs.rollInterval |
hdfs sink間隔多長將臨時文件滾動成最終目標文件,單位是秒;如果設置成0,則表示不根據時間來滾動文件 |
agent.sinks.HDFS.hdfs.rollCount |
當events數據達到該數量時候,將臨時文件滾動成目標文件;如果設置成0,則表示不根據events數據來滾動文件 |
表1
6. 運行Flume代理
保存上一步的設置,然后重啟Flume服務,如圖2所示。
重啟后,狀態文件已經記錄了將最新的id值7,如圖3所示。

查看目標路徑,生成了一個臨時文件,其中有7條記錄,如圖4所示。

查詢HAWQ外部表,結果也有全部7條數據,如圖5所示。

至此,初始數據抽取已經完成。
7. 測試准實時增量抽取
在源表中新增id為8、9、10的三條記錄。- use test;
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(8,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(9,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');
- insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(10,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');
- commit;

五、方案優缺點
利用Flume采集關系數據庫表數據最大的優點是配置簡單,不用編程。相比tungsten-replicator的復雜性,Flume只要在flume.conf文件中配置source、channel及sink的相關屬性,已經沒什么難度了。而與現在很火的canal比較,雖然不夠靈活,但畢竟一行代碼也不用寫。再有該方案采用普通SQL輪詢的方式實現,具有通用性,適用於所有關系庫數據源。這種方案的缺點與其優點一樣突出,主要體現在以下幾方面。
- 在源庫上執行了查詢,具有入侵性。
- 通過輪詢的方式實現增量,只能做到准實時,而且輪詢間隔越短,對源庫的影響越大。
- 只能識別新增數據,檢測不到刪除與更新。
- 要求源庫必須有用於表示增量的字段。
參考:
Flume架構以及應用介紹Streaming MySQL Database Table Data to HDFS with Flume
how to read data from oracle using FLUME to kafka broker
https://github.com/keedio/flume-ng-sql-source
-
v