DataX的執行流程分析【轉】


鏈接:https://www.jianshu.com/p/b10fbdee7e56

開篇

 最早接觸DataX是在前阿里同事在現在的公司引入的時候提到的,一直想抽空好好看看這部分代碼,因為DataX的代碼框架設計的很好,非常適合二次開發。在熟悉DataX的代碼過程中,沒有時間針對每個數據源的讀寫部分代碼進行研究(這部分代碼非常值得研究,基本上主流數據源的讀寫操作都能看到),主要閱讀的還是DataX的啟動和工作部分代碼。

 DataX的框架的核心部分我個人看來就兩大塊,一塊是配置貫穿DataX,all in configuration,將配置的json用到了極致;另一塊是通過URLClassLoader實現插件的熱加載

DataX的github地址:https://github.com/alibaba/DataX

 

DataX介紹

 DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各種異構數據源之間高效的數據同步功能。
 DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。同時DataX插件體系作為一套生態系統, 每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。

 

Job&Task概念

 在DataX的邏輯模型中包括job、task兩個維度,通過將job進行task拆分,然后將task合並到taskGroup進行運行。

  • job實例運行在jobContainer容器中,它是所有任務的master,負責初始化、拆分、調度、運行、回收、監控和匯報,但它並不做實際的數據同步操作。

  • Job: Job是DataX用以描述從一個源頭到一個目的端的同步作業,是DataX數據同步的最小業務單元。比如:從一張mysql的表同步到odps的一個表的特定分區。

  • Task: Task是為最大化而把Job拆分得到的最小執行單元。比如:讀一張有1024個分表的mysql分庫分表的Job,拆分成1024個讀Task,用若干個並發執行。

  • TaskGroup: 描述的是一組Task集合。在同一個TaskGroupContainer執行下的Task集合稱之為TaskGroup。

  • JobContainer: Job執行器,負責Job全局拆分、調度、前置語句和后置語句等工作的工作單元。類似Yarn中的JobTracker。

  • TaskGroupContainer: TaskGroup執行器,負責執行一組Task的工作單元,類似Yarn中的TaskTracker。

  • 簡而言之, Job拆分成Task,在分別在框架提供的容器中執行,插件只需要實現Job和Task兩部分邏輯。

 

啟動過程

 
 

說明:

  • 上圖中,黃色表示Job部分的執行階段,藍色表示Task部分的執行階段,綠色表示框架執行階段。



 
 


說明:

 

  • reader和writer的自定義插件內部需要實現job和task的接口即可

 

DataX開啟Debug

 閱讀源碼的最好方法是debug整個項目工程,在如何調試DataX項目的過程中還是花費了一些精力在里面的,現在一並共享出來供有興趣的程序員一並研究。
 整個debug過程需要按照下列步驟進行:

  • 1、github上下載DataX的源碼並通過以下命令進行編譯,github官網有編譯命令,如果遇到依賴包無法下載可以省去部分writer或reader插件,不影響debug。
(1)、下載DataX源碼: $ git clone git@github.com:alibaba/DataX.git (2)、通過maven打包: $ cd {DataX_source_code_home} $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true 打包成功,日志顯示如下: [INFO] BUILD SUCCESS [INFO] ----------------------------------------------------------------- [INFO] Total time: 08:12 min [INFO] Finished at: 2015-12-13T16:26:48+08:00 [INFO] Final Memory: 133M/960M [INFO] ----------------------------------------------------------------- 打包成功后的DataX包位於 {DataX_source_code_home}/target/datax/datax/ ,結構如下: $ cd {DataX_source_code_home} $ ls ./target/datax/datax/ bin conf job lib log log_perf plugin script tmp 
  • 2、由於DataX是通過python腳本進行啟動的,所以在python腳本中把啟動參數打印出來,核心在於print startCommand這句,繼而我們就能夠獲取啟動命令參數了。
if __name__ == "__main__": printCopyright() parser = getOptionParser() options, args = parser.parse_args(sys.argv[1:]) if options.reader is not None and options.writer is not None: generateJobConfigTemplate(options.reader,options.writer) sys.exit(RET_STATE['OK']) if len(args) != 1: parser.print_help() sys.exit(RET_STATE['FAIL']) startCommand = buildStartCommand(options, args) print startCommand child_process = subprocess.Popen(startCommand, shell=True) register_signal() (stdout, stderr) = child_process.communicate() sys.exit(child_process.returncode) 
  • 3、獲取啟動DataX的啟動命令
java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=//Users/lebron374/Documents/github/DataX/target/datax/datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=//Users/lebron374/Documents/github/DataX/target/datax/datax -Dlogback.configurationFile=//Users/lebron374/Documents/github/DataX/target/datax/datax/conf/logback.xml -classpath //Users/lebron374/Documents/github/DataX/target/datax/datax/lib/*:. -Dlog.file.name=s_datax_job_job_json com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job //Users/lebron374/Documents/github/DataX/target/datax/datax/job/job.json 
  • 4、配置Idea啟動腳本
 
 
以下配置在VM options當中 -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=//Users/lebron374/Documents/github/DataX/target/datax/datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=//Users/lebron374/Documents/github/DataX/target/datax/datax -Dlogback.configurationFile=//Users/lebron374/Documents/github/DataX/target/datax/datax/conf/logback.xml -classpath //Users/lebron374/Documents/github/DataX/target/datax/datax/lib/*:. -Dlog.file.name=s_datax_job_job_json com.alibaba.datax.core.Engine 以下配置在Program arguments當中 -mode standalone -jobid -1 -job //Users/lebron374/Documents/github/DataX/target/datax/datax/job/job.json 

 

啟動步驟解析

  • 1、解析配置,包括job.json、core.json、plugin.json三個配置

  • 2、設置jobId到configuration當中

  • 3、啟動Engine,通過Engine.start()進入啟動程序

  • 4、設置RUNTIME_MODE奧configuration當中

  • 5、通過JobContainer的start()方法啟動

  • 6、依次執行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。

  • 7、init()方法涉及到根據configuration來初始化reader和writer插件,這里涉及到jar包熱加載以及調用插件init()操作方法,同時設置reader和writer的configuration信息

  • 8、prepare()方法涉及到初始化reader和writer插件的初始化,通過調用插件的prepare()方法實現,每個插件都有自己的jarLoader,通過集成URLClassloader實現而來

  • 9、split()方法通過adjustChannelNumber()方法調整channel個數,同時執行reader和writer最細粒度的切分,需要注意的是,writer的切分結果要參照reader的切分結果,達到切分后數目相等,才能滿足1:1的通道模型

  • 10、channel的計數主要是根據byte和record的限速來實現的,在split()的函數中第一步就是計算channel的大小

  • 11、split()方法reader插件會根據channel的值進行拆分,但是有些reader插件可能不會參考channel的值,writer插件會完全根據reader的插件1:1進行返回

  • 12、split()方法內部的mergeReaderAndWriterTaskConfigs()負責合並reader、writer、以及transformer三者關系,生成task的配置,並且重寫job.content的配置

  • 13、schedule()方法根據split()拆分生成的task配置分配生成taskGroup對象,根據task的數量和單個taskGroup支持的task數量進行配置,兩者相除就可以得出taskGroup的數量

  • 14、schdule()內部通過AbstractScheduler的schedule()執行,繼續執行startAllTaskGroup()方法創建所有的TaskGroupContainer組織相關的task,TaskGroupContainerRunner負責運行TaskGroupContainer執行分配的task。

  • 15、taskGroupContainerExecutorService啟動固定的線程池用以執行TaskGroupContainerRunner對象,TaskGroupContainerRunner的run()方法調用taskGroupContainer.start()方法,針對每個channel創建一個TaskExecutor,通過taskExecutor.doStart()啟動任務

 

啟動過程源碼分析

入口main函數

public class Engine { public static void main(String[] args) throws Exception { int exitCode = 0; try { Engine.entry(args); } catch (Throwable e) { System.exit(exitCode); } } public static void entry(final String[] args) throws Throwable { // 省略相關參數的解析代碼 // 獲取job的配置路徑信息 String jobPath = cl.getOptionValue("job"); // 如果用戶沒有明確指定jobid, 則 datax.py 會指定 jobid 默認值為-1 String jobIdString = cl.getOptionValue("jobid"); RUNTIME_MODE = cl.getOptionValue("mode"); // 解析配置信息 Configuration configuration = ConfigParser.parse(jobPath); // 省略相關代碼 boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE); configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId); // 根據配置啟動參數 Engine engine = new Engine(); engine.start(configuration); } } 

說明:
main函數主要做兩件事情,分別是:

  • 1、解析job相關配置生成configuration。
  • 2、依據配置啟動Engine。

 

configuration解析過程

public final class ConfigParser { private static final Logger LOG = LoggerFactory.getLogger(ConfigParser.class); /** * 指定Job配置路徑,ConfigParser會解析Job、Plugin、Core全部信息,並以Configuration返回 */ public static Configuration parse(final String jobPath) { // 加載任務的指定的配置文件,這個配置是有固定的json的固定模板格式的 Configuration configuration = ConfigParser.parseJobConfig(jobPath); // 合並conf/core.json的配置文件 configuration.merge( ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH), false); // todo config優化,只捕獲需要的plugin // 固定的節點路徑 job.content[0].reader.name String readerPluginName = configuration.getString( CoreConstant.DATAX_JOB_CONTENT_READER_NAME); // 固定的節點路徑 job.content[0].writer.name String writerPluginName = configuration.getString( CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME); // 固定的節點路徑 job.preHandler.pluginName String preHandlerName = configuration.getString( CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME); // 固定的節點路徑 job.postHandler.pluginName String postHandlerName = configuration.getString( CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME); // 添加讀寫插件的列表待加載 Set<String> pluginList = new HashSet<String>(); pluginList.add(readerPluginName); pluginList.add(writerPluginName); if(StringUtils.isNotEmpty(preHandlerName)) { pluginList.add(preHandlerName); } if(StringUtils.isNotEmpty(postHandlerName)) { pluginList.add(postHandlerName); } try { // parsePluginConfig(new ArrayList<String>(pluginList))加載指定的插件的配置信息,並且和全局的配置文件進行合並 configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false); }catch (Exception e){ } // configuration整合了三方的配置,包括 任務配置、core核心配置、指定插件的配置。 return configuration; } // 在指定的reader和writer目錄獲取指定的插件並解析其配置 public static Configuration parsePluginConfig(List<String> wantPluginNames) { // 創建一個空的配置信息對象 Configuration configuration = Configuration.newDefault(); Set<String> replicaCheckPluginSet = new HashSet<String>(); int complete = 0; // 所有的reader在/plugin/reader目錄,遍歷獲取所有reader的目錄 // 獲取待加載插件的配資信息,並合並到上面創建的空配置對象 // //Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/reader for (final String each : ConfigParser .getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) { // 解析單個reader目錄,eachReaderConfig保存的是key是plugin.reader.pluginname,value是對應的plugin.json內容 Configuration eachReaderConfig = ConfigParser.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames); if(eachReaderConfig!=null) { // 采用覆蓋式的合並 configuration.merge(eachReaderConfig, true); complete += 1; } } // //Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/writer for (final String each : ConfigParser .getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) { Configuration eachWriterConfig = ConfigParser.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames); if(eachWriterConfig!=null) { configuration.merge(eachWriterConfig, true); complete += 1; } } if (wantPluginNames != null && wantPluginNames.size() > 0 && wantPluginNames.size() != complete) { throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, "插件加載失敗,未完成指定插件加載:" + wantPluginNames); } return configuration; } } 

說明:
configuration解析包括三部分的配置解析合並解析結果並返回,分別是:

  • 1、解析job的配置信息,由啟動參數指定job.json文件。
  • 2、解析DataX自帶配置信息,由默認指定的core.json文件。
  • 3、解析讀寫插件配置信息,由job.json指定的reader和writer插件信息

 

configuration配置信息

job.json的configuration

{ "job": { "setting": { "speed": { "byte":10485760, "record":1000 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column" : [ { "value": "DataX", "type": "string" }, { "value": 19890604, "type": "long" }, { "value": "1989-06-04 00:00:00", "type": "date" }, { "value": true, "type": "bool" }, { "value": "test", "type": "bytes" } ], "sliceRecordCount": 100000 } }, "writer": { "name": "streamwriter", "parameter": { "print": false, "encoding": "UTF-8" } } } ] } } 



core.json的configuration


{ "entry": { "jvm": "-Xms1G -Xmx1G", "environment": {} }, "common": { "column": { "datetimeFormat": "yyyy-MM-dd HH:mm:ss", "timeFormat": "HH:mm:ss", "dateFormat": "yyyy-MM-dd", "extraFormats":["yyyyMMdd"], "timeZone": "GMT+8", "encoding": "utf-8" } }, "core": { "dataXServer": { "address": "http://localhost:7001/api", "timeout": 10000, "reportDataxLog": false, "reportPerfLog": false }, "transport": { "channel": { "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel", "speed": { "byte": 100, "record": 10 }, "flowControlInterval": 20, "capacity": 512, "byteCapacity": 67108864 }, "exchanger": { "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger", "bufferSize": 32 } }, "container": { "job": { "reportInterval": 10000 }, "taskGroup": { "channel": 5 }, "trace": { "enable": "false" } }, "statistics": { "collector": { "plugin": { "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector", "maxDirtyNumber": 10 } } } } } 



plugin.json的configuration

{ "name": "streamreader", "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader", "description": { "useScene": "only for developer test.", "mechanism": "use datax framework to transport data from stream.", "warn": "Never use it in your real job." }, "developer": "alibaba" } { "name": "streamwriter", "class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter", "description": { "useScene": "only for developer test.", "mechanism": "use datax framework to transport data to stream.", "warn": "Never use it in your real job." }, "developer": "alibaba" } 



合並后的configuration

{ "common": { "column": { "dateFormat": "yyyy-MM-dd", "datetimeFormat": "yyyy-MM-dd HH:mm:ss", "encoding": "utf-8", "extraFormats": ["yyyyMMdd"], "timeFormat": "HH:mm:ss", "timeZone": "GMT+8" } }, "core": { "container": { "job": { "id": -1, "reportInterval": 10000 }, "taskGroup": { "channel": 5 }, "trace": { "enable": "false" } }, "dataXServer": { "address": "http://localhost:7001/api", "reportDataxLog": false, "reportPerfLog": false, "timeout": 10000 }, "statistics": { "collector": { "plugin": { "maxDirtyNumber": 10, "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector" } } }, "transport": { "channel": { "byteCapacity": 67108864, "capacity": 512, "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel", "flowControlInterval": 20, "speed": { "byte": -1, "record": -1 } }, "exchanger": { "bufferSize": 32, "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger" } } }, "entry": { "jvm": "-Xms1G -Xmx1G" }, "job": { "content": [{ "reader": { "name": "streamreader", "parameter": { "column": [{ "type": "string", "value": "DataX" }, { "type": "long", "value": 19890604 }, { "type": "date", "value": "1989-06-04 00:00:00" }, { "type": "bool", "value": true }, { "type": "bytes", "value": "test" }], "sliceRecordCount": 100000 } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": false } } }], "setting": { "errorLimit": { "percentage": 0.02, "record": 0 }, "speed": { "byte": 10485760 } } }, "plugin": { "reader": { "streamreader": { "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader", "description": { "mechanism": "use datax framework to transport data from stream.", "useScene": "only for developer test.", "warn": "Never use it in your real job." }, "developer": "alibaba", "name": "streamreader", "path": "//Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/reader/streamreader" } }, "writer": { "streamwriter": { "class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter", "description": { "mechanism": "use datax framework to transport data to stream.", "useScene": "only for developer test.", "warn": "Never use it in your real job." }, "developer": "alibaba", "name": "streamwriter", "path": "//Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/writer/streamwriter" } } } } 

 

Engine的start過程

public class Engine { private static final Logger LOG = LoggerFactory.getLogger(Engine.class); private static String RUNTIME_MODE; /* check job model (job/task) first */ public void start(Configuration allConf) { // 省略相關代碼 boolean isJob = !("taskGroup".equalsIgnoreCase(allConf .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL))); AbstractContainer container; if (isJob) { allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE); // 核心點在於JobContainer的對象 container = new JobContainer(allConf); instanceId = allConf.getLong( CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0); } Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO); // 核心容器的啟動 container.start(); } 

說明:
start過程中做了兩件事:

  • 1、創建JobContainer對象
  • 2、啟動JobContainer對象

 

JobContainer的啟動過程

public class JobContainer extends AbstractContainer { /** * jobContainer主要負責的工作全部在start()里面,包括init、prepare、split、scheduler、 * post以及destroy和statistics */ @Override public void start() { try { isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false); if(isDryRun) { // 省略相關代碼 } else { //拷貝一份新的配置,保證線程安全 userConf = configuration.clone(); // 執行preHandle()操作 LOG.debug("jobContainer starts to do preHandle ..."); this.preHandle(); // 執行reader、transform、writer等初始化 LOG.debug("jobContainer starts to do init ..."); this.init(); // 執行plugin的prepare LOG.info("jobContainer starts to do prepare ..."); this.prepare(); // 執行任務切分 LOG.info("jobContainer starts to do split ..."); this.totalStage = this.split(); // 執行任務調度 LOG.info("jobContainer starts to do schedule ..."); this.schedule(); // 執行后置操作 LOG.debug("jobContainer starts to do post ..."); this.post(); // 執行postHandle操作 LOG.debug("jobContainer starts to do postHandle ..."); this.postHandle(); LOG.info("DataX jobId [{}] completed successfully.", this.jobId); this.invokeHooks(); } } catch (Throwable e) { // 省略相關代碼 } finally { // 省略相關代碼 } } } 

說明:
JobContainer的start方法會執行一系列job相關的操作,如下:

  • 1、執行job的preHandle()操作,暫時不關注。
  • 2、執行job的init()操作,需重點關注。
  • 3、執行job的prepare()操作,暫時不關注。
  • 4、執行job的split()操作,需重點關注。
  • 5、執行job的schedule()操作,需重點關注。
  • 6、執行job的post()和postHandle()操作,暫時不關注。

 

Job的init過程

public class JobContainer extends AbstractContainer { private void init() { this.jobId = this.configuration.getLong( CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1); if (this.jobId < 0) { LOG.info("Set jobId = 0"); this.jobId = 0; this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, this.jobId); } Thread.currentThread().setName("job-" + this.jobId); // 初始化 JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector( this.getContainerCommunicator()); //必須先Reader ,后Writer this.jobReader = this.initJobReader(jobPluginCollector); this.jobWriter = this.initJobWriter(jobPluginCollector); } private Reader.Job initJobReader( JobPluginCollector jobPluginCollector) { // 獲取插件名字 this.readerPluginName = this.configuration.getString( CoreConstant.DATAX_JOB_CONTENT_READER_NAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( PluginType.READER, this.readerPluginName)); Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin( PluginType.READER, this.readerPluginName); // 設置reader的jobConfig jobReader.setPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER)); // 設置reader的readerConfig jobReader.setPeerPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER)); jobReader.setJobPluginCollector(jobPluginCollector); // 這里已經到每個插件具體的初始化操作 jobReader.init(); classLoaderSwapper.restoreCurrentThreadClassLoader(); return jobReader; } private Writer.Job initJobWriter( JobPluginCollector jobPluginCollector) { this.writerPluginName = this.configuration.getString( CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME); classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( PluginType.WRITER, this.writerPluginName)); Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin( PluginType.WRITER, this.writerPluginName); // 設置writer的jobConfig jobWriter.setPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER)); // 設置reader的readerConfig jobWriter.setPeerPluginJobConf(this.configuration.getConfiguration( CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER)); jobWriter.setPeerPluginName(this.readerPluginName); jobWriter.setJobPluginCollector(jobPluginCollector); jobWriter.init(); classLoaderSwapper.restoreCurrentThreadClassLoader(); return jobWriter; } } 

說明:
Job的init()過程主要做了兩個事情,分別是:

  • 1、創建reader的job對象,通過URLClassLoader實現類加載。
  • 2、創建writer的job對象,通過URLClassLoader實現類加載。

 

job的split過程

public class JobContainer extends AbstractContainer { private int split() { this.adjustChannelNumber(); if (this.needChannelNumber <= 0) { this.needChannelNumber = 1; } List<Configuration> readerTaskConfigs = this .doReaderSplit(this.needChannelNumber); int taskNumber = readerTaskConfigs.size(); List<Configuration> writerTaskConfigs = this .doWriterSplit(taskNumber); List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER); LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList)); /** * 輸入是reader和writer的parameter list,輸出是content下面元素的list */ List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs( readerTaskConfigs, writerTaskConfigs, transformerList); LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig)); this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig); return contentConfig.size(); } private void adjustChannelNumber() { int needChannelNumberByByte = Integer.MAX_VALUE; int needChannelNumberByRecord = Integer.MAX_VALUE; boolean isByteLimit = (this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0); if (isByteLimit) { long globalLimitedByteSpeed = this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024); Long channelLimitedByteSpeed = this.configuration .getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE); needChannelNumberByByte = (int) (globalLimitedByteSpeed / channelLimitedByteSpeed); needChannelNumberByByte = needChannelNumberByByte > 0 ? needChannelNumberByByte : 1; LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes."); } boolean isRecordLimit = (this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0; if (isRecordLimit) { long globalLimitedRecordSpeed = this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000); Long channelLimitedRecordSpeed = this.configuration.getLong( CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD); needChannelNumberByRecord = (int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed); needChannelNumberByRecord = needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1; } // 取較小值 this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ? needChannelNumberByByte : needChannelNumberByRecord; boolean isChannelLimit = (this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0); if (isChannelLimit) { this.needChannelNumber = this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL); LOG.info("Job set Channel-Number to " + this.needChannelNumber + " channels."); return; } throw DataXException.asDataXException( FrameworkErrorCode.CONFIG_ERROR, "Job運行速度必須設置"); } } 

說明:
DataX的job的split過程主要是根據限流配置計算channel的個數,進而計算task的個數,主要過程如下:

  • 1、adjustChannelNumber的過程根據按照字節限流和record限流計算channel的個數。
  • 2、reader的個數根據channel的個數進行計算。
  • 3、writer的個數根據reader的個數進行計算,writer和reader實現1:1綁定。
  • 4、通過mergeReaderAndWriterTaskConfigs()方法生成reader+writer的task的configuration,至此我們生成了task的配置信息。

 

Job的schedule過程

public class JobContainer extends AbstractContainer { private void schedule() { /** * 通過獲取配置信息得到每個taskGroup需要運行哪些tasks任務 */ List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration, this.needChannelNumber, channelsPerTaskGroup); ExecuteMode executeMode = null; AbstractScheduler scheduler; try { executeMode = ExecuteMode.STANDALONE; scheduler = initStandaloneScheduler(this.configuration); //設置 executeMode for (Configuration taskGroupConfig : taskGroupConfigs) { taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue()); } // 開始調度所有的taskGroup scheduler.schedule(taskGroupConfigs); } catch (Exception e) { // 省略相關代碼 } } } 

說明:
Job的schedule的過程主要做了兩件事,分別是:

  • 1、將task拆分成taskGroup,生成List<Configuration> taskGroupConfigs。
  • 2、啟動taskgroup的對象, scheduler.schedule(taskGroupConfigs)。

 

TaskGroup的schedule過程

public abstract class AbstractScheduler { public void schedule(List<Configuration> configurations) { int totalTasks = calculateTaskCount(configurations); // 啟動所有的TaskGroup startAllTaskGroup(configurations); try { while (true) { // 省略相關代碼 } } catch (InterruptedException e) { } } } public abstract class ProcessInnerScheduler extends AbstractScheduler { private ExecutorService taskGroupContainerExecutorService; @Override public void startAllTaskGroup(List<Configuration> configurations) { //todo 根據taskGroup的數量啟動固定的線程數 this.taskGroupContainerExecutorService = Executors .newFixedThreadPool(configurations.size()); //todo 每個TaskGroup啟動一個TaskGroupContainerRunner for (Configuration taskGroupConfiguration : configurations) { //todo 創建TaskGroupContainerRunner並提交線程池運行 TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration); this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner); } // 等待所有任務執行完后會關閉,執行該方法后不會再接收新任務 this.taskGroupContainerExecutorService.shutdown(); } } public class TaskGroupContainerRunner implements Runnable { private TaskGroupContainer taskGroupContainer; private State state; public TaskGroupContainerRunner(TaskGroupContainer taskGroup) { this.taskGroupContainer = taskGroup; this.state = State.SUCCEEDED; } @Override public void run() { try { Thread.currentThread().setName( String.format("taskGroup-%d", this.taskGroupContainer.getTaskGroupId())); this.taskGroupContainer.start(); this.state = State.SUCCEEDED; } catch (Throwable e) { } } } 

說明:
TaskGroup的Schedule方法做的事情如下:

  • 1、為所有的TaskGroup創建TaskGroupContainerRunner。
  • 2、通過線程池提交TaskGroupContainerRunner任務,執行TaskGroupContainerRunner的run()方法。
  • 3、在run()方法內部執行this.taskGroupContainer.start()方法。

 

TaskGroupContainer的啟動

public class TaskGroupContainer extends AbstractContainer { 

說明:
TaskGroupContainer的內部主要做的事情如下:

  • 1、根據TaskGroupContainer分配的Task任務列表,創建TaskExecutor對象。
  • 2、創建TaskExecutor對象,用以啟動分配該TaskGroup的task。
  • 3、至此,已經成功的啟動了Job當中的Task任務。

Task的啟動

    class TaskExecutor { private Channel channel; private Thread readerThread; private Thread writerThread; private ReaderRunner readerRunner; private WriterRunner writerRunner; /** * 該處的taskCommunication在多處用到: * 1. channel * 2. readerRunner和writerRunner * 3. reader和writer的taskPluginCollector */ public TaskExecutor(Configuration taskConf, int attemptCount) { // 獲取該taskExecutor的配置 this.taskConfig = taskConf; // 得到taskId this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID); this.attemptCount = attemptCount; /** * 由taskId得到該taskExecutor的Communication * 要傳給readerRunner和writerRunner,同時要傳給channel作統計用 */ this.channel = ClassUtil.instantiate(channelClazz, Channel.class, configuration); // channel在這里生成,每個taskGroup生成一個channel,在generateRunner方法當中生成writer或reader並注入channel this.channel.setCommunication(this.taskCommunication); /** * 獲取transformer的參數 */ List<TransformerExecution> transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig); /** * 生成writerThread */ writerRunner = (WriterRunner) generateRunner(PluginType.WRITER); this.writerThread = new Thread(writerRunner, String.format("%d-%d-%d-writer", jobId, taskGroupId, this.taskId)); //通過設置thread的contextClassLoader,即可實現同步和主程序不通的加載器 this.writerThread.setContextClassLoader(LoadUtil.getJarLoader( PluginType.WRITER, this.taskConfig.getString( CoreConstant.JOB_WRITER_NAME))); /** * 生成readerThread */ readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs); this.readerThread = new Thread(readerRunner, String.format("%d-%d-%d-reader", jobId, taskGroupId, this.taskId)); /** * 通過設置thread的contextClassLoader,即可實現同步和主程序不通的加載器 */ this.readerThread.setContextClassLoader(LoadUtil.getJarLoader( PluginType.READER, this.taskConfig.getString( CoreConstant.JOB_READER_NAME))); } public void doStart() { this.writerThread.start(); this.readerThread.start(); } } 

說明:
TaskExecutor的啟動過程主要做了以下事情:

  • 1、創建了reader和writer的線程任務,reader和writer公用一個channel。
  • 2、先啟動writer線程后,再啟動reader線程。
  • 3、至此,同步數據的Task任務已經啟動了。

 

DataX的數據傳輸

 跟一般的生產者-消費者模式一樣,Reader插件和Writer插件之間也是通過channel來實現數據的傳輸的。channel可以是內存的,也可能是持久化的,插件不必關心。插件通過RecordSender往channel寫入數據,通過RecordReceiver從channel讀取數據。

 channel中的一條數據為一個Record的對象,Record中可以放多個Column對象,這可以簡單理解為數據庫中的記錄和列。

public class DefaultRecord implements Record { private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16; private List<Column> columns; private int byteSize; // 首先是Record本身需要的內存 private int memorySize = ClassSize.DefaultRecordHead; public DefaultRecord() { this.columns = new ArrayList<Column>(RECORD_AVERGAE_COLUMN_NUMBER); } @Override public void addColumn(Column column) { columns.add(column); incrByteSize(column); } @Override public Column getColumn(int i) { if (i < 0 || i >= columns.size()) { return null; } return columns.get(i); } @Override public void setColumn(int i, final Column column) { if (i < 0) { throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR, "不能給index小於0的column設置值"); } if (i >= columns.size()) { expandCapacity(i + 1); } decrByteSize(getColumn(i)); this.columns.set(i, column); incrByteSize(getColumn(i)); } } 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM