今天這篇文章分析一下DolphinScheduler2.0.0 版本的源碼
關於如何搭建DolphinScheduler2.0.0源碼分析環境,可以參考官方網站和我之前的文章。
下面開始分析:
第一步:先在idea啟動ApiApplicationServer和MasterServer進程和WorkerServer進程。
第二步:啟動前端程序,切換到dolphinscheduler-ui子文件夾下,用cmd運行npm run start
第三步:用瀏覽器打開localhost:8888,輸入用戶名密碼登錄。初始的用戶名密碼是: dolphinscheduler / dolphinscheduler123
進去后,新建一個項目如下
第四步:創建一個新工作流
第五步:切換到后台mysql數據庫,看一下此時數據庫的情況:
打開t_ds_task_definition表,查看一下,信息保存如下:
第六步:上線剛才的任務進行運行
看一下提交之后,后台MasterServer進程的日志輸出情況:
[INFO] 2021-11-23 11:43:00.387 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1162] - remove task from stand by list, id: 1 name:test_shell [INFO] 2021-11-23 11:43:00.392 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[291] - process event: State Event :key: null type: TASK_STATE_CHANGE executeStatus: FAILURE task instance id: 1 process instance id: 1 context: null [INFO] 2021-11-23 11:43:00.397 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[369] - work flow 1 task 1 state:FAILURE [INFO] 2021-11-23 11:43:00.397 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell [INFO] 2021-11-23 11:43:00.397 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[376] - failure task will be submitted: process id: 1, task instance id: 1 state:FAILURE retry times:0 / 1, interval:30 [INFO] 2021-11-23 11:44:00.038 org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob:[74] - scheduled fire time :Tue Nov 23 11:44:00 CST 2021, fire time :Tue Nov 23 11:44:00 CST 2021, process id :1 [INFO] 2021-11-23 11:44:00.917 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[243] - find command 2, slot:0 : [INFO] 2021-11-23 11:44:00.918 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[186] - find one command: id: 2, type: SCHEDULER [INFO] 2021-11-23 11:44:00.936 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[209] - handle command end, command 2 process 2 start... [INFO] 2021-11-23 11:44:00.951 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell [INFO] 2021-11-23 11:44:00.954 org.apache.dolphinscheduler.service.process.ProcessService:[1093] - start submit task : test_shell, instance id:2, state: RUNNING_EXECUTION [INFO] 2021-11-23 11:44:00.966 org.apache.dolphinscheduler.service.process.ProcessService:[1106] - end submit task to db successfully:2 test_shell state:SUBMITTED_SUCCESS complete, instance id:2 state: RUNNING_EXECUTION [INFO] 2021-11-23 11:44:00.967 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[120] - task ready to submit: TaskInstance{id=2, name='test_shell', taskType='SHELL', processInstanceId=2, processInstanceName='null', state=SUBMITTED_SUCCESS, firstSubmitTime=Tue Nov 23 11:44:00 CST 2021, submitTime=Tue Nov 23 11:44:00 CST 2021, startTime=null, endTime=null, host='null', executePath='null', logPath='null', retryTimes=0, alertFlag=NO, processInstance=null, processDefine=null, pid=0, appLink='null', flag=YES, dependency='null', duration=null, maxRetryTimes=1, retryInterval=30, taskInstancePriority=MEDIUM, processInstancePriority=MEDIUM, dependentResult='null', workerGroup='default', environmentCode=-1, environmentConfig='null', executorId=1, executorName='null', delayTime=5, dryRun=0} [ERROR] 2021-11-23 11:44:00.981 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[334] - tenant not exists,process instance id : 2,task instance id : 2 [INFO] 2021-11-23 11:44:00.993 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[130] - master submit success, task : test_shell [ERROR] 2021-11-23 11:44:00.993 org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer:[116] - dispatcher task error java.lang.NullPointerException: null at org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer.dispatch(TaskPriorityQueueConsumer.java:131) at org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer.run(TaskPriorityQueueConsumer.java:100) [INFO] 2021-11-23 11:44:00.997 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1162] - remove task from stand by list, id: 2 name:test_shell [INFO] 2021-11-23 11:44:01.000 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[291] - process event: State Event :key: null type: TASK_STATE_CHANGE executeStatus: FAILURE task instance id: 2 process instance id: 2 context: null [INFO] 2021-11-23 11:44:01.002 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[369] - work flow 2 task 2 state:FAILURE [INFO] 2021-11-23 11:44:01.003 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell [INFO] 2021-11-23 11:44:01.003 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[376] - failure task will be submitted: process id: 2, task instance id: 2 state:FAILURE retry times:0 / 1, interval:30
再過一分鍾,再次調用的時候,再次輸出新日志如下:
[INFO] 2021-11-23 11:45:00.079 org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob:[74] - scheduled fire time :Tue Nov 23 11:45:00 CST 2021, fire time :Tue Nov 23 11:45:00 CST 2021, process id :1 [INFO] 2021-11-23 11:45:00.657 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[243] - find command 3, slot:0 : [INFO] 2021-11-23 11:45:00.657 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[186] - find one command: id: 3, type: SCHEDULER [INFO] 2021-11-23 11:45:00.748 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[209] - handle command end, command 3 process 3 start... [INFO] 2021-11-23 11:45:00.767 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell [INFO] 2021-11-23 11:45:00.769 org.apache.dolphinscheduler.service.process.ProcessService:[1093] - start submit task : test_shell, instance id:3, state: RUNNING_EXECUTION [INFO] 2021-11-23 11:45:00.799 org.apache.dolphinscheduler.service.process.ProcessService:[1106] - end submit task to db successfully:3 test_shell state:SUBMITTED_SUCCESS complete, instance id:3 state: RUNNING_EXECUTION [INFO] 2021-11-23 11:45:00.800 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[120] - task ready to submit: TaskInstance{id=3, name='test_shell', taskType='SHELL', processInstanceId=3, processInstanceName='null', state=SUBMITTED_SUCCESS, firstSubmitTime=Tue Nov 23 11:45:00 CST 2021, submitTime=Tue Nov 23 11:45:00 CST 2021, startTime=null, endTime=null, host='null', executePath='null', logPath='null', retryTimes=0, alertFlag=NO, processInstance=null, processDefine=null, pid=0, appLink='null', flag=YES, dependency='null', duration=null, maxRetryTimes=1, retryInterval=30, taskInstancePriority=MEDIUM, processInstancePriority=MEDIUM, dependentResult='null', workerGroup='default', environmentCode=-1, environmentConfig='null', executorId=1, executorName='null', delayTime=5, dryRun=0} [ERROR] 2021-11-23 11:45:00.810 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[334] - tenant not exists,process instance id : 3,task instance id : 3 [INFO] 2021-11-23 11:45:00.821 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[130] - master submit success, task : test_shell [ERROR] 2021-11-23 11:45:00.821 org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer:[116] - dispatcher task error java.lang.NullPointerException: null at org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer.dispatch(TaskPriorityQueueConsumer.java:131) at org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer.run(TaskPriorityQueueConsumer.java:100) [INFO] 2021-11-23 11:45:00.825 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1162] - remove task from stand by list, id: 3 name:test_shell [INFO] 2021-11-23 11:45:00.827 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[291] - process event: State Event :key: null type: TASK_STATE_CHANGE executeStatus: FAILURE task instance id: 3 process instance id: 3 context: null [INFO] 2021-11-23 11:45:00.829 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[369] - work flow 3 task 3 state:FAILURE [INFO] 2021-11-23 11:45:00.829 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell [INFO] 2021-11-23 11:45:00.829 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[376] - failure task will be submitted: process id: 3, task instance id: 3 state:FAILURE retry times:0 / 1, interval:30
第七步:根據上面一步的輸出,去源代碼中找到輸出日志的文件位置:
我們先來找第一句:
[INFO] 2021-11-23 11:45:00.079 org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob:[74] - scheduled fire time :Tue Nov 23 11:45:00 CST 2021, fire time :Tue Nov 23 11:45:00 CST 2021, process id :1
我們以紅色的為關鍵詞,用idea的find in path整個工程去找到這個輸出是在哪個java文件的哪個類哪個方法。
第一步:調度會先執行,src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java的execute()方法。
接下來看下一句:
[INFO] 2021-11-23 11:45:00.657 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[243] - find command 3, slot:0 :
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java的findOneCommand()方法。
接下來再看下一句:
[INFO] 2021-11-23 11:45:00.657 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[186] - find one command: id: 3, type: SCHEDULER
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java的scheduleProcess()方法。
接下來看下一句log:
[INFO] 2021-11-23 11:45:00.748 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService:[209] - handle command end, command 3 process 3 start...
這仍然是src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java的scheduleProcess()方法。
接下來再看下一句:
[INFO] 2021-11-23 11:45:00.767 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的addTaskToStandByList(TaskInstance taskInstance)方法。
接下來看下一句log:
[INFO] 2021-11-23 11:45:00.769 org.apache.dolphinscheduler.service.process.ProcessService:[1093] - start submit task : test_shell, instance id:3, state: RUNNING_EXECUTION
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的submitTask(TaskInstance taskInstance)方法。
接下來再看下一句log:
[INFO] 2021-11-23 11:45:00.799 org.apache.dolphinscheduler.service.process.ProcessService:[1106] - end submit task to db successfully:3 test_shell state:SUBMITTED_SUCCESS complete, instance id:3 state: RUNNING_EXECUTION
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的submitTask(TaskInstance taskInstance)方法。
接下來再看下一句日志:
[INFO] 2021-11-23 11:45:00.800 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[120] - task ready to submit: TaskInstance{id=3, name='test_shell', taskType='SHELL', processInstanceId=3, processInstanceName='null', state=SUBMITTED_SUCCESS, firstSubmitTime=Tue Nov 23 11:45:00 CST 2021, submitTime=Tue Nov 23 11:45:00 CST 2021, startTime=null, endTime=null, host='null', executePath='null', logPath='null', retryTimes=0, alertFlag=NO, processInstance=null, processDefine=null, pid=0, appLink='null', flag=YES, dependency='null', duration=null, maxRetryTimes=1, retryInterval=30, taskInstancePriority=MEDIUM, processInstancePriority=MEDIUM, dependentResult='null', workerGroup='default', environmentCode=-1, environmentConfig='null', executorId=1, executorName='null', delayTime=5, dryRun=0}
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java的dispatchTask(TaskInstance taskInstance, ProcessInstance processInstance)方法。
接下來看下一句日志:
[ERROR] 2021-11-23 11:45:00.810 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[334] - tenant not exists,process instance id : 3,task instance id : 3
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java的verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance)方法。
執行到這里說明報錯了,租戶不存在的錯誤。
雖然報錯了,但是我們還是要繼續看完日志,就當作這次是一次失敗的執行,我們看看失敗的流程是什么樣的。
再往下走就是:
[INFO] 2021-11-23 11:45:00.821 org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessor:[130] - master submit success, task : test_shell
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java的dispatchTask(TaskInstance taskInstance, ProcessInstance processInstance)方法。
接着往下看日志:
[ERROR] 2021-11-23 11:45:00.821 org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer:[116] - dispatcher task error
java.lang.NullPointerException: null
at org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer.dispatch(TaskPriorityQueueConsumer.java:131)
at org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer.run(TaskPriorityQueueConsumer.java:100)
這是src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java的run()方法。
接着往下走:
[INFO] 2021-11-23 11:45:00.825 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1162] - remove task from stand by list, id: 3 name:test_shell
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的removeTaskFromStandbyList(TaskInstance taskInstance)方法。
接着往下走是:
[INFO] 2021-11-23 11:45:00.827 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[291] - process event: State Event :key: null type: TASK_STATE_CHANGE executeStatus: FAILURE task instance id: 3 process instance id: 3 context: null
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的stateEventHandler(StateEvent stateEvent)方法。
接着往下走:
[INFO] 2021-11-23 11:45:00.829 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[369] - work flow 3 task 3 state:FAILURE
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的taskFinished(TaskInstance task)方法。
接着往下走是:
[INFO] 2021-11-23 11:45:00.829 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[1146] - add task to stand by list: test_shell
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的addTaskToStandByList(TaskInstance taskInstance)方法。
接着往下走:
[INFO] 2021-11-23 11:45:00.829 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread:[376] - failure task will be submitted: process id: 3, task inst
這是src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java的taskFinished(TaskInstance task)方法。
到這里,一次代碼流程的分析就結束了,當然,這個日志是一次失敗的調度流程。失敗的流程也是值得學習的,下一篇我們接着執行正確的流程。