在前面的章節Client的加載中,Spark的DriverRunner已開始執行用戶任務類(比如:org.apache.spark.examples.SparkPi),下面我們開始針對於用戶任務類(或者任務代碼)進行分析
一、整體預覽
基於上篇圖做了擴展,增加任務執行的相關交互


- Code:指的用戶編寫的代碼
- RDD:彈性分布式數據集,用戶編碼根據SparkContext與RDD的api能夠很好的將Code轉化為RDD數據結構(下文將做轉化細節介紹)
- DAGScheduler:有向無環圖調度器,將RDD封裝為JobSubmitted對象存入EventLoop(實現類DAGSchedulerEventProcessLoop)隊列中
- EventLoop: 定時掃描未處理JobSubmitted對象,將JobSubmitted對象提交給DAGScheduler
- DAGScheduler:針對於JobSubmitted進行處理,最終將RDD轉化為執行TaskSet,並將TaskSet提交至TaskScheduler
- TaskScheduler: 根據TaskSet創建TaskSetManager對象存入SchedulableBuilder的數據池(Pool)中,並調用DriverEndpoint喚起消費(ReviveOffers)操作
- DriverEndpoint:接受ReviveOffers指令后將TaskSet中的Tasks根據相關規則均勻分配給Executor
- Executor:啟動一個TaskRunner執行一個Task
二、Code轉化為初始RDDs
我們的用戶代碼通過調用Spark的Api(比如:SparkSession.builder.appName("Spark Pi").getOrCreate()),該Api會創建Spark的上下文(SparkContext),當我們調用transform類方法 (如:parallelize(),map())都會創建(或者裝飾已有的) Spark數據結構(RDD), 如果是action類操作(如:reduce()),那么將最后封裝的RDD作為一次Job提交,存入待調度隊列中(DAGSchedulerEventProcessLoop )待后續異步處理。
如果多次調用action類操作,那么封裝的多個RDD作為多個Job提交。
流程如下:


- ExecuteEnv(執行環境 )
-
- 這里可以是通過spark-submit提交的MainClass,也可以是spark-shell腳本
- MainClass : 代碼中必定會創建或者獲取一個SparkContext
- spark-shell:默認會創建一個SparkContext
- RDD(彈性分布式數據集)
-
- create:可以直接創建(如:sc.parallelize(1 until n, slices) ),也可以在其他地方讀取(如:sc.textFile("README.md"))等
- transformation:rdd提供了一組api可以進行對已有RDD進行反復封裝成為新的RDD,這里采用的是裝飾者設計模式,下面為部分裝飾器類圖
- action:當調用RDD的action類操作方法時(collect、reduce、lookup、save ),這觸發DAGScheduler的Job提交
- DAGScheduler:創建一個名為JobSubmitted的消息至DAGSchedulerEventProcessLoop阻塞消息隊列(LinkedBlockingDeque)中
- DAGSchedulerEventProcessLoop:啟動名為【dag-scheduler-event-loop】的線程實時消費消息隊列
- 【dag-scheduler-event-loop】處理完成后回調JobWaiter
- DAGScheduler:打印Job執行結果
- JobSubmitted:相關代碼如下(其中jobId為DAGScheduler全局遞增Id):
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
- 最終示例:

最終轉化的RDD分為四層,每層都依賴於上層RDD,將ShffleRDD封裝為一個Job存入DAGSchedulerEventProcessLoop待處理,如果我們的代碼中存在幾段上面示例代碼,那么就會創建對應對的幾個ShffleRDD分別存入DAGSchedulerEventProcessLoop
三、RDD分解為待執行任務集合(TaskSet
)
Job提交后,DAGScheduler根據RDD層次關系解析為對應的Stages,同時維護Job與Stage的關系。
將最上層的Stage根據並發關系(findMissingPartitions )分解為多個Task,將這個多個Task封裝為TaskSet提交給TaskScheduler。非最上層的Stage的存入處理的列表中(waitingStages += stage)
流程如下:


- DAGSchedulerEventProcessLoop中,線程【dag-scheduler-event-loop】處理到JobSubmitted
- 調用DAGScheduler進行handleJobSubmitted
-
- 首先根據RDD依賴關系依次創建Stage族,Stage分為ShuffleMapStage,ResultStage兩類
- 更新jobId與StageId關系Map
- 創建ActiveJob,調用LiveListenerBug,發送SparkListenerJobStart指令
- 找到最上層Stage進行提交,下層Stage存入waitingStage中待后續處理
-
- 調用OutputCommitCoordinator進行stageStart()處理
- 調用LiveListenerBug, 發送 SparkListenerStageSubmitted指令
- 調用SparkContext的broadcast方法獲取Broadcast對象
- 根據Stage類型創建對應多個Task,一個Stage根據findMissingPartitions分為多個對應的Task,Task分為ShuffleMapTask,ResultTask
- 將Task封裝為TaskSet,調用TaskScheduler.submitTasks(taskSet)進行Task調度,關鍵代碼如下:
- 首先根據RDD依賴關系依次創建Stage族,Stage分為ShuffleMapStage,ResultStage兩類
taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
四、TaskSet封裝為TaskSetManager並提交至Driver
TaskScheduler將TaskSet封裝為TaskSetManager(new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)),存入待處理任務池(Pool)中,發送DriverEndpoint喚起消費(ReviveOffers)指令


- DAGSheduler將TaskSet提交給TaskScheduler的實現類,這里是TaskChedulerImpl
- TaskSchedulerImpl創建一個TaskSetManager管理TaskSet,關鍵代碼如下:
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
- 同時將TaskSetManager添加SchedduableBuilder的任務池Poll中
- 調用SchedulerBackend的實現類進行reviveOffers,這里是standlone模式的實現類StandaloneSchedulerBackend
- SchedulerBackend發送ReviveOffers指令至DriverEndpoint
五、Driver將TaskSetManager分解為TaskDescriptions並發布任務到Executor
Driver接受喚起消費指令后,將所有待處理的TaskSetManager與Driver中注冊的Executor資源進行匹配,最終一個TaskSetManager得到多個TaskDescription對象,按照TaskDescription想對應的Executor發送LaunchTask指令


當Driver獲取到ReviveOffers(請求消費)指令時
- 首先根據executorDataMap緩存信息得到可用的Executor資源信息(WorkerOffer),關鍵代碼如下
val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toIndexedSeq
- 接着調用TaskScheduler進行資源匹配,方法定義如下:
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {..}
-
- 將WorkerOffer資源打亂(val shuffledOffers = Random.shuffle(offers))
- 將Poo中待處理的TaskSetManager取出(val sortedTaskSets = rootPool.getSortedTaskSetQueue),
- 並循環處理sortedTaskSets並與shuffledOffers循環匹配,如果shuffledOffers(i)有足夠的Cpu資源( if (availableCpus(i) >= CPUS_PER_TASK) ),調用TaskSetManager創建TaskDescription對象(taskSet.resourceOffer(execId, host, maxLocality)),最終創建了多個TaskDescription,TaskDescription定義如下:
new TaskDescription( taskId, attemptNum, execId, taskName, index, sched.sc.addedFiles, sched.sc.addedJars, task.localProperties, serializedTask)
- 如果TaskDescriptions不為空,循環TaskDescriptions,序列化TaskDescription對象,並向ExecutorEndpoint發送LaunchTask指令,關鍵代碼如下:
for (task <- taskDescriptions.flatten) { val serializedTask = TaskDescription.encode(task) val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) }