- DAG,有向無環圖,Directed Acyclic Graph的縮寫,常用於建模。
- Spark中使用DAG對RDD的關系進行建模,描述了RDD的依賴關系,這種關系也被稱之為lineage,RDD的依賴關系使用Dependency維護,參考Spark RDD之Dependency,DAG在Spark中的對應的實現為DAGScheduler。
- DAGScheduler
- 作業(Job)調用RDD的一個action,如count,即觸發一個Job,spark中對應實現為ActiveJob,DAGScheduler中使用集合activeJobs和jobIdToActiveJob維護Job
-
調度階段(Stage ) 代表一個Job的DAG,會在發生shuffle處被切分,切分后每一個部分即為一個Stage,Stage實現分為ShuffleMapStage和ResultStage,一個Job切分的結果是0個或多個ShuffleMapStage加一個ResultStage,
-
任務(Task ) 最終被發送到Executor執行的任務,和stage的ShuffleMapStage和ResultStage對應,其實現分為ShuffleMapTask和ResultTask
-
DAG中每個節點是一個RDD
- RDD依賴關系
- 窄依賴 Narrow Dependency:
- 從父RDD角度看:一個父RDD只被一個子RDD分區使用。父RDD的每個分區最多只能被一個Child RDD的一個分區使用
- 從子RDD角度看: 依賴上級RDD的部分分區,精確知道依賴的上級RDD分區,會選擇和自己在同一節點的上級RDD分區,沒有網絡IO開銷,高效。如map,flatmap,filter
寬依賴 Shffule Dependency:
-
從父RDD角度看:一個父RDD被多個子RDD分區使用。父RDD的每個分區可以被多個Child RDD分區依賴
- 從子RDD角度看:依賴上級RDD的所有分區 無法精確定位依賴的上級RDD分區,相當於依賴所有分區(例如reduceByKey) 計算就涉及到節點間網絡傳輸
- 需要shuffle
-
窄依賴可以支持在同一個集群Executor上,以pipeline管道形式順序執行多條命令,例如在執行了map后,緊接着執行filter。分區內的計算收斂,不需要依賴所有分區的數據,可以並行地在不同節點進行計算。所以它的失敗恢復也更有效,因為它只需要重新計算丟失的parent partition即可。
-
寬依賴需要所有的父分區都是可用的,必須等RDD的parent partition數據全部ready之后才能開始計算,可能還需要調用類似MapReduce之類的操作進行跨節點傳遞。從失敗恢復的角度看,寬依賴牽涉RDD各級的多個parent partition。
- 窄依賴 Narrow Dependency:
- 划分stage
- 由於寬依賴必須等RDD的parent RDD partition數據全部ready之后才能開始計算,因此spark的設計是讓parent RDD將結果寫在本地,完全寫完之后,通知后面的RDD。后面的RDD則首先去讀之前的本地數據作為input,然后進行運算。
- 由於上述特性,將shuffle依賴就必須分為兩個階段(stage)去做
- 第一個階段(stage)需要把結果shuffle到本地,例如reduceByKey,首先要聚合某個key的所有記錄,才能進行下一步的reduce計算,這個匯聚的過程就是shuffle
- 第二個階段(stage)則讀入數據進行處理
-
對於transformation操作,以寬依賴為分隔,分為不同的Stages。
窄依賴------>tasks會歸並在同一個stage中,(相同節點上的task運算可以像pipeline一樣順序執行,不同節點並行計算,互不影響)
寬依賴------>前后拆分為兩個stage,前一個stage寫完文件后下一個stage才能開始
action操作------>和其他tasks會歸並在同一個stage(在沒有shuffle依賴的情況下,生成默認的stage,保證至少一個stage)。
-
job划分原則
-
每個action函數內會調用runJob,進而調用submitJob,所以每個action會觸發一個job。
job間按順序執行,待前一個job完全成功,才能執行下一個job,所有job執行成功后,本application執行完成
- DAG划分:
- 各個RDD之間存在着依賴關系,這些依賴關系形成有向無環圖DAG,DAGScheduler對這些依賴關系形成的DAG,進行Stage划分,划分的規則很簡單,從后往前回溯,遇到窄依賴加入本stage,遇見寬依賴進行Stage切分。完成了Stage的划分,DAGScheduler基於每個Stage生成TaskSet,並將TaskSet提交給TaskScheduler。TaskScheduler 負責具體的task調度,在Worker節點上啟動task。
- 當RDD觸發一個Action操作(如:colllect)后,導致SparkContext.runJob的執行。而在SparkContext的run方法中會調用DAGScheduler的run方法最終調用了DAGScheduler的submit方法:
- 設計:盡量多設計窄依賴,減少寬依賴。最大化本地化處理優勢,減少網絡IO.