DataX啟動步驟解析


啟動步驟解析

  • 1、解析配置,包括job.json、core.json、plugin.json三個配置

  • 2、設置jobId到configuration當中

  • 3、啟動Engine,通過Engine.start()進入啟動程序

  • 4、設置RUNTIME_MODE奧configuration當中

  • 5、通過JobContainer的start()方法啟動

  • 6、依次執行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。

  • 7、init()方法涉及到根據configuration來初始化reader和writer插件,這里涉及到jar包熱加載以及調用插件init()操作方法,同時設置reader和writer的configuration信息

  • 8、prepare()方法涉及到初始化reader和writer插件的初始化,通過調用插件的prepare()方法實現,每個插件都有自己的jarLoader,通過集成URLClassloader實現而來

  • 9、split()方法通過adjustChannelNumber()方法調整channel個數,同時執行reader和writer最細粒度的切分,需要注意的是,writer的切分結果要參照reader的切分結果,達到切分后數目相等,才能滿足1:1的通道模型

  • 10、channel的計數主要是根據byte和record的限速來實現的,在split()的函數中第一步就是計算channel的大小

  • 11、split()方法reader插件會根據channel的值進行拆分,但是有些reader插件可能不會參考channel的值,writer插件會完全根據reader的插件1:1進行返回

  • 12、split()方法內部的mergeReaderAndWriterTaskConfigs()負責合並reader、writer、以及transformer三者關系,生成task的配置,並且重寫job.content的配置

  • 13、schedule()方法根據split()拆分生成的task配置分配生成taskGroup對象,根據task的數量和單個taskGroup支持的task數量進行配置,兩者相除就可以得出taskGroup的數量

  • 14、schdule()內部通過AbstractScheduler的schedule()執行,繼續執行startAllTaskGroup()方法創建所有的TaskGroupContainer組織相關的task,TaskGroupContainerRunner負責運行TaskGroupContainer執行分配的task。

  • 15、taskGroupContainerExecutorService啟動固定的線程池用以執行TaskGroupContainerRunner對象,TaskGroupContainerRunner的run()方法調用taskGroupContainer.start()方法,針對每個channel創建一個TaskExecutor,通過taskExecutor.doStart()啟動任務




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM