Spark任務提交底層原理


Driver的任務提交過程


1、Driver程序的代碼運行到action操作,觸發了SparkContext的runJob方法。
2、SparkContext調用DAGScheduler的runJob函數。
3、DAGScheduler把Job划分stage,然后把stage轉化為相應的Tasks,把Tasks交給TaskScheduler。
4、通過TaskScheduler把Tasks添加到任務隊列當中,交給SchedulerBackend進行資源分配和任務調度。
5、調度器給Task分配執行Executor,ExecutorBackend負責執行Task。

Spark調度管理

本文主要介紹在單個任務內Spark的調度管理,Spark調度相關概念如下:

  • Task(任務):單個分區數據及上的最小處理流程單元。
  • TaskSet(任務集):由一組關聯的,但互相之間沒有Shuffle依賴關系的任務所組成的任務集。
  • Stage(調度階段):一個任務集對應的調度階段。
  • Job(作業):有一個RDD Action生成的一個或多個調度階段所組成的一次計算作業。
  • Application(應用程序):Spark應用程序,由一個或多個作業組成。

各概念間的邏輯關系如下圖所示:

resource_manager

Spark的調度管理模塊中,最重要的類是DAGScheduler和TaskScheduler,TaskScheduler負責每個具體任務的實際物理調度,DAGScheduler負責將作業拆分成不同階段的具有依賴關系的多批任務,可以理解為DAGScheduler負責任務的邏輯調度。Spark調度管理示意圖如下:
Spark調度管理示意圖

調度階段的拆分

一個Spark任務提交后,DAGScheduler從RDD依賴鏈末端的RDD出發,遍歷整個RDD依賴鏈,將Job分解成具有前后依賴關系的多個stage。DAGScheduler是根據ShuffleDependency划分stage的,也就是說當某個RDD的運算需要將數據進行shuffle操作時,這個包含了shuffle依賴關系的RDD將被用來作為輸入信息,構建一個新的調度階段。以此為依據划分調度階段,可以確保有依賴關系的數據能夠按照正確的順序得到處理和運算。

如何進行Stage划分?下圖給出的是對應Spark應用程序代碼生成的Stage。它的具體划分依據是根據RDD的依賴關系進行,在遇到寬依賴時將兩個RDD划分為不同的Stage。

這里寫圖片描述

這里寫圖片描述

從上圖中可以看到,RDD G與RDD F間的依賴是寬依賴,所以RDD F與 RDD G被划分為不同的Stage,而RDD G 與 RDD 間為窄依賴,因此 RDD B 與 RDD G被划分為同一個Stage。通過這種遞歸的調用方式,將所有RDD進行划分。

Stage划分算法

由於Spark的算子構建一般都是鏈式的,這就涉及了要如何進行這些鏈式計算,Spark的策略是對這些算子,先划分Stage,然后在進行計算。

由於數據是分布式的存儲在各個節點上的,所以為了減少網絡傳輸的開銷,就必須最大化的追求數據本地性,所謂的數據本地性是指,在計算時,數據本身已經在內存中或者利用已有緩存無需計算的方式獲取數據。

Stage划分算法思想

(1)一個Job由多個Stage構成

一個Job可以有一個或者多個Stage,Stage划分的依據就是寬依賴,產生寬依賴的算子:reduceByKey、groupByKey等等

(2)根據依賴關系,從前往后依次執行多個Stage

SparkApplication 中可以因為不同的Action觸發眾多的Job,也就是說一個Application中可以有很多的Job,每個Job是有一個或者多個Stage構成,后面的Stage依賴前面的Stage,也就是說只有前面的Stage計算完后,后面的Stage才會運行。

(3)Stage的執行時Lazy級別的

所有的Stage會形成一個DAG(有向無環圖),由於RDD的Lazy特性,導致Stage也是Lazy級別的,只有遇到了Action才會真正發生作業的執行,在Action之前,Spark框架只是將要進行的計算記錄下來,並沒有真的執行。

調度階段的提交

在划分Stage的步驟中會得到一個或多個有依賴關系的Stage,其中直接觸發作業的RDD關聯的調度階段被稱為FinalStage,DAGScheduler從FinalStage開始生成一個Job。Job和Stage的關系存儲在一個映射表中,用於在該調度階段全部完成時做一些后續處理,如報告狀態、清理作業相關數據等。

具體提交一個Stage時,首先判斷其依賴的所有父Stage的結果是否可用。如果所有父Stage的結果都可用,則提交該Stage。如果有任何一個父Stage的結果不可用,則嘗試迭代提交當前不可用的父Stage。在迭代過程中,父Stage還未運行的Stage都被放到等待隊列中,等待將來被提交。

下圖是一個具有四個調度階段的Job的Stage提交順序:

resource_manager

當一個屬於中間過程調度階段的任務(這種類型的任務所對應的類為ShuffleMapTask)完成后,DAGScheduler會檢查對應調度階段的所有任務是否都完成了。如果完成了,則DAGScheduler將重新掃描一次等待列表中所有的Stage,檢查它們是否還有依賴的Stage沒有完成。如果所有依賴的Stage都已執行完畢,則提交該Stage。

任務結果的獲取

根據任務結果的大小不同,ResultTask返回的結果分為兩中形式:

  • 如果結果足夠小,則直接放在DirectTaskResult對象內。
  • 如果超過特定尺寸(默認約10MB),則在Executor端會將DirectTaskResult序列化,將序列化的結果作為一個數據塊存放在BlockManager中,然后將BlockManager返回的BlockId放在IndirectTaskResult對象中返回給TaskScheduler,TaskScheduler進而調用TaskResultGetter將IndirectTaskResult中的BlockId取出並通過BlockManager最終取得對應的DirectTaskResult。
轉自:http://www.cnblogs.com/BYRans/

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM