Spark 源碼解析 : DAGScheduler中的DAG划分與提交


一、Spark 運行架構

Spark 運行架構如下圖:
各個RDD之間存在着依賴關系,這些依賴關系形成有向無環圖DAG,DAGScheduler對這些依賴關系形成的DAG,進行Stage划分,划分的規則很簡單,從后往前回溯,遇到窄依賴加入本stage,遇見寬依賴進行Stage切分。完成了Stage的划分,DAGScheduler基於每個Stage生成TaskSet,並將TaskSet提交給TaskScheduler。TaskScheduler 負責具體的task調度,在Worker節點上啟動task。




二、源碼解析:DAGScheduler中的DAG划分
    當RDD觸發一個Action操作(如:colllect)后,導致SparkContext.runJob的執行。而在SparkContext的run方法中會調用DAGScheduler的run方法最終調用了DAGScheduler的submit方法:
   
   
   
           
  1. def submitJob[T, U](
  2. rdd: RDD[T],
  3. func: (TaskContext, Iterator[T]) => U,
  4. partitions: Seq[Int],
  5. callSite: CallSite,
  6. resultHandler: (Int, U) => Unit,
  7. properties: Properties): JobWaiter[U] = {
  8. // Check to make sure we are not launching a task on a partition that does not exist.
  9. val maxPartitions = rdd.partitions.length
  10. partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
  11. throw new IllegalArgumentException(
  12. "Attempting to access a non-existent partition: " + p + ". " +
  13. "Total number of partitions: " + maxPartitions)
  14. }
  15. val jobId = nextJobId.getAndIncrement()
  16. if (partitions.size == 0) {
  17. // Return immediately if the job is running 0 tasks
  18. return new JobWaiter[U](this, jobId, 0, resultHandler)
  19. }
  20. assert(partitions.size > 0)
  21. val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  22. val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
  23. //給eventProcessLoop發送JobSubmitted消息
  24. eventProcessLoop.post(JobSubmitted(
  25. jobId, rdd, func2, partitions.toArray, callSite, waiter,
  26. SerializationUtils.clone(properties)))
  27. waiter
  28. }

DAGScheduler的submit方法中,像eventProcessLoop對象發送了JobSubmitted消息。 eventProcessLoop是 DAGSchedulerEventProcessLoop 類的對象

   
   
   
           
  1. private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

DAGSchedulerEventProcessLoop,接收各種消息並進行處理,處理的邏輯在其doOnReceive方法中:

   
   
   
           
  1. private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
  2.    //Job提交
  1. case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
  2. dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
  3. case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
  4. dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
  5. case StageCancelled(stageId) =>
  6. dagScheduler.handleStageCancellation(stageId)
  7. case JobCancelled(jobId) =>
  8. dagScheduler.handleJobCancellation(jobId)
  9. case JobGroupCancelled(groupId) =>
  10. dagScheduler.handleJobGroupCancelled(groupId)
  11. case AllJobsCancelled =>
  12. dagScheduler.doCancelAllJobs()
  13. case ExecutorAdded(execId, host) =>
  14. dagScheduler.handleExecutorAdded(execId, host)
  15. case ExecutorLost(execId) =>
  16. dagScheduler.handleExecutorLost(execId, fetchFailed = false)
  17. case BeginEvent(task, taskInfo) =>
  18. dagScheduler.handleBeginEvent(task, taskInfo)
  19. case GettingResultEvent(taskInfo) =>
  20. dagScheduler.handleGetTaskResult(taskInfo)
  21. case completion: CompletionEvent =>
  22. dagScheduler.handleTaskCompletion(completion)
  23. case TaskSetFailed(taskSet, reason, exception) =>
  24. dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
  25. case ResubmitFailedStages =>
  26. dagScheduler.resubmitFailedStages()
  27. }

可以把 DAGSchedulerEventProcessLoop 理解成DAGScheduler的對外的功能接口。它對外隱藏了自己內部實現的細節。無論是內部還是外部消息, DAGScheduler可以共用同一消息處理代碼,邏輯清晰,處理方式統一。

接下來分析 DAGScheduler的Stage划分, handleJobSubmitted 方法首先創建ResultStage

   
   
   
           
  1. try {
  2. //創建新stage可能出現異常,比如job運行依賴hdfs文文件被刪除
  3. finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
  4. } catch {
  5. case e: Exception =>
  6. logWarning("Creating new stage failed due to exception - job: " + jobId, e)
  7. listener.jobFailed(e)
  8. return
  9. }

然后調用submitStage方法,進行stage的划分。




首先由finalRDD獲取它的父RDD依賴,判斷依賴類型,如果是窄依賴,則將父RDD壓入棧中,如果是寬依賴,則作為父Stage。

看一下源碼的具體過程:

   
   
   
           
  1. private def getMissingParentStages(stage: Stage): List[Stage] = {
  2. val missing = new HashSet[Stage] //存儲需要返回的父Stage
  3. val visited = new HashSet[RDD[_]] //存儲訪問過的RDD
  4. //自己建立棧,以免函數的遞歸調用導致
  5. val waitingForVisit = new Stack[RDD[_]]

  6. def visit(rdd: RDD[_]) {
  7. if (!visited(rdd)) {
  8. visited += rdd
  9. val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
  10. if (rddHasUncachedPartitions) {
  11. for (dep <- rdd.dependencies) {
  12. dep match {
  13. case shufDep: ShuffleDependency[_, _, _] =>
  14. val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
  15. if (!mapStage.isAvailable) {
  16. missing += mapStage //遇到寬依賴,加入父stage
  17. }
  18. case narrowDep: NarrowDependency[_] =>
  19. waitingForVisit.push(narrowDep.rdd) //窄依賴入棧,
  20. }
  21. }
  22. }
  23. }
  24. }

  25.    //回溯的起始RDD入棧
  26. waitingForVisit.push(stage.rdd)
  27. while (waitingForVisit.nonEmpty) {
  28. visit(waitingForVisit.pop())
  29. }
  30. missing.toList
  31. }

getMissingParentStages方法是由當前stage,返回他的父stage,父stage的創建由getShuffleMapStage返回,最終會調用 newOrUsedShuffleStage 方法返回ShuffleMapStage

   
   
   
           
  1. private def newOrUsedShuffleStage(
  2. shuffleDep: ShuffleDependency[_, _, _],
  3. firstJobId: Int): ShuffleMapStage = {
  4. val rdd = shuffleDep.rdd
  5. val numTasks = rdd.partitions.length
  6. val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
  7. if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
  8. //Stage已經被計算過,從MapOutputTracker中獲取計算結果
  9. val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
  10. val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
  11. (0 until locs.length).foreach { i =>
  12. if (locs(i) ne null) {
  13. // locs(i) will be null if missing
  14. stage.addOutputLoc(i, locs(i))
  15. }
  16. }
  17. } else {
  18. // Kind of ugly: need to register RDDs with the cache and map output tracker here
  19. // since we can't do it in the RDD constructor because # of partitions is unknown
  20. logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
  21. mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
  22. }
  23. stage
  24. }

現在父Stage已經划分好,下面看看你Stage的提交邏輯

   
   
   
           
  1. /** Submits stage, but first recursively submits any missing parents. */
  2. private def submitStage(stage: Stage) {
  3. val jobId = activeJobForStage(stage)
  4. if (jobId.isDefined) {
  5. logDebug("submitStage(" + stage + ")")
  6. if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
  7. val missing = getMissingParentStages(stage).sortBy(_.id)
  8. logDebug("missing: " + missing)
  9. if (missing.isEmpty) {
  10. logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
  11. //如果沒有父stage,則提交當前stage
  12. submitMissingTasks(stage, jobId.get)
  13. } else {
  14. for (parent <- missing) {
  15. //如果有父stage,則遞歸提交父stage
  16. submitStage(parent)
  17. }
  18. waitingStages += stage
  19. }
  20. }
  21. } else {
  22. abortStage(stage, "No active job for stage " + stage.id, None)
  23. }
  24. }

提交的過程很簡單,首先當前stage獲取父stage,如果父stage為空,則當前Stage為起始stage,交給submitMissingTasks處理,如果當前stage不為空,則遞歸調用submitStage進行提交。

到這里,DAGScheduler中的DAG划分與提交就講完了,下次解析這些stage是如果封裝成TaskSet交給TaskScheduler以及TaskSchedule的調度過程。


















免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM