DAG有向無環圖生成
DAG是什么
DAG(Directed Acyclic Graph) 叫做有向無環圖(有方向,無閉環,代表着數據的流向),原始的RDD通過一系列的轉換就形成了DAG。
下圖是基於單詞統計邏輯得到的DAG有向無環圖
DAG划分stage(★★★★★)
stage是什么
一個Job會被拆分為多組Task,每組任務被稱為一個stage
stage表示不同的調度階段,一個spark job會對應產生很多個stage
stage類型一共有2種
- ShuffleMapStage
- 最后一個shuffle之前的所有變換的Stage叫ShuffleMapStage
- 它對應的task是shuffleMapTask
- ResultStage
- 最后一個shuffle之后操作的Stage叫ResultStage,它是最后一個Stage。
- 它對應的task是ResultTask
為什么要划分stage
根據RDD之間依賴關系的不同將DAG划分成不同的Stage(調度階段)
- 對於窄依賴,partition的轉換處理在一個Stage中完成計算
- 對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來的計算,
由於划分完stage之后,在同一個stage中只有窄依賴,沒有寬依賴,可以實現流水線計算,
stage中的每一個分區對應一個task,在同一個stage中就有很多可以並行運行的task。
如何划分stage
划分stage的依據就是寬依賴
划分流程:
(1) 首先根據rdd的算子操作順序生成DAG有向無環圖,接下里從最后一個rdd往前推,創建一個新的stage,把該rdd加入到該stage中,它是最后一個stage。
(2) 在往前推的過程中運行遇到了窄依賴就把該rdd加入到本stage中,如果遇到了寬依賴,就從寬依賴切開,那么最后一個stage也就結束了。
(3) 重新創建一個新的stage,按照第二個步驟繼續往前推,一直到最開始的rdd,整個划分stage也就結束了
stage與stage之間的關系
划分完stage之后,每一個stage中有很多可以並行運行的task,后期把每一個stage中的task封裝在一個taskSet集合中,最后把一個一個的taskSet集合提交到worker節點上的executor進程中運行。
rdd與rdd之間存在依賴關系,stage與stage之前也存在依賴關系,前面stage中的task先運行,運行完成了再運行后面stage中的task,也就是說后面stage中的task輸入數據是前面stage中task的輸出結果數據。
spark的任務調度
(1) Driver端運行客戶端的main方法,構建SparkContext對象,在SparkContext對象內部依次構建DAGScheduler和TaskScheduler
(2) 按照rdd的一系列操作順序,來生成DAG有向無環圖
(3) DAGScheduler拿到DAG有向無環圖之后,按照寬依賴進行stage的划分。每一個stage內部有很多可以並行運行的task,最后封裝在一個一個的taskSet集合中,然后把taskSet發送給TaskScheduler
(4) TaskScheduler得到taskSet集合之后,依次遍歷取出每一個task提交到worker節點上的executor進程中運行。
(5) 所有task運行完成,整個任務也就結束了
spark的運行架構
(1) Driver端向資源管理器Master發送注冊和申請計算資源的請求
(2) Master通知對應的worker節點啟動executor進程(計算資源)
(3) executor進程向Driver端發送注冊並且申請task請求
(4) Driver端運行客戶端的main方法,構建SparkContext對象,在SparkContext對象內部依次構建DAGScheduler和TaskScheduler
(5) 按照客戶端代碼洪rdd的一系列操作順序,生成DAG有向無環圖
(6) DAGScheduler拿到DAG有向無環圖之后,按照寬依賴進行stage的划分。每一個stage內部有很多可以並行運行的task,最后封裝在一個一個的taskSet集合中,然后把taskSet發送給TaskScheduler
(7) TaskScheduler得到taskSet集合之后,依次遍歷取出每一個task提交到worker節點上的executor進程中運行
(8) 所有task運行完成,Driver端向Master發送注銷請求,Master通知Worker關閉executor進程,Worker上的計算資源得到釋放,最后整個任務也就結束了。