spark DAG 筆記


  • DAG,有向無環圖,Directed Acyclic Graph的縮寫,常用於建模。
  • Spark中使用DAG對RDD的關系進行建模,描述了RDD的依賴關系,這種關系也被稱之為lineage,RDD的依賴關系使用Dependency維護,參考Spark RDD之Dependency,DAG在Spark中的對應的實現為DAGScheduler。
  • DAGScheduler
    • 作業(Job)調用RDD的一個action,如count,即觸發一個Job,spark中對應實現為ActiveJob,DAGScheduler中使用集合activeJobs和jobIdToActiveJob維護Job
    • 調度階段(Stage )   代表一個Job的DAG,會在發生shuffle處被切分,切分后每一個部分即為一個Stage,Stage實現分為ShuffleMapStage和ResultStage,一個Job切分的結果是0個或多個ShuffleMapStage加一個ResultStage, 

    • 任務(Task )   最終被發送到Executor執行的任務,和stage的ShuffleMapStage和ResultStage對應,其實現分為ShuffleMapTask和ResultTask

  • DAG中每個節點是一個RDD

  • RDD依賴關系
    • 窄依賴 Narrow Dependency:
      • 從父RDD角度看:一個父RDD只被一個子RDD分區使用。父RDD的每個分區最多只能被一個Child RDD的一個分區使用
      • 從子RDD角度看:  依賴上級RDD的部分分區,精確知道依賴的上級RDD分區,會選擇和自己在同一節點的上級RDD分區,沒有網絡IO開銷,高效。如map,flatmap,filter
    • 寬依賴 Shffule Dependency:
      • 從父RDD角度看:一個父RDD被多個子RDD分區使用。父RDD的每個分區可以被多個Child RDD分區依賴

      • 從子RDD角度看:依賴上級RDD的所有分區     無法精確定位依賴的上級RDD分區,相當於依賴所有分區(例如reduceByKey)  計算就涉及到節點間網絡傳輸
      • 需要shuffle
    • 窄依賴可以支持在同一個集群Executor上,以pipeline管道形式順序執行多條命令,例如在執行了map后,緊接着執行filter。分區內的計算收斂,不需要依賴所有分區的數據,可以並行地在不同節點進行計算。所以它的失敗恢復也更有效,因為它只需要重新計算丟失的parent partition即可。

    • 寬依賴需要所有的父分區都是可用的,必須等RDD的parent partition數據全部ready之后才能開始計算,可能還需要調用類似MapReduce之類的操作進行跨節點傳遞。從失敗恢復的角度看,寬依賴牽涉RDD各級的多個parent partition。

  • 划分stage
    • 由於寬依賴必須等RDD的parent RDD partition數據全部ready之后才能開始計算,因此spark的設計是讓parent RDD將結果寫在本地,完全寫完之后,通知后面的RDD。后面的RDD則首先去讀之前的本地數據作為input,然后進行運算。
    • 由於上述特性,將shuffle依賴就必須分為兩個階段(stage)去做
      • 第一個階段(stage)需要把結果shuffle到本地,例如reduceByKey,首先要聚合某個key的所有記錄,才能進行下一步的reduce計算,這個匯聚的過程就是shuffle
      • 第二個階段(stage)則讀入數據進行處理
  • 對於transformation操作,以寬依賴為分隔,分為不同的Stages。

    窄依賴------>tasks會歸並在同一個stage中,(相同節點上的task運算可以像pipeline一樣順序執行,不同節點並行計算,互不影響)

    寬依賴------>前后拆分為兩個stage,前一個stage寫完文件后下一個stage才能開始

    action操作------>和其他tasks會歸並在同一個stage(在沒有shuffle依賴的情況下,生成默認的stage,保證至少一個stage)。

  • job划分原則

     

  • 每個action函數內會調用runJob,進而調用submitJob,所以每個action會觸發一個job。

    job間按順序執行,待前一個job完全成功,才能執行下一個job,所有job執行成功后,本application執行完成

  • DAG划分:
    • 各個RDD之間存在着依賴關系,這些依賴關系形成有向無環圖DAG,DAGScheduler對這些依賴關系形成的DAG,進行Stage划分,划分的規則很簡單,從后往前回溯,遇到窄依賴加入本stage,遇見寬依賴進行Stage切分。完成了Stage的划分,DAGScheduler基於每個Stage生成TaskSet,並將TaskSet提交給TaskScheduler。TaskScheduler 負責具體的task調度,在Worker節點上啟動task。
    • 當RDD觸發一個Action操作(如:colllect)后,導致SparkContext.runJob的執行。而在SparkContext的run方法中會調用DAGScheduler的run方法最終調用了DAGScheduler的submit方法:
    • 設計:盡量多設計窄依賴,減少寬依賴。最大化本地化處理優勢,減少網絡IO.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM