鏈接:https://www.jianshu.com/p/b10fbdee7e56
開篇
最早接觸DataX是在前阿里同事在現在的公司引入的時候提到的,一直想抽空好好看看這部分代碼,因為DataX的代碼框架設計的很好,非常適合二次開發。在熟悉DataX的代碼過程中,沒有時間針對每個數據源的讀寫部分代碼進行研究(這部分代碼非常值得研究,基本上主流數據源的讀寫操作都能看到),主要閱讀的還是DataX的啟動和工作部分代碼。
DataX的框架的核心部分我個人看來就兩大塊,一塊是配置貫穿DataX,all in configuration,將配置的json用到了極致;另一塊是通過URLClassLoader實現插件的熱加載。
DataX的github地址:https://github.com/alibaba/DataX
DataX介紹
DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各種異構數據源之間高效的數據同步功能。
DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。同時DataX插件體系作為一套生態系統, 每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。
Job&Task概念
在DataX的邏輯模型中包括job、task兩個維度,通過將job進行task拆分,然后將task合並到taskGroup進行運行。
-
job實例運行在jobContainer容器中,它是所有任務的master,負責初始化、拆分、調度、運行、回收、監控和匯報,但它並不做實際的數據同步操作。
-
Job: Job是DataX用以描述從一個源頭到一個目的端的同步作業,是DataX數據同步的最小業務單元。比如:從一張mysql的表同步到odps的一個表的特定分區。
-
Task: Task是為最大化而把Job拆分得到的最小執行單元。比如:讀一張有1024個分表的mysql分庫分表的Job,拆分成1024個讀Task,用若干個並發執行。
-
TaskGroup: 描述的是一組Task集合。在同一個TaskGroupContainer執行下的Task集合稱之為TaskGroup。
-
JobContainer: Job執行器,負責Job全局拆分、調度、前置語句和后置語句等工作的工作單元。類似Yarn中的JobTracker。
-
TaskGroupContainer: TaskGroup執行器,負責執行一組Task的工作單元,類似Yarn中的TaskTracker。
-
簡而言之, Job拆分成Task,在分別在框架提供的容器中執行,插件只需要實現Job和Task兩部分邏輯。
啟動過程
說明:
- 上圖中,黃色表示
Job部分的執行階段,藍色表示Task部分的執行階段,綠色表示框架執行階段。
說明:
- reader和writer的自定義插件內部需要實現job和task的接口即可
DataX開啟Debug
閱讀源碼的最好方法是debug整個項目工程,在如何調試DataX項目的過程中還是花費了一些精力在里面的,現在一並共享出來供有興趣的程序員一並研究。
整個debug過程需要按照下列步驟進行:
- 1、github上下載DataX的源碼並通過以下命令進行編譯,github官網有編譯命令,如果遇到依賴包無法下載可以省去部分writer或reader插件,不影響debug。
(1)、下載DataX源碼: $ git clone git@github.com:alibaba/DataX.git (2)、通過maven打包: $ cd {DataX_source_code_home} $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true 打包成功,日志顯示如下: [INFO] BUILD SUCCESS [INFO] ----------------------------------------------------------------- [INFO] Total time: 08:12 min [INFO] Finished at: 2015-12-13T16:26:48+08:00 [INFO] Final Memory: 133M/960M [INFO] ----------------------------------------------------------------- 打包成功后的DataX包位於 {DataX_source_code_home}/target/datax/datax/ ,結構如下: $ cd {DataX_source_code_home} $ ls ./target/datax/datax/ bin conf job lib log log_perf plugin script tmp
- 2、由於DataX是通過python腳本進行啟動的,所以在python腳本中把啟動參數打印出來,核心在於print startCommand這句,繼而我們就能夠獲取啟動命令參數了。
if __name__ == "__main__": printCopyright() parser = getOptionParser() options, args = parser.parse_args(sys.argv[1:]) if options.reader is not None and options.writer is not None: generateJobConfigTemplate(options.reader,options.writer) sys.exit(RET_STATE['OK']) if len(args) != 1: parser.print_help() sys.exit(RET_STATE['FAIL']) startCommand = buildStartCommand(options, args) print startCommand child_process = subprocess.Popen(startCommand, shell=True) register_signal() (stdout, stderr) = child_process.communicate() sys.exit(child_process.returncode)
- 3、獲取啟動DataX的啟動命令
java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=//Users/lebron374/Documents/github/DataX/target/datax/datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=//Users/lebron374/Documents/github/DataX/target/datax/datax -Dlogback.configurationFile=//Users/lebron374/Documents/github/DataX/target/datax/datax/conf/logback.xml -classpath //Users/lebron374/Documents/github/DataX/target/datax/datax/lib/*:. -Dlog.file.name=s_datax_job_job_json com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job //Users/lebron374/Documents/github/DataX/target/datax/datax/job/job.json
- 4、配置Idea啟動腳本
以下配置在VM options當中 -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=//Users/lebron374/Documents/github/DataX/target/datax/datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=//Users/lebron374/Documents/github/DataX/target/datax/datax -Dlogback.configurationFile=//Users/lebron374/Documents/github/DataX/target/datax/datax/conf/logback.xml -classpath //Users/lebron374/Documents/github/DataX/target/datax/datax/lib/*:. -Dlog.file.name=s_datax_job_job_json com.alibaba.datax.core.Engine 以下配置在Program arguments當中 -mode standalone -jobid -1 -job //Users/lebron374/Documents/github/DataX/target/datax/datax/job/job.json
啟動步驟解析
-
1、解析配置,包括job.json、core.json、plugin.json三個配置
-
2、設置jobId到configuration當中
-
3、啟動Engine,通過Engine.start()進入啟動程序
-
4、設置RUNTIME_MODE奧configuration當中
-
5、通過JobContainer的start()方法啟動
-
6、依次執行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。
-
7、init()方法涉及到根據configuration來初始化reader和writer插件,這里涉及到jar包熱加載以及調用插件init()操作方法,同時設置reader和writer的configuration信息
-
8、prepare()方法涉及到初始化reader和writer插件的初始化,通過調用插件的prepare()方法實現,每個插件都有自己的jarLoader,通過集成URLClassloader實現而來
-
9、split()方法通過adjustChannelNumber()方法調整channel個數,同時執行reader和writer最細粒度的切分,需要注意的是,writer的切分結果要參照reader的切分結果,達到切分后數目相等,才能滿足1:1的通道模型
-
10、channel的計數主要是根據byte和record的限速來實現的,在split()的函數中第一步就是計算channel的大小
-
11、split()方法reader插件會根據channel的值進行拆分,但是有些reader插件可能不會參考channel的值,writer插件會完全根據reader的插件1:1進行返回
-
12、split()方法內部的mergeReaderAndWriterTaskConfigs()負責合並reader、writer、以及transformer三者關系,生成task的配置,並且重寫job.content的配置
-
13、schedule()方法根據split()拆分生成的task配置分配生成taskGroup對象,根據task的數量和單個taskGroup支持的task數量進行配置,兩者相除就可以得出taskGroup的數量
-
14、schdule()內部通過AbstractScheduler的schedule()執行,繼續執行startAllTaskGroup()方法創建所有的TaskGroupContainer組織相關的task,TaskGroupContainerRunner負責運行TaskGroupContainer執行分配的task。
-
15、taskGroupContainerExecutorService啟動固定的線程池用以執行TaskGroupContainerRunner對象,TaskGroupContainerRunner的run()方法調用taskGroupContainer.start()方法,針對每個channel創建一個TaskExecutor,通過taskExecutor.doStart()啟動任務
啟動過程源碼分析
入口main函數
public class Engine { public static void main(String[] args) throws Exception { int exitCode = 0; try { Engine.entry(args); } catch (Throwable e) { System.exit(exitCode); } } public static void entry(final String[] args) throws Throwable { // 省略相關參數的解析代碼 // 獲取job的配置路徑信息 String jobPath = cl.getOptionValue("job"); // 如果用戶沒有明確指定jobid, 則 datax.py 會指定 jobid 默認值為-1 String jobIdString = cl.getOptionValue("jobid"); RUNTIME_MODE = cl.getOptionValue("mode"); // 解析配置信息 Configuration configuration = ConfigParser.parse(jobPath); // 省略相關代碼 boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE); configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId); // 根據配置啟動參數 Engine engine = new Engine(); engine.start(configuration); } }
說明:
main函數主要做兩件事情,分別是:
- 1、解析job相關配置生成configuration。
- 2、依據配置啟動Engine。
configuration解析過程
public final class ConfigParser { private static final Logger LOG = LoggerFactory.getLogger(ConfigParser.class); /** * 指定Job配置路徑,ConfigParser會解析Job、Plugin、Core全部信息,並以Configuration返回 */ public static Configuration parse(final String jobPath) { // 加載任務的指定的配置文件,這個配置是有固定的json的固定模板格式的 Configuration configuration = ConfigParser.parseJobConfig(jobPath); // 合並conf/core.json的配置文件 configuration.merge( ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH), false); // todo config優化,只捕獲需要的plugin // 固定的節點路徑 job.content[0].reader.name String readerPluginName = configuration.getString( CoreConstant.DATAX_JOB_CONTENT_READER_NAME); // 固定的節點路徑 job.content[0].writer.name String writerPluginName = configuration.getString( CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME); // 固定的節點路徑 job.preHandler.pluginName String preHandlerName = configuration.getString( CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME); // 固定的節點路徑 job.postHandler.pluginName String postHandlerName = configuration.getString( CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME); // 添加讀寫插件的列表待加載 Set<String> pluginList = new HashSet<String>(); pluginList.add(readerPluginName); pluginList.add(writerPluginName); if(StringUtils.isNotEmpty(preHandlerName)) { pluginList.add(preHandlerName); } if(StringUtils.isNotEmpty(postHandlerName)) { pluginList.add(postHandlerName); } try { // parsePluginConfig(new ArrayList<String>(pluginList))加載指定的插件的配置信息,並且和全局的配置文件進行合並 configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false); }catch (Exception e){ } // configuration整合了三方的配置,包括 任務配置、core核心配置、指定插件的配置。 return configuration; } // 在指定的reader和writer目錄獲取指定的插件並解析其配置 public static Configuration parsePluginConfig(List<String> wantPluginNames) { // 創建一個空的配置信息對象 Configuration configuration = Configuration.newDefault(); Set<String> replicaCheckPluginSet = new HashSet<String>(); int complete = 0; // 所有的reader在/plugin/reader目錄,遍歷獲取所有reader的目錄 // 獲取待加載插件的配資信息,並合並到上面創建的空配置對象 // //Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/reader for (final String each : ConfigParser .getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) { // 解析單個reader目錄,eachReaderConfig保存的是key是plugin.reader.pluginname,value是對應的plugin.json內容 Configuration eachReaderConfig = ConfigParser.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames); if(eachReaderConfig!=null) { // 采用覆蓋式的合並 configuration.merge(eachReaderConfig, true); complete += 1; } } // //Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/writer for (final String each : ConfigParser .getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) { Configuration eachWriterConfig = ConfigParser.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames)