本課主題
- Job Stage 划分算法解密
- Task 最佳位置算法實現解密
引言
作業調度的划分算法以及 Task 的最佳位置的算法,因為 Stage 的划分是DAGScheduler 工作的核心,這也是關系到整個作業有集群中該怎么運行;其次就是數據本地性,Spark 一舨的代碼都是鏈式表達的,這就讓一個任務什么時候划分成 Stage,在大數據世界要追求最大化的數據本地性,所有最大化的數據本地性就是在數據計算的時候,數據就在內存中。最后就是 Spark 的實現算法時候的略的怎么樣。希望這篇文章能為讀者帶出以下的啟發:
- 了解 Stage 的具體是如何划分的
- 了解 數據本地性的最大化
Job Stage 划分算法解密
- Spark Application 中可以因為不同的Action 觸發眾多的Job,也就是一個Application 中可以有很多的Job ,每個Job 是由一個或者多個Stage 構成的,后面的Stage 依賴前面的Stage; 也就是說只有前面的依賴的Stage 計算完畢后,后面的Stage 才會運行;
- Stage 划分的依據就是寬依賴,什么時侯產生寬依賴呢?例如 reduceByKey、groupByKey 等等;
- 由 Action (例如collect) 導致了SparkContext.runJob 最終導致了 DAGScheduler 中的 submitJob 執行。
它會等待作業提交的結果,然后判斷一下成功或者是失敗來進行下一步操作 - 其核心是通過發送一個case class JobSubmitted 對象給 eventProcessLoop
其中JobSubmitted 源碼如下:因為需要創建不同的實例,所以要弄一個case class 而不是case object,case object 一般是以全區唯一的變量去使用。 - 這里開了一條線程,用 post 的方式把消息交在隊例中,由於你把它放在隊例中它就會不斷的循環去拿消息,它轉過來就調用回調方法 onReceive( ),eventProcessLoop 是 一個消息循環器,它是 DAGSchedulerEvent 的具體實例,eventLoop 是一個 Link的blockingQueue。
而DAGSchedulerEventProcessLoop 是 EventLoop 的子類,具體實現 eventLoop 的 onReceive 方法,onReceive方法轉過來回調 doOnReceive( ) - 在 doOnReceive 這個類中有接收 JobSubmitted 的判斷,轉過來調用 handleJobSubmitted 的方法
思考題:為什么要再開一條線程搞一個消息循環器呢?因為有對例你就可以接受多個作業的提交,就是異步處理多 Job,這里背后有一個很重要的理念,就是如果無論是你自己發消息,還是別人發消息,你都采用一個線程去處理的話,這個時候處理的方式就是統一的,你的思路是一致的,這樣你的擴展性就會非常的好,代碼也會很乾凈。
處理 Job 時的過程和邏輯
handleJobSubmitted( ) -->
- 調用 JobSubmitted 的方法,在這里用了一個消息循環器就可以統一對消息進行處理,在 handleJobSubmitted 中首先創建 finalStage,創建 finalStage 時會建立父 Stage 的依賴鏈條,這里是在這個算法里用的數據結構:
如果沒有之前沒有 visited 就把放在 visited 的數據結構中,然后判斷一下它的依賴關系,如果是寬依賴的話就新增一個 Stage
處理 missingParent
- 處理 missingParent
SubmitJob
- submitJob
Task 最佳位置算法實現解密
- 從 submitMissingTask 開始找出它的數據本地算法
- 在具體算算法實現的時候,會首先查詢 DAGScheduler 的內存數據結構中是否存在當前 Partition 的數據本地性的信息,如果有得話就直接返回;如果沒有首先會調用 rdd.getPreferredLocations.例如想讓 Spark 運行在 HBase 上或者一種現在還沒有直接的數據庫上面,此時開發者需要自訂義 RDD,為了保証 Task 數據本地性,最為關鍵的方法就是必需實現 RDD 的 getPreferredLocations
-
DAGScheduler 計算數據本地性的時候,巧妙的借助了RDD 自身的getPreferredLocations 中的數據,最大化的優化了效率,因為getPreferredLocations 中表明了每個Partition 的數據本地性,雖然當前Partition 可能被persists 或者是checkpoint,但是persists 或者是checkpoint默認情況下肯定是和getPreferredLocations 中的數據本地性是一致的,所以這就更大的優化了Task 的數據本地性算法的顯現和效率的優化
總結
參考資料
資料來源來至 DT大數據夢工廠 大數據傳奇行動 第34課:Stage划分和Task最佳位置算法源碼徹底解密
Spark源碼圖片取自於 Spark 1.6.0版本