啟動步驟解析
-
1、解析配置,包括job.json、core.json、plugin.json三個配置
-
2、設置jobId到configuration當中
-
3、啟動Engine,通過Engine.start()進入啟動程序
-
4、設置RUNTIME_MODE奧configuration當中
-
5、通過JobContainer的start()方法啟動
-
6、依次執行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。
-
7、init()方法涉及到根據configuration來初始化reader和writer插件,這里涉及到jar包熱加載以及調用插件init()操作方法,同時設置reader和writer的configuration信息
-
8、prepare()方法涉及到初始化reader和writer插件的初始化,通過調用插件的prepare()方法實現,每個插件都有自己的jarLoader,通過集成URLClassloader實現而來
-
9、split()方法通過adjustChannelNumber()方法調整channel個數,同時執行reader和writer最細粒度的切分,需要注意的是,writer的切分結果要參照reader的切分結果,達到切分后數目相等,才能滿足1:1的通道模型
-
10、channel的計數主要是根據byte和record的限速來實現的,在split()的函數中第一步就是計算channel的大小
-
11、split()方法reader插件會根據channel的值進行拆分,但是有些reader插件可能不會參考channel的值,writer插件會完全根據reader的插件1:1進行返回
-
12、split()方法內部的mergeReaderAndWriterTaskConfigs()負責合並reader、writer、以及transformer三者關系,生成task的配置,並且重寫job.content的配置
-
13、schedule()方法根據split()拆分生成的task配置分配生成taskGroup對象,根據task的數量和單個taskGroup支持的task數量進行配置,兩者相除就可以得出taskGroup的數量
-
14、schdule()內部通過AbstractScheduler的schedule()執行,繼續執行startAllTaskGroup()方法創建所有的TaskGroupContainer組織相關的task,TaskGroupContainerRunner負責運行TaskGroupContainer執行分配的task。
-
15、taskGroupContainerExecutorService啟動固定的線程池用以執行TaskGroupContainerRunner對象,TaskGroupContainerRunner的run()方法調用taskGroupContainer.start()方法,針對每個channel創建一個TaskExecutor,通過taskExecutor.doStart()啟動任務