使用StreamSets從MySQL增量更新數據到Hive
我們可以StreamSets實現數據采集,在實際生產中需要批量、實時捕獲MySQL、Oracle等數據源的變化數據並將其寫入大數據平台的Hive等。這里主要介紹如何使用StreamSets通過JDBC方式實時從MySQL抽取數據到Hive。
StreamSets實現的流程如下:

大致的流程如下:
- Reads data from a JDBC source using a query
- Generates Hive metadata and write information fo HDFS
- Updates the Hive Metastore。通俗的講就是在目標庫中創建表。
- Writes to a Hadoop file system。通俗的講就是把數據寫入到目標庫的表中。
用到的各組件介紹:
這里除了看組件名稱,也注意右上角的字母,有的組件圖標很相似,但是功能是不一樣的,要選擇合適的組件。
第一個組件source, 如下圖, Source是JDBC Query Consumer,看其注釋:Reads data from a JDBC source using a query,可以知道是從數據庫讀取數據。

中間的組件,如下圖,Hive Metadata,注釋:Generates Hive metadata and write information fo HDFS。作用是根據源數據來產生相應的目標數據庫的metadata和寫入HDFS的需要的信息。

兩個目標組件如下圖:


目標組件一:Hadoop FS,注釋:Writes to a Hadoop file system,是將數據寫到HDFS,通俗的講就是把data寫入硬盤或者說Hive表。
目標組件二:Hive Metastore,注釋:Updates the Hive Metastore,是將medata寫入Hive,通俗的講就是創建Hive表。
准備StreamSets。
下載StreamSets,一定要下載all版本,大概5.6G,迅雷一晚上下載完。Core版本不包含有些組件,安裝不上。
https://archives.streamsets.com/datacollector/3.19.0/tarball/streamsets-datacollector-all-3.19.0.tgz
修改最大打開文件數
使用ulimit -a 可以查看當前系統的所有限制值,使用ulimit -n 可以查看當前的最大打開文件數。新裝的linux默認只有1024,當作負載較大的服務器時,很容易遇到error: too many open files或者Configuration of maximum open file limit is too low: 1024。因此,需要將其改大。
使用 ulimit -n 65535 可即時修改,但重啟后就無效了。
臨時修改:
ulimit -n
ulimit -n 65535
永久修改:
echo ulimit -n 65535 >>/etc/profile
source /etc/profile
解壓啟動StreamSets。直接解壓即可。
tar zxvf streamsets-datacollector-all-3.19.0.tgz
cd /home/cg/streamsets/
bin/streamsets dc
登錄。缺省端口18630,原始用戶名和秘密均是admin.目前只有251上的streamsets能解密並且寫入hive,以此為例。
admin/admin
准備源數據庫的表及數據。注意:因為要做增量測試,也就是隨時需要向源MySQL中增加數據。
創建表:
drop table streamsetstest;
CREATE TABLE streamsetstest (
id int auto_increment,
no_encrypt varchar(255) NULL,
a nvarchar(255) NULL,
primary key (id)
);
插入數據:
INSERT INTO streamsetstest (no_encrypt,a) VALUES('BWP018930705', 'BWP018930705');
創建Pipeline.
創建一個Pipeline,依次創建所需要的組件,每次保證創建的組件能夠連通。
New Pipeline.
登錄http://ip:18630/,點擊Create New Pipeline
填寫Error Records相關的信息
寫入Error Records的方式見下圖:

寫入Error Records的位置見下圖:

創建源組件
我們需要從MySQL或者Oracle中通過JDBC讀取數據。可以根據圖標和注釋來選擇合適的組件。
選擇JDBC Query Consumer。如果Origin missing-> Select Origin能找到,就這么選擇JDBC Query Consumer

如果找不到,就在右側Stage選JDBC,找到JDBC Query Consumer

配置JDBC Query Consumer。
JDBC Connection String,如圖,跟通常連JDBC的URL一致,無區別。
SQL Query: 有缺省的模板,需要將table名稱改成具體的名稱,where雖然能改成<,但是我們只有要測增量,所以最好還是>。注意和Initial Offset、Offset column值得對應關系。
Credential Tab里的配置,輸入用戶名和密碼,無特別需要注意的地方。
JDBC Tab配置如下圖:

這個源組件配置完畢之后,點擊Preview按鈕,看看什么地方有錯誤
Preview按鈕如下圖。

如果配置有錯,streamsets會提示,根據提示找到相應的配置項改過來。如果配置沒有錯,就能看到下圖中顯示輸出的數據。確定沒錯之后,退出Preview模式即可。

streamsets缺省的JDBC lib里並不含有MySQL的jdbc driver文件mysql.jar,我們需要點擊Install External Libraries來添加需要的driver。

配置Hive Metadata組件。
這個組件的作用是Generates Hive metadata and write information fo HDFS。各個Tab里的配置會在下邊說明。
General:對於整個流程里涉及到的三個Hive相關的組件的Stage Library要選一樣的,其他的不用管了。
General Tab配置如下圖

Hive Tab:
JDBC URL,這個沒特別需要注意的地方
jdbc:hive2://ip:10000
JDBC Driver Name,也沒啥需要特別注意的地方。
org.apache.hive.jdbc.HiveDriver
Use Credentials,這個依賴於Hive本身的配置,如果不需要用戶密碼,就不用填。如果需要就填上。至於怎么配置hive可以遠程匿名訪問,可以在網上查,或者見后邊的附錄。
Hadoop Configuration Directory,這個需要特別注意,這是存放core-site.xml hdfs-site.xml hive-site.xml的配置文件目錄。我開始直覺的認為這是hadoop服務器上的directory,實際上不是。我們需要在StreamSets服務器所在的那台機器上創建一個目錄,然后從Hadoop那台機器下載那幾個文件,放在這個目錄里。
Hive Tab配置如下圖

Table Tab:
Database Expression:無特別需要注意的地方。這個應該是hive已經存在的數據庫名稱。
Table Name: 這個不需要在數據庫中手動創建此表。如果整個流程沒有問題,會自動創建這個表。咱們可以寫任何我們想要的表名稱。
Partition Configuration: 這里我們不需要分表存儲,所以刪掉,不用配置此項。
Table Tab配置如下圖:

Data Format Tab:
Data Format:選Avro即可。無特別需要注意的地方。

添加目標組件一:Hadoop FS,注釋:Writes to a Hadoop file system,是將數據寫到HDFS。通俗的講就是把data寫入。
Connection Tab:
File System URI,
hdfs://ip:9000
這個值可以從hadoop那台機器上的core-site.xml里找。
<property>
<name>fs.defaultFS</name>
<value>hdfs://ip:9000</value>
</property>
Connection Tab 配置如下:

Output Files Tab:
也是需要Show Advanced Options,注意
check上Directory in Header
修改Idle Timeout,缺省的可能是1小時,導致長時間沒有寫入到Hive。
Output Files Tab配置如下圖:

Data Format如下圖所示選擇即可:

添加目標組件二:Hive Metastore,注釋:Updates the Hive Metastore,是將medata寫入Hive,通俗的講就是創建Hive表。
整個組件的配置也沒有特別需要說明的地方,一些配置從上個Hive Metastore組件copy過來就行。

遇到的問題及解決方法
遠程匿名訪問hive
修改core-site.xml加上以下部分,然后重啟Hadoop。
遠程匿名訪問Hive,會直接以root用戶訪問Hive。
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.sdc.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.sdc.groups</name>
<value>*</value>
</property>
重啟Hadoop
cd /usr/local/hadoop-2.10.1/
sbin/stop-all.sh
sbin/stop-all.sh
將數據寫入表時遇到如下錯誤
Permission denied: user=anonymous, access=WRITE, inode="/hive/warehouse":root:supergroup:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:3
原因是權限問題,根據錯誤提示,開放相應目錄權限。
hadoop dfs -chmod -R 777 /hive/warehouse
hadoop dfs -chmod -R 777 /tmp/hadoop-yarn
簡單程序驗證Hive是否可遠程創建表並插入數據。確保Hive可以遠程創建表並插入數據,再在streamsets上試驗。
需要用到的jar包如下:大概需要11個包

附簡單程序,已驗證通過。
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
public class HiveConnectionTest {
public static void main(String[] args) throws Exception {
Class.forName("org.apache.hive.jdbc.HiveDriver");
String dropSQL="DROP TABLE streamsetstest";
String createSQL="CREATE TABLE streamsetstest(id int,no_encrypt string,a string)";
//String insterSQL="LOAD DATA LOCAL INPATH '/work/hive/examples/files/kv1.txt' OVERWRITE INTO TABLE javabloger";
String insertSQL="INSERT INTO streamsetstest (no_encrypt,a) VALUES('test1', 'test2')";
String querySQL="select * from streamsetstest";
Connection con = DriverManager.getConnection("jdbc:hive2://ip:10000/default", "", "");
Statement stmt = con.createStatement();
stmt.execute(dropSQL);
stmt.execute(createSQL); // 執行建表語句
stmt.execute(insertSQL); // 執行插入語句
ResultSet res = stmt.executeQuery(querySQL); // 執行查詢語句
while (res.next()) {
System.out.println("Result: no_encrypt:"+res.getString(2) +" –> a:" +res.getString(3));
}
}
}
