最近在看dolphinScheduler的源碼,還是很經典的
一、架構圖
二、活動圖
3.1.master分配任務給worker
3.1.1.主線
MasterServer啟動NettyRemotingServer,MasterSchedulerService,QuartzExecutors,ZKMasterClient
MasterSchedulerService構造了ThreadPoolExecutor線程池
ThreadPoolExecutor線程池執行MasterExecThread(對應一個任務流)
MasterExecThread將任務流構造出一張task構成的DAG,並將task寫入PriorityQueue
3.1.2.消費支線-對象初始化后啟動
TaskPriorityQueueConsumer從PriorityQueue獲取task,調用ExecutorDispatcher
ExecutorDispatcher調用NettyExecutorManager
NettyExecutorManager調用NettyRemotingClient.send()發送到worker
3.1.3.調度支線
QuartzExecutors啟動Scheduler
Scheduler接收從API提交的調度任務
3.1.4.DB支線
TaskResponseService初始化后啟動一個TaskResponseWorker線程
TaskResponseWorker負責將eventQueue中的事件進行持久化,並向channel發送一條信息,分兩種情況:
ACK:發送DBTaskAckCommand
Response:發送DBTaskResponseCommand
3.1.5.處理器
master中的NettyRemotingServer有三個processor:task回復、task-確認,task-kill回復
TaskResponseProcessor:將信息更新到eventQueue
TaskAckProcessor:將信息更新到eventQueue
TaskKillResponseProcessor:打印一條日志
3.1.6.競選master
ZKMasterClient
3.2.worker接收任務
3.2.1.主線
WorkerServer.run()啟動NettyRemotingServer,WorkerManagerThread,RetryReportTaskStatusThread,WorkerRegistry(將自己注冊到zookeeper)
WorkerManagerThread從workerExecuteQueue獲取TaskExecuteThread並執行
TaskExecuteThread.run()進行AbstractTask.handle()處理,處理完后調用TaskCallbackService.sendResult()中的NettyRemoteChannel發送responseCommand消息給master
3.2.2.執行任務支線
NettyRemotingServer注冊NettyServerHandler
NettyServerHandler.processReceived()處理從Master接收到的command,根據type得到不同的processor
TaskExecuteProcessor執行process()方法,調用TaskCallbackService
TaskCallbackService執行NettyRemoteChannel發送task:ack消息給master
3.2.3.處理器
worker里的NettyRemotingServer有4個processor:task-execute請求、task-kill請求、db-確認、db-回復
TaskExecuteProcessor:初始化taskExecutionContext,創建一條新channel,發送master一條ACK信息,並將command加入到workerExecuteQueue
TaskKillProcessor:殺掉進程,並給master發送一條taskKillResponseCommand
DBTaskAckProcessor:如果taskAckCommand狀態成功,從緩存中刪除對應taskID
DBTaskResponseProcessor:如果taskResponseCommand狀態為成功,從緩存中刪除taskID
3.2.4.殺掉線程請求
TaskKillProcessor.process()調用WorkerManagerThread.killTaskBeforeExecuteByInstanceId()發送responseCommand給master
3.2.5.執行線程請求
TaskExecuteProcessor調用TaskCallbackService.sendAck()中的NettyRemoteChannel發送消息給master
3.2.6.重試線程
RetryReportTaskStatusThread不斷從緩存讀command信息,調用TaskCallbackService.sendResult()/sendAck()發送消息給master
3.2.7.注冊zookeeper
WorkerRegistry將自己注冊到zookeeper