Apache Sqoop是用來實現結構型數據(如關系數據庫)和Hadoop之間進行數據遷移的工具。它充分利用了MapReduce的並行特點以批處理的方式加快數據的傳輸,同時也借助MapReduce實現了容錯。
項目地址: http://sqoop.apache.org/
目前為止,已經演化出了2個版本:sqoop1和sqoop2。
sqoop1的最新版本是1.4.5,sqoop2的最新版本是1.99.3;1.99.3和1.4.5是不兼容的,並且功能尚未開發完成,還不適合在生產環境部署。
sqoop支持的數據庫:
Database |
version |
--direct support? |
connect string matches |
HSQLDB |
1.8.0+ |
No |
|
MySQL |
5.0+ |
Yes |
jdbc:mysql:// |
Oracle |
10.2.0+ |
No |
|
PostgreSQL |
8.3+ |
Yes (import only) |
jdbc:postgresql:/ |
guojian@localtest:~/work$ sudo apt-get install sqoop
guojian@localtest:~/work$ sqoop help
usage: sqoop COMMAND [ARGS]
Available commands:
codegen Generate code to interact with database records
create-hive-table Import a table definition into Hive
eval Evaluate a SQL statement and display the results
export Export an HDFS directory to a database table
help List available commands
import Import a table from a database to HDFS
import-all-tables Import tables from a database to HDFS
job Work with saved jobs
list-databases List available databases on a server
list-tables List available tables in a database
merge Merge results of incremental imports
metastore Run a standalone Sqoop metastore
version Display version information
See 'sqoop help COMMAND' for information on a specific command.
import是將關系數據庫遷移到HDFS上
guojian@localtest:~/work$ sqoop import --connectjdbc:mysql://192.168.81.176/hivemeta2db --username root -password passwd --table sds
guojian@localtest:~/work$ hadoop fs -ls /user/guojian/sds
Found 5 items
-rw-r--r-- 3 guojian cug_test 0 2014-09-11 16:04 /user/guojian/sds/_SUCCESS
-rw-r--r-- 3 guojian cug_test 483 2014-09-11 16:03 /user/guojian/sds/part-m-00000.snappy
-rw-r--r-- 3 guojian cug_test 504 2014-09-11 16:04 /user/guojian/sds/part-m-00001.snappy
-rw-r--r-- 3 guojian cug_test 1001 2014-09-11 16:03 /user/guojian/sds/part-m-00002.snappy
-rw-r--r-- 3 guojian cug_test 952 2014-09-11 16:03 /user/guojian/sds/part-m-00003.snappy
可以通過--m設置並行數據,即map的數據,決定文件的個數。
默認目錄是/user/${user.name}/${tablename},可以通過--target-dir設置hdfs上的目標目錄。
如果想要將整個數據庫中的表全部導入到hdfs上,可以使用import-all-tables命令。
sqoop import-all-tables –connect jdbc:mysql://192.168.81.176/hivemeta2db --username root -password passwd
如果想要指定所需的列,使用如下:
sqoop import --connect jdbc:mysql://192.168.81.176/hivemeta2db --username root -password passwd --table sds --columns "SD_ID,CD_ID,LOCATION"
指定導出文件為SequenceFiles,並且將生成的類文件命名為com.ctrip.sds:
sqoop import --connect jdbc:mysql://192.168.81.176/hivemeta2db --username root -password passwd --table sds --class-name com.ctrip.sds --as-sequencefile
導入文本時可以指定分隔符:
sqoop import --connect jdbc:mysql://192.168.81.176/hivemeta2db --username root -password passwd --table sds --fields-terminated-by '\t' --lines-terminated-by '\n' --optionally-enclosed-by '\"'
可以指定過濾條件:
sqoop import --connect jdbc:mysql://192.168.81.176/hivemeta2db --username root -password passwd --table sds --where "sd_id > 100"
export是import的反向過程,將hdfs上的數據導入到關系數據庫中
sqoop export --connect jdbc:mysql://192.168.81.176/sqoop --username root -password passwd --table sds --export-dir /user/guojian/sds
上例中sqoop數據中的sds表需要先把表結構創建出來,否則export操作會直接失敗。
由於sqoop是通過map完成數據的導入,各個map過程是獨立的,沒有事物的概念,可能會有部分map數據導入失敗的情況。為了解決這一問題,sqoop中有一個折中的辦法,即是指定中間 staging 表,成功后再由中間表導入到結果表。
這一功能是通過 --staging-table <staging-table-name> 指定,同時staging表結構也是需要提前創建出來的:
sqoop export --connect jdbc:mysql://192.168.81.176/sqoop --username root -password passwd --table sds --export-dir /user/guojian/sds --staging-table sds_tmp
需要說明的是,在使用 --direct , --update-key 或者--call存儲過程的選項時,staging中間表是不可用的。
驗證結果:
(1)數據會首先寫到sds_tmp表,導入操作成功后,再由sds_tmp表導入到sds結果表中,同時會清除sds_tmp表。
(2)如果有map失敗,則成功的map會將數據寫入tmp表,export任務失敗,同時tmp表的數據會被保留。
(3)如果tmp中已有數據,則此export操作會直接失敗,可以使用 --clear-staging-table 指定在執行前清除中間表。
export選項:
--direct |
直接使用 mysqlimport 工具導入mysql |
--export-dir <dir> |
需要export的hdfs數據路徑 |
-m,--num-mappers <n> |
並行export的map個數n |
--table <table-name> |
導出到的目標表 |
--call <stored-proc-name> |
調用存儲過程 |
--update-key <col-name> |
指定需要更新的列名,可以將數據庫中已經存在的數據進行更新 |
--update-mode <mode> |
更新模式,包括 updateonly (默認)和allowinsert |
前者只允許更新,后者允許新的列數據寫入 |
|
--input-null-string <null-string> |
The string to be interpreted as null for string columns |
--input-null-non-string <null-string> |
The string to be interpreted as null for non-string columns |
--staging-table <staging-table-name> |
指定中間staging表 |
--clear-staging-table |
執行export前將中間staging表數據清除 |
--batch |
Use batch mode for underlying statement execution. |
Argument |
Description |
create-hive-table將關系數據庫表導入到hive表中
參數 |
說明 |
–hive-home <dir> |
Hive的安裝目錄,可以通過該參數覆蓋掉默認的hive目錄 |
–hive-overwrite |
覆蓋掉在hive表中已經存在的數據 |
–create-hive-table |
默認是false,如果目標表已經存在了,那么創建任務會失敗 |
–hive-table |
后面接要創建的hive表 |
–table |
指定關系數據庫表名 |
sqoop create-hive-table --connect jdbc:mysql://192.168.81.176/sqoop --username root -password passwd --table sds --hive-table sds_bak
默認sds_bak是在default數據庫的。
這一步需要依賴HCatalog,需要先安裝HCatalog,否則報如下錯誤:
Hive history file=/tmp/guojian/hive_job_log_cfbe2de9-a358-4130-945c-b97c0add649d_1628102887.txt
FAILED: ParseException line 1:44 mismatched input ')' expecting Identifier near '(' in column specification
list-databases列出一台server上可用的數據庫
sqoop list-databases --connect jdbc:mysql://192.168.81.176/ --username root -password passwd
list-tables列出一個數據庫中的表
sqoop list-tables --connect jdbc:mysql://192.168.81.176/sqoop --username root -password passwd
codegen:將關系數據庫表映射為一個java文件、java class類、以及相關的jar包
sqoop codegen --connect jdbc:mysql://192.168.81.176/sqoop --username root -password passwd --table sds
Note: /tmp/sqoop-guojian/compile/d58f607b046a061593ba708ec5f3d608/sds.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
guojian@localtest:~/work$ ll /tmp/sqoop-guojian/compile/d58f607b046a061593ba708ec5f3d608/
total 48
drwxrwxr-x 2 guojian guojian 4096 9月 12 14:15 ./
drwxrwxr-x 11 guojian guojian 4096 9月 12 14:15 ../
-rw-rw-r-- 1 guojian guojian 11978 9月 12 14:15 sds.class
-rw-rw-r-- 1 guojian guojian 4811 9月 12 14:15 sds.jar
-rw-rw-r-- 1 guojian guojian 17525 9月 12 14:15 sds.java
merge是將兩個數據集合並的工具,對於相同的key會覆蓋老的值。
--class-name <class> |
指定merge job使用的類名稱 |
--jar-file <file> |
合並時引入的jar包,該jar包是通過Codegen工具生成的jar包 |
--merge-key <col> |
指定作為merge key的列名 |
--new-data <path> |
指定newer數據目錄 |
--onto <path> |
指定older數據目錄 |
--target-dir <path> |
指定目標輸出目錄 |
參數 |
說明 |
sqoop merge --new-data /user/guojian/sds --onto /user/guojian/sqoop --target-dir /user/guojian/sds_new --jar-file sds.jar --class-name sds --merge-key SD_ID
值得注意的是,--target-dir如果設置為已經存在的目錄,sqoop會報錯退出。
eval用戶可以很快的使用sql語句對數據庫進行操作。這使得用戶在執行import操作之前檢查sql語句是否正確。
sqoop eval --connect jdbc:mysql://192.168.81.176/sqoop --username root -password passwd --query "SELECT SD_ID,CD_ID,LOCATION FROM sds LIMIT 10"
job用來生成sqoop任務。
--create <job-id> |
創業一個新的sqoop作業. |
--delete <job-id> |
刪除一個sqoop job |
--exec <job-id> |
執行一個 --create 保存的作業 |
--show <job-id> |
顯示一個作業的參數 |
--list |
顯示所有創建的sqoop作業 |
參數 |
說明 |
例子:
sqoop job --create myimportjob -- import --connectjdbc:mysql://192.168.81.176/hivemeta2db --username root -password passwd --table TBLS
guojian@localtest:~/work$ sqoop job --listAvailable jobs:
myimportjob
guojian@localtest:~/work$ sqoop job --show myimportjob
Job: myimportjob
Tool: import
Options:----------------------------
verbose = false
db.connect.string = jdbc:mysql://192.168.81.176/hivemeta2db
codegen.output.delimiters.escape = 0
codegen.output.delimiters.enclose.required = false
codegen.input.delimiters.field = 0
hbase.create.table = false
db.require.password = true
hdfs.append.dir = false
db.table = TBLS
import.fetch.size = null
accumulo.create.table = false
codegen.input.delimiters.escape = 0
codegen.input.delimiters.enclose.required = false
db.username = root
codegen.output.delimiters.record = 10
import.max.inline.lob.size = 16777216
hbase.bulk.load.enabled = false
hcatalog.create.table = false
db.clear.staging.table = false
codegen.input.delimiters.record = 0
enable.compression = false
hive.overwrite.table = false
hive.import = false
codegen.input.delimiters.enclose = 0
accumulo.batch.size = 10240000
hive.drop.delims = false
codegen.output.delimiters.enclose = 0
hdfs.delete-target.dir = false
codegen.output.dir = .
codegen.auto.compile.dir = true
mapreduce.num.mappers = 4
accumulo.max.latency = 5000
import.direct.split.size = 0
codegen.output.delimiters.field = 44
export.new.update = UpdateOnly
incremental.mode = None
hdfs.file.format = TextFile
codegen.compile.dir = /tmp/sqoop-guojian/compile/bd9c7f7b651276067b3f7b341b7fa4cb
direct.import = false
hive.fail.table.exists = false
db.batch = false
執行:
sqoop job -exec myimportjob
metastore 配置sqoop job的共享元數據信息,這樣多個用戶定義和執行sqoop job在這一 metastore中。默認存儲在~/.sqoop
啟動:sqoop metastore
關閉:sqoop metastore --shutdown
metastore文件的存儲位置是在 conf/sqoop-site.xml中sqoop.metastore.server.location 配置,指向本地文件。
metastore可以通過TCP/IP訪問,端口號可以通過sqoop.metastore.server.port 配置,默認是16000。
客戶端可以通過 指定 sqoop.metastore.client.autoconnect.url 或使用 --meta-connect ,配置為 jdbc:hsqldb:hsql://<server-name>:<port>/sqoop,例如 jdbc:hsqldb:hsql://metaserver.example.com:16000/sqoop 。
更多說明見 http://sqoop.apache.org/docs/1.4.5/SqoopUserGuide.html