Flink的Job啟動JobManager端(源碼分析)


通過前面的文章了解到

Driver將用戶代碼轉換成streamGraph再轉換成Jobgraph后向Jobmanager端提交

JobManager啟動以后會在Dispatcher.java起來RPC方法submitJob(jobGraph),用於接收來自Driver轉化得到的JobGraph來啟動任務

具體來看jobGraph提交到JobManager的submitJob方法

前面都是一些調用鏈沒有什么好講的,最后到createJobManager( )方法這里

先看一下1,創建了一個jobmanagerRunner並且將中Driver端得到的JobGraph傳遞了進去

 在創建JobManagerRunner的過程中它調用了

這里主要是為了創建一個jobMaster,在jobMaster的構造方法中

在這里它先是create傳入了jobgraph然后又通過createAndRestoreExecutionGraph()方法轉換得到executionGraph

這個executionGraph就可以用來調度啟動任務了

具體看一下他的轉化邏輯

可以看到它從createExecutionGraph方法中得到了executionGraph

並且通過getCheckpointCoordinator()方法得到了一個coordinator(主要是用於周期性觸發checkpoint,調用對應TaskManager的rpc生成barriers往下游發送)

繼續看一下他的轉化邏輯

在createExecutionGraph中通過ExecutionGraphBuilder.buildGraph()返回了一個executionGraph

在buildGraph()方法中

創建了一個executionGraph

 

為executionGraph設置一些基礎信息,包括調度方式等(這里stream是eager的調度方法)

然后

1處得到了一個的拓撲圖包含了所有jobGraph的所有jobVertex節點

2處就是具體遍歷所有jobGraph的jobVertex生成executionGraph的頂點ExecutionJobVertex

遍歷所有jobGraph的頂點jobVertex

在這里就具體生成了ExecutionJobVertex中的每一個ExecutionVertex[] taskVertices

當然這里還會配置很多ExecutionGraph的信息,就不一一列舉了

配置了一些ExecutionGraph的屬性以后

調用了

可以看到我的注釋,就是說這個地方其實是和coordinator的創建有關,在這個方法中

創建了一個coordinator對象

在這里注冊了一個JobStatus的監聽

來看一下這個監聽的作用

可以看到源碼上的注解就是說用於監聽job狀態的改變,具體監聽

看到這里就非常明顯了

當監聽到jobstutes的狀態改變時

當jobstatus變成Running時調用了coordinator的.startCheckpointScheduler()方法其中

這里可以看到創建了一個周期的調度線程

看下線程的run方法

這里就真相大白了,調用了triggerCheckpoint方法觸發一次checkpoint(觸發checkpoint的邏輯以后隨緣更新到再講)

注意,前面說到只是注冊了一個監聽,也就是說這個coordinator現在其實還沒有啟動起來的!!要到監聽到jobStatus變成running才會啟動

回到最開始的這里

1處轉化成executionGraph以后

2處具體看一下這個startJobManagerRunner()方法

把jobManager啟動了起來

 

在其中

啟動了這個jobMasterService

在這里開啟了jobmaster的一些RPC,像什么cancel job的stop job 的還有register TM的

然后startJobExecution()方法中

這里其實會向jobManager中啟動的resourceManager的RPC請求solt信息初始化自己的的soltPool這里不細講了,我還沒有研究

后面

這個地方就是修改job狀態和調度運行了

其中調用了scheduleExecutionGraph(),在其中又調用了

這個地方比較重要,在其中先

這里它就通過CAS修改了jobStatue從Created變成了Running

修改完了以后還沒完,還通過這個方法notifyJobStatusChange(),這個方法里面具體看一看

他遍歷了所有的listener,也就是說會觸發我們前面注冊的那個coordinator的監聽監聽到job狀態改變為running

這里coordinator就啟動完成了

繼續往下,在修改完job狀態以后

因為流模式這里是用的EAGER,flink批處理我不熟這里就不展開了

在這個schduleEager方法中

然后

看到這里它創建了一個TaskDeploymentDescriptor一個用於調度TaskManager端任務的tdd對象

看過前面幾篇博客的同學,就應該有印象了,在TaskManager啟動會啟動很多的RPC接口

其中有一個

一目了然了,這個東西是用來發送給TaskManager用於啟動TaskManager端任務的!!!!

到這里jobManager端的job啟動任務就差不多完成了

接下來就是TaskManager端的任務了,隨緣更新的時候在說一下真正TaskManager節點是如何啟動我們job任務的


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM