【Spark2.0源碼學習】-9.Job提交與Task的拆分


      在前面的章節Client的加載中,Spark的DriverRunner已開始執行用戶任務類(比如:org.apache.spark.examples.SparkPi),下面我們開始針對於用戶任務類(或者任務代碼)進行分析
 
一、整體預覽
          基於上篇圖做了擴展,增加任務執行的相關交互
     
  • Code:指的用戶編寫的代碼
  • RDD:彈性分布式數據集,用戶編碼根據SparkContext與RDD的api能夠很好的將Code轉化為RDD數據結構(下文將做轉化細節介紹)
  • DAGScheduler:有向無環圖調度器,將RDD封裝為JobSubmitted對象存入EventLoop(實現類DAGSchedulerEventProcessLoop)隊列中
  • EventLoop: 定時掃描未處理JobSubmitted對象,將JobSubmitted對象提交給DAGScheduler
  • DAGScheduler:針對於JobSubmitted進行處理,最終將RDD轉化為執行TaskSet,並將TaskSet提交至TaskScheduler
  • TaskScheduler: 根據TaskSet創建TaskSetManager對象存入SchedulableBuilder的數據池(Pool)中,並調用DriverEndpoint喚起消費(ReviveOffers)操作
  • DriverEndpoint:接受ReviveOffers指令后將TaskSet中的Tasks根據相關規則均勻分配給Executor
  • Executor:啟動一個TaskRunner執行一個Task
 
二、Code轉化為初始RDDs
          我們的用戶代碼通過調用Spark的Api(比如:SparkSession.builder.appName("Spark Pi").getOrCreate()),該Api會創建Spark的上下文(SparkContext),當我們調用transform類方法 (如:parallelize(),map())都會創建(或者裝飾已有的) Spark數據結構(RDD), 如果是action類操作(如:reduce()),那么將最后封裝的RDD作為一次Job提交,存入待調度隊列中(DAGSchedulerEventProcessLoop )待后續異步處理。
          如果多次調用action類操作,那么封裝的多個RDD作為多個Job提交。
     流程如下:
     
  • ExecuteEnv(執行環境 )
    • 這里可以是通過spark-submit提交的MainClass,也可以是spark-shell腳本
    • MainClass : 代碼中必定會創建或者獲取一個SparkContext
    • spark-shell:默認會創建一個SparkContext
  • RDD(彈性分布式數據集)
    • create:可以直接創建(如:sc.parallelize(1 until n, slices) ),也可以在其他地方讀取(如:sc.textFile("README.md"))等
    • transformation:rdd提供了一組api可以進行對已有RDD進行反復封裝成為新的RDD,這里采用的是裝飾者設計模式,下面為部分裝飾器類圖
    • action:當調用RDD的action類操作方法時(collect、reduce、lookup、save ),這觸發DAGScheduler的Job提交
  • DAGScheduler:創建一個名為JobSubmitted的消息至DAGSchedulerEventProcessLoop阻塞消息隊列(LinkedBlockingDeque)中
  • DAGSchedulerEventProcessLoop:啟動名為【dag-scheduler-event-loop】的線程實時消費消息隊列
  • 【dag-scheduler-event-loop】處理完成后回調JobWaiter
  • DAGScheduler:打印Job執行結果
  • JobSubmitted:相關代碼如下(其中jobId為DAGScheduler全局遞增Id):
eventProcessLoop.post(JobSubmitted(
  jobId, rdd, func2, partitions.toArray, callSite, waiter,
  SerializationUtils.clone(properties)))
  • 最終示例:

     

     最終轉化的RDD分為四層,每層都依賴於上層RDD,將ShffleRDD封裝為一個Job存入DAGSchedulerEventProcessLoop待處理,如果我們的代碼中存在幾段上面示例代碼,那么就會創建對應對的幾個ShffleRDD分別存入DAGSchedulerEventProcessLoop
 
三、RDD分解為待執行任務集合(TaskSet
     Job提交后,DAGScheduler根據RDD層次關系解析為對應的Stages,同時維護Job與Stage的關系。
     將最上層的Stage根據並發關系(findMissingPartitions )分解為多個Task,將這個多個Task封裝為TaskSet提交給TaskScheduler。非最上層的Stage的存入處理的列表中(waitingStages += stage)
 流程如下:
  
  • DAGSchedulerEventProcessLoop中,線程【dag-scheduler-event-loop】處理到JobSubmitted
  • 調用DAGScheduler進行handleJobSubmitted
    • 首先根據RDD依賴關系依次創建Stage族,Stage分為ShuffleMapStage,ResultStage兩類

    • 更新jobId與StageId關系Map
    • 創建ActiveJob,調用LiveListenerBug,發送SparkListenerJobStart指令
    • 找到最上層Stage進行提交,下層Stage存入waitingStage中待后續處理
      • 調用OutputCommitCoordinator進行stageStart()處理
      • 調用LiveListenerBug, 發送 SparkListenerStageSubmitted指令
      • 調用SparkContext的broadcast方法獲取Broadcast對象
      • 根據Stage類型創建對應多個Task,一個Stage根據findMissingPartitions分為多個對應的Task,Task分為ShuffleMapTask,ResultTask

      • 將Task封裝為TaskSet,調用TaskScheduler.submitTasks(taskSet)進行Task調度,關鍵代碼如下:
taskScheduler.submitTasks(new TaskSet(
  tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
 
四、TaskSet封裝為TaskSetManager並提交至Driver
     TaskScheduler將TaskSet封裝為TaskSetManager(new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)),存入待處理任務池(Pool)中,發送DriverEndpoint喚起消費(ReviveOffers)指令
      
  • DAGSheduler將TaskSet提交給TaskScheduler的實現類,這里是TaskChedulerImpl
  • TaskSchedulerImpl創建一個TaskSetManager管理TaskSet,關鍵代碼如下:
     new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
  • 同時將TaskSetManager添加SchedduableBuilder的任務池Poll中
  • 調用SchedulerBackend的實現類進行reviveOffers,這里是standlone模式的實現類StandaloneSchedulerBackend
  • SchedulerBackend發送ReviveOffers指令至DriverEndpoint
 
五、Driver將TaskSetManager分解為TaskDescriptions並發布任務到Executor
     Driver接受喚起消費指令后,將所有待處理的TaskSetManager與Driver中注冊的Executor資源進行匹配,最終一個TaskSetManager得到多個TaskDescription對象,按照TaskDescription想對應的Executor發送LaunchTask指令
當Driver獲取到ReviveOffers(請求消費)指令時
  • 首先根據executorDataMap緩存信息得到可用的Executor資源信息(WorkerOffer),關鍵代碼如下
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
  new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
  • 接着調用TaskScheduler進行資源匹配,方法定義如下:
     def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {..}
    • 將WorkerOffer資源打亂(val shuffledOffers = Random.shuffle(offers))
    • 將Poo中待處理的TaskSetManager取出(val sortedTaskSets = rootPool.getSortedTaskSetQueue),
    • 並循環處理sortedTaskSets並與shuffledOffers循環匹配,如果shuffledOffers(i)有足夠的Cpu資源( if (availableCpus(i) >= CPUS_PER_TASK) ),調用TaskSetManager創建TaskDescription對象(taskSet.resourceOffer(execId, host, maxLocality)),最終創建了多個TaskDescription,TaskDescription定義如下:
new TaskDescription(
  taskId,
  attemptNum,
  execId,
  taskName,
  index,
  sched.sc.addedFiles,
  sched.sc.addedJars,
  task.localProperties,
  serializedTask)
  • 如果TaskDescriptions不為空,循環TaskDescriptions,序列化TaskDescription對象,並向ExecutorEndpoint發送LaunchTask指令,關鍵代碼如下:
for (task <- taskDescriptions.flatten) {
  val serializedTask = TaskDescription.encode(task)
  val executorData = executorDataMap(task.executorId)
  executorData.freeCores -= scheduler.CPUS_PER_TASK
  executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM