Sqoop是一個用於在Hadoop和關系數據庫或大型機之間傳輸數據的工具。
您可以使用Sqoop將關系數據庫管理系統(RDBMS)中的數據導入Hadoop分布式文件系統(HDFS),在Hadoop MapReduce中轉換數據,然后將數據導出回RDBMS。
Sqoop自動化了這個過程的大部分,它依賴於數據庫來描述要導入的數據的模式。Sqoop使用MapReduce導入和導出數據,提供並行操作和容錯
使用Sqoop,您可以將數據從關系數據庫系統或大型機導入HDFS。導入過程的輸入要么是數據庫表,要么是大型機數據集。對於數據庫,Sqoop將逐行將表讀入HDFS。對於大型機數據集,Sqoop將從每個大型機數據集中讀取記錄到HDFS。這個導入過程的輸出是一組文件,其中包含導入的表或數據集的副本。導入過程是並行執行的。由於這個原因,輸出將在多個文件中。這些文件可以是帶分隔符的文本文件(例如,用逗號或制表符分隔每個字段),也可以是包含序列化記錄數據的二進制Avro或sequencefile。導入過程的副產品是生成的Java類,該類可以封裝導入表的一行。該類在導入過程中由Sqoop本身使用。還為您提供了該類的Java源代碼,以便在后續的MapReduce數據處理中使用。該類可以在SequenceFile格式之間序列化和反序列化數據。它還可以解析記錄的分隔文本形式。這些功能允許您快速開發使用處理管道中存儲的hdfs記錄的MapReduce應用程序。您還可以使用自己喜歡的任何其他工具自由地解析delimiteds記錄數據。在操作導入的記錄之后(例如,使用MapReduce或Hive),您可能有一個結果數據集,然后可以將其導出回關系數據庫。
Sqoop的導出過程將並行地從HDFS讀取一組帶分隔符的文本文件,將它們解析為記錄,並將它們作為新行插入目標數據庫表中,供外部應用程序或用戶使用。
Sqoop還包含一些其他命令,允許您檢查正在使用的數據庫。例如,可以列出可用的數據庫模式(使用sqop -list-databases工具)和模式中的表(使用sqop -list-tables工具)。
Sqoop還包括一個基本的SQL執行shell (sqop -eval工具)。導入、代碼生成和導出過程的大多數方面都可以定制。對於數據庫,可以控制導入的特定行范圍或列。
您可以為基於文件的數據表示以及使用的文件格式指定特定的分隔符和轉義字符。您還可以控制生成代碼中使用的類名或包名。
sqoop-import
導入工具將單個表從RDBMS導入到HDFS。表中的每一行在HDFS中都表示為一條單獨的記錄。記錄可以存儲為文本文件(每行一條記錄),或者以Avro或sequencefile的二進制表示形式存儲。
--append 將數據追加到HDFS中的現有數據集 --as-avrodatafile 將數據導入Avro數據文件 --as-sequencefile 將數據導入到sequencefile --as-textfile 以純文本形式導入數據(默認) --as-parquetfile 導入數據到拼花文件 --boundary-query <statement> 用於創建分割的邊界查詢 --columns <col,col,col…> 要從表導入的列 --delete-target-dir 如果導入目標目錄存在,則刪除它 --direct 如果數據庫存在,請使用直接連接器 --fetch-size <n> 一次從數據庫讀取的條目數。 --inline-lob-limit <n> 設置內聯LOB的最大大小 -m,--num-mappers <n> 使用n個map任務並行導入 -e,--query <statement> 導入語句的結果。 --split-by <column-name> 用於分割工作單元的表的列。不能與——autoreset-to-one-mapper選項一起使用。 --autoreset-to-one-mapper 如果一個表沒有主鍵,也沒有提供split-by列,那么導入應該使用一個映射器。不能與——split-by 選項一起使用。 --table <table-name> 表名 --target-dir <dir> HDFS目的地dir --warehouse-dir <dir> 表目標的HDFS父節點 --where <where clause> 進口時使用WHERE子句 -z,--compress 啟用壓縮 --compression-codec <c> 使用Hadoop編解碼器(默認gzip) --null-string <null-string> 要為字符串列的空值編寫的字符串 --null-non-string <null-string> 要為非字符串列的空值編寫的字符串
輸出行格式化參數:
--enclosed-by <char> 設置所需字段的封閉字符 --escaped-by <char> 設置轉義字符 --fields-terminated-by <char> 設置字段分隔符 --lines-terminated-by <char> 設置行尾字符 --mysql-delimiters 使用MySQL默認的分隔符集:fields:, lines: \n escape -by: \ option - closed-by: ' --optionally-enclosed-by <char> 設置包含字符的字段
hive 的參數
--hive-home <dir> 覆蓋HIVE_HOME --hive-import 將表導入到Hive中(如果沒有設置任何分隔符,則使用Hive的默認分隔符)。 --hive-overwrite 重寫Hive表中的現有數據。 --create-hive-table 如果設置好,表存在,則任務將在目標hive中失敗 。默認情況下,此屬性為false。 --hive-table <table-name> 設置導入到Hive時要使用的表名。 --hive-drop-import-delims 將\n、\r和\01從字符串字段導入Hive時刪除。 --hive-delims-replacement 當導入到Hive時,用用戶定義的字符串替換字符串字段中的\n、\r和\01。 --hive-partition-key 要分區的hive字段的名稱被分片 --hive-partition-value <v> String-value作為該作業中導入到hive的分區鍵。 --map-column-hive <map> 為已配置列重寫從SQL類型到Hive類型的默認映射。
sqoop-export
導出工具將一組文件從HDFS導出回RDBMS。目標表必須已經存在於數據庫中。根據用戶指定的分隔符,讀取輸入文件並將其解析為一組記錄。
默認操作是將這些轉換為一組INSERT語句,將記錄注入數據庫。在“更新模式”中,Sqoop將生成更新語句來替換數據庫中的現有記錄,在“調用模式”中,Sqoop將對每個記錄執行存儲過程調用。
常見的參數
--connect <jdbc-uri> 指定JDBC連接字符串 --connection-manager <class-name> 指定要使用的連接管理器類 --driver <class-name> 手動指定要使用的JDBC驅動程序類 --hadoop-mapred-home <dir> 覆蓋HADOOP_MAPRED_HOME美元 --help 打印使用說明 --password-file 為包含身份驗證密碼的文件設置路徑 -P 從控制台讀取密碼 --password <password> 設置身份驗證密碼 --username <username> 設置身份驗證的用戶名 --verbose 工作時打印更多信息 --connection-param-file <filename> 提供連接參數的可選屬性文件
導出的控制參數
--columns <col,col,col…> 要導出到表中的列 --direct 使用直接導出快速路徑 --export-dir <dir> 導出的HDFS源路徑 -m,--num-mappers <n> 使用n個map任務並行導出 --table <table-name> 表名 --call <stored-proc-name> 要調用的存儲過程 --update-key <col-name> 用於更新的錨列。如果有多個列,則使用逗號分隔的列列表。 --update-mode <mode> 指定在數據庫中發現具有不匹配鍵的新行時如何執行更新。模式的合法值包括updateonly(默認值)和allowinsert。" --input-null-string <null-string> 要解釋為空的字符串用於字符串列 --input-null-non-string <null-string> 對於非字符串列,要解釋為null的字符串 --staging-table <staging-table-name> 將數據暫存於其中的表,然后插入目標表。 --clear-staging-table 指示可以刪除登台表中存在的任何數據。 --batch 使用批處理模式執行底層語句。
說明:
需要—export-dir參數和—table或—調用中的一個。它們指定要填充到數據庫中的表(或要調用的存儲過程),以及包含源數據的HDFS目錄。
默認情況下,表中的所有列都選擇導出。您可以選擇列的子集,並使用——columns參數控制它們的順序。這應該包括要導出的列的逗號分隔列表。例如:——列“col1,col2,col3”。注意,沒有包含在——columns參數中的列需要定義默認值或允許空值。否則,數據庫將拒絕導入的數據,從而導致Sqoop作業失敗。
您可以獨立於目錄中文件的數量控制映射器的數量。導出性能取決於並行度。默認情況下,Sqoop將為導出過程並行使用四個任務。這可能不是最優的;您需要對自己的特定設置進行試驗。額外的任務可能提供更好的並發性,但是如果數據庫已經在更新索引、調用觸發器等方面遇到瓶頸,那么額外的負載可能會降低性能。- nummappers或-m參數控制映射任務的數量,這是使用的並行度。
一些數據庫還為導出提供了直接模式。使用——direct參數指定這個代碼路徑。
——input-null-string和——input-null-non-string參數是可選的。如果——input-null-string沒有指定,那么字符串“null”將被解釋為string類型列的null。如果——input-null-non-string沒有指定,那么字符串“null”和空字符串都將被解釋為非字符串列的null。注意,對於非字符串列,空字符串總是被解釋為null,如果使用——input-null-non-string指定其他字符串,則空字符串將被解釋為null。
由於Sqoop將導出過程分解為多個事務,所以失敗的導出作業可能導致部分數據提交到數據庫。這可能進一步導致后續作業在某些情況下由於插入沖突而失敗,或者在其他情況下導致數據重復。
您可以通過—stage -table選項指定一個staging表來克服這個問題,—stage -table選項充當一個輔助表,用於對導出的數據進行分段。階段數據最終在單個事務中移動到目標表。
為了使用登台功能,您必須在運行導出作業之前創建登台表。此表的結構必須與目標表相同。這個表應該在導出作業運行之前為空,或者必須指定——clear- stage -table選項。如果staging表包含數據,並且指定了——clear- stage -table選項,Sqoop將在啟動導出作業之前刪除所有數據。
sqoop 支持的數據庫
Database version --direct support connect string matches HSQLDB 1.8.0+ No jdbc:hsqldb:*// MySQL 5.0+ Yes jdbc:mysql:// Oracle 10.2.0+ No jdbc:oracle:*// PostgreSQL 8.3+ Yes (import only) jdbc:postgresql:// CUBRID 9.2+ NO jdbc:cubrid:*
源庫是oracle
1.全表導入
sqoop import \ --connect "jdbc:oracle:thin:@192.168.xxx.xxx:1521:CMIS" \ --username="glloans" \ --password="glloans" \ --outdir \/BIG_DATA/tmp/.sqoop/java/ \ --table "GLLOANS_XF.LM_ACCT_INFO" \ --columns "LOAN_NO,ACCT_BANK_CDE,ACCT_BCH_CDE,LOAN_ACCT_TYP,ACCT_NO,ACCT_NAME,ACCT_VAL_DT,ACCT_CREATE_DT,ACCT_EXPIRY_DT,ACCT_RMK,ATPY_STS" \ -m 3 \ --hive-import \ --delete-target-dir \ --hive-overwrite \ --hive-table "SDATA.LM_ACCT_INFO" \ --hive-partition-key "dt" \ --hive-partition-value "20190504" \ --hive-drop-import-delims \ --fields-terminated-by "\001" \ --lines-terminated-by "\n" \ --null-string "\\\N" \ --null-non-string "\\\N"
hive-drop-import-delims --將\n、\r和\01從字符串字段導入Hive時刪除。
null-string --要為字符串列的空值編寫的字符串
null-non-strin --要為非字符串列的空值編寫的字符串
delete-target-dir --如果導入目標目錄存在,則刪除
2.條件查詢導入
sqoop import \ --connect "jdbc:oracle:thin:@192.168.xxx.xxx:1521:CMIS" \ --username="glloans" --password="glloans" \ --outdir \/BIG_DATA/tmp/.sqoop/java/ \ --table "GLLOANS.INTF_TRADE_LOAN_LOG" \ --columns "GEN_GL_NO,TX_LOG_SEQ,LOAN_NO,LOAN_CONT_NO,BCH_CDE,TX_DT,TX_TIME,TX_TYP,HOST_TX_NO,HOST_TX_DT" \ -m 3 \ --hive-import \ --hive-overwrite \ --hive-table "SDATA.INTF_TRADE_LOAN_LOG" \ --hive-partition-key "dt" \ --hive-partition-value "20190504" \ --where "SUBSTR(TX_DT,1,4)||SUBSTR(TX_DT,6,2)||SUBSTR(TX_DT,9,2)='20190504'" \ --hive-drop-import-delims \ --fields-terminated-by "\001" \ --lines-terminated-by "\n" \ --null-string "\\\N" --null-non-string "\\\N"
3.查詢關聯
sqoop import \ --connect "jdbc:oracle:thin:@192.168.xxx.xxx:1521:CMIS" \ --username="glloans" \ --password="glloans" \ --outdir \/BIG_DATA/tmp/.sqoop/java/ \ -m 1 \ --hive-import \ --hive-overwrite \ --target-dir /user/taskctl/GLLOANS_XF.LM_WV_FEE_DTL_T \ --hive-table "SDATA.LM_WV_FEE_DTL_T" \ --hive-partition-key "dt" \ --hive-partition-value "20190504" \ --query "select GEN_GL_NO,SEQ_NO,WV_TX_LOG_SEQ from glloans.LM_WV_FEE_DTL_T t1 WHERE exists (select t2.GEN_GL_NO
from glloans.LM_WV_FEE_T t2 where t1.GEN_GL_NO = t2.GEN_GL_NO and t2.ADJ_DT=TO_CHAR(TO_DATE(20190504,'YYYY-MM-DD'),'YYYY-MM-DD')) AND \$CONDITIONS" \ --hive-drop-import-delims \ --fields-terminated-by "\001" \ --lines-terminated-by "\n" \ --null-string "\\\N" \ --null-non-string "\\\N"
(2)源庫是mysql
1.全表抽取
sqoop import \ --connect "jdbc:mysql://192.168.xxx.xxx:3306/mysqldb" \ --username="user" \ --password="password" \ --outdir \/BIG_DATA/tmp/.sqoop/java/ \ --table "MCS_UPSTREAM" \ --columns "MU_COMMAND_ID,MU_COMMAND,MU_COMMAND_TYPE,MU_MOBILE,MU_EXT,MU_CONTENT,MU_TIME,MU_CREATETIME" \ -m 3 \ --hive-import \ --delete-target-dir \ --hive-overwrite \ --hive-table "SDATA.MCS_UPSTREAM" \ --hive-partition-key "dt" \ --hive-partition-value "20190504" \ --hive-drop-import-delims \ --fields-terminated-by "\001" \ --lines-terminated-by "\n" \ --null-string "\\\N" \ --null-non-string "\\\N"
2.增量抽取
sqoop import \ --connect "jdbc:mysql://192.168.xxx.xxx:3306/mysqldb" \ --username="user" \ --password="password" \ --outdir \/BIG_DATA/tmp/.sqoop/java/ \ --table "MCS_MSG_WX" \ --columns "MMW_WXID,MMW_MODELCODE,MMW_MESSAGE,MMW_MOBILE,MMW_CERTNO,MMW_SENDFLAG,MMW_SENDDATE,MMW_SENDTIME,MMW_STATE,MMW_CHANNEL,MMW_RETCODE,MMW_CREATETIME,LAST_CHG_DT" \ -m 3 \ --hive-import \ --delete-target-dir \ --hive-overwrite \ --hive-table "SDATA.MCS_MSG_WX" \ --hive-partition-key "dt" \ --hive-partition-value "20190504" \ --where "concat(substr(MMW_CREATETIME,1,4),substr(MMW_CREATETIME,6,2),substr(MMW_CREATETIME,9,2))='20190504'" \ --hive-drop-import-delims \ --fields-terminated-by "\001" \ --lines-terminated-by "\n" \ --null-string "\\\N" \ --null-non-string "\\\N"
3.條件抽取
sqoop import \ --connect "jdbc:mysql://192.168.xxx.xxx:3306/mysqldb" \ --username="user" \ --password="password" \ --outdir \/BIG_DATA/tmp/.sqoop/java/ \ -m 1 \ --hive-import \ --hive-overwrite \ --target-dir /user/taskctl/MCS_MSG_WARN_MOBILE \ --hive-table "SDATA.S101_MCS_MSG_WARN_MOBILE" \ --hive-partition-key "dt" \ --hive-partition-value "20190504" \ --query "select * from mcs.MCS_MSG_WARN_MOBILE where date_format(LAST_CHG_DT,'%Y%m%d')='20190504' AND \$CONDITIONS" \ --hive-drop-import-delims \ --fields-terminated-by "\001" \ --lines-terminated-by "\n" \ --null-string "\\\N" \ --null-non-string "\\\N"