Spark的job、stage和task的機制論述


Spark任務調度機制論述

在生產環境下,Spark集群的部署方式一般為YARN-Cluster模式。 Driver線程主要是初始化SparkContext對象,准備運行所需的上下文,然后一方面保持與ApplicationMaster的RPC連接,通過ApplicationMaster申請資源,另一方面根據用戶業務邏輯開始調度任務,將任務下發到已有的空閑Executor上。
當ResourceManager向ApplicationMaster返回Container資源時,ApplicationMaster就嘗試在對應的Container上啟動Executor進程,Executor進程起來后,會向Driver反向注冊,注冊成功后保持與Driver的心跳,同時等待Driver分發任務,當分發的任務執行完畢后,將任務狀態上報給Driver。

1. Spark任務調度概述

1.1 基礎概念

當Driver起來后,Driver則會根據用戶程序邏輯准備任務,並根據Executor資源情況逐步分發任務。在詳細闡述任務調度前,首先說明下Spark里的幾個概念。一個Spark應用程序包括Job、Stage以及Task三個概念:

job:以 action 方法為界,一個 action 觸發一個 job

stage:它是 job 的子集,以 RDD 寬依賴為界,遇到寬依賴即划分 stage

task:它是 stage 的子集,以分區數來衡量,分區數多少,task 就有多少

1.2 任務調度

spark 任務從發起到執行可用下圖表示

1.3 Client—>ResourceManage

(1). Client 端通過 spark-submit + 參數 發起任務,即向ResourceManage 提交 application,注意該 application 包含了一堆參數,如 Executor 數,Executor 內存,Driver 內存等;

(2). ResourceManage 需要先判斷現在資源是否能滿足該 application,如果滿足,則響應該 application,如果不滿足,報錯;

(3). 如果資源滿足,Client 端准備 ApplicationMaster 的啟動上下文,並交給 ResourceManage;

(4). 並且循環監控 application 的狀態;

1.4 ResourceManage—>ApplicationMaster

(1). ResourceManage 找一個 worker 啟動 ApplicationMaster;

(2). ApplicationMaster 向 ResourceManage 申請 Container;

(3). ResourceManage 收集可用資源,並告訴 ApplicationMaster;

(4). ApplicationMaster 嘗試在對應的 Container 上啟動 Executor 進程;

1.5 ApplicationMaster-Driver

(1). 有了資源,ApplicationMaster 啟動 Driver;

//Driver 線程主要是初始化 SparkContext 對象,准備運行所需上下文,並保持與 ApplicationMaster 的 RPC 連接,通過 ApplicationMaster 申請資源

(2). Driver 啟動成功后,告訴 ApplicationMaster;

1.6 Driver-Executor

(1). Executor 啟動成功后,反向注冊到 Driver 上,並持續向 Driver 發送心跳;

(2). Driver 啟動 task,分發給 Executor,並監控 task 狀態;

(3). 當 Executor 任務執行完畢后,將任務狀態發送給 Driver;

spark 的核心就是資源申請和任務調度,主要通過 ApplicationMaster、Driver、Executor 來完成

spark 任務調度分為兩層,一層是 stage 級的調度,一層是 task 級的調度

RDD 間的血緣關系,代表了計算的流程,構成了 有向無環圖,即 DAG;

最后通過 action 觸發 job 並調度執行;

DAGScheduler 負責 stage 級的調度,主要是將 DAG 切分成多個 stage,並將 stage 打包成 TaskSet 交給 TaskScheduler;

TaskScheduler 負責 task 級的調度,將 DAGScheduler 發過來的 TaskSet 按照指定的調度策略發送給 Executor;

SchedulerBackend 負責給 調度策略 提供可用資源,調度策略決定把 task 發送給哪個 Executor;【其中 SchedulerBackend 有多種實現,分別對接不同的資源管理系統】

基於上述認知,再來看一張圖

Driver 在啟動過程中,除了初始化 SparkContext 外,也初始化了 DAGScheduler、TaskScheduler、 SchedulerBackend 3個調度對象,同時初始化了 HeartbeatReceiver 心跳接收器;

並且各個線程之間保存通信;

SchedulerBackend 向 ApplicationMaster 申請資源,並不間斷地從 TaskScheduler 獲取 task 並發送給 合適的 Executor;

HeartbeatReceiver 負責接收 Executor 心跳報文,監控 Executor 存活狀態;

2. Spark Stage級調度

Spark的任務調度是從DAG切割開始,主要是由DAGScheduler來完成。當遇到一個Action操作后就會觸發一個Job的計算,並交給DAGScheduler來提交,下圖是涉及到Job提交的相關方法調用流程圖。

1) Job由最終的RDD和Action方法封裝而成;

2) SparkContext將Job交給DAGScheduler提交,它會根據RDD的血緣關系構成的DAG進行切分,將一個Job划分為若干Stages,具體划分策略是,由最終的RDD不斷通過依賴回溯判斷父依賴是否是寬依賴,即以Shuffle為界,划分Stage,窄依賴的RDD之間被划分到同一個Stage中,可以進行pipeline式的計算。划分的Stages分兩類,一類叫做ResultStage,為DAG最下游的Stage,由Action方法決定,另一類叫做ShuffleMapStage,為下游Stage准備數據,下面看一個簡單的例子WordCount。

Job由saveAsTextFile觸發,該Job由RDD-3和saveAsTextFile方法組成,根據RDD之間的依賴關系從RDD-3開始回溯搜索,直到沒有依賴的RDD-0,在回溯搜索過程中,RDD-3依賴RDD-2,並且是寬依賴,所以在RDD-2和RDD-3之間划分Stage,RDD-3被划到最后一個Stage,即ResultStage中,RDD-2依賴RDD-1,RDD-1依賴RDD-0,這些依賴都是窄依賴,所以將RDD-0、RDD-1和RDD-2划分到同一個Stage,形成pipeline操作,。即ShuffleMapStage中,實際執行的時候,數據記錄會一氣呵成地執行RDD-0到RDD-2的轉化。不難看出,其本質上是一個深度優先搜索(Depth First Search)算法。

一個Stage是否被提交,需要判斷它的父Stage是否執行,只有在父Stage執行完畢才能提交當前Stage,如果一個Stage沒有父Stage,那么從該Stage開始提交。Stage提交時會將Task信息(分區信息以及方法等)序列化並被打包成TaskSet交給TaskScheduler,一個Partition對應一個Task,另一方面TaskScheduler會監控Stage的運行狀態,只有Executor丟失或者Task由於Fetch失敗才需要重新提交失敗的Stage以調度運行失敗的任務,其他類型的Task失敗會在TaskScheduler的調度過程中重試。

相對來說DAGScheduler做的事情較為簡單,僅僅是在Stage層面上划分DAG,提交Stage並監控相關狀態信息。TaskScheduler則相對較為復雜,下面詳細闡述其細節。

3. Spark Task級調度

Spark Task的調度是由TaskScheduler來完成,由前文可知,DAGScheduler將Stage打包到交給TaskScheTaskSetduler,TaskScheduler會將TaskSet封裝為TaskSetManager加入到調度隊列中,TaskSetManager結構如下圖所示。

TaskSetManager負責監控管理同一個Stage中的Tasks,TaskScheduler就是以TaskSetManager為單元來調度任務。

前面也提到,TaskScheduler初始化后會啟動SchedulerBackend,它負責跟外界打交道,接收Executor的注冊信息,並維護Executor的狀態,所以說SchedulerBackend是管“糧食”的,同時它在啟動后會定期地去“詢問”TaskScheduler有沒有任務要運行,也就是說,它會定期地“問”TaskScheduler“我有這么余糧,你要不要啊”,TaskScheduler在SchedulerBackend“問”它的時候,會從調度隊列中按照指定的調度策略選擇TaskSetManager去調度運行,大致方法調用流程如下圖所示:

上圖中,將TaskSetManager加入rootPool調度池中之后,調用SchedulerBackend的riviveOffers方法給driverEndpoint發送ReviveOffer消息;driverEndpoint收到ReviveOffer消息后調用makeOffers方法,過濾出活躍狀態的Executor(這些Executor都是任務啟動時反向注冊到Driver的Executor),然后將Executor封裝成WorkerOffer對象;准備好計算資源(WorkerOffer)后,taskScheduler基於這些資源調用resourceOffer在Executor上分配task。

3.1 調度策略

TaskScheduler支持兩種調度策略,一種是FIFO,也是默認的調度策略,另一種是FAIR。在TaskScheduler初始化過程中會實例化rootPool,表示樹的根節點,是Pool類型。

(1) FIFO調度策略

如果是采用FIFO調度策略,則直接簡單地將TaskSetManager按照先來先到的方式入隊,出隊時直接拿出最先進隊的TaskSetManager,其樹結構如下圖所示,TaskSetManager保存在一個FIFO隊列中。

(2) FAIR調度策略

FAIR調度策略的樹結構如下圖所示:

FAIR模式中有一個rootPool和多個子Pool,各個子Pool中存儲着所有待分配的TaskSetMagager。

在FAIR模式中,需要先對子Pool進行排序,再對子Pool里面的TaskSetMagager進行排序,因為Pool和TaskSetMagager都繼承了Schedulable特質,因此使用相同的排序算法。

排序過程的比較是基於Fair-share來比較的,每個要排序的對象包含三個屬性: runningTasks值(正在運行的Task數)、minShare值、weight值,比較時會綜合考量runningTasks值,minShare值以及weight值。

注意,minShare、weight的值均在公平調度配置文件fairscheduler.xml中被指定,調度池在構建階段會讀取此文件的相關配置。

1) 如果A對象的runningTasks大於它的minShare,B對象的runningTasks小於它的minShare,那么B排在A前面;(runningTasks比minShare小的先執行)

2) 如果A、B對象的runningTasks都小於它們的minShare,那么就比較runningTasks與minShare的比值(minShare使用率),誰小誰排前面;(minShare使用率低的先執行)

3) 如果A、B對象的runningTasks都大於它們的minShare,那么就比較runningTasks與weight的比值(權重使用率),誰小誰排前面。(權重使用率低的先執行)

4) 如果上述比較均相等,則比較名字。

整體上來說就是通過minShare和weight這兩個參數控制比較過程,可以做到讓minShare使用率和權重使用率少(實際運行task比例較少)的先運行。

FAIR模式排序完成后,所有的TaskSetManager被放入一個ArrayBuffer里,之后依次被取出並發送給Executor執行。

從調度隊列中拿到TaskSetManager后,由於TaskSetManager封裝了一個Stage的所有Task,並負責管理調度這些Task,那么接下來的工作就是TaskSetManager按照一定的規則一個個取出Task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發到Executor上執行。

3.2 本地化調度

DAGScheduler切割Job,划分Stage, 通過調用submitStage來提交一個Stage對應的tasks,submitStage會調用submitMissingTasks,submitMissingTasks 確定每個需要計算的 task 的preferredLocations,通過調用getPreferrdeLocations()得到partition 的優先位置,由於一個partition對應一個Task,此partition的優先位置就是task的優先位置,對於要提交到TaskScheduler的TaskSet中的每一個Task,該task優先位置與其對應的partition對應的優先位置一致。

從調度隊列中拿到TaskSetManager后,那么接下來的工作就是TaskSetManager按照一定的規則一個個取出task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發到Executor上執行。前面也提到,TaskSetManager封裝了一個Stage的所有Task,並負責管理調度這些Task。

根據每個Task的優先位置,確定Task的Locality級別,Locality一共有五種,優先級由高到低順序:

在調度執行時,大數據培訓Spark調度總是會盡量讓每個task以最高的本地性級別來啟動,當一個task以X本地性級別啟動,但是該本地性級別對應的所有節點都沒有空閑資源而啟動失敗,此時並不會馬上降低本地性級別啟動而是在某個時間長度內再次以X本地性級別來啟動該task,若超過限時時間則降級啟動,去嘗試下一個本地性級別,依次類推。

可以通過調大每個類別的最大容忍延遲時間,在等待階段對應的Executor可能就會有相應的資源去執行此task,這就在在一定程度上提到了運行性能。

3.3 失敗重試與黑名單機制

除了選擇合適的Task調度運行外,還需要監控Task的執行狀態,前面也提到,與外部打交道的是SchedulerBackend,Task被提交到Executor啟動執行后,Executor會將執行狀態上報給SchedulerBackend,SchedulerBackend則告訴TaskScheduler,TaskScheduler找到該Task對應的TaskSetManager,並通知到該TaskSetManager,這樣TaskSetManager就知道Task的失敗與成功狀態,對於失敗的Task,會記錄它失敗的次數,如果失敗次數還沒有超過最大重試次數,那么就把它放回待調度的Task池子中,否則整個Application失敗。

在記錄Task失敗次數過程中,會記錄它上一次失敗所在的Executor Id和Host,這樣下次再調度這個Task時,會使用黑名單機制,避免它被調度到上一次失敗的節點上,起到一定的容錯作用。黑名單記錄Task上一次失敗所在的Executor Id和Host,以及其對應的“拉黑”時間,“拉黑”時間是指這段時間內不要再往這個節點上調度這個Task了。

4. 總結

本圖有助於理解job,stage,task工作的原理。Spark通用運行流程圖,體現了基本的Spark應用程序在部署中的基本提交流程。

流程按照如下的核心步驟進行工作的:

1) 任務提交后,都會先啟動Driver程序;

2) 隨后Driver向集群管理器注冊應用程序;

3) 之后集群管理器根據此任務的配置文件分配Executor並啟動;

4) Driver開始執行main函數,Spark查詢為懶執行,當執行到Action算子時開始反向推算,根據寬依賴進行Stage的划分,隨后每一個Stage對應一個Taskset,Taskset中有多個Task,查找可用資源Executor進行調度;

5) 根據本地化原則,Task會被分發到指定的Executor去執行,在任務執行的過程中,Executor也會不斷與Driver進行通信,報告任務運行情況。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM