使用StreamSets從MySQL增量更新數據到Hive


使用StreamSets從MySQL增量更新數據到Hive

我們可以StreamSets實現數據采集,在實際生產中需要批量、實時捕獲MySQL、Oracle等數據源的變化數據並將其寫入大數據平台的Hive等。這里主要介紹如何使用StreamSets通過JDBC方式實時從MySQL抽取數據到Hive。

StreamSets實現的流程如下:

 

 

 大致的流程如下:

  1. Reads data from a JDBC source using a query
  2. Generates Hive metadata and write information fo HDFS
  3. Updates the Hive Metastore。通俗的講就是在目標庫中創建表。
  4. Writes to a Hadoop file system。通俗的講就是把數據寫入到目標庫的表中。

用到的各組件介紹:

這里除了看組件名稱,也注意右上角的字母,有的組件圖標很相似,但是功能是不一樣的,要選擇合適的組件。

第一個組件source, 如下圖, Source是JDBC Query Consumer,看其注釋:Reads data from a JDBC source using a query,可以知道是從數據庫讀取數據。

 

 

 

中間的組件,如下圖,Hive Metadata,注釋:Generates Hive metadata and write information fo HDFS。作用是根據源數據來產生相應的目標數據庫的metadata和寫入HDFS的需要的信息。

 

 

兩個目標組件如下圖:

目標組件一:Hadoop FS,注釋:Writes to a Hadoop file system,是將數據寫到HDFS,通俗的講就是把data寫入硬盤或者說Hive表。

目標組件二:Hive Metastore,注釋:Updates the Hive Metastore,是將medata寫入Hive,通俗的講就是創建Hive表。

 

准備StreamSets。

下載StreamSets,一定要下載all版本,大概5.6G,迅雷一晚上下載完。Core版本不包含有些組件,安裝不上。

https://archives.streamsets.com/datacollector/3.19.0/tarball/streamsets-datacollector-all-3.19.0.tgz

修改最大打開文件數

使用ulimit -a 可以查看當前系統的所有限制值,使用ulimit -n 可以查看當前的最大打開文件數。新裝的linux默認只有1024,當作負載較大的服務器時,很容易遇到error: too many open files或者Configuration of maximum open file limit is too low: 1024。因此,需要將其改大。

使用 ulimit -n 65535 可即時修改,但重啟后就無效了。

臨時修改:

ulimit -n

ulimit -n 65535

永久修改:

echo ulimit -n 65535 >>/etc/profile

source /etc/profile

解壓啟動StreamSets。直接解壓即可。

tar zxvf streamsets-datacollector-all-3.19.0.tgz

cd /home/cg/streamsets/

bin/streamsets dc

登錄。缺省端口18630,原始用戶名和秘密均是admin.目前只有251上的streamsets能解密並且寫入hive,以此為例。

http://ip:18630/

admin/admin

准備源數據庫的表及數據。注意:因為要做增量測試,也就是隨時需要向源MySQL中增加數據。

創建表:

drop table streamsetstest;

CREATE TABLE streamsetstest (

  id int auto_increment,

  no_encrypt varchar(255)   NULL,

  a nvarchar(255)   NULL,

  primary key (id)

);

插入數據:

INSERT INTO streamsetstest (no_encrypt,a) VALUES('BWP018930705', 'BWP018930705');

 

創建Pipeline.

創建一個Pipeline,依次創建所需要的組件,每次保證創建的組件能夠連通。

New Pipeline.

登錄http://ip:18630/,點擊Create New Pipeline

填寫Error Records相關的信息

寫入Error Records的方式見下圖:

 

寫入Error Records的位置見下圖:

 

創建源組件

我們需要從MySQL或者Oracle中通過JDBC讀取數據。可以根據圖標和注釋來選擇合適的組件。

選擇JDBC Query Consumer。如果Origin missing-> Select Origin能找到,就這么選擇JDBC Query Consumer

 

 

如果找不到,就在右側Stage選JDBC,找到JDBC Query Consumer

 

 

 

 

配置JDBC Query Consumer

JDBC Connection String,如圖,跟通常連JDBC的URL一致,無區別。

SQL Query: 有缺省的模板,需要將table名稱改成具體的名稱,where雖然能改成<,但是我們只有要測增量,所以最好還是>。注意和Initial Offset、Offset column值得對應關系。

Credential Tab里的配置,輸入用戶名和密碼,無特別需要注意的地方。

JDBC Tab配置如下圖:

 

 

 

這個源組件配置完畢之后,點擊Preview按鈕,看看什么地方有錯誤

Preview按鈕如下圖。

如果配置有錯,streamsets會提示,根據提示找到相應的配置項改過來。如果配置沒有錯,就能看到下圖中顯示輸出的數據。確定沒錯之后,退出Preview模式即可。

streamsets缺省的JDBC lib里並不含有MySQL的jdbc driver文件mysql.jar,我們需要點擊Install External Libraries來添加需要的driver。

 

 

 

配置Hive Metadata組件

這個組件的作用是Generates Hive metadata and write information fo HDFS。各個Tab里的配置會在下邊說明。

General:對於整個流程里涉及到的三個Hive相關的組件的Stage Library要選一樣的,其他的不用管了。

General Tab配置如下圖

 

Hive Tab:

JDBC URL,這個沒特別需要注意的地方

jdbc:hive2://ip:10000

JDBC Driver Name,也沒啥需要特別注意的地方。

org.apache.hive.jdbc.HiveDriver

Use Credentials,這個依賴於Hive本身的配置,如果不需要用戶密碼,就不用填。如果需要就填上。至於怎么配置hive可以遠程匿名訪問,可以在網上查,或者見后邊的附錄。

Hadoop Configuration Directory,這個需要特別注意,這是存放core-site.xml  hdfs-site.xml  hive-site.xml的配置文件目錄。我開始直覺的認為這是hadoop服務器上的directory,實際上不是。我們需要在StreamSets服務器所在的那台機器上創建一個目錄,然后從Hadoop那台機器下載那幾個文件,放在這個目錄里。

Hive Tab配置如下圖

 

 

Table Tab:

Database Expression:無特別需要注意的地方。這個應該是hive已經存在的數據庫名稱。

Table Name: 這個不需要在數據庫中手動創建此表。如果整個流程沒有問題,會自動創建這個表。咱們可以寫任何我們想要的表名稱。

Partition Configuration: 這里我們不需要分表存儲,所以刪掉,不用配置此項。

Table Tab配置如下圖:

Data Format Tab:

Data Format:選Avro即可。無特別需要注意的地方。

 

 

 

 

添加目標組件一:Hadoop FS,注釋:Writes to a Hadoop file system,是將數據寫到HDFS。通俗的講就是把data寫入。

Connection Tab:

File System URI,

hdfs://ip:9000

這個值可以從hadoop那台機器上的core-site.xml里找。

<property>

    <name>fs.defaultFS</name>

    <value>hdfs://ip:9000</value>

</property>

Connection Tab 配置如下:

 

Output Files Tab:

也是需要Show Advanced Options,注意

check上Directory in Header

修改Idle Timeout,缺省的可能是1小時,導致長時間沒有寫入到Hive。

Output Files Tab配置如下圖:

 

 

 

 

Data Format如下圖所示選擇即可:

 

添加目標組件二:Hive Metastore,注釋:Updates the Hive Metastore,是將medata寫入Hive,通俗的講就是創建Hive表。

整個組件的配置也沒有特別需要說明的地方,一些配置從上個Hive Metastore組件copy過來就行。

 

 

遇到的問題及解決方法

遠程匿名訪問hive

修改core-site.xml加上以下部分,然后重啟Hadoop。

遠程匿名訪問Hive,會直接以root用戶訪問Hive。

<property>
    <name>hadoop.proxyuser.root.hosts</name>
    <value>*</value>
</property>
<property>
    <name>hadoop.proxyuser.root.groups</name>
    <value>*</value>
</property>
<property>
    <name>hadoop.proxyuser.sdc.hosts</name>
    <value>*</value>
</property>
<property>
    <name>hadoop.proxyuser.sdc.groups</name>
    <value>*</value>
</property>

重啟Hadoop

cd /usr/local/hadoop-2.10.1/

sbin/stop-all.sh

sbin/stop-all.sh

 

將數據寫入表時遇到如下錯誤

Permission denied: user=anonymous, access=WRITE, inode="/hive/warehouse":root:supergroup:drwxr-xr-x

       at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:3

原因是權限問題,根據錯誤提示,開放相應目錄權限。

hadoop dfs -chmod -R 777 /hive/warehouse

hadoop dfs -chmod -R 777 /tmp/hadoop-yarn

 

簡單程序驗證Hive是否可遠程創建表並插入數據。確保Hive可以遠程創建表並插入數據,再在streamsets上試驗。

需要用到的jar包如下:大概需要11個包

 

附簡單程序,已驗證通過。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

public class HiveConnectionTest {
    public static void main(String[] args) throws  Exception {
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        String dropSQL="DROP TABLE streamsetstest";
        String createSQL="CREATE TABLE streamsetstest(id int,no_encrypt string,a string)";
        //String insterSQL="LOAD DATA LOCAL INPATH '/work/hive/examples/files/kv1.txt' OVERWRITE INTO TABLE javabloger";
        String insertSQL="INSERT INTO streamsetstest (no_encrypt,a) VALUES('test1', 'test2')";
        String querySQL="select * from streamsetstest";
        Connection con = DriverManager.getConnection("jdbc:hive2://ip:10000/default", "", "");
        Statement stmt = con.createStatement();
        stmt.execute(dropSQL);
        stmt.execute(createSQL);  // 執行建表語句
        stmt.execute(insertSQL);  // 執行插入語句
        ResultSet res = stmt.executeQuery(querySQL);   // 執行查詢語句      
          while (res.next()) {
            System.out.println("Result: no_encrypt:"+res.getString(2) +"  –>  a:" +res.getString(3));
        }
    }
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM