spark 源碼分析之二十 -- Stage的提交


引言

上篇 spark 源碼分析之十九 -- DAG的生成和Stage的划分 中,主要介紹了下圖中的前兩個階段DAG的構建和Stage的划分。

本篇文章主要剖析,Stage是如何提交的。

rdd的依賴關系構成了DAG,DAGScheduler根據shuffle依賴關系將DAG圖划分為一個一個小的stage。具體可以看 spark 源碼分析之十九 -- DAG的生成和Stage的划分 做進一步了解。

緊接上篇文章

上篇文章中,DAGScheduler的handleJobSubmitted方法我們只剖析了stage的生成部分,下面我們看一下stage的提交部分源碼。

提交Stage的思路

首先構造ActiveJob對象,其次清除緩存的block location信息,然后記錄jobId和job對象的映射關系到jobIdToActiveJob map集合中,並且將該jobId記錄到活動的job集合中。

獲取到Job所有的stage的唯一標識,並且根據唯一標識來獲取stage對象,並且調用其lastestInfo方法獲取其StageInfo對象。

然后進一步封裝成 SparkListenerJobStart 事件對象,並post到 listenerBus中,listenerBus 是一個 LiveListenerBus 對象,其內部封裝了四個消息隊列組成的集合,具體可以看 spark 源碼分析之三 -- LiveListenerBus介紹 文章做進一步了解。

最后調用submitStage 方法執行Stage的提交。

先來看一下ActiveJob的說明。

ActiveJob

類說明

A running job in the DAGScheduler. Jobs can be of two types: a result job, which computes a ResultStage to execute an action, or a map-stage job, which computes the map outputs for a ShuffleMapStage before any downstream stages are submitted. The latter is used for adaptive query planning, to look at map output statistics before submitting later stages. We distinguish between these two types of jobs using the finalStage field of this class. Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's submitJob or submitMapStage methods. However, either type of job may cause the execution of other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some of these previous stages. These dependencies are managed inside DAGScheduler.

它代表了正運行在DAGScheduler中的一個job,job有兩種類型:result job,其通過計算一個ResultStage來執行一個action操作;map-stage job,它在下游的stage提交之前,為ShuffleMapStage計算map的輸出。

構造方法

finalStages是這個job的最后一個stage。

提交Stage前的准備

直接先來看submitStage方法,如下:

思路: 首先先獲取可能丟失的父stage信息,如果該stage的父stage被遺漏了,則遞歸調用查看其爺爺stage是否被遺漏。

查找遺漏父Stage

getMissingParentStages方法如下:

思路:不斷創建父stage,可以看上篇文章 spark 源碼分析之十九 -- DAG的生成和Stage的划分 做進一步了解。

提交Stage

submitMissingTasks方法過於長,為方便分析,按功能大致分為如下部分:

獲取Stage需要計算的partition信息

org.apache.spark.scheduler.ResultStage#findMissingPartitions 方法如下:

 org.apache.spark.scheduler.ShuffleMapStage#findMissingPartitions 方法如下:

org.apache.spark.MapOutputTrackerMaster#findMissingPartitions 方法如下:

將stage和分區記錄到OutputCommitCoordinator中

 OutputCommitCoordinator 的 stageStart實現如下:

本質上就是把它放入到一個map中了。

 

獲取分區的優先位置

 

思路:根據stage的RDD和分區id獲取到其rdd中的分區的優先位置。

下面看一下 getPreferredLocs 方法:

 

 

注釋中說到,它是線程安全的,下面看一下,它是如何實現的,即 getPrefferredLocsInternal 方法。

這個方法中提到四種情況:

1. 如果之前獲取到過,那么直接返回Nil即可。

2. 如果之前已經緩存在內存中,直接從緩存的內存句柄中取出返回即可。

3. 如果RDD對應的是HDFS輸入的文件等,則使用RDD記錄的優先位置。

4. 如果上述三種情況都不滿足,且是narrowDependency,則調用該方法,獲取子RDDpartition對應的父RDD的partition的優先位置。

下面仔細說一下中間兩種情況。

從緩存中取

getCacheLocs 方法如下:

思路:先查看rdd的存儲級別,如果沒有存儲級別,則直接返回Nil,否則根據RDD和分區id組成BlockId集合,請求存儲系統中的BlockManager來獲取block的位置,然后轉換為TaskLocation信息返回。

獲取RDD的優先位置

RDD的 preferredLocations 方法如下:

思路:先從checkpoint中找,如果checkpoint中沒有,則返回默認的為Nil。

 

返回對象是TaskLocation對象,做一下簡單的說明。

TaskLocation

類說明

A location where a task should run. This can either be a host or a (host, executorID) pair. In the latter case, we will prefer to launch the task on that executorID, but our next level of preference will be executors on the same host if this is not possible.

它有三個子類,如下:

這三個類定義如下:

很簡單,不做過多說明。

TaskLocation伴隨對象如下,現在用的方法是第二種 apply 方法:

創建新的StageInfo 

對應方法如下:

org.apache.spark.scheduler.Stage#makeNewStageAttempt 方法如下:

很簡單,主要是調用了StageInfo的fromStage方法。

先來看Stage類。

StageInfo

StageInfo封裝了關於Stage的一些信息,用於調度和SparkListener傳遞stage信息。

其伴生對象如下:

廣播要執行task函數

對應源碼如下:

通過broadcast機制,將數據廣播到spark集群中的driver和各個executor中。關於broadcast的實現細節,可以查看 spark 源碼分析之十四 -- broadcast 是如何實現的?做進一步了解。

生成Task集合

根據stage的類型生成不同的類型Task。關於過多Task 的內容,在階段四進行剖析。

TaskScheduler提交TaskSet

對應代碼如下:

其中taskScheduler是 TaskSchedulerImpl,它是TaskScheduler的唯一子類實現。它負責task的調度。

org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks方法實現如下:

其中 createTaskSetManager 方法如下:

SchedulableBuilder類是構建Schedulable樹的接口。

schedulableBuilder 定義如下:

其中schedulingMode 可以通過參數 spark.scheduler.mode 來調整,默認為FIFO。

schedulableBuilder 初始化如下:

schedulableBuilder的 addTaskSetManager (FIFO)方法如下:

即調用了內部Pool對象的addSchedulable 方法:

 

 

關於更多TaskSetManager的內容,將在階段四進行剖析。

backend是一個 SchedulerBackend 實例。在SparkContetx的初始化過程中調用 createTaskScheduler 初始化 backend,具體可以看 spark 源碼分析之四 -- TaskScheduler的創建和啟動過程 做深入了解。

在yarn 模式下,它有兩個實現yarn-client 模式下的 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend實現 和 yarn-cluster 模式下的 org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend 實現。

這兩個類在spark 項目的 resource-managers 目錄下的 yarn 目錄下定義實現,當然它也支持 kubernetes 和 mesos,不做過多說明。

這兩個類的繼承關系如下:

 

org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers 實現如下:

發送ReviveOffers 請求給driver。

driver端的 CoarseGrainedSchedulerBackend 的 receive 方法有如下事件處理分支:

其內部經過一系列RPC過程,關於 RPC 可以看 spark 源碼分析之十二--Spark RPC剖析之Spark RPC總結 做進一步了解。

即會調用driver端的makeOffsers方法,如下:

總結

本篇文章剖析了從DAGScheduler生成的Stage是如何被提交給TaskScheduler,以及TaskScheduler是如何把TaskSet提交給ResourceManager的。

下面就是task的運行部分了,下篇文章對其做詳細介紹。跟task執行關系很密切的TaskSchedulerBackend、Task等內容,也將在下篇文章做更詳細的說明。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM