[Spark內核] 第34課:Stage划分和Task最佳位置算法源碼徹底解密


本課主題

  • Job Stage 划分算法解密
  • Task 最佳位置算法實現解密

 

引言

作業調度的划分算法以及 Task 的最佳位置的算法,因為 Stage 的划分是DAGScheduler 工作的核心,這也是關系到整個作業有集群中該怎么運行;其次就是數據本地性,Spark 一舨的代碼都是鏈式表達的,這就讓一個任務什么時候划分成 Stage,在大數據世界要追求最大化的數據本地性,所有最大化的數據本地性就是在數據計算的時候,數據就在內存中。最后就是 Spark 的實現算法時候的略的怎么樣。希望這篇文章能為讀者帶出以下的啟發:

  • 了解 Stage 的具體是如何划分的
  • 了解 數據本地性的最大化

 

Job Stage 划分算法解密

  1. Spark Application 中可以因為不同的Action 觸發眾多的Job,也就是一個Application 中可以有很多的Job ,每個Job 是由一個或者多個Stage 構成的,后面的Stage 依賴前面的Stage; 也就是說只有前面的依賴的Stage 計算完畢后,后面的Stage 才會運行;

  2. Stage 划分的依據就是寬依賴什么時侯產生寬依賴呢?例如 reduceByKey、groupByKey 等等;
  3. 由 Action (例如collect) 導致了SparkContext.runJob 最終導致了 DAGScheduler 中的 submitJob 執行。





    它會等待作業提交的結果,然后判斷一下成功或者是失敗來進行下一步操作

  4. 其核心是通過發送一個case class JobSubmitted 對象給 eventProcessLoop

    其中JobSubmitted 源碼如下:因為需要創建不同的實例,所以要弄一個case class 而不是case object,case object 一般是以全區唯一的變量去使用。
  5. 這里開了一條線程,用 post 的方式把消息交在隊例中,由於你把它放在隊例中它就會不斷的循環去拿消息,它轉過來就調用回調方法 onReceive( ),eventProcessLoop 是 一個消息循環器,它是 DAGSchedulerEvent 的具體實例,eventLoop 是一個 Link的blockingQueue。
      
    而DAGSchedulerEventProcessLoop 是 EventLoop 的子類,具體實現 eventLoop 的 onReceive 方法,onReceive方法轉過來回調 doOnReceive( )

  6. 在 doOnReceive 這個類中有接收 JobSubmitted 的判斷,轉過來調用 handleJobSubmitted 的方法

    思考題:為什么要再開一條線程搞一個消息循環器呢?因為有對例你就可以接受多個作業的提交,就是異步處理多 Job,這里背后有一個很重要的理念,就是如果無論是你自己發消息,還是別人發消息,你都采用一個線程去處理的話,這個時候處理的方式就是統一的,你的思路是一致的,這樣你的擴展性就會非常的好,代碼也會很乾凈。

處理 Job 時的過程和邏輯

handleJobSubmitted( ) --> 

  1. 調用 JobSubmitted 的方法,在這里用了一個消息循環器就可以統一對消息進行處理,在 handleJobSubmitted 中首先創建 finalStage,創建 finalStage 時會建立父 Stage 的依賴鏈條,這里是在這個算法里用的數據結構:




    如果沒有之前沒有 visited 就把放在 visited 的數據結構中,然后判斷一下它的依賴關系,如果是寬依賴的話就新增一個 Stage


處理 missingParent

  1. 處理 missingParent

SubmitJob

  1. submitJob

  

Task 最佳位置算法實現解密

  1. 從 submitMissingTask 開始找出它的數據本地算法
     
  2. 在具體算算法實現的時候,會首先查詢 DAGScheduler 的內存數據結構中是否存在當前 Partition 的數據本地性的信息,如果有得話就直接返回;如果沒有首先會調用 rdd.getPreferredLocations.例如想讓 Spark 運行在 HBase 上或者一種現在還沒有直接的數據庫上面,此時開發者需要自訂義 RDD,為了保証 Task 數據本地性,最為關鍵的方法就是必需實現 RDD 的 getPreferredLocations
  3. DAGScheduler 計算數據本地性的時候,巧妙的借助了RDD 自身的getPreferredLocations 中的數據,最大化的優化了效率,因為getPreferredLocations 中表明了每個Partition 的數據本地性,雖然當前Partition 可能被persists 或者是checkpoint,但是persists 或者是checkpoint默認情況下肯定是和getPreferredLocations 中的數據本地性是一致的,所以這就更大的優化了Task 的數據本地性算法的顯現和效率的優化

 

 

總結

 

 
 
 

參考資料 

資料來源來至 DT大數據夢工廠 大數據傳奇行動 第34課:Stage划分和Task最佳位置算法源碼徹底解密

Spark源碼圖片取自於 Spark 1.6.0版本

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM