海豚調度活動圖解析


最近在看dolphinScheduler的源碼,還是很經典的

一、架構圖

 

二、活動圖

三、執行流程

3.1.master分配任務給worker

3.1.1.主線

MasterServer啟動NettyRemotingServer,MasterSchedulerService,QuartzExecutors,ZKMasterClient

MasterSchedulerService構造了ThreadPoolExecutor線程池

ThreadPoolExecutor線程池執行MasterExecThread(對應一個任務流)

MasterExecThread將任務流構造出一張task構成的DAG,並將task寫入PriorityQueue

3.1.2.消費支線-對象初始化后啟動

TaskPriorityQueueConsumer從PriorityQueue獲取task,調用ExecutorDispatcher

ExecutorDispatcher調用NettyExecutorManager

NettyExecutorManager調用NettyRemotingClient.send()發送到worker

3.1.3.調度支線

QuartzExecutors啟動Scheduler

Scheduler接收從API提交的調度任務

3.1.4.DB支線

TaskResponseService初始化后啟動一個TaskResponseWorker線程

TaskResponseWorker負責將eventQueue中的事件進行持久化,並向channel發送一條信息,分兩種情況:

ACK:發送DBTaskAckCommand

Response:發送DBTaskResponseCommand

3.1.5.處理器

master中的NettyRemotingServer有三個processor:task回復、task-確認,task-kill回復

TaskResponseProcessor:將信息更新到eventQueue
TaskAckProcessor:將信息更新到eventQueue
TaskKillResponseProcessor:打印一條日志
3.1.6.競選master

ZKMasterClient

 

3.2.worker接收任務

3.2.1.主線

WorkerServer.run()啟動NettyRemotingServer,WorkerManagerThread,RetryReportTaskStatusThread,WorkerRegistry(將自己注冊到zookeeper)

WorkerManagerThread從workerExecuteQueue獲取TaskExecuteThread並執行

TaskExecuteThread.run()進行AbstractTask.handle()處理,處理完后調用TaskCallbackService.sendResult()中的NettyRemoteChannel發送responseCommand消息給master

3.2.2.執行任務支線

NettyRemotingServer注冊NettyServerHandler

NettyServerHandler.processReceived()處理從Master接收到的command,根據type得到不同的processor

TaskExecuteProcessor執行process()方法,調用TaskCallbackService

TaskCallbackService執行NettyRemoteChannel發送task:ack消息給master

3.2.3.處理器

worker里的NettyRemotingServer有4個processor:task-execute請求、task-kill請求、db-確認、db-回復

TaskExecuteProcessor:初始化taskExecutionContext,創建一條新channel,發送master一條ACK信息,並將command加入到workerExecuteQueue
TaskKillProcessor:殺掉進程,並給master發送一條taskKillResponseCommand
DBTaskAckProcessor:如果taskAckCommand狀態成功,從緩存中刪除對應taskID
DBTaskResponseProcessor:如果taskResponseCommand狀態為成功,從緩存中刪除taskID
3.2.4.殺掉線程請求

TaskKillProcessor.process()調用WorkerManagerThread.killTaskBeforeExecuteByInstanceId()發送responseCommand給master

3.2.5.執行線程請求

TaskExecuteProcessor調用TaskCallbackService.sendAck()中的NettyRemoteChannel發送消息給master

3.2.6.重試線程

RetryReportTaskStatusThread不斷從緩存讀command信息,調用TaskCallbackService.sendResult()/sendAck()發送消息給master

3.2.7.注冊zookeeper

WorkerRegistry將自己注冊到zookeeper

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM