Sqoop-1.4.4工具import和export使用詳解
Sqoop可以在HDFS/Hive和關系型數據庫之間進行數據的導入導出,其中主要使用了import和export這兩個工具。這兩個工具非常強大,提供了很多選項幫助我們完成數據的遷移和同步。比如,下面兩個潛在的需求:
- 業務數據存放在關系數據庫中,如果數據量達到一定規模后需要對其進行分析或同統計,單純使用關系數據庫可能會成為瓶頸,這時可以將數據從業務數據庫數據導入(import)到Hadoop平台進行離線分析。
- 對大規模的數據在Hadoop平台上進行分析以后,可能需要將結果同步到關系數據庫中作為業務的輔助數據,這時候需要將Hadoop平台分析后的數據導出(export)到關系數據庫。
這里,我們介紹Sqoop完成上述基本應用場景所使用的import和export工具,通過一些簡單的例子來說明這兩個工具是如何做到的。
工具通用選項
import和export工具有些通用的選項,如下表所示:
選項 |
含義說明 |
--connect <jdbc-uri> |
指定JDBC連接字符串 |
--connection-manager <class-name> |
指定要使用的連接管理器類 |
--driver <class-name> |
指定要使用的JDBC驅動類 |
--hadoop-mapred-home <dir> |
指定$HADOOP_MAPRED_HOME路徑 |
--help |
打印用法幫助信息 |
--password-file |
設置用於存放認證的密碼信息文件的路徑 |
-P |
從控制台讀取輸入的密碼 |
--password <password> |
設置認證密碼 |
--username <username> |
設置認證用戶名 |
--verbose |
打印詳細的運行信息 |
--connection-param-file <filename> |
可選,指定存儲數據庫連接參數的屬性文件 |
數據導入工具import
import工具,是將HDFS平台外部的結構化存儲系統中的數據導入到Hadoop平台,便於后續分析。我們先看一下import工具的基本選項及其含義,如下表所示:
選項 |
含義說明 |
--append |
將數據追加到HDFS上一個已存在的數據集上 |
--as-avrodatafile |
將數據導入到Avro數據文件 |
--as-sequencefile |
將數據導入到SequenceFile |
--as-textfile |
將數據導入到普通文本文件(默認) |
--boundary-query <statement> |
邊界查詢,用於創建分片(InputSplit) |
--columns <col,col,col…> |
從表中導出指定的一組列的數據 |
--delete-target-dir |
如果指定目錄存在,則先刪除掉 |
--direct |
使用直接導入模式(優化導入速度) |
--direct-split-size <n> |
分割輸入stream的字節大小(在直接導入模式下) |
--fetch-size <n> |
從數據庫中批量讀取記錄數 |
--inline-lob-limit <n> |
設置內聯的LOB對象的大小 |
-m,--num-mappers <n> |
使用n個map任務並行導入數據 |
-e,--query <statement> |
導入的查詢語句 |
--split-by <column-name> |
指定按照哪個列去分割數據 |
--table <table-name> |
導入的源表表名 |
--target-dir <dir> |
導入HDFS的目標路徑 |
--warehouse-dir <dir> |
HDFS存放表的根路徑 |
--where <where clause> |
指定導出時所使用的查詢條件 |
-z,--compress |
啟用壓縮 |
--compression-codec <c> |
指定Hadoop的codec方式(默認gzip) |
--null-string <null-string> |
果指定列為字符串類型,使用指定字符串替換值為null的該類列的值 |
--null-non-string <null-string> |
如果指定列為非字符串類型,使用指定字符串替換值為null的該類列的值 |
下面,我們通過實例來說明,在實際中如何使用這些選項。
- 將MySQL數據庫中整個表數據導入到Hive表
1 |
bin/sqoop import --connect jdbc:mysql://10.95.3.49:3306/workflow --table project --username shirdrn -P --hive-import -- --default-character-set=utf-8 |
- 將MySQL數據庫workflow中project表的數據導入到Hive表中。
- 將MySQL數據庫中多表JION后的數據導入到HDFS
1 |
bin/sqoop import --connect jdbc:mysql://10.95.3.49:3306/workflow --username shirdrn -P --query 'SELECT users.*, tags.tag FROM users JOIN tags ON (users.id = tags.user_id) WHERE $CONDITIONS' --split-byusers.id --target-dir /hive/tag_db/user_tags -- --default-character-set=utf-8 |
- 這里,使用了--query選項,不能同時與--table選項使用。而且,變量$CONDITIONS必須在WHERE語句之后,供Sqoop進程運行命令過程中使用。上面的--target-dir指向的其實就是Hive表存儲的數據目錄。
- 將MySQL數據庫中某個表的數據增量同步到Hive表
1 |
bin/sqoop job --create your-sync-job -- import --connect jdbc:mysql://10.95.3.49:3306/workflow --table project --username shirdrn -P --hive-import --incremental append --check-column id --last-value 1 -- --default-character-set=utf-8 |
- 這里,每次運行增量導入到Hive表之前,都要修改--last-value的值,否則Hive表中會出現重復記錄。
- 將MySQL數據庫中某個表的幾個字段的數據導入到Hive表
1 |
bin/sqoop import --connect jdbc:mysql://10.95.3.49:3306/workflow --username shirdrn --P --table tags --columns 'id,tag' --create-hive-table -target-dir /hive/tag_db/tags -m 1 --hive-table tags --hive-import -- --default-character-set=utf-8 |
- 我們這里將MySQL數據庫workflow中tags表的id和tag字段的值導入到Hive表tag_db.tags。其中--create-hive-table選項會自動創建Hive表,--hive-import選項會將選擇的指定列的數據導入到Hive表。如果在Hive中通過SHOW TABLES無法看到導入的表,可以在conf/hive-site.xml中顯式修改如下配置選項:
1 |
<property> |
2 |
<name>javax.jdo.option.ConnectionURL</name> |
3 |
<value>jdbc:derby:;databaseName=hive_metastore_db;create=true</value> |
4 |
</property> |
- 然后再重新運行,就能看到了。
- 使用驗證配置選項
1 |
sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --validate --validator org.apache.sqoop.validation.RowCountValidator --validation-threshold org.apache.sqoop.validation.AbsoluteValidationThreshold --validation-failurehandler org.apache.sqoop.validation.AbortOnFailureHandler |
- 上面這個是官方用戶手冊上給出的用法,我們在實際中還沒用過這個,有感興趣的可以驗證嘗試一下。
數據導出工具export
export工具,是將HDFS平台的數據,導出到外部的結構化存儲系統中,可能會為一些應用系統提供數據支持。我們看一下export工具的基本選項及其含義,如下表所示:
選項 |
含義說明 |
--validate <class-name> |
啟用數據副本驗證功能,僅支持單表拷貝,可以指定驗證使用的實現類 |
--validation-threshold <class-name> |
指定驗證門限所使用的類 |
--direct |
使用直接導出模式(優化速度) |
--export-dir <dir> |
導出過程中HDFS源路徑 |
-m,--num-mappers <n> |
使用n個map任務並行導出 |
--table <table-name> |
導出的目的表名稱 |
--call <stored-proc-name> |
導出數據調用的指定存儲過程名 |
--update-key <col-name> |
更新參考的列名稱,多個列名使用逗號分隔 |
--update-mode <mode> |
指定更新策略,包括:updateonly(默認)、allowinsert |
--input-null-string <null-string> |
使用指定字符串,替換字符串類型值為null的列 |
--input-null-non-string <null-string> |
使用指定字符串,替換非字符串類型值為null的列 |
--staging-table <staging-table-name> |
在數據導出到數據庫之前,數據臨時存放的表名稱 |
--clear-staging-table |
清除工作區中臨時存放的數據 |
--batch |
使用批量模式導出 |
下面,我們通過實例來說明,在實際中如何使用這些選項。這里,我們主要結合一個實例,講解如何將Hive中的數據導入到MySQL數據庫。
首先,我們准備幾個表,MySQL數據庫為tag_db,里面有兩個表,定義如下所示:
CREATE TABLE tag_db.users (
id INT(11) NOT NULL AUTO_INCREMENT,name VARCHAR(100) NOT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE tag_db.tags (
id INT(11) NOT NULL AUTO_INCREMENT,
user_id INT NOT NULL,
tag VARCHAR(100) NOT NULL,
PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
這兩個表中存儲的是基礎數據,同時對應着Hive中如下兩個表:
CREATE TABLE users (id INT,name STRING);
CREATE TABLE tags (id INT,user_id INT,tag STRING);
我們首先在上述MySQL的兩個表中插入一些測試數據:
1 |
INSERT INTO tag_db.users(name) VALUES('jeffery'); |
2 |
INSERT INTO tag_db.users(name) VALUES('shirdrn'); |
3 |
INSERT INTO tag_db.users(name) VALUES('sulee'); |
4 |
5 |
INSERT INTO tag_db.tags(user_id, tag) VALUES(1, 'Music'); |
6 |
INSERT INTO tag_db.tags(user_id, tag) VALUES(1, 'Programming'); |
7 |
INSERT INTO tag_db.tags(user_id, tag) VALUES(2, 'Travel'); |
8 |
INSERT INTO tag_db.tags(user_id, tag) VALUES(3, 'Sport'); |
然后,使用Sqoop的import工具,將MySQL兩個表中的數據導入到Hive表,執行如下命令行:
1 |
bin/sqoop import --connect jdbc:mysql://10.95.3.49:3306/tag_db --table users --username shirdrn -P --hive-import -- --default-character-set=utf-8 |
2 |
bin/sqoop import --connect jdbc:mysql://10.95.3.49:3306/tag_db --table tags --username shirdrn -P --hive-import -- --default-character-set=utf-8 |
導入成功以后,再在Hive中創建一個用來存儲users和tags關聯后數據的表:
1 |
CREATE TABLE user_tags ( |
2 |
id STRING, |
3 |
name STRING, |
4 |
tag STRING |
5 |
); |
執行如下HQL語句,將關聯數據插入user_tags表:
1 |
FROM users u JOIN tags t ON u.id=t.user_id INSERT INTO TABLE user_tags SELECT CONCAT(CAST(u.id AS STRING),CAST(t.id AS STRING)), u.name, t.tag; |
將users.id與tags.id拼接的字符串,作為新表的唯一字段id,name是用戶名,tag是標簽名稱。
再在MySQL中創建一個對應的user_tags表,如下所示:
1 |
CREATE TABLE tag_db.user_tags ( |
2 |
id varchar(200) NOT NULL, |
3 |
name varchar(100) NOT NULL, |
4 |
tag varchar(100) NOT NULL |
5 |
); |
使用Sqoop的export工具,將Hive表user_tags的數據同步到MySQL表tag_db.user_tags中,執行如下命令行:
1 |
bin/sqoop export --connect jdbc:mysql://10.95.3.49:3306/tag_db --username shirdrn --P --table user_tags --export-dir /hive/user_tags --input-fields-terminated-by '\001' -- --default-character-set=utf-8 |
執行導出成功后,可以在MySQL的tag_db.user_tags表中看到對應的數據。
如果在導出的時候出現類似如下的錯誤:
14/02/27 17:59:06 INFO mapred.JobClient: Task Id : attempt_201402260008_0057_m_000001_0, Status : FAILED
java.io.IOException: Can't export data, please check task tracker logs
at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:112)
at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:39)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.util.NoSuchElementException
at java.util.AbstractList$Itr.next(AbstractList.java:350)
at user_tags.__loadFromFields(user_tags.java:225)
at user_tags.parse(user_tags.java:174)
at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:83)
... 10 more
通過指定字段分隔符選項--input-fields-terminated-by,指定Hive中表字段之間使用的分隔符,供Sqoop讀取解析,就不會報錯了。