增量導入
一、說明
當在生產環境中,我們可能會定期從與業務相關的關系型數據庫向Hadoop導入數據,導入數倉后進行后續離線分析。這種情況下我們不可能將所有數據重新再導入一遍,所以此時需要數據增量導入。
增量導入數據分為兩種方式:
一是基於遞增列的增量數據導入(Append方式)。
二是基於時間列的數據增量導入(LastModified方式)。
二、增量導入
方式一:Append方式
比如:有一個訂單表,里面每個訂單有一個唯一標識自增列ID,在關系型數據庫中以主鍵形式存在,之前已經將id在1-3的編號的訂單導入到了Hive中,現在一段時間后我們需要將近期產生的新的訂單數據(id為4、5的兩條數據)導入Hive,供后續數倉進行分析。此時我們只需要指定-incremental參數為append,-last-value參數為3即可。表示只從大於3后開始導入。
1、MYSQL建表
CREATE TABLE `appendTest` ( `id` int(11) , `name` varchar(255) )
2、導入數據
insert into appendTest(id,name) values(1,'name1'); insert into appendTest(id,name) values(2,'name2'); insert into appendTest(id,name) values(3,'name3');
3、創建一張跟mysql中的appendTest表一樣的hive表appendTest
sqoop create-hive-table \ --connect jdbc:mysql://192.168.200.100:3306/yang \ --username root \ --password 010209 \ --table appendTest \ --hive-table appendTest
4、進行導入,將id>0的三條數據進行導入
sqoop import \ --connect jdbc:mysql://192.168.200.100:3306/yang \ --username root \ --P \ --table appendTest \ --hive-import \ -m 1 \
--hive-table appendTest \ --incremental append \ --check-column id \ --last-value 0
結果:
5、查看
6、向mysql表appendTest再次插入數據
insert into appendTest(id,name) values(4,'name4');
insert into appendTest(id,name) values(5,'name5');
7、再次執行增量導入
由於上一次導入的時候,,將--last-value設置為0,將id>0的三條數據導入后,現在進行導入了時候需要將last-value設置為3
sqoop import \ --connect jdbc:mysql://192.168.200.100:3306/yang \ --username root \ --P \ --table appendTest \ --hive-import \ -m 1 \
--hive-table appendTest \ --incremental append \ --check-column id \ --last-value 3
結果:
8、查看hive表appendTest
重要參數說明:
9、說明
說明:
增量抽取,需要指定--incremental append,同時指定按照源表中哪個字段進行增量--check-column id,
並指定hive表appendTest當前最大值--last-value 3。創建sqoop job的目的是,每次執行job以后,sqoop會自動記錄appedndTest的last-value,
下次再執行時,就會自動指定last-value,不需要手工去改了。
方式二:lastModify方式
基於lastModify的方式,要求原表中有time字段,它能指定一個時間戳,讓SQoop把該時間戳之后的數據導入至Hive,因為后續訂單可能狀態會發生變化,變化后time字段時間戳也會發生變化,此時SQoop依然會將相同狀態更改后的訂單導入Hive,當然我們可以指定merge-key參數為id,表示將后續新的記錄與原有記錄合並。
1、Mysql建表
CREATE TABLE lastModifyTest ( id INT, name VARCHAR (20), last_mod TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );
2、導入數據
insert into lastModifyTest(id,name) values(1,'enzo'); insert into lastModifyTest(id,name) values(2,'din'); insert into lastModifyTest(id,name) values(3,'fz'); insert into lastModifyTest(id,name) values(4,'dx'); insert into lastModifyTest(id,name) values(5,'ef');
3、HIve建表
sqoop create-hive-table \
--connect jdbc:mysql://192.168.200.100:3306/yang \
--username root \ --password 010209 \ --table lastModifyTest \ --hive-table lastModifyTest
4、導入數據,將時間以后的數據進行導入
sqoop import \ --connect jdbc:mysql://192.168.200.100:3306/yang \ --username root \ --P \ --table lastModifyTest \ --hive-import \ -m 1 \ --hive-table lastModifyTest \ --incremental lastmodified \ --check-column last_mod \ --last-value "2019-05-14 15:17:23"
結果:
5、查看數據導入結果
6、參數說明
全量導入
將mysql表中全部數據都導入Hive,下面來查看實例:
1、MYSQL數據
2、一次性將mysql表im數據全量導入hive中
sqoop import \ --connect jdbc:mysql://192.168.200.100:3306/yang \ --username root \ --password 010209 \ --table im \ --hive-import \ --hive-table im \ -m 1
減量導入
設置where條件,通過判斷條件可以判斷減少的數據和增加的數據,控制更加靈活。
sqoop import \ --connect jdbc:mysql://192.168.200.100:3306/yang \ --username root \ --P \ --table appendTest \ --hive-import \ -m 1 \ --incremental append \ --where "age>30" --check-column id \ --last-value 0