術語:
process 工作流
process definition 工作流定義
process instance 工作流實例
task instance 任務實例
Master運行流程
啟動master
MasterServer的main()方法
MasterServer的run()方法
初始化netty遠程服務
注冊到zookeeper
啟動容錯
調用MasterSchedulerService.start方法,start方法實際調用了Thread.start方法,Thread.start調用的是MasterSchedulerService.run方法
MasterSchedulerService的run()方法
進入一個死循環(一定條件下可以退出)
檢查資源是否夠用(mem,cpu),如果資源不夠,睡1秒
調用scheduleProcess()方法
調用findOneCommand()方法
調用handleCommand()方法返回一個processInstance,將一個command轉換成processInstance,並刪除command
如果processInstance不為空,則利用processInstance信息生成一個WorkflowExecuteThread實例
調用masterExecService.execute(workflowExecuteThread),把workflowExecuteThread加入masterExecService線程池。
workflowExecuteThread的run()方法
run()調用 startProcess()方法
調用buildFlowDag來構建工作流dag圖(build process dag)
調用initTaskQueue來初始化任務隊列(init task queue)
調用submitPostNode(null)提交初始節點
submitPostNode方法第一次運行會將dag圖的初始節點(第一個task instance)加入standByList,然后調用submitStandByTask運行standByList中就緒的task instance.
而submitStandByTask運行成功后又會返回來再次調用submitPostNode()方法,使得運行成功的task instance節點的后續節點能夠加入到standByList。
submitStandByTask中也會調用submitTaskExec(task)方法運行。
submitTaskExec()方法會調用taskProcessor.submit()方法
taskProcessor.submit()調用時候有多種選擇,一般走CommonTaskProcessor類的submit()方法,當然還有ConditionTaskProcessor/DependentTaskProcessor/SubTaskProcessor/SwitchTaskProcessor等類的submit方法。
submit()方法會執行processService.submitTask()方法和CommonTaskProcessor.dispatchTask()方法
submitTask()方法會將task存入db,將子工作流存入command表。
dispatchTask()方法有待進一步探究
submitTaskExec()緊接着會調用taskProcessor.run()方法
run()調用 handleEvents()方法