DataX源碼分析(1)


 

開始(以mysql為例)

從 https://github.com/alibaba/DataX  下載源碼,通過idea閱讀。

Datx根目錄下core包包含了整個執行框架,

其中com.alibaba.datax.core.Engine是整個Java任務的入口,

core/src/main/bin/datax.py是服務端打包后執行的入口。

 

datax.py片段

ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)

Engine接收三個主要參數:job、jobid、mode 

    job代表任務配置json的位置,

    jobid表示此次任務的id,默認為-1,

    mode表示執行的模式。

將job路徑的文件內容轉換成Configuration,將項目本身存在在${datax.home}/conf/core.json中的文件與自定義的配置文件合並,更新core.json有而自定義job中沒有的配置。

public Configuration merge(final Configuration another,
        boolean updateWhenConflict) {
    Set<String> keys = another.getKeys();

    for (final String key : keys) {
        // 如果使用更新策略,凡是another存在的key,均需要更新
        if (updateWhenConflict) {
            this.set(key, another.get(key));
            continue;
        }

        // 使用忽略策略,只有another Configuration存在但是當前Configuration不存在的key,才需要更新
        boolean isCurrentExists = this.get(key) != null;
        if (isCurrentExists) {
            continue;
        }

        this.set(key, another.get(key));
    }
    return this;
}

初始化plugin中的配置,配置string,btye,date類型的格式。

public static void bind(final Configuration configuration) {
        StringCast.init(configuration);
        DateCast.init(configuration);
        BytesCast.init(configuration);
}

根據core.container.model變量來判斷是任務組還是一個任務,實例化對應的JobContainer或TaskGroupContainer,啟動對應的start方法。

重點

根據isDry屬性判斷是不是任務預檢查,如果isDry為true,則只執行preCheck方法。

該方法設置基本的jobPlugin,communicator,檢查讀寫plugin是否有每個表的讀寫權限,以及querySql,splikPK是否正確。

 /*verify query*/
ResultSet rs = null;
try {
    DBUtil.sqlValid(querySql,dataBaseType);
    if(i == 0) {
        rs = DBUtil.query(conn, querySql, fetchSize);
    }
} catch (ParserException e) {
    throw RdbmsException.asSqlParserException(this.dataBaseType, e, querySql);
} catch (Exception e) {
    throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, userName);
} finally {
    DBUtil.closeDBResources(rs, null, null);
}
/*verify splitPK*/
try{
    if (splitPkSqls != null && !splitPkSqls.isEmpty()) {
        splitPkSql = splitPkSqls.get(i).toString();
        DBUtil.sqlValid(splitPkSql,dataBaseType);
        if(i == 0) {
            SingleTableSplitUtil.precheckSplitPk(conn, splitPkSql, fetchSize, table, userName);
        }
    }
} catch (ParserException e) {
    throw RdbmsException.asSqlParserException(this.dataBaseType, e, splitPkSql);
} catch (DataXException e) {
    throw e;
} catch (Exception e) {
    throw RdbmsException.asSplitPKException(this.dataBaseType, e, splitPkSql,this.splitPkId.trim());
}

 

如果isDry為false,則依次執行七個步驟:preHandle,init,split,schedule,post,postHandle,invokeHooks。

preHandle方法:配置對應plugin的handler init方法:預處理jobId,初始化reader,writer對應的plugin split方法(核心): 首先Reader根據json配置中的channel參數指定建議的切分個數,然后按照isTableMode參數來判斷是指定Table方式還是querySql方式。isTableMode為True,計算eachTableShouldSplittedNumber的數值,即eachTableShouldSplittedNumber是單表應該切分的份數。如果splitPK有值且eachTableShouldSplittedNumber>1且是單表,則eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;嘗試對每個表,切分為eachTableShouldSplittedNumber 份

 for (String table : tables) {
    tempSlice = sliceConfig.clone();
    tempSlice.set(Key.TABLE, table);

    List<Configuration> splittedSlices = SingleTableSplitUtil
            .splitSingleTable(tempSlice, eachTableShouldSplittedNumber);

    splittedConfigs.addAll(splittedSlices);
}

其中,SingleTableSplitUtil.splitSingleTable有如下作用: 1.用於查詢出對應的表按splitPk切分對應的對大值和最小值。

 public static String genPKSql(String splitPK, String table, String where){
        String minMaxTemplate = "SELECT MIN(%s),MAX(%s) FROM %s";
        String pkRangeSQL = String.format(minMaxTemplate, splitPK, splitPK,
                table);
        if (StringUtils.isNotBlank(where)) {
            pkRangeSQL = String.format("%s WHERE (%s AND %s IS NOT NULL)",
                    pkRangeSQL, where, splitPK);
        }
        return pkRangeSQL;
}

2.對splitPk的跨度做eachTableShouldSplittedNumber粒度的區分,將column,table,where和上一步的splitPk做拼接。

for (String range : rangeList) {
                Configuration tempConfig = configuration.clone();

                tempQuerySql = buildQuerySql(column, table, where)
                        + (hasWhere ? " and " : " where ") + range;

                allQuerySql.add(tempQuerySql);
                tempConfig.set(Key.QUERY_SQL, tempQuerySql);
                pluginParams.add(tempConfig);
}

isTableMode為False,表示是querySql模式,不拼接,直接返回list。 然后,依據Reader的任務數,Writer做對應的切分。分為單表和多表,單表即:

//處理單表的情況
if (tableNumber == 1) {
    //由於在之前的  master prepare 中已經把 table,jdbcUrl 提取出來,所以這里處理十分簡單
    for (int j = 0; j < adviceNumber; j++) {
        splitResultConfigs.add(simplifiedConf.clone());
    }

    return splitResultConfigs;
}

其中,adviceNumber是Reader切分的數量。 多表則得判斷配置的表的個數是否與adviceNumber一樣,否則跑出異常。一樣則分別為每個表添加一份sql。

for (String table : tables) {
        Configuration tempSlice = sliceConfig.clone();
        tempSlice.set(Key.TABLE, table);
        tempSlice.set(Key.PRE_SQL, renderPreOrPostSqls(preSqls, table));
        tempSlice.set(Key.POST_SQL, renderPreOrPostSqls(postSqls, table));
        splitResultConfigs.add(tempSlice);
    }

接着整合上幾步的Reader配置和Writer配置,一一對應

for (int i = 0; i < readerTasksConfigs.size(); i++) {
    Configuration taskConfig = Configuration.newDefault();
    taskConfig.set(CoreConstant.JOB_READER_NAME,
            this.readerPluginName);
    taskConfig.set(CoreConstant.JOB_READER_PARAMETER,
            readerTasksConfigs.get(i));
    taskConfig.set(CoreConstant.JOB_WRITER_NAME,
            this.writerPluginName);
    taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
            writerTasksConfigs.get(i));

    if(transformerConfigs!=null && transformerConfigs.size()>0){
        taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
    }

    taskConfig.set(CoreConstant.TASK_ID, i);
    contentConfigs.add(taskConfig);
}

這里的taskConfig就是整合后的reader和wirter一一對應的配置。 schedule方法:未完待續…….


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM