Sqoop:SQL-to-Hadoop (點擊查看官方英文文檔)
這個鏈接是簡潔的中文教程:https://www.yiibai.com/sqoop/sqoop_import_all_tables.html
Sqoop連接傳統關系型數據庫 和 Hadoop 的工具
Sqoop是一個轉換工具,用於在關系型數據庫與Hive等之間進行數據轉換
Sqoop導入
導入工具從RDBMS到HDFS導入單個表。表中的每一行被視為HDFS的記錄。所有記錄被存儲在文本文件的文本數據或者在Avro和序列文件的二進制數據。
Sqoop導出
導出工具從HDFS導出一組文件到一個RDBMS。作為輸入到Sqoop文件包含記錄,這被稱為在表中的行。那些被讀取並解析成一組記錄和分隔使用用戶指定的分隔符。
我的git(點擊3實現hive數據倉庫)
步驟:
- Sqoop抽取mysql數據到hive
- 和業務溝通需要的自主分析的數據指標,
- 在hive上做數據的聚合.
- linux服務器上的定時部署
- sqoop抽取數據到mysql
- 使用bi做展示,線上部署
Sqoop 工具的命令格式
Sqoop是一系列相關工具的集合,它的基本命令格式:
$ sqoop tool-name [tool-arguments]
幫助方法:
$ sqoop help [tool-name] 或者 $sqoop [tool-name] --help
Sqoop的導入
$ sqoop import (generic-args) (import-args)
例子:
$ sqoop import --connect jdbc:mysql://localhost/db --username foo --table TEST $ sqoop --options-file /users/homer/work/import.txt --table TEST ⚠️import.txt內包括:
import
--connect
jdbc:mysql://localhost/db
--username
foo
⚠️jdbc是Java數據庫連接,Java Database Connectivity,簡稱JDBC。
重要的參數
表1常用參數:
Argument | Description |
---|---|
--connect <jdbc-uri> |
Specify JDBC connect string |
--connection-manager <class-name> |
Specify connection manager class to use |
--driver <class-name> |
Manually specify JDBC driver class to use |
--hadoop-mapred-home <dir> |
Override $HADOOP_MAPRED_HOME |
--help |
Print usage instructions |
--password-file |
Set path for a file containing the authentication password |
-P |
Read password from console |
--password <password> |
Set authentication password |
--username <username> |
Set authentication username |
--driver參數:
Sqoop可以使用任何JDBC數據庫,但首先要下載相對應的JDBC驅動,並安裝。
例如: Mysql的驅動com.mysql.jdbc.Driver
--connect <jdbc-uri> 的參數對兒看:https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-configuration-properties.html
附加:
useUnicode=true #To use 3-byte UTF8 with Connector/J characterEncoding=utf-8 #What character encoding should the driver use when dealing with strings? (defaults is to 'autodetect') zeroDateTimeBehavior=convertToNull #當遇到datetime數據:0000-00-00這樣全是0的,轉化為null tinyInt1isBit=false #TINYINT(1) as the BIT type默認是true,改成false的話,就是數字1了。 dontTrackOpenResources=true# 防止內存泄露的。會自動close(), 默認false。 defaultFetchSize=50000 #和下面的useCursorFetch配合使用的。每次最多取n行數據。原因大概如下: # 簡單說就是大數據,小內存放不下,就用這個2個參數 useCursorFetch=true #使用cursor這種方式得到row,不是很理解。
選擇導入的數據
--table employees #導入表
--columns
"列名,列名, ..." #選擇要導入的列
--where "條件" #設置導入條件
自由表格的查詢導入
--query “標准的select語句”
配套的參數:
--target-dir "HDFS絕對路徑"
是否用並行的方式導入數據:
如果是,則每個map任務將需要執行這個query的副本,結果由綁定的conditions分組。
- query內需要包括$conditions變量
- 還必須使用參數--split-by 列名, 即選擇一個分隔列。
如果不是,這個query只被執行一次,使用參數-m 1
遷移過程使用1個map(開啟一個線程)
$ sqoop import \ --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' \ -m 1 --target-dir /user/foo/joinresults
⚠️嵌套引號的寫法: "SELECT * FROM x WHERE a='foo' AND \$CONDITIONS"
關於Parallelims並行機制
You can specify the number of map tasks (parallel processes) to use to perform the import by using the -m
or --num-mappers
argument.
默認都是4個map task。
當你執行一個並行導入,Sqoop需要一個規則,來分割載入的工作workload。Sqoop使用一個被分割的列來分割這個workload。
默認,會使用一個表中的主鍵key列作為分割列。這個列的最小/大值是邊界。
比如,一個表的主鍵是id,范圍0~1000,Sqoop會被使用4個task, 運行4個進程。每個進程執行SQL語句:
SELECT * FROM sometable WHERE id >= lo AND id < hi
在不同的task中,(lo, hi)
被設置為 (0, 250), (250, 500), (500, 750), and (750, 1001)
使用參數--split-by,用於指定被分割的列名,這在缺少主鍵或又多重主鍵key時最有效。
7.29增量導入(Incremental imports)
Sqoop提供這周導入模式,用於之前已經導入,后來表的數據發生了改變的情況。無需全表重新導入,提高效率。
--check-column (col) |
Specifies the column to be examined when determining which rows to import. 指定被檢查的列。 |
--incremental (mode) |
Specifies how Sqoop determines which rows are new. append and lastmodified .明確新的變化是來自新增row,還是原有row的數據變更了。 |
--last-value (value) |
Specifies the maximum value of the check column from the previous import. 明確之前導入的數據中,被檢查的列的最后一個值。 |
用上面的3個參數:來明確檢查方法。