Sqoop(四)增量導入、全量導入、減量導入


增量導入

一、說明

  當在生產環境中,我們可能會定期從與業務相關的關系型數據庫向Hadoop導入數據,導入數倉后進行后續離線分析。這種情況下我們不可能將所有數據重新再導入一遍,所以此時需要數據增量導入。

  增量導入數據分為兩種方式:

    一是基於遞增列的增量數據導入(Append方式)。

    二是基於時間列的數據增量導入(LastModified方式)。

二、增量導入

方式一:Append方式

  比如:有一個訂單表,里面每個訂單有一個唯一標識自增列ID,在關系型數據庫中以主鍵形式存在,之前已經將id在1-3的編號的訂單導入到了Hive中,現在一段時間后我們需要將近期產生的新的訂單數據(id為4、5的兩條數據)導入Hive,供后續數倉進行分析。此時我們只需要指定-incremental參數為append,-last-value參數為3即可。表示只從大於3后開始導入。

1、MYSQL建表

CREATE TABLE `appendTest` (
  `id` int(11) ,
  `name` varchar(255)
) 

2、導入數據

insert into appendTest(id,name) values(1,'name1');
insert into appendTest(id,name) values(2,'name2');
insert into appendTest(id,name) values(3,'name3');

3、創建一張跟mysql中的appendTest表一樣的hive表appendTest

sqoop create-hive-table \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \
--password 010209 \
--table appendTest \
--hive-table appendTest

4、進行導入,將id>0的三條數據進行導入

sqoop import \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \
--P \
--table appendTest \
--hive-import \
-m 1  \
--hive-table appendTest \
--incremental append \ --check-column id \ --last-value 0

結果:

5、查看

 6、向mysql表appendTest再次插入數據

insert into appendTest(id,name) values(4,'name4');
insert into appendTest(id,name) values(5,'name5');

7、再次執行增量導入

由於上一次導入的時候,,將--last-value設置為0,將id>0的三條數據導入后,現在進行導入了時候需要將last-value設置為3
sqoop import \
--connect jdbc:mysql://192.168.200.100:3306/yang \ --username root \ --P \ --table appendTest \ --hive-import \ -m 1 \
--hive-table appendTest \
--incremental append \ --check-column id \ --last-value 3

結果:

8、查看hive表appendTest

重要參數說明:

9、說明

說明:
增量抽取,需要指定--incremental append,同時指定按照源表中哪個字段進行增量--check-column id,
並指定hive表appendTest當前最大值--last-value 3。創建sqoop job的目的是,每次執行job以后,sqoop會自動記錄appedndTest的last-value,
下次再執行時,就會自動指定last-value,不需要手工去改了。

方式二:lastModify方式

 基於lastModify的方式,要求原表中有time字段,它能指定一個時間戳,讓SQoop把該時間戳之后的數據導入至Hive,因為后續訂單可能狀態會發生變化,變化后time字段時間戳也會發生變化,此時SQoop依然會將相同狀態更改后的訂單導入Hive,當然我們可以指定merge-key參數為id,表示將后續新的記錄與原有記錄合並。

 1、Mysql建表

CREATE TABLE lastModifyTest (
id INT,
name VARCHAR (20),
last_mod TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

2、導入數據

insert into lastModifyTest(id,name) values(1,'enzo');
insert into lastModifyTest(id,name) values(2,'din');
insert into lastModifyTest(id,name) values(3,'fz');
insert into lastModifyTest(id,name) values(4,'dx');
insert into lastModifyTest(id,name) values(5,'ef');

3、HIve建表

 
         
sqoop create-hive-table \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \ --password 010209 \ --table lastModifyTest \ --hive-table lastModifyTest 

4、導入數據,將時間以后的數據進行導入

sqoop import \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \
--P \
--table lastModifyTest \
--hive-import \
-m 1  \
--hive-table lastModifyTest \
--incremental lastmodified \
--check-column last_mod \
--last-value "2019-05-14 15:17:23"

結果:

5、查看數據導入結果

 6、參數說明

 

全量導入

將mysql表中全部數據都導入Hive,下面來查看實例:

1、MYSQL數據

2、一次性將mysql表im數據全量導入hive中

sqoop import \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \
--password 010209 \
--table im \
--hive-import \
--hive-table im \
-m 1

減量導入

設置where條件,通過判斷條件可以判斷減少的數據和增加的數據,控制更加靈活。

sqoop import \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \
--P \
--table appendTest \
--hive-import \
-m 1  \
--incremental append \
--where "age>30"
--check-column id \
--last-value 0

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM