本課主題
- Spark Worker 原理
- Worker 啟動 Driver 源碼鑒賞
- Worker 啟動 Executor 源碼鑒賞
- Worker 與 Master 的交互關系
[引言部份:你希望讀者看完這篇博客后有那些啟發、學到什么樣的知識點]
更新中......
Spark Worker 原理圖
Worker 啟動 Driver 源碼鑒賞
- 因為 Worker 中有消息的循環體,可以用來接收消息,接上一章介紹當 Master 把一個 LaunchDriver 發送到 Worker 的時候,Worker 接收這個 LaunchDriver 然后創建一個新的 DriverRunner 實例,我們這里重點研究 LaunchDriver,當啟動 Driver 或者是 Executor 的時候,它必需是滿足內存的要求的。當實際上不一定會滿足 Core 的要求的,也就是說實際分配的 Core 可能比你期待的 Core 多、也有可能比它少 (為什么呢?)
在這里首先創建一個 DriverRunner 的實例對象,然后把實例交給 drivers 數據結構 (HashMap[String, DriverRunner]) 來保存信息,這個數據結構很重要,因為在 Worker 下可能啟動很多不同的 Executor,你可以理解 DriverRunner 為Driver 進程本身的一個proxy [代理模式],調用它的start( ) 方法並記錄一下 coreUsed 和 memoryUsed 的數據。
Cluster 中的 Driver 失敗的時候,如果 Supervise 為 true,則啟動該 Driver 的Worker 會負責重新啟動該 Driver; - 在start( )的方法中會創建一個新的進程;具體代碼運行順序:new Thread( ) --> 創建一個本地目錄和下載相關的 Jar包 --> launchDriver( ) --> 判斷並收集它的狀態 --> 再發送給 Worker 一個狀態變化的消息。補充說明:Executor 和 ExecutorBackend 是一對一的關系,一個ExecutorBackend進程里面有一個Executor,而在Executor內部它是通過線程池並發處理的方式來處理我們 Spark 提交過來的 Task。Executor 啟動后需要向 Driver 注冊,具體是注冊給 SparkDeploySchedulerBackend實例。
在本地創建了的一個工作目錄
從 HDFS 上獲取相關的依賴包 Jar 到本地,因為你提交程序的時候是提交給Spark集群的。 - Worker 是實現RPC通信的,否則別人無法給你發消息的,可以初步看一下類的說明,你會發現它是繼承著 ThreadRpcEndPoint (在這里先不深入探討 RpcEndPoint 的機制,如果想了解可以看點擊這篇博客)
- 通過Command PrcoessBuilder
- 啟動Driver e.g. launchDriver
DriverRunner 啟動進程是通過 ProcessBuilder 中的 process.get.waitFor 來完成的。
當Driver 的狀態改變的時候
- Worker 接收 DriverStatedChanged 信息
然后把信息發送給Master
Master 收到 Worker 發送過來的 Driver狀態信息
- 判斷Driver的狀態然后把 ERROR、FINISHED、KILLED 和 FAILED 的 driver 刪取掉
Worker 啟動 Executor 源碼鑒賞
- Worker 收到 LaunchExecutor 的信息
當Executor 的狀態改變的時候
- 向 Worker 發送一個 ExecutorStatedChange 的信息
在Worker 中收到這個信息后調用 handleExecutorStateChanged 方法
Master 收到 Worker 發送過來的 Executor 狀態信息
- Master 收到 Worker 的發送的 ExecutorStateChanged 信息
Worker 與 Master 的交互關系
[總結部份]
更新中......
參考資料
資料來源來至 DT大數據夢工廠 大數據傳奇行動 第32課:Spark Worker原理和源碼剖析解密:Worker工作流程圖、Worker啟動Driver源碼解密、Worker啟動Executor源碼解密等
Spark源碼圖片取自於 Spark 1.6.0版本