通過前面的文章了解到
Driver將用戶代碼轉換成streamGraph再轉換成Jobgraph后向Jobmanager端提交
JobManager啟動以后會在Dispatcher.java起來RPC方法submitJob(jobGraph),用於接收來自Driver轉化得到的JobGraph來啟動任務
具體來看jobGraph提交到JobManager的submitJob方法
前面都是一些調用鏈沒有什么好講的,最后到createJobManager( )方法這里
先看一下1,創建了一個jobmanagerRunner並且將中Driver端得到的JobGraph傳遞了進去
在創建JobManagerRunner的過程中它調用了
這里主要是為了創建一個jobMaster,在jobMaster的構造方法中
在這里它先是create傳入了jobgraph然后又通過createAndRestoreExecutionGraph()方法轉換得到executionGraph
這個executionGraph就可以用來調度啟動任務了
具體看一下他的轉化邏輯
可以看到它從createExecutionGraph方法中得到了executionGraph
並且通過getCheckpointCoordinator()方法得到了一個coordinator(主要是用於周期性觸發checkpoint,調用對應TaskManager的rpc生成barriers往下游發送)
繼續看一下他的轉化邏輯
在createExecutionGraph中通過ExecutionGraphBuilder.buildGraph()返回了一個executionGraph
在buildGraph()方法中
創建了一個executionGraph
為executionGraph設置一些基礎信息,包括調度方式等(這里stream是eager的調度方法)
然后
1處得到了一個的拓撲圖包含了所有jobGraph的所有jobVertex節點
2處就是具體遍歷所有jobGraph的jobVertex生成executionGraph的頂點ExecutionJobVertex
遍歷所有jobGraph的頂點jobVertex
在這里就具體生成了ExecutionJobVertex中的每一個ExecutionVertex[] taskVertices
當然這里還會配置很多ExecutionGraph的信息,就不一一列舉了
配置了一些ExecutionGraph的屬性以后
調用了
可以看到我的注釋,就是說這個地方其實是和coordinator的創建有關,在這個方法中
創建了一個coordinator對象
在這里注冊了一個JobStatus的監聽
來看一下這個監聽的作用
可以看到源碼上的注解就是說用於監聽job狀態的改變,具體監聽
看到這里就非常明顯了
當監聽到jobstutes的狀態改變時
當jobstatus變成Running時調用了coordinator的.startCheckpointScheduler()方法其中
這里可以看到創建了一個周期的調度線程
看下線程的run方法
這里就真相大白了,調用了triggerCheckpoint方法觸發一次checkpoint(觸發checkpoint的邏輯以后隨緣更新到再講)
注意,前面說到只是注冊了一個監聽,也就是說這個coordinator現在其實還沒有啟動起來的!!要到監聽到jobStatus變成running才會啟動
回到最開始的這里
1處轉化成executionGraph以后
2處具體看一下這個startJobManagerRunner()方法
把jobManager啟動了起來
在其中
啟動了這個jobMasterService
在這里開啟了jobmaster的一些RPC,像什么cancel job的stop job 的還有register TM的
然后startJobExecution()方法中
這里其實會向jobManager中啟動的resourceManager的RPC請求solt信息初始化自己的的soltPool這里不細講了,我還沒有研究
后面
這個地方就是修改job狀態和調度運行了
其中調用了scheduleExecutionGraph(),在其中又調用了
這個地方比較重要,在其中先
這里它就通過CAS修改了jobStatue從Created變成了Running
修改完了以后還沒完,還通過這個方法notifyJobStatusChange(),這個方法里面具體看一看
他遍歷了所有的listener,也就是說會觸發我們前面注冊的那個coordinator的監聽監聽到job狀態改變為running
這里coordinator就啟動完成了
繼續往下,在修改完job狀態以后
因為流模式這里是用的EAGER,flink批處理我不熟這里就不展開了
在這個schduleEager方法中
然后
看到這里它創建了一個TaskDeploymentDescriptor一個用於調度TaskManager端任務的tdd對象
看過前面幾篇博客的同學,就應該有印象了,在TaskManager啟動會啟動很多的RPC接口
其中有一個
一目了然了,這個東西是用來發送給TaskManager用於啟動TaskManager端任務的!!!!
到這里jobManager端的job啟動任務就差不多完成了
接下來就是TaskManager端的任務了,隨緣更新的時候在說一下真正TaskManager節點是如何啟動我們job任務的