Flink的Job啟動Driver端(源碼分析)


整個Flink的Job啟動是通過在Driver端通過用戶的Envirement的execute()方法將用戶的算子轉化成StreamGraph

然后得到JobGraph通過遠程RPC將這個JobGraph提交到JobManager對應的接口

JobManager轉化成executionGraph.deploy(),然后生成TDD發給TaskManager,然后整個Job就啟動起來了

這里來看一下Driver端的實現從用戶的Envirement.execute()方法作為入口

這里的Envirement分為

RemoteStreamEnvironment

LocalStreamEnvironment

因為local模式比較簡單這里就不展開了,主要是看下RemoteStreamEnvironment的execute方法

可以看到這里先獲取到了streamGraph,具體獲取的實現

這里傳入了一個transformations其中就包含了我們用戶的所有operator

這個地方就是遍歷了用戶端所有的operator生成StreamGraph,遍歷的每一個算子具體轉化成streamGraph的邏輯

1處會遞歸遍歷input直到input已經transfor,然后拿到了上游的ids

然后將operator加入到了streamGraph中調用addNode()方法將operator作為一個node,包含了一些信息,上下游的類型,並行度,soltGroup

最后遍歷上游的ids,創建邊添加到streamGraph

到這里streamGraph就創建完成了

回到最開始的地方,創建完streamGraph以后,會將streamGraph傳入executeRemotely(streamGraph, jarFiles)這個方法,這里就是streamGraph轉化成jobgraph的邏輯

其中創建了一個RestClusterClient

可以看到這里,通過getJobGraph方法將streamGraph轉換成了jobgraph

然后就submitJob將這個JobGraph提交Jobmanager了

先看一下streamGraph如何轉化成jobgraph的

通過getJobGraph方法然后

這個createJobGraph方法是主要的轉化邏輯

廣度優先遍歷為所有streamGraph的node 即operator生成hash散列值,為什么要生成這個operator的hash?

因為這個hash需要作為每一個operator的唯一標示,標示每一個operator用於cp的恢復,當用戶代碼沒有修改時,這個hash值是不會改變的

接下來

這里會將flink中上下游的operator操作根據是否滿足chain條件鏈在一起,在createChian中

這個isChainable()方法就是是否可以chain的判斷條件

1.下游的輸入邊只有一條

2.下游操作operator不為空

3.上游操作operator不為空

4.上游必須有相同的solt組

5.下游chain策略為always

6.上游chain策略為head或上游chain策略為always

7.forwardpartition的邊

8.上下游並行度相同

9.用戶代碼設置的operator是否可以chian

 

將可以chain的streamnode 鏈在一起以后就可以創建成為jobGraph的jobVertex了

然后通過RestClusterClient會將這個jobGraph往jobmanager的Dispatcher對應的RPC接口上面發送

整個job的啟動Driver端的任務就結束了

 

總結:

  在Driver端用戶的算子會被創建成為streamGraph,其中包含了一些邊,角,上下游類型,並行度等一些信息

  然后將streamGraph通過一些chain條件將可以chain的頂點chain在了一起轉化成了JobGraph

  streamEdge變成了jobEdge,chain在一起的streamnode變成了jobVertex

  最后然后通過RPC將整個jobGraph向jobmanager提交。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM