1.1.1.計算流程
1.1.2. 從代碼構建DAG圖
Spark program
Val lines1 = sc.textFile(inputPath1).map(...).map(...)
Val lines2 = sc.textFile(inputPath2).map(...)
Val lines3 = sc.textFile(inputPath3)
Val dtinone1 = lines2.union(lines3)
Val dtinone = lines1.join(dtinone1)
dtinone.saveAsTextFile(...)
dtinone.filter(...).foreach(...)
Spark的計算發生在RDD的Action操作,而對Action之前的所有Transformation,Spark只是記錄下RDD生成的軌跡,而不會觸發真正的計算。
Spark內核會在需要計算發生的時刻繪制一張關於計算路徑的有向無環圖,也就是DAG。
1.1.3. 將DAG划分為Stage核心算法
Application多個job多個Stage:
Spark Application中可以因為不同的Action觸發眾多的job,一個Application中可以有很多的job,每個job是由一個或者多個Stage構成的,后面的Stage依賴於前面的Stage,也就是說只有前面依賴的Stage計算完畢后,后面的Stage才會運行。
划分依據:
Stage划分的依據就是寬依賴,何時產生寬依賴,reduceByKey, groupByKey等算子,會導致寬依賴的產生。
核心算法:
- 從后往前回溯/反向解析,遇到窄依賴加入本stage,遇見寬依賴進行Stage切分。
- Spark內核會從觸發Action操作的那個RDD開始從后往前推,
- 首先會為最后一個RDD創建一個stage,
- 然后繼續倒推,如果發現對某個RDD是寬依賴,那么就會將寬依賴的那個RDD創建一個新的stage,那個RDD就是新的stage的最后一個RDD。
- 然后依次類推,繼續倒推,根據窄依賴或者寬依賴進行stage的划分,直到所有的RDD全部遍歷完成為止。
1.1.4. 將DAG划分為Stage剖析
從HDFS中讀入數據生成3個不同的RDD,通過一系列transformation操作后再將計算結果保存回HDFS。
可以看到這個DAG中只有join操作是一個寬依賴,Spark內核會以此為邊界將其前后划分成不同的Stage.
同時我們可以注意到,在圖中Stage2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,通過map操作生成的partition可以不用等待整個RDD計算結束,而是繼續進行union操作,這樣大大提高了計算的效率。
1.1.5. 提交Stages
調度階段的提交,最終會被轉換成一個任務集的提交,
DAGScheduler通過TaskScheduler接口提交任務集,
這個任務集最終會觸發TaskScheduler構建一個TaskSetManager的實例來管理這個任務集的生命周期,
對於DAGScheduler來說,提交調度階段的工作到此就完成了。
而TaskScheduler的具體實現則會在得到計算資源的時候,進一步通過TaskSetManager調度具體的任務到對應的Executor節點上進行運算。
1.1.6. 監控Job、Task、Executor
l DAGScheduler監控Job與Task:
要保證相互依賴的作業調度階段能夠得到順利的調度執行,DAGScheduler需要監控當前作業調度階段乃至任務的完成情況。
這通過對外暴露一系列的回調函數來實現的,對於TaskScheduler來說,這些回調函數主要包括任務的開始結束失敗、任務集的失敗,DAGScheduler根據這些任務的生命周期信息進一步維護作業和調度階段的狀態信息。
l DAGScheduler監控Executor的生命狀態:
TaskScheduler通過回調函數通知DAGScheduler具體的Executor的生命狀態,如果某一個Executor崩潰了,則對應的調度階段任務集的ShuffleMapTask的輸出結果也將標志為不可用,這將導致對應任務集狀態的變更,進而重新執行相關計算任務,以獲取丟失的相關數據。
1.1.7. 獲取任務執行結果
l結果DAGScheduler:
一個具體的任務在Executor中執行完畢后,其結果需要以某種形式返回給DAGScheduler,根據任務類型的不同,任務結果的返回方式也不同。
l兩種結果,中間結果與最終結果:
對於FinalStage所對應的任務,返回給DAGScheduler的是運算結果本身,而對於中間調度階段對應的任務ShuffleMapTask,返回給DAGScheduler的是一個MapStatus里的相關存儲信息,而非結果本身,這些存儲位置信息將作為下一個調度階段的任務獲取輸入數據的依據。
l兩種類型,DirectTaskResult與IndirectTaskResult:
根據任務結果大小的不同,ResultTask返回的結果又分為兩類:
-
如果結果足夠小,則直接放在DirectTaskResult對象內中,
-
如果超過特定尺寸則在Executor端會將DirectTaskResult先序列化,再把序列化的結果作為一個數據塊存放在BlockManager中,然后將BlockManager返回的BlockID放在IndirectTaskResult對象中返回給TaskScheduler,TaskScheduler進而調用TaskResultGetter將IndirectTaskResult中的BlockID取出並通過BlockManager最終取得對應的DirectTaskResult。
1.1.8. 任務調度總體詮釋