4.Spark 任務調度機制
在工廠環境下,Spark 集群的部署方式一般為 YARN-Cluster 模式,之后的內核
分析內容中我們默認集群的部署方式為 YARN-Cluster 模式。
4.1 Spark 任務提交流程
在上一章中我們講解了 Spark YARN-Cluster 模式下的任務提交流程,
如下圖所示:

下面的時序圖清晰地說明了一個 Spark 應用程序從提交到運行的完整流程:

提交一個 Spark 應用程序,首先通過 Client 向 ResourceManager 請求啟動一個
Application,同時檢查是否有足夠的資源滿足 Application 的需求,如果資源條件滿
足,則准備 ApplicationMaster 的啟動上下文,交給 ResourceManager,並循環監控
Application 狀態。
當提交的資源隊列中有資源時,ResourceManager 會在某個 NodeManager 上啟
動 ApplicationMaster 進程,ApplicationMaster 會單獨啟動 Driver 后台線程,當 Driver
啟動后,ApplicationMaster 會通過本地的 RPC 連接 Driver,並開始向 ResourceManager
申請 Container 資源運行 Executor 進程(一個 Executor 對應與一個 Container),當
ResourceManager 返回 Container 資源,ApplicationMaster 則在對應的 Container 上啟
動 Executor。
Driver 線程主要是初始化 SparkContext 對象,准備運行所需的上下文,然后一
方面保持與 ApplicationMaster 的 RPC 連接,通過 ApplicationMaster 申請資源,另一
方面根據用戶業務邏輯開始調度任務,將任務下發到已有的空閑 Executor 上。
當 ResourceManager 向 ApplicationMaster 返 回 Container 資源時,
ApplicationMaster 就嘗試在對應的 Container 上啟動 Executor 進程,Executor 進程起
來后,會向 Driver 反向注冊,注冊成功后保持與 Driver 的心跳,同時等待 Driver
分發任務,當分發的任務執行完畢后,將任務狀態上報給 Driver。
從上述時序圖可知,Client 只負責提交 Application 並監控 Application 的狀態。
對於 Spark 的任務調度主要是集中在兩個方面: 資源申請和任務分發,其主要是通
過 ApplicationMaster、Driver 以及 Executor 之間來完成。
4.2 Spark 任務調度概述
當 Driver 起來后,Driver 則會根據用戶程序邏輯准備任務,並根據 Executor 資
源情況逐步分發任務。在詳細闡述任務調度前,首先說明下 Spark 里的幾個概念。
一個 Spark 應用程序包括 Job、Stage 以及 Task 三個概念:
Job 是以 Action 方法為界,遇到一個 Action 方法則觸發一個 Job;
Stage 是 Job 的子集,以 RDD 寬依賴(即 Shuffle)為界,遇到 Shuffle 做一次
划分;
Task 是 Stage 的子集,以並行度(分區數)來衡量,分區數是多少,則有多少
個 task。
Spark 的任務調度總體來說分兩路進行,一路是 Stage 級的調度,一路是 Task
級的調度,總體調度流程如下圖所示:

Spark RDD 通過其 Transactions 操作,形成了 RDD 血緣關系圖,即 DAG,最后
通過 Action 的調用,觸發 Job 並調度執行。
DAGScheduler 負責 Stage 級的調度,
主要是將 DAG 切分成若干 Stages,並將每個 Stage 打包成 TaskSet 交給 TaskScheduler
調度。
TaskScheduler 負責 Task 級的調度,將 DAGScheduler 給過來的 TaskSet 按照
指定的調度策略分發到 Executor 上執行,調度過程中 SchedulerBackend 負責提供可
用資源,其中
SchedulerBackend 有多種實現,分別對接不同的資源管理系統。有了
上述感性的認識后,下面這張圖描述了 Spark-On-Yarn 模式下在任務調度期間,
ApplicationMaster、Driver 以及 Executor 內部模塊的交互過程:

Driver 初始化 SparkContext 過 程 中 , 會 分 別 初 始 化 DAGScheduler 、
TaskScheduler、SchedulerBackend 以及 HeartbeatReceiver,並啟動 SchedulerBackend
以及 HeartbeatReceiver。SchedulerBackend 通過 ApplicationMaster 申請資源,並不
斷從 TaskScheduler 中拿到合適的 Task 分發到 Executor 執行。HeartbeatReceiver 負
責接收 Executor 的心跳信息,監控 Executor 的存活狀況,並通知到 TaskScheduler。
4.3 Spark Stage 級調度
Spark 的任務調度是從 DAG 切割開始,主要是由 DAGScheduler 來完成。當遇
到一個 Action 操作后就會觸發一個 Job 的計算,並交給 DAGScheduler 來提交,下
圖是涉及到 Job 提交的相關方法調用流程圖。

Job 由 最 終 的 RDD 和 Action 方 法 封 裝 而 成 , SparkContext 將 Job 交 給
DAGScheduler 提交,它會根據 RDD 的血緣關系構成的 DAG 進行切分,將一個 Job
划分為若干 Stages,具體划分策略是,由最終的 RDD 不斷通過依賴回溯判斷父依賴
是否是寬依賴,即以 Shuffle 為界,划分 Stage,窄依賴的 RDD 之間被划分到同一個
Stage 中,可以進行 pipeline 式的計算,如上圖紫色流程部分。划分的 Stages 分兩類,
一類叫做 ResultStage,為 DAG 最下游的 Stage,由 Action 方法決定,另一類叫做
ShuffleMapStage,為下游 Stage 准備數據,下面看一個簡單的例子 WordCount。

Job 由 saveAsTextFile 觸發,該 Job 由 RDD-3 和 saveAsTextFile 方法組成,根據
RDD 之間的依賴關系從 RDD-3 開始回溯搜索,直到沒有依賴的 RDD-0,在回溯搜
索過程中,RDD-3 依賴 RDD-2,並且是寬依賴,所以在 RDD-2 和 RDD-3 之間划分
Stage,RDD-3 被划到最后一個 Stage,即
ResultStage 中,RDD-2 依賴 RDD-1,RDD-1
依賴 RDD-0,這些依賴都是窄依賴,所以將 RDD-0、RDD-1 和 RDD-2 划分到同一
個 Stage,即
ShuffleMapStage 中,實際執行的時候,數據記錄會一氣呵成地執行
RDD-0 到 RDD-2 的轉化。不難看出,其本質上是一個深度優先搜索算法。
一個 Stage 是否被提交,需要判斷它的父 Stage 是否執行,只有在父 Stage 執行
完畢才能提交當前 Stage,如果一個 Stage 沒有父 Stage,那么從該 Stage 開始提交。
Stage 提交時會將 Task 信息(分區信息以及方法等)序列化並被打包成 TaskSet 交給
TaskScheduler,一個 Partition 對應一個 Task,另一方面 TaskScheduler 會監控 Stage
的運行狀態,只有 Executor 丟失或者 Task 由於 Fetch 失敗才需要重新提交失敗的
Stage 以調度運行失敗的任務,其他類型的 Task 失敗會在 TaskScheduler 的調度過程
中重試。
相對來說 DAGScheduler 做的事情較為簡單,僅僅是在 Stage 層面上划分 DAG,
提交 Stage 並監控相關狀態信息。TaskScheduler 則相對較為復雜,下面詳細闡述其
細節。
4.4 Spark Task 級調度
Spark Task 的調度是由 TaskScheduler 來完成,由前文可知,DAGScheduler 將
Stage 打 包 到 TaskSet 交 給 TaskScheduler, TaskScheduler 會 將 TaskSet 封裝為
TaskSetManager 加入到調度隊列中,TaskSetManager 結構如下圖所示。

TaskSetManager 負責監控管理同一個 Stage 中的 Tasks,TaskScheduler 就是以
TaskSetManager 為單元來調度任務。
前面也提到,
TaskScheduler 初始化后會啟動 SchedulerBackend,它負責跟外界
打交道,接收 Executor 的注冊信息,並維護 Executor 的狀態,所以說 SchedulerBackend
是管“糧食”的,同時它在啟動后會定期地去“詢問”TaskScheduler 有沒有任務要運行,
也就是說,
它會定期地 “ 問 ”TaskScheduler“ 我有這么余量,你 要不要啊 ” ,
TaskScheduler 在 SchedulerBackend“問”它的時候,會從調度隊列中按照指定的調度
策略選擇 TaskSetManager 去調度運行,大致方法調用流程如下圖所示:

圖 3-7 中,將 TaskSetManager 加入 rootPool 調度池中之后,調用 SchedulerBackend
的 riviveOffers 方法給 driverEndpoint 發送 ReviveOffer 消息;
driverEndpoint 收到
ReviveOffer 消息后調用 makeOffers 方法,過濾出活躍狀態的 Executor(這些 Executor
都是任務啟動時反向注冊到 Driver 的 Executor),然后將 Executor 封裝成 WorkerOffer
對 象 ; 准 備 好 計 算 資 源 ( WorkerOffer ) 后 , taskScheduler 基 於 這 些 資 源 調 用
resourceOffer 在 Executor 上分配 task。
4.4.1 調度策略
前 面 講 到 , TaskScheduler 會 先 把 DAGScheduler 給 過 來 的 TaskSet 封裝成
TaskSetManager 扔到任務隊列里,然后再從任務隊列里按照一定的規則把它們取出
來在 SchedulerBackend 給過來的 Executor 上運行。這個調度過程實際上還是比較粗
粒度的,是面向 TaskSetManager 的。
TaskScheduler 是以樹的方式來管理任務隊列,樹中的節點類型為 Schdulable,
葉子節點為 TaskSetManager,非葉子節點為 Pool,下圖是它們之間的繼承關系。

TaskScheduler 支持兩種調度策略,一種是
FIFO,也是默認的調度策略,另一種
是
FAIR。在 TaskScheduler 初始化過程中會實例化 rootPool,表示樹的根節點,是
Pool 類型。
1. FIFO 調度策略
如果是采用 FIFO 調度策略,則直接簡單地將 TaskSetManager 按照先來先到的
方式入隊,出隊時直接拿出最先進隊的 TaskSetManager,其樹結構如下圖所示,
TaskSetManager 保存在一個 FIFO 隊列中。

2. FAIR 調度策略
FAIR 調度策略的樹結構如下圖所示:

FAIR 模式中有一個 rootPool 和多個子 Pool,各個子 Pool 中存儲着所有待分配
的 TaskSetMagager。
在 FAIR 模 式 中 , 需 要 先 對 子 Pool 進 行 排 序 , 再 對 子 Pool 里 面 的
TaskSetMagager 進行排序,因為 Pool 和 TaskSetMagager 都繼承了 Schedulable 特
質,因此使用相同的排序算法。
排序過程的比較是基於
Fair-share 來比較的,每個要排序的對象包含三個屬性:
runningTasks 值(正在運行的 Task 數)、minShare 值、weight 值,比較時會綜合考
量 runningTasks 值,minShare 值以及 weight 值。
注意,minShare、weight 的值均在公平調度配置文件 fairscheduler.xml 中被指定,
調度池在構建階段會讀取此文件的相關配置。
1) 如果 A 對象的 runningTasks 大於它的 minShare,B 對象的 runningTasks 小
於它的 minShare,那么 B 排在 A 前面;(
runningTasks 比 minShare 小的先執行)
2) 如果 A、B 對象的 runningTasks 都小於它們的 minShare,那么就比較
runningTasks 與 minShare 的比值(
minShare 使用率),誰小誰排前面;(
minShare
使用率低的先執行)
3) 如果 A、B 對象的 runningTasks 都大於它們的 minShare,那么就比較
runningTasks 與 weight 的比值(
權重使用率),誰小誰排前面。(
權重使用率低的
先執行)
4) 如果上述比較均相等,則比較名字。
整體上來說就是通過 minShare 和 weight 這兩個參數控制比較過程,可以做到
讓 minShare 使用率和權重使用率少(實際運行 task 比例較少)的先運行。
FAIR 模式排序完成后,所有的 TaskSetManager 被放入一個 ArrayBuffer 里,之
后依次被取出並發送給 Executor 執行。
從調度隊列中拿到 TaskSetManager 后,由於 TaskSetManager 封裝了一個 Stage
的所有 Task,並負責管理調度這些 Task,那么接下來的工作就是 TaskSetManager
按照一定的規則一個個取出 Task 給 TaskScheduler , TaskScheduler 再交給
SchedulerBackend 去發到 Executor 上執行。
4.4.2 本地化調度
DAGScheduler 切割 Job,划分 Stage, 通過調用 submitStage 來提交一個 Stage
對應的 tasks,submitStage 會調用 submitMissingTasks,submitMissingTasks 確定每
個需要計算的 task 的 preferredLocations,
通過調用 getPreferrdeLocations()得到
partition 的優先位置,由於一個 partition 對應一個 task,此 partition 的優先位置就
是 task 的優先位置,對於要提交到 TaskScheduler 的 TaskSet 中的每一個 task,該 task
優先位置與其對應的 partition 對應的優先位置一致。
從調度隊列中拿到 TaskSetManager 后,那么接下來的工作就是 TaskSetManager 按照一
定的規則一個個取出 task 給 TaskScheduler,TaskScheduler 再交給 SchedulerBackend 去發到
Executor 上執行。前面也提到,TaskSetManager 封裝了一個 Stage 的所有 task,並負責管理
調度這些 task。
根據每個 task 的優先位置,確定 task 的 Locality 級別,Locality 一共有五種,優先級由
高到低順序:

在調度執行時,Spark 調度總是會盡量讓每個 task 以最高的本地性級別來啟動,
當一個 task 以 X 本地性級別啟動,但是該本地性級別對應的所有節點都沒有空閑資
源而啟動失敗,此時並不會馬上降低本地性級別啟動而是在某個時間長度內再次以
X 本地性級別來啟動該 task,若超過限時時間則降級啟動,去嘗試下一個本地性級
別,依次類推。
可以通過調大每個類別的最大容忍延遲時間,在等待階段對應的 Executor 可能
就會有相應的資源去執行此 task,這就在在一定程度上提到了運行性能。
4.4.3 失敗重試與黑名單機制
除了選擇合適的 Task 調度運行外,還需要監控 Task 的執行狀態,前面也提到,
與外部打交道的是 SchedulerBackend,Task 被提交到 Executor 啟動執行后,Executor
會將執行狀態上報給 SchedulerBackend,SchedulerBackend 則告訴 TaskScheduler,
TaskScheduler 找到該 Task 對應的 TaskSetManager,並通知到該 TaskSetManager,這
樣 TaskSetManager 就知道 Task 的失敗與成功狀態,
對於失敗的 Task,會記錄它失
敗的次數,如果失敗次數還沒有超過最大重試次數,那么就把它放回待調度的 Task
池子中,否則整個 Application 失敗。
在記錄 Task 失敗次數過程中,會記錄它上一次失敗所在的 Executor Id 和 Host,
這樣下次再調度這個 Task 時,會使用黑名單機制,避免它被調度到上一次失敗的節
點上,起到一定的容錯作用。黑名單記錄 Task 上一次失敗所在的 Executor Id 和 Host,
以及其對應的“拉黑”時間,“拉黑”時間是指這段時間內不要再往這個節點上調
度這個 Task 了。