sqoop關系型數據遷移原理以及map端內存為何不會爆掉窺探
序:map客戶端使用jdbc向數據庫發送查詢語句,將會拿到所有數據到map的客戶端,安裝jdbc的原理,數據全部緩存在內存中,但是內存沒有出現爆掉情況,這是因為1.3以后,對jdbc進行了優化,改進jdbc內部原理,將數據寫入磁盤存儲了。
Sqoop是apache旗下一款“Hadoop和關系數據庫服務器之間傳送數據”的工具。Sqoop架構非常簡單,其整合了Hive、Hbase和Oozie,通過map-reduce任務來傳輸數據,從而提供並發特性和容錯。
導入數據:MySQL,Oracle導入數據到Hadoop的HDFS、HIVE、HBASE等數據存儲系統。
導出數據:從Hadoop的文件系統中導出數據到關系數據庫mysql等。
工作機制
將導入或導出命令翻譯成mapreduce程序來實現,在翻譯出的mapreduce中主要是對inputformat和outputformat進行定制。
Sqoop的數據導入
從RDBMS導入單個表到HDFS。表中的每一行被視為HDFS的記錄。所有記錄都存儲為文本文件的文本數據(或者Avro、sequence文件等二進制數據)
表數據:在mysql中有一個庫test中intsmaze表。
導入intsmaze表數據到HDFS
bin/sqoop import \ --connect jdbc:mysql://192.168.19.131:3306/test \ --username root \ --password hadoop \ --table intsmaze \ --m 1
如果成功執行,那么會得到下面的輸出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
17/04/25 03:15:06 INFO mapreduce.Job: Running job: job_1490356790522_0018
17/04/25 03:15:52 INFO mapreduce.Job: Job job_1490356790522_0018 running in uber mode : false
17/04/25 03:15:52 INFO mapreduce.Job: map 0% reduce 0%
17/04/25 03:16:13 INFO mapreduce.Job: map 100% reduce 0%
17/04/25 03:16:14 INFO mapreduce.Job: Job job_1490356790522_0018 completed successfully
17/04/25 03:16:15 INFO mapreduce.Job: Counters: 30
File System Counters......
Job Counters ......
Map-Reduce Framework......
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=22
17/04/25 03:16:15 INFO mapreduce.ImportJobBase: Transferred 22 bytes in 98.332 seconds (0.2237 bytes/sec)
17/04/25 03:16:15 INFO mapreduce.ImportJobBase: Retrieved 3 records.
|
原理解析:
查看HDFS導入的數據,intsmaze表的數據和字段之間用逗號(,)表示。
1
2
3
|
1,2,22
2,3,33
3,ad,12
|
默認情況下,Sqoop會將我們導入的數據保存為逗號分隔的文本文件。如果導入數據的字段內容存在逗號分隔符,我們可以另外指定分隔符,字段包圍字符和轉義字符。使用命令行參數可以指定分隔符,文件格式,壓縮等。支持文本文件(--as-textfile)、avro(--as-avrodatafile)、SequenceFiles(--as-sequencefile)。默認為文本。
Sqoop啟動的mapreduce作業會用到一個InputFormat,它可以通過JDBC從一個數據庫表中讀取部分內容。Hadoop提供的DataDrivenDBInputFormat能夠為幾個map任務對查詢結果進行划分。
使用一個簡單的查詢通常就可以讀取一張表的內容
1
|
select col1,col2,... form tablename
|
但是為了更好的導入性能,可以將查詢划分到多個節點上執行。查詢時根據一個划分列(確定根據哪一個列划分)來進行划分。根據表中的元數據,Sqoop會選擇一個合適的列作為划分列(通常是表的主鍵)。主鍵列中的最小值和最大值會被讀出,與目標任務數一起來確定每個map任務要執行的查詢。當然用戶也可以使用split-by參數自己指定一個列作為划分列。
例如:person表中有10000條記錄,其id列值為0~9999。在導入這張表時,Sqoop會判斷出id是表的主鍵列。啟動MapReduce作業時,用來執行導入的DataDrivenDBInputFormat便會發出一條類似於select min(id),max(id) form intsmaze的查詢語句。假設我們制定運行5個map任務(使用-m 5),這樣便可以確認每個map任務要執行的查詢分別為select id,name,... form intsmaze where id>=0 and id<2000,select id,name,... form intsmaze where id>=2000 and id<4000,...,依次類推。
注意:划分列的選擇是影響並行執行效率的重要因素。如果id列的值不是均勻分布的(比如id值從2000到4000的范圍是沒有記錄的),那么有一部分map任務可能只有很少或沒有工作要做,而其他任務則有很多工作要做。
嚴重注意:在1.3之前,map的並行度一定要設置好,因為map客戶端會向數據庫發送查詢語句,將會拿到所有數據到map的客戶端緩存到,然后在執行map()方法一條一條處理,所有如果設置不好,一個map拿到的表數據過大就會內存溢出,畢竟里面是用jdbc去獲取的,所有數據都裝在jdbc的對象中,爆是必然的。在1.3以后改寫jdbc的內部原理,拿到一條數據就寫入硬盤中,就沒有內存溢出了。
增量導入
Sqoop不需要每次都導入整張表。例如,可以指定僅導入表的部分列。用戶也可以在查詢中加入where子句,來限定需要導入的記錄。例如,如果上個月已經將id為0~9999的記錄導入,而本月新增了1000條記錄,那么在導入時的查詢語句中加入子句where id>=10000,來實現只導入所有新增的記錄。
它需要添加incremental,check-column,和last-value選項來執行增量導入。
下面的語法用於Sqoop導入命令增量選項。
--incremental <mode> --check-column <column name> --last value <last check column value>
假設新添加的數據轉換成intsmaze表如下:
下面的命令用於在intsmaze表執行增量導入。
1
2
3
4
5
6
|
bin/sqoop import --connect jdbc:mysql://192.168.19.131:3306/test --username root --password hadoop \
--table person \
--m 1 \
--incremental append \
--check-column id \
--last-value 3
|
執行增量導入時,則會在hdfs上默認路徑下新增一個文件來存儲導入的新增數據,如上面的part-m-00001。
part-m-00001文件的數據內容為:
4,aa,4 5,bb,5 6,cc,6
導入到HDFS指定目錄
在使用Sqoop導入表數據到HDFS,我們可以指定目標目錄。
--target-dir <new or exist directory in HDFS>
下面的命令是用來導入emp_add表數據到'/queryresult'目錄。
bin/sqoop import \ --connect jdbc:mysql://192.168.19.131:3306/test \ --username root \ --password hadoop \ --target-dir /queryresult \ --table intsmaze \ --m 1
實際場景的分析:我一開始擔心在導入增量數據時,數據文件的位置等問題,想過通過每次執行增量導入時來根據時間作為文件名來指定每一次導入時文件存儲在hdfs上的路徑來解決。現在看來不需要擔心這個問題了。但是考慮這樣一種情況:關系庫中的某張表每天增量導入到hdfs上,然后使用hive對導入的數據加載進hive表時,我們不應該每次都情況hive表再進行全局導入hive,這樣太耗費效率了。當然可以根據文件的生成時間來確定每次把那個文件導入到hive中,但是不便於維護,可以直接根據目錄名來導入該目錄下的數據到hive中,且導入到hive中的數據可以按天設置分區,每次導入的數據進入一個新的分區。
有些業務場景只需要對hive表中每天新增的那些數據進行etl即可,完全沒有必要每次都是將整個hive表進行清理,那么可以結合hive的分區,按天進行分區,這樣每次進行etl處理就處理那一個分區數據即可。當然有些數據比如兩表的join操作,則必須對全表進行處理,那么在join時不限制分區即可,數據倒入時仍然時間分區裝載數據。
導入關系表到HIVE
bin/sqoop import --connect jdbc:mysql://192.168.19.131:3306/test --username root --password root --table intsmaze --hive-import --m 1
sqoop import --connect jdbc:mysql://192.168.19.131:3306/hive --username root --password admin --table intsmaze --fields-terminated-by '\t' --null-string '**' -m 1 --append --hive-import --check-column 'TBL_ID' --incremental append --last-value 6
導入表數據子集
Sqoop導入"where"子句的一個子集。它執行在各自的數據庫服務器相應的SQL查詢,並將結果存儲在HDFS的目標目錄。
where子句的語法如下。
--where <condition>
導入intsmaze表數據的子集。子集查詢檢所有列但是居住城市為:sec-bad
bin/sqoop import \ --connect jdbc:mysql://192.168.19.131:3306/test \ --username root \ --password root \ --where "city ='sec-bad'" \ --target-dir /wherequery \ --table intsmaze --m 1
按需導入
bin/sqoop import \ --connect jdbc:mysql://192.168.19.131:3306/test \ --username root \ --password root \ --target-dir /wherequery2 \ --query 'select id,name,deg from intsmaze WHERE id>1207 and $CONDITIONS' \ --split-by id \ --fields-terminated-by '\t' \ --m 1
$CONDITIONS參數是固定的,必須要寫上。
支持將關系數據庫中的數據導入到Hive(--hive-import)、HBase(--hbase-table)
數據導入Hive分三步:1)導入數據到HDFS 2)Hive建表 3)使用“LOAD DATA INPAHT”將數據LOAD到表中
數據導入HBase分二部:1)導入數據到HDFS 2)調用HBase put操作逐行將數據寫入表
導入表數據由於字段存在空字符串或null導致的問題
bin/sqoop import --connect jdbc:mysql://192.168.19.131:3306/test --username root --password hadoop \ --table intsmaze \ --m 1 \ --incremental append \ --check-column id \ --last-value 6
我們查看hdfs上的數據
7,null,7 8,null,8
MySQL(或者別的RDBMS)導入數據到hdfs后會發現原來在mysql中字段值明明是NULL, 到Hive查詢后 where field is null 會沒有結果呢,然后通過檢查一看,NULL值都變成了字段串'null'。其實你在導入的時候加上以下兩個參數就可以解決了,
--null-string '\\N' --null-non-string '\\N'
這里要注意一點。在hive里面。NULL是用\N來表示的。你可以自己做個實驗 insert overwrite table tb select NULL from tb1 limit 1;然后在去查看原文件就可以發現了。多提一點,如果在導入后發現數據錯位了,或者有好多原來有值的字段都變成了NULL, 這是因為你原表varchar類型的字段中可能含有\n\r等一些特殊字符。可以加上 --hive-drop-import-delims