Sqoop切分數據及自定義boundary-query


1、指定切分的字段

Sqoop通過--split-by指定切分的字段,--m設置mapper的數量。通過這兩個參數分解生成m個where子句,進行分段查詢。因此sqoop的split可以理解為where子句的切分。

sqoop import \
--connect jdbc:mysql://192.168.1.100:3306/test \
--username root \
--password zxasqw12/* \
--query 'SELECT * FROM directory_excel_md5_mac1 WHERE $CONDITIONS' \
--delete-target-dir \
--target-dir /user/sqoop2/directory_excel_md5_mac1  \
--null-string '\\N'  \
--null-non-string '\\N'  \
--fields-terminated-by '\t'  \
-m 7  \
--split-by 'id'

可以看到

sqoop會根據切分字段的MIN()和MAX()來切分

具體如下:

第一步,獲取切分字段的MIN()和MAX()

為了根據mapper的個數切分table,sqoop首先會執行一個sql,用於獲取table中該字段的最小值和最大值,源碼片段為org.apache.sqoop.mapreduce.DataDrivenImportJob 224行,大體為:

private String buildBoundaryQuery(String col, String query) {
    ....
    return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
        + "FROM (" + query + ") AS " + alias;
  }

獲取到最大值和最小值,就可以根據不同的字段類型進行切分。

第二步,根據MIN和MAX不同的類型采用不同的切分方式

支持有Date,Text,Float,Integer,Boolean,NText,BigDecimal等等。

數字都是一個套路,就是

步長=(最大值-最小值)/mapper個數

,生成的區間為

[最小值,最小值+步長) [最小值+2*步長,最小值+3*步長) ... [最大值-步長,最大值]

可以參考下面的代碼片段org.apache.sqoop.mapreduce.db.FloatSplitter 43行

 List<InputSplit> splits = new ArrayList<InputSplit>();
    ...
    int numSplits = ConfigurationHelper.getConfNumMaps(conf);
    double splitSize = (maxVal - minVal) / (double) numSplits;
...
    double curLower = minVal;
    double curUpper = curLower + splitSize;

    while (curUpper < maxVal) {
        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          lowClausePrefix + Double.toString(curLower),
          highClausePrefix + Double.toString(curUpper)));
        curLower = curUpper;
        curUpper += splitSize;
    }

這樣最后每個mapper會執行自己的sql語句,比如第一個mapper執行:

select * from t where splitcol >= min and splitcol < min+splitsize

第二個mapper又會執行

select * from t where splitcol >= min+splitsize and splitcol < min+2*splitsize

2、自定義切分鍵和boundary-query

sqoop import  \
--username reWork \
--password reWork \
--connect jdbc:oracle:thin:@"(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=192.168.0.67)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=192.168.0.68)(PORT=1521))(LOAD_BALANCE = yes)(FAILOVER = on))(CONNECT_DATA=(SERVICE_NAME=FKBIGDAT)(SRVR=DEDICATED)))" \
--query "
......
select GATHER_TIME,ID,PAP_R,PRP_R,PAP_R1,PAP_R2,PAP_R3,PAP_R4,DATA_DATE,METER_ID from reWork.loss_yc_mrcjdldjsj_consgzb partition(P_20160829) 
union all
select GATHER_TIME,ID,PAP_R,PRP_R,PAP_R1,PAP_R2,PAP_R3,PAP_R4,DATA_DATE,METER_ID from reWork.loss_yc_mrcjdldjsj_consgzb partition(P_20160830)  where \$CONDITIONS" \
--target-dir /inceptor1/user/hive/warehouse/rework.db/hive/loss_yc_mrcjdldjsj_consgzb_txt3/pdata_date=p_201608 \
--null-string '\\N' \
--null-non-string '\\N' \
--fields-terminated-by "\001" \
--map-column-java GATHER_TIME=java.sql.Timestamp,DATA_DATE=java.sql.Date \
--map-column-hive GATHER_TIME=string,DATA_DATE=string \
--hive-drop-import-delims \
-m 7 \
--split-by  "MOD(ORA_HASH(concat(METER_ID, Data_date)),7)"  \
--boundary-query "select 0,7 from dual"

oracle中的hash分區就是利用的ora_hash函數

partition by hash(object_id) 等價於 ora_hash(object_id,4294967295)

ora_hash(列,hash桶) hash桶默認是4294967295 可以設置0到4294967295

ora_hash(object_id,4) 會把object_id的值進行hash運算,然后放到 0,1,2,3,4 這些桶里面,也就是說 ora_hash(object_id,4) 只會產生 0 1 2 3 4

 

By default sqoop will use query select min(<split-by>), max(<split-by>) from <table name> to find out boundaries for creating splits. In some cases this query is not the most optimal so you can specify any arbitrary query returning two numeric columns using --boundary-query argument.





免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM