Spark(五十二):Spark Scheduler模塊之DAGScheduler流程


導入

從一個Job運行過程中來看DAGScheduler是運行在Driver端的,其工作流程如下圖:

圖中涉及到的詞匯概念:

1. RDD——Resillient Distributed Dataset 彈性分布式數據集。

2. Operation——作用於RDD的各種操作分為transformation和action。

3. Job——作業,一個JOB包含多個RDD及作用於相應RDD上的各種operation。

4. Stage——一個作業分為多個階段。

5. Partition——數據分區, 一個RDD中的數據可以分成多個不同的區。

6. DAG——Directed Acycle graph,有向無環圖,反應RDD之間的依賴關系。

7. Narrow dependency——窄依賴,子RDD依賴於父RDD中固定的data partition。

8. Wide Dependency——寬依賴,子RDD對父RDD中的所有data partition都有依賴。

9. Caching Managenment——緩存管理,對RDD的中間計算結果進行緩存管理以加快整體的處理速度。

在Driver端,運行一個job時,涉及到DAGSheduler的流程如下:

1)調用applicaiton_jar.jar(應用程序入口函數),應用程序的運行過程依賴SparkContext,且需要初始化SparkContext sc,通過sc就可以創建RDD了(因為RDD是調用sc來創建的);

2)初始化SparkContext過程中會初始化DAGScheduler,並調用SparkContext.createTaskScheduler(this, master, deployMode)來初始化TaskScheduler、ShedulerBackend;

3)應用程序實際上是執行的RDD的transform或者action函數,當RDD#action函數觸發時,實際上這樣的action函數內部會調用sc.submitJob(...)方法,在SparkContext#submitJob(...)方法內部會根據action創建ResultStage,並找到其依賴的所有ShuffleMapStage。stage之間按照順序執行,待前一個stage執行完成成功,才能執行下一個stage,所有stage執行成功后,該job才算執行完成。

4)stage實際上可以看作為TaskSet,它實際上代表的就是一個獨立的Task集合,DAGScheduler將調用TaskScheduler來對TaskSet進行作業調度;

5)TaskScheduler調度過程是將task序列化通過RPC傳遞給Executor,Executor上會使用TaskRunner來運行task;

6)TaskScheduler上task如果運行失敗,TaskScheduler會重試處理;同樣在stage失敗后,DAGScheduler也會觸發stage重試處理。需要注意:這里如果stage失敗,對當前stage重算,而不是從上一個stage開始,這樣也是DAG划分stage的原因。

Spark任務Scheduler實現

Spark任務調用主要實現類包含三個角色:

1)org.apache.spark.scheduler.DAGScheduler

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

2)org.apache.spark.scheduler.SchedulerBackend

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

3)org.apache.spark.scheduler.TaskScheduler

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala

TaskScheduler是一個trait,它的作用是從DAGScheduler接收不同的Stage的任務,並向Executor提交這些任務(並為執行特別慢的任務啟動備份任務)。TaskScheduler是實現多種任務調度器的集成,TaskSchedulerImpl是唯一的實現。

TaskSchedulerImpl類使用時,必選先調用TaskSchedulerImpl#initialize(backend: SchedulerBackend),TaskSchedulerImpl#start(),然后才可以調用TaskSchedulerImpl#submitTasks(taskSet: TaskSet)來提交任務,從initialize(backend: SchedulerBackend)的參數上可以看出,TaskSchedulerImpl使用過程中需要依賴SchedulerBackend。

實際上,上邊提到的TaskScheduler向Executor提交任務需要依賴於SchedulerBackend。

SchedulerBackend也是一個trait,每個SchedulerBackend都會對應一個唯一的TaskScheduler。SchedulerBackend的作用是分配當前可用的資源,為Task分配計算資源(Executor),並在分配的Executor上啟動Task。SchedulerBackend的最基本實現類是:CoarseGrainedSchedulerBackend,繼承了CoarseGrainedSchedulerBackend的類包含:StandaloneSchedulerBackend、YarnSchedulerBackend,而YarnSchedulerBackend子類包含:YarnClientSchedulerBackend、YarnClusterSchedulerBackend。另外mesos、kubernetes也都包含CoarseGrainedSchedulerBackend的子類:MesosCoarseGrainedSchedulerBackend、KubernetesClusterSchedulerBackend等。

TaskSchedulerImpl在以下幾種場景下調用TaskSchedulerImpl#reviveOffers:

1)有新任務提交時;

2)有任務執行失敗時;

3)計算節點(Executor)不可用時;

4)某些任務執行過慢而需要重新分配資源時。

DAGScheduler源碼分析

DAGScheduler功能:

  1)最高層的調度層,實現了stage-oriented(面向階段)調度。DAGScheduler為每個作業計算出一個描述stages的DAG,跟蹤哪些RDD和stage輸出實現,並找到運行作業的最小計划。然后,它將stages封裝為TaskSets提交給在集群上運行它們的底層TaskScheduler實現(TaskScheduler唯一實現類是TaskSchedulerImpl)。任務集包含完全獨立的一組任務,這些任務可以根據群集中已經存在的數據(例如,前幾個階段的映射輸出文件)立即運行,但如果此數據不可用,它可能會失敗。

  2)Spark stages 是將RDD圖在Shuffle邊界處斷開來創建的。具有“窄(narrow)”依賴關系的RDD操作(如map()和filter())在每個階段中被流水線連接到一組任務中,但是具有shuffle依賴關系的操作需要多個階段(一個階段寫入一組映射輸出文件,另一個階段在屏障后讀取這些文件)。最后,每個階段將只具有對其他階段的shuffle依賴,並且可以在其中計算多個操作。這些操作的實際管道化發生在各種RDD的rdd.compute()函數中。

  3)除了划分stages的DAG之外,DAGScheduler還根據當前緩存狀態確定運行每個task的首選位置,並將這些位置傳遞給底層TaskScheduler(任務調度器)。此外,它還處理由於shuffle輸出文件丟失而導致的故障,在這種情況下,可能需要重新提交舊stages。在內部TaskScheduler會處理stage中不是由shuffle文件丟失引起的失敗,它會在取消整個stage之前對每個任務重試幾次。

  4)要從故障中恢復,同一階段可能需要多次運行,這稱為“attempts”。如果 TaskScheduler 報告某個任務由於前一階段的映射輸出文件丟失而失敗,則DAGScheduler將重新提交該丟失的階段。這是通過具有FetchFailed的CompletionEvent或ExecutorLost事件檢測到的。DAGScheduler將等待一小段時間來查看其他節點或任務是否失敗,然后為計算丟失任務的任何丟失階段重新提交TaskSets(任務集)。作為這個過程的一部分,我們可能還必須為以前清理stage objects的舊(已完成)stage創建stage objects。由於stage的舊“attempts”中的任務可能仍在運行,因此必須小心映射在正確的stage對象中接收到的任何事件。

查看此代碼時,有幾個關鍵概念:

-Jobs:(由[ActiveJob]表示)是提交給調度程序的頂級工作項。例如,當用戶調用諸如count()之類的操作時,作業將通過sc.submitJob()方法提交。每個作業可能需要執行多個階段來構建中間數據。

-Stages:Stage是一組任務(TaskSet),用於計算作業中的中間結果,其中每個任務在同一個RDD的分區上計算相同的函數。

       Stage在shuffle邊界處分離,這會引入一個屏障(我們必須等待上一階段完成獲取輸出)。

       有兩種類型的Stage(階段):【ResultStage】(對於執行操作的最后階段);【ShuffleMapStage】(為shuffle寫入映射輸出文件)。

       如果這些作業重用同一個RDD,則Stage(階段)通常在多個作業之間共享。

-Tasks:是單獨的工作單元,每個工作單元發送到一台機器。

-Cache tracking: DagScheduler會找出緩存哪些RDD以避免重新計算它們,同樣會記住哪些shuffle map stage已經生成了輸出文件,以避免重復shuffle的映射端。

-Preferred locations:DAGScheduler還根據其底層RDD的首選位置,或緩存或無序處理數據的位置,計算在階段中運行每個任務的位置。

-Cleanup:當依賴於它們的正在運行的作業完成時,所有數據結構都會被清除,以防止長時間運行的應用程序中發生內存泄漏。

DAGScheduler構造函數

private[spark] class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging {

  def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
    this(
      sc,
      taskScheduler,
      sc.listenerBus,
      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
      sc.env.blockManager.master,
      sc.env)
  }

  def this(sc: SparkContext) = this(sc, sc.taskScheduler)

DAGScheduler的構造函數參數解釋:

√)sc: SparkContext:當前SparkContext對象,就是applicaiton_jar.jar的main函數調用時初始化的SparkContext對象,而DAGScheduler在SparkContext初始化時初始化的SarpkContext的屬性。

√)taskScheduler: TaskScheduler和DAGScheduler、SchedulerBackend都是在SparkContext初始化時初始化的SparkContext的屬性,因此該參數從當前sc內置的taskScheduler獲取。

√)listenerBus: LiveListenerBus異步處理事件的對象,從sc中獲取。https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

√)mapOutputTracker: MapOutputTrackerMaster運行在Driver端管理shuffle map task的輸出,從sc屬性env:SparkEnv的mapOutputTracker屬性獲取。https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/MapOutputTracker.scala

1)用於跟蹤階段的映射輸出位置的Driver-side類。
2)DAGScheduler使用這個類來(取消)注冊映射輸出狀態,並查找用於執行位置感知的減少任務調度的統計信息。
3)ShuffleMapStage使用MapOutputTrackerMaster類跟蹤可用/丟失的輸出,以確定需要運行哪些任務。

√)blockManagerMaster: BlockManagerMaster運行在Driver端,管理整個Job的Block信息,從sc中env.blockManager.master獲取。https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

√)env: SparkEnvSpark的運行環境,從sc的env屬性獲取。

DAGScheduler屬性

在DAGScheduler的源代碼中,定義了很多屬性,這些屬性在DAGScheduler初始化時被初始化。

  private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)

  private[scheduler] val nextJobId = new AtomicInteger(0)
  private[scheduler] def numTotalJobs: Int = nextJobId.get()
  private val nextStageId = new AtomicInteger(0)

  private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
  private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
  /**
   * Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for
   * that dependency. Only includes stages that are part of currently running job (when the job(s)
   * that require the shuffle stage complete, the mapping will be removed, and the only record of
   * the shuffle data will be in the MapOutputTracker).
   */
  private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
  private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]

  // Stages we need to run whose parents aren't done
  private[scheduler] val waitingStages = new HashSet[Stage]

  // Stages we are running right now
  private[scheduler] val runningStages = new HashSet[Stage]

  // Stages that must be resubmitted due to fetch failures
  private[scheduler] val failedStages = new HashSet[Stage]

  private[scheduler] val activeJobs = new HashSet[ActiveJob]

  /**
   * Contains the locations that each RDD's partitions are cached on.  This map's keys are RDD ids
   * and its values are arrays indexed by partition numbers. Each array value is the set of
   * locations where that RDD partition is cached.
   *
   * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).
   */
  private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]

  // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
  // every task. When we detect a node failing, we note the current epoch number and failed
  // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
  //
  // TODO: Garbage collect information about failure epochs when we know there are no more
  //       stray messages to detect.
  private val failedEpoch = new HashMap[String, Long]

  private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator

  // A closure serializer that we reuse.
  // This is only safe because DAGScheduler runs in a single thread.
  private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()

  /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
  private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)

  /**
   * Whether to unregister all the outputs on the host in condition that we receive a FetchFailure,
   * this is set default to false, which means, we only unregister the outputs related to the exact
   * executor(instead of the host) on a FetchFailure.
   */
  private[scheduler] val unRegisterOutputOnHostOnFetchFailure =
    sc.getConf.get(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE)

  /**
   * Number of consecutive stage attempts allowed before a stage is aborted.
   */
  private[scheduler] val maxConsecutiveStageAttempts =
    sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
      DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)

  /**
   * Number of max concurrent tasks check failures for each barrier job.
   */
  private[scheduler] val barrierJobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int]

  /**
   * Time in seconds to wait between a max concurrent tasks check failure and the next check.
   */
  private val timeIntervalNumTasksCheck = sc.getConf
    .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL)

  /**
   * Max number of max concurrent tasks check failures allowed for a job before fail the job
   * submission.
   */
  private val maxFailureNumTasksCheck = sc.getConf
    .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES)

  private val messageScheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")

  private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

√)metricsSource: DAGSchedulerSource:metrics system的Source角色,內注冊了failedStages、runningStages、waitingStages、allJobs、activeJobs這些度量監控。https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
√)nextJobId:AtomicInteger:生成jobId
√)numTotalJobs: Int:總的job數
√)nextStageId:AtomicInteger:下一個StageId
√)jobIdToStageIds:HashMap[Int, HashSet[Int]]:記錄某個job與其包含的所有stageId的映射
√)stageIdToStage:HashMap[Int, Stage]:記錄stageId與Stage的映射
√)shuffleIdToMapStage:HashMap[Int, ShuffleMapStage]:記錄每一個shuffle對應的ShuffleMapStage,key為shuffleId。
√)jobIdToActiveJob:HashMap[Int, ActiveJob]:記錄處於Active狀態的Job,key為jobId,value為ActiveJob類型的對象。
√)waitingStages:HashSet[Stage]:等待運行的stage,一般這些是在登台Parent Stage運行完成才能開始。
√)runningStages:HashSet[Stage]:處於Running狀態的stage
√)failedStages:HashSet[Stage]:處於Failed狀態的stage,失敗原因因為fetch failures的stage,並等待重新提交。
√)activeJobs:HashSet[ActiveJob]:處於Active狀態的job列表
√)cacheLocs:HashMap[Int, IndexedSeq[Seq[TaskLocation]]]:維護着RDD的partitions 的 location信息。Map的key是rdd的id,value是rdd對應的partition編號索引的數組。每個數組值都是緩存該rdd partition的location set集合
√)failedEpoch:HashMap[String, Long]:對於跟蹤失敗的節點,我們使用MapOutputTracker的epoch編號,它與每個任務一起發送。當我們檢測到一個節點失敗時,我們記錄到當前的epoch編號和失敗的執行器,為新任務增加它,並使用它忽略雜散的ShuffleMapTask結果。
√)outputCommitCoordinator:env.outputCommitCoordinator:輸出提交協調器
√)closureSerializer:JavaSerializer:重用的閉包序列化程序。它是安全的,因為DagScheduler在單個線程中運行。
√)disallowStageRetryForTest:Boolean:變量“spark.test.noStageRetry”,如果啟用,FetchFailed將不會導致階段重試,以顯示問題。
√)unRegisterOutputOnHostOnFetchFailure:Boolean:是否在接收到FetchFailure的情況下注銷主機上的所有輸出,這將設置為默認值false,這意味着在FetchFailure時,我們只注銷與確切的執行器(而不是主機)相關的輸出。
√)maxConsecutiveStageAttempts:Int:變量“spark.stage.maxConsecutiveAttempts”,中止stage之前允許的連續stage嘗試次數。
√)barrierJobIdToNumTasksCheckFailures:ConcurrentHashMap[Int, Int]:每個屏障作業的最大並發任務檢查失敗數。
√)timeIntervalNumTasksCheck:Int:在最大並發任務檢查失敗和下一次檢查之間等待的時間(秒)。
√)maxFailureNumTasksCheck:Int:作業提交失敗前允許的最大並發任務檢查失敗數。
√)messageScheduler:ScheduledExecutorService:dag-scheduler-message線程池調度器(調度的線程內部通過eventProcessLoop來實現: ResubmitFailedStages/JobSubmitted )
√)eventProcessLoop:DAGSchedulerEventProcessLoop:DAGSchedulerEventProcessLoop類定義在DAGScheduler類文件下,集成了EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") 。

  • EventLoop實際上內置了一個用來存儲消息的隊列,對外提供了post方法用來接收消息存放到隊列中,一個消費隊列中消息的線程,消費線程以死循環方式獲取隊列中消息,當獲取到消息后調用 onReceive(msg)進行消息處理。
  • DAGSchedulerEventProcessLoop的構造函數接收dagScheduler: DAGScheduler,在onReceive方法中會根據消息類型調用dagScheduler的不同方法進行消息處理。
  • DAGScheduler與EventLoop之間配合工作圖:

在DAGScheduler類的屬性中定義eventProcessLoop:DAGSchedulerEventProcessLoop成員變量,DAGScheduler類初始化過程中會初始化變量eventProcessLoop = new DAGSchedulerEventProcessLoop(this),初始化最后一步是調用eventProcessLoop.start()來啟動該事件循環處理。
接下來我們來分析eventProcessLoop的相關定義以及它的工作方式:

EventLoop定義:

EventLoop是個消息異步處理策略抽象類:

/**
 * An event loop to receive events from the caller and process all events in the event thread. It
 * will start an exclusive event thread to process all events.
 *
 * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
 * handle events in time to avoid the potential OOM.
 */
private[spark] abstract class EventLoop[E](name: String) extends Logging {

  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  private val stopped = new AtomicBoolean(false)

  // Exposed for testing.
  private[spark] val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }

  def stop(): Unit = {
    if (stopped.compareAndSet(false, true)) {
      eventThread.interrupt()
      var onStopCalled = false
      try {
        eventThread.join()
        // Call onStop after the event thread exits to make sure onReceive happens before onStop
        onStopCalled = true
        onStop()
      } catch {
        case ie: InterruptedException =>
          Thread.currentThread().interrupt()
          if (!onStopCalled) {
            // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
            // it's already called.
            onStop()
          }
      }
    } else {
      // Keep quiet to allow calling `stop` multiple times.
    }
  }

  /**
   * Put the event into the event queue. The event thread will process it later.
   */
  def post(event: E): Unit = {
    eventQueue.put(event)
  }

  /**
   * Return if the event thread has already been started but not yet stopped.
   */
  def isActive: Boolean = eventThread.isAlive

  /**
   * Invoked when `start()` is called but before the event thread starts.
   */
  protected def onStart(): Unit = {}

  /**
   * Invoked when `stop()` is called and the event thread exits.
   */
  protected def onStop(): Unit = {}

  /**
   * Invoked in the event thread when polling events from the event queue.
   *
   * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
   * and cannot process events in time. If you want to call some blocking actions, run them in
   * another thread.
   */
  protected def onReceive(event: E): Unit

  /**
   * Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError`
   * will be ignored.
   */
  protected def onError(e: Throwable): Unit

}

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/util/EventLoop.scala

1)內置了一個消息隊列eventQueue: BlockingQueue[E],配合實現消息存儲、消息消費使用;
2)對外開放了接收消息的post方法:接收到外部消息並存入隊列,等待被消費;
3)內置了一個消費線程eventThread,消費線程以阻塞死循環方式消費隊列中的消息,消費處理接口函數是onReceive(event: E),消費異常函數接口onError(e: Throwable);
4)還提供了消費線程啟動方法start,在調用線程啟動方法:eventThread.start()之前,需要調用onStart()為啟動做准備接口函數;
5)還提供了消費線程停止方法stop,在調用線程停止方法:eventThread.interrupt&eventThread.join()之后,需要調用onStop()做補充接口函數。

DAGSchedulerEventProcessLoop定義:

顧名思義,DAGSchedulerEvent事件循環處理。

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {

  private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer

  /**
   * The main event loop of the DAG scheduler.
   */
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }

  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

    case StageCancelled(stageId, reason) =>
      dagScheduler.handleStageCancellation(stageId, reason)

    case JobCancelled(jobId, reason) =>
      dagScheduler.handleJobCancellation(jobId, reason)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId, reason) =>
      val workerLost = reason match {
        case SlaveLost(_, true) => true
        case _ => false
      }
      dagScheduler.handleExecutorLost(execId, workerLost)

    case WorkerRemoved(workerId, host, message) =>
      dagScheduler.handleWorkerRemoved(workerId, host, message)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case SpeculativeTaskSubmitted(task) =>
      dagScheduler.handleSpeculativeTaskSubmitted(task)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion: CompletionEvent =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

  override def onError(e: Throwable): Unit = {
    logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
    try {
      dagScheduler.doCancelAllJobs()
    } catch {
      case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
    }
    dagScheduler.sc.stopInNewThread()
  }

  override def onStop(): Unit = {
    // Cancel any active jobs in postStop hook
    dagScheduler.cleanUpAfterSchedulerStop()
  }
}

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

1)DAGSchedulerEventProcessLoop繼承了EventLoop抽象類(其實EventLoop也是泛型類,這里泛型類型為DAGSchedulerEvent),並在構造函數中傳遞了DAGScheduler類對象;
2)對外提供了DAGSchedulerEvent接收DAGSchedulerEvent事件,並將接收到DAGSchedulerEvent事件存儲到隊列中;
3)在內部阻塞死循環方式去從隊列中獲取DAGSchedulerEvent事件,獲取到后並處理它[調用onReceive(event: DAGSchedulerEvent)方法];
4)onReceive方式內部調用了私有方法doOnReceive(event),在doOnReceive方法中會根據event類型不同去調用dagScheduler的不同handleXxx方法(真正事件處理最后還歸結於DAGScheduler中);
5)DAGSchedulerEvent是一個接口類,它的實現類包含:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

JobSubmitted:已提交作業(RDD)

MapStageSubmitted:已提交MapStage

StageCancelled:已取消Stage

JobCancelled:已取消Job

JobGroupCancelled:已取消JobGroup

AllJobsCancelled:已取消所有Job

BeginEvent:開始Event

GettingResultEvent:獲取結果Event

CompletionEvent:完成Event

ExecutorAdded:已添加Executor

ExecutorLost:Executor丟失

WorkerRemoved:已被移除Worker

TaskSetFailed:已除失敗TaskSet

ResubmitFailedStages:重新提交已失敗的Stages

SpeculativeTaskSubmitted:已提交推測性Task

DAGScheduler的生命周期

那么下邊將會結合代碼對DAGScheduler整個生命周期進行介紹,DAGScheduler的生命周期:

1)初始化DAGScheduler

2)根據RDD DAG划分Stages

3)對Stage進行調度、Stage容錯

1)DAGScheduler之初始化

 當一個spark application代碼被提交yanr上時,比如yarn-cluster方式提交,通過SparkSubmit->YarnClusterApplication類中運行的是Client中run方法,Client#run()->ApplicationMaster#userClassThread用來執行application main的線程,當執行applicatin main函數時,會先初始化SparkContext對象,在初始化SparkContext過程會初始化DAGScheduler:

  @volatile private var _dagScheduler: DAGScheduler = _
  。。。
  private[spark] def dagScheduler: DAGScheduler = _dagScheduler
  private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = {
    _dagScheduler = ds
  }
  。。。
  // Create and start the scheduler
  val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
  _schedulerBackend = sched
  _taskScheduler = ts
  _dagScheduler = new DAGScheduler(this)
  _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
  // constructor
  _taskScheduler.start()

在DAGScheduler初始化過程中會初始化DAGScheduler變量eventProcessLoop = new DAGSchedulerEventProcessLoop(this),初始化最后一步是調用eventProcessLoop.start()來啟動該事件循環處理。

2)DAGScheduler之根據RDD DAG划分Stages

application jar的代碼[RDD(Spark Core),注意Dataset、DataFrame、sparkSession.sql("select ...")經過catalyst代碼解析會將代碼轉化為RDD,
SparkSQL底層依然是RDD]最終是RDD計算,RDD計算分為兩類:transform、action。
Each RDD has 2 sets of parallel operations: transformation and action.(1)Transformation:Return a MappedRDD[U] by applying function f to each element

map(func)

filter(func)

flatMap(func)

mapPartitions(func)

mapPartitionsWithIndex(func)

sample(withReplacement, fraction, seed)

union(otherDataset)

intersection(otherDataset)

distinct([numTasks]))

groupByKey([numTasks])

reduceByKey(func, [numTasks])

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

sortByKey([ascending], [numTasks])

join(otherDataset, [numTasks])

cogroup(otherDataset, [numTasks])

cartesian(otherDataset)

pipe(command, [envVars])

coalesce(numPartitions)

repartition(numPartitions)

repartitionAndSortWithinPartitions(partitioner)

(2)Action:Return T by reducing the elements using specified commutative and associative binary operator

reduce(func)

collect()

count()

first()

take(n)

takeOrdered(n, [ordering])

saveAsTextFile(path)

saveAsSequenceFile(path)

saveAsObjectFile(path)

countByKey()

foreach(func)

在applicaiton jar代碼開始執行時,當遇到action操作時,就會調用sc.runJob(...)。下邊以WordCount為例來展開RDD DAG圖划分Stage流程:

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val wordFile = "E:\\personOrder.csv"
    val conf = new SparkConf().setMaster("local[1,1]").setAppName("wordcount");
    val sc = new SparkContext(conf)
    val input = sc.textFile(wordFile, 2).cache()
    val lines = input.flatMap(line => line.split("[,|-]"))
    val count = lines.map(word => (word, 1)).reduceByKey { case (x, y) => x + y }
    count.foreach(println)
  }
}

當application jar的main被調用,代碼執行到count.foreach(println)時,RDD#foreach底層實現如下:

  /**
   * Applies a function f to all elements of this RDD.
   */
  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

SparkContext#runJob用來提交job的方法
在RDD中的一組給定partitions上運行函數,並將結果傳遞給給定的處理程序函數。這是Spark中所有操作的主要入口點。

   /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @param resultHandler callback to pass each result to
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

參數解析:
1)rdd:要在其上運行任務的目標RDD
2)func:在RDD的每個分區上運行的函數
3)partitions:要運行的分區集;某些作業可能不希望在目標RDD的所有分區上進行計算,例如對於“first()”之類的操作。`。生成方式:{0 until rdd.partitions.length}

RDD#first方法實現:

  /**
   * Return the first element in this RDD.
   */
  def first(): T = withScope {
    take(1) match {
      case Array(t) => t
      case _ => throw new UnsupportedOperationException("empty collection")
    }
  }

從上邊代碼我們看出RDD#first內部是調用RDD#take(num)方法,那么我們來查看RDD#take方法:

  /**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   *
   * @note This method should only be used if the resulting array is expected to be small, as
   * all the data is loaded into the driver's memory.
   *
   * @note Due to complications in the internal implementation, this method will raise
   * an exception if called on an RDD of `Nothing` or `Null`.
   */
  def take(num: Int): Array[T] = withScope {
    val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
    if (num == 0) {
      new Array[T](0)
    } else {
      val buf = new ArrayBuffer[T]
      val totalParts = this.partitions.length
      var partsScanned = 0
      while (buf.size < num && partsScanned < totalParts) {
        // The number of partitions to try in this iteration. It is ok for this number to be
        // greater than totalParts because we actually cap it at totalParts in runJob.
        var numPartsToTry = 1L
        val left = num - buf.size
        if (partsScanned > 0) {
          // If we didn't find any rows after the previous iteration, quadruple and retry.
          // Otherwise, interpolate the number of partitions we need to try, but overestimate
          // it by 50%. We also cap the estimation in the end.
          if (buf.isEmpty) {
            numPartsToTry = partsScanned * scaleUpFactor
          } else {
            // As left > 0, numPartsToTry is always >= 1
            numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
            numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
          }
        }

        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

        res.foreach(buf ++= _.take(num - buf.size))
        partsScanned += p.size
      }

      buf.toArray
    }
  }

取RDD的num個元素。它內部實現:首先掃描一個分區,然后使用該分區的結果來估計是否滿足num個元素結果,不滿足則嘗試從下一個分區中獲取,依次循環處理知道取夠num個元素。
@注意:只有當結果數組很小時才應使用此方法,因為所有數據都加載到驅動程序的內存中。
@注意:由於內部實現的復雜性,如果對“Nothing”或“null”的RDD調用此方法,則會引發異常。

4)resultHandler:回調函數,以將每個分區結果傳遞給Xxx,

比如:

  /**
   * Run a function on a given set of partitions in an RDD and return the results as an array.
   * The function that is run against each partition additionally takes `TaskContext` argument.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @return in-memory collection with a result of the job (each collection element will contain
   * a result from one partition)
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  }

上邊代碼中:resultHadnler是:(index, res) => results(index) = res;將每個partittion中計算結果,賦值給results[partition index],最終並返回results結果集。

DAGScheduler#runJob方法:

在SparkContext#runJob方法內部會調用dagScheduler.runJob(xxx)方法,也就是將stage划分和任務提交分配了dagScheduler來處理,來看下DAGScheduler#runJob代碼:

  /**
   * Run an action job on the given RDD and pass all the results to the resultHandler function as
   * they arrive.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   *
   * @note Throws `Exception` when the job fails
   */
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }

runJob方法的解釋:對給定的RDD運行操作作業,並在結果到達時將所有結果傳遞給resultHandler函數。
參數解析:
1)rdd:要在其上運行任務的參數RDD目標RDD
2)func:在RDD的每個分區上運行的函數
3)partitions:要運行的分區的集;某些作業可能不希望在目標RDD的所有分區上進行計算,例如,對於first()之類的操作。
4)callSite:在用戶程序中調用此作業的位置
5)resultHandler:回調函數,以將每個分區結果傳遞給Xxx
6)properties:要附加到此作業的scheduler屬性,例如fair scheduler pool name
注意:在作業失敗時引發“Exception”
DAGScheduler#runJob內部實現分析:
1)調用DAGScheduler#submitJob(...)方法提交作業,並接收返回值waiter。
2)使用ThreadUtils.awaitRedy(...)來等待waiter處理完成,實際上這里是阻塞等待Job結束;
3)根據waiter完成后返回值作相應響應:Success,打印:‘Job x finished:xxx’;Failure,打印:‘Job x failed:xxx’,並拋出異常。

DAGScheduler#submitJob(...)方法

  /**
   * Submit an action job to the scheduler.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   *
   * @return a JobWaiter object that can be used to block until the job finishes executing
   *         or can be used to cancel the job.
   *
   * @throws IllegalArgumentException when partitions ids are illegal
   */
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

DAGScheduler#submitJob(...)方法內部實現:
第一步:封裝一個JobWaiter對象;
第二步:將JobWaiter對象賦值給JobSubmitted的listener屬性,並將JobSubmitted(DAGSchedulerEvent事件)對象傳遞給eventProcessLoop事件循環處理器。eventProcessLoop內部事件消息處理線程將會接收JobSubmitted事件,並調用dagScheduler.handleJobSubmitted(...)方法來處理事件;
第三步:返回JobWaiter對象。

DAGScheduler#handleJobSubmitted(...)方法:

需要說明:該方法是被eventProcessLoop:DAGSchedulerEventProcessLoop下的事件處理線程(獲取到JobSubmitted事件后)調用的,因此該方法與主線程不是同一個線程下執行的。

 private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: BarrierJobSlotsNumberCheckFailed =>
        logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
          "than the total number of slots in the cluster currently.")
        // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
        val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
          new BiFunction[Int, Int, Int] {
            override def apply(key: Int, value: Int): Int = value + 1
          })
        if (numCheckFailures <= maxFailureNumTasksCheck) {
          messageScheduler.schedule(
            new Runnable {
              override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
                partitions, callSite, listener, properties))
            },
            timeIntervalNumTasksCheck,
            TimeUnit.SECONDS
          )
          return
        } else {
          // Job failed, clear internal data.
          barrierJobIdToNumTasksCheckFailures.remove(jobId)
          listener.jobFailed(e)
          return
        }

      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    // Job submitted, clear internal data.
    barrierJobIdToNumTasksCheckFailures.remove(jobId)

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)
  }

 當Job提交后,JobSubmitted事件會被eventProcessLoop捕獲到,然后進入本方法中。開始處理Job,並執行Stage的划分。

Stage的划分:

Stage的划分過程中,會涉及到寬依賴和窄依賴的概念,寬依賴是Stage的分界線,連續的窄依賴都屬於同一Stage。 

比如上圖中,在RDD G處調用了Action操作,在划分Stage時,會從G開始逆向分析,G依賴於B和F,其中對B是窄依賴,對F是寬依賴,所以F和G不能算在同一個Stage中,即在F和G之間會有一個Stage分界線。上圖中還有一處寬依賴在A和B之間,所以這里還會分出一個Stage。最終形成了3個Stage,由於Stage1和Stage2是相互獨立的,所以可以並發執行,等Stage1和Stage2准備就緒后,Stage3才能開始執行。 

Stage有兩個類型,最后的Stage為ResultStage類型,除此之外的Stage都是ShuffleMapStage類型。

Stage類定義:

private[scheduler] abstract class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int,
    val parents: List[Stage],
    val firstJobId: Int,
    val callSite: CallSite)
  extends Logging {

  val numPartitions = rdd.partitions.length

  /** Set of jobs that this stage belongs to. */
  val jobIds = new HashSet[Int]

  /** The ID to use for the next new attempt for this stage. */
  private var nextAttemptId: Int = 0

  val name: String = callSite.shortForm
  val details: String = callSite.longForm

  /**
   * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
   * here, before any attempts have actually been created, because the DAGScheduler uses this
   * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
   * have been created).
   */
  private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)

  /**
   * Set of stage attempt IDs that have failed. We keep track of these failures in order to avoid
   * endless retries if a stage keeps failing.
   * We keep track of each attempt ID that has failed to avoid recording duplicate failures if
   * multiple tasks from the same stage attempt fail (SPARK-5945).
   */
  val failedAttemptIds = new HashSet[Int]

  private[scheduler] def clearFailures() : Unit = {
    failedAttemptIds.clear()
  }

  /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
  def makeNewStageAttempt(
      numPartitionsToCompute: Int,
      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
    val metrics = new TaskMetrics
    metrics.register(rdd.sparkContext)
    _latestInfo = StageInfo.fromStage(
      this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
    nextAttemptId += 1
  }

  /** Returns the StageInfo for the most recent attempt for this stage. */
  def latestInfo: StageInfo = _latestInfo

  override final def hashCode(): Int = id

  override final def equals(other: Any): Boolean = other match {
    case stage: Stage => stage != null && stage.id == id
    case _ => false
  }

  /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
  def findMissingPartitions(): Seq[Int]
}

Stage的RDD參數只有一個RDD, final RDD, 而不是一系列的RDD。
因為在一個stage中的所有RDD都是map, partition不會有任何改變, 只是在data依次執行不同的map function所以對於TaskScheduler而言, 一個RDD的狀況就可以代表這個stage。
Stage是一組並行任務,所有這些任務都在計算需要作為一部分運行的相同函數
在Spark Job中,所有任務都具有相同的shuffle依賴項。每個任務的DAG運行由調度程序在發生shuffle的邊界處分為多個stages,然后DagScheduler以拓撲順序運行這些階段。
每個stage都可以是shuffle map stage,在這種情況下,任務的結果將輸入到其他階段或結果階段,在這種情況下,其任務直接計算Spark action(例如count()、save()等)在RDD上運行函數。對於shuffle map stages,我們還跟蹤每個輸出分區所在的節點。
每個stage也有一個FirstJobID,用於標識第一個提交階段的作業。當使用FIFO調度時,這允許首先計算早期作業的階段,或者在失敗時更快地恢復。
最后,由於故障恢復,一個stage可以在多次嘗試中重新執行。在這種情況下,stage對象將跟蹤要傳遞給偵聽器(listeners)或Web UI的多個StageInfo對象。
最新版本將通過LatestInfo訪問。
@id 唯一階段ID
@rdd 這個階段運行的RDD:對於shuffle map stage,是我們運行映射任務的RDD,而對於結果階段,是我們運行操作的目標RDD
@tasks 階段中任務的總數;結果階段可能不需要計算所有分區,例如first()、lookup()和take()。
@parents 此階段依賴的階段列表(通過shuffle dependencies)。
@firstJobId 此階段所屬的第一個作業的ID,用於FIFO調度。
@callSite 與此階段關聯的用戶程序中的調用站點位置:創建目標RDD的位置、shuffle map stage的位置或調用結果階段的action的位置。

1)DAGScheduler#handleJobSubmitted(...)方法之createResultStage

    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: BarrierJobSlotsNumberCheckFailed =>
        logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
          "than the total number of slots in the cluster currently.")
        // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
        val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
          new BiFunction[Int, Int, Int] {
            override def apply(key: Int, value: Int): Int = value + 1
          })
        if (numCheckFailures <= maxFailureNumTasksCheck) {
          messageScheduler.schedule(
            new Runnable {
              override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
                partitions, callSite, listener, properties))
            },
            timeIntervalNumTasksCheck,
            TimeUnit.SECONDS
          )
          return
        } else {
          // Job failed, clear internal data.
          barrierJobIdToNumTasksCheckFailures.remove(jobId)
          listener.jobFailed(e)
          return
        }

      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }

上邊這段代碼是 DAGScheduler#handleJobSubmitted 中划分Stage的主要實現代碼。前面提到了Stage的划分是從最后一個Stage開始逆推的,每遇到一個寬依賴處,就分裂成另外一個Stage,依此類推直到Stage划分完畢為止。並且,只有最后一個Stage的類型是ResultStage類型。

因此,finalStage的類型是:ResultStage。

2)DAGScheduler#createResultStage(...)

  /**
   * Create a ResultStage associated with the provided jobId.
   */
  private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    // 獲取當前Stage的parent Stage,這個方法是划分Stage的核心實現
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    // 創建當前最后的ResultStage
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    // 將ResultStage與stageId相關聯
    stageIdToStage(id) = stage
    // 更新該job中包含的stage
    updateJobIdStageIdMaps(jobId, stage)
    // 返回ResultStage
    stage
  }

上邊3個check解釋:

1)checkBarrierStageWithDynamicAllocation(rdd):不支持在啟用動態資源分配的情況下運行屏障階段,這將導致一些混亂的行為(例如,在啟用動態資源分配的情況下,我們可能會獲得一些執行者(但不足以在屏障階段啟動所有任務),並在以后釋放它們執行器空閑時間到期,然后重新獲取)。如果在啟用動態資源分配的情況下運行屏障階段,將在作業提交時執行檢查並快速失敗。

2)checkBarrierStageWithNumSlots(rdd):檢查屏障階段是否需要比當前活動插槽總數更多的插槽(以便能夠一起啟動屏障階段中的所有任務)。如果嘗試提交一個比當前總數需要更多插槽的屏障階段,則提前檢查失敗。如果檢查連續失敗,超過作業的配置數量,則當前作業提交失敗。

3)checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size):檢查以確保我們不使用不支持的RDD鏈模式啟動屏障階段。不支持以下模式:

   1.與生成的RDD具有不同分區數的祖先RDD(例如union()/coalesce()/first()/take()/PartitionPruningRDD);

   2.第二步。依賴多個屏障RDD的RDD(如barrierRdd1.zip(barrierRdd2))。

3)DAGScheduler#getOrCreateParentStages(...)

獲取或創建給定RDD的parentStage列表。將使用提供的firstJobId創建新階段。

  /**
   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
   */
  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }

這個方法主要是為當前的RDD向前探索,找到寬依賴處划分出parentStage。

4)DAGScheduler#getShuffleDependencies(...)

采用的是深度優先遍歷找到Action算子的父依賴中的寬依賴

這個是最主要的方法,要看懂這個方法,其實后面的就好理解,最好結合這例子上面給出的RDD邏輯依賴圖,比較容易看出來,根據上面的RDD邏輯依賴圖,其返回的ShuffleDependency就是RDD2和RDD1,RDD7和RDD6的依賴。

如果存在A<-B<-C,這兩個都是shuffle依賴,那么對於C其只返回B的shuffle依賴,而不會返回A

  /**
   * Returns shuffle dependencies that are immediate parents of the given RDD.
   *
   * This function will not return more distant ancestors.  
   * For example, if C has a shuffle dependency on B which has a shuffle dependency on A: 
   *     A <-- B <-- C calling this function with rdd C will only return the B <-- C dependency.
   *
   * This function is scheduler-visible for the purpose of unit testing.
   */
  private[scheduler] def getShuffleDependencies(
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
    //用來存放依賴
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    //遍歷過的RDD放入這個里面
    val visited = new HashSet[RDD[_]]    
    
    //創建一個待遍歷RDD的棧結構
    val waitingForVisit = new ArrayStack[RDD[_]]
    //壓入finalRDD,邏輯圖中的RDD9
    waitingForVisit.push(rdd)
    
    //循環遍歷這個棧結構
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      
      // 如果RDD沒有被遍歷過,則執行if內部的代碼
      if (!visited(toVisit)) {      
        //然后把其放入已經遍歷隊列中
        visited += toVisit
        //得到依賴,我們知道依賴中存放的有父RDD的對象
        toVisit.dependencies.foreach {
          //如果這個依賴是shuffle依賴,則放入返回隊列中
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep
          //如果不是shuffle依賴,把其父RDD壓入待訪問棧中,從而進行循環
          case dependency =>
            waitingForVisit.push(dependency.rdd)
        }
      }
    }
    parents
  }

5)DAGScheduler#getOrCreateShuffleMapStage(...)

如果shuffleIdToMapStage中存在shuffle,則獲取shuffle map stage。否則,如果shuffle map stage不存在,該方法將創建shuffle map stage以及任何丟失的parent shuffle map stage。

  /**
   * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
   * shuffle map stage doesn't already exist, this method will create the shuffle map stage in
   * addition to any missing ancestor shuffle map stages.
   */
  private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>
        stage
    
      case None =>
        // Create stages for all missing ancestor shuffle dependencies.
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
          // that were not already in shuffleIdToMapStage, it's possible that by the time we
          // get to a particular dependency in the foreach loop, it's been added to
          // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
          // SPARK-13902 for more information.
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            createShuffleMapStage(dep, firstJobId)
          }
        }
        // Finally, create a stage for the given shuffle dependency.
        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }

6)DAGScheduler#createShuffleMapStage(...)

創建一個ShuffleMapStage,它生成給定的shuffle依賴項的分區。如果先前運行的stage生成了相同的shuffle 數據,則此函數將復制先前shuffle 中仍然可用的輸出位置,以避免不必要地重新生成數據。

  /**
   * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a
   * previously run stage generated the same shuffle data, this function will copy the output
   * locations that are still available from the previous shuffle to avoid unnecessarily
   * regenerating data.
   */
  def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
    val numTasks = rdd.partitions.length
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ShuffleMapStage(
      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)

    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)

    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
  }

 通過上面的源代碼分析,結合RDD的邏輯執行圖,我們可以看出,這個job擁有三個Stage,一個ResultStage,兩個ShuffleMapStage,一個ShuffleMapStage中的RDD是RDD1,另一個stage中的RDD是RDD6,從而,以上完成了RDD到Stage的切分工作。當切分完成后在handleJobSubmitted這個方法的最后,調用提交stage的方法。

3)DAGScheduler之對Stage進行調度、容錯

回顧下任務提交流程,一個application.jar執行流程中什么時候開始進行stage提交:

1)DAGScheduler初始化:當一個spark application代碼被提交yanr上時,比如yarn-cluster方式提交,通過SparkSubmit->YarnClusterApplication類中運行的是Client中run方法,Client#run()->ApplicationMaster#userClassThread用來執行application main的線程,當執行applicatin main函數時,會先初始化SparkContext對象,在初始化SparkContext過程會初始化DAGScheduler:
2)當執行application jar的代碼RDD(Spark Core)時,當遇到rdd的action操作時,就會調用:
--> SparkContext#runJob用來提交job的方法
--> DAGScheduler#runJob方法
--> DAGScheduler#submitJob(...)方法,提交給eventProcessLoop
--> eventProcessLoop內部事件處理器會調用DAGScheduler#handleJobSubmitted(...)方法;
3)在DAGScheduler#handleJobSubmitted(...)方法中,會調用“finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)”生成stage;submitStage(finalStage)用來提交stage。
下面,讓我看下stage提交具體代碼執行流程:
1)DAGScheduler#handleJobSubmitted(...)方法之submitStage(finalStage)

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    。。。。。。

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)
  }

2)DAGScheduler#submitStage方法

  /** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        // 獲取該stage未提交的父stages,並按stage id從小到大排序
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          // 若無未提交的父stage, 則提交該stage對應的tasks
          submitMissingTasks(stage, jobId.get)
        } else {
          // 若存在未提交的父stage, 依次提交所有父stage (若父stage也存在未提交的父stage, 則提交之, 依次類推); 並把該stage添加到等待stage隊列中
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

提交Stage,首先遞歸提交無父Stage的Stage。

1)若無未提交的父stage, 則提交該stage對應的tasks;

2)若存在未提交的父stage, 依次提交所有父stage (若父stage也存在未提交的父stage, 則提交之, 依次類推); 並把該stage添加到等待stage隊列中

3)DAGScheduler#getMissingParentStages方法

getMissingParentStages與DAGScheduler划分stage中介紹的getOrCreateParentStages有點像,但不同的是不再需要划分stage,並對每個stage的狀態做了判斷,源碼及注釋如下:

  // 以參數stage為起點,向前遍歷所有stage,判斷stage是否為未提交,若使則加入missing中
  private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]  // 未提交的stage
    val visited = new HashSet[RDD[_]] // 存儲已經被訪問到得RDD
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new ArrayStack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
              // 若為寬依賴,生成新的stage
              case shufDep: ShuffleDependency[_, _, _] => 
                // 這里調用getShuffleMapStage不像在getParentStages時需要划分stage,而是直接根據shufDep.shuffleId獲取對應的ShuffleMapStage
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) { // 若stage得狀態為available則為未提交stage
                  missing += mapStage
                }
              // 若為窄依賴,那就屬於同一個stage。並將依賴的RDD放入waitingForVisit中,以能夠在下面的while中繼續向上visit,直至遍歷了整個DAG圖
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        } 
      }
    }
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  }

上邊提到在調用submitStage中講到,submitStage先調用getMissingParentStages來獲取參數missing是否有未提交的父stages,若有,則依次遞歸(按stage id從小到大排列,也就是stage是從后往前提交的)提交父stages,並將missing加入到waitingStages: HashSet[Stage]中。對於要依次提交的父stage,也是如此。

若missing存在未提交的父stages,則先提交父stages;那么,如果missing沒有未提交的父stage呢(比如,包含從HDFS讀取數據生成HadoopRDD的那個stage是沒有父stage的)?

這時會調用submitMissingTasks(stage, jobId.get),參數就是missing及其對應的jobId.get。這個函數便是將stage與taskSet對應起來,然后DAGScheduler將taskSet提交給TaskScheduler去執行的實施者。

4)DAGScheduler#submitMissingTasks方法

  /** Called when stage's parents are available and we can now do its task. */
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    
    // Step1: 得到RDD中需要計算的partition
    //< 首先得到RDD中需要計算的partition
    //< 對於Shuffle類型的stage,需要判斷stage中是否緩存了該結果;
    //< 對於Result類型的Final Stage,則判斷計算Job中該partition是否已經計算完成
    //< 這么做的原因是,stage中某個task執行失敗其他執行成功地時候就需要找出這個失敗的task對應要計算的partition而不是要計算所有partition
    // First figure out the indexes of partition ids to compute.
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
    // with this Stage
    val properties = jobIdToActiveJob(jobId).properties

    runningStages += stage
    // SparkListenerStageSubmitted should be posted before testing whether tasks are
    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    // event.
    stage match {
      case s: ShuffleMapStage =>
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
      case s: ResultStage =>
        outputCommitCoordinator.stageStart(
          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
    }
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

    // If there are tasks to execute, record the submission time of the stage. Otherwise,
    // post the even without the submission time, which indicates that this stage was
    // skipped.
    if (partitionsToCompute.nonEmpty) {
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    }
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    // Step2: 序列化task的binary
    // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
    // the serialized copy of the RDD and for each task we will deserialize it, which means each
    // task gets a different copy of the RDD. This provides stronger isolation between tasks that
    // might modify state of objects referenced in their closures. This is necessary in Hadoop
    // where the JobConf/Configuration object is not thread-safe.
    var taskBinary: Broadcast[Array[Byte]] = null
    var partitions: Array[Partition] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      var taskBinaryBytes: Array[Byte] = null
      // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
      // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
      // consistent view of both variables.
      RDDCheckpointData.synchronized {
        taskBinaryBytes = stage match { 
          //< 對於ShuffleMapTask,將rdd及其依賴關系序列化;在Executor執行task之前會反序列化
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          //< 對於ResultTask,對rdd及要在每個partition上執行的func
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

        partitions = stage.rdd.partitions
      }

      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      // In the case of a failure during serialization, abort the stage.
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString, Some(e))
        runningStages -= stage

        // Abort execution
        return
      case e: Throwable =>
        abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage

        // Abort execution
        return
    }
    // Step3: 為每個需要計算的partiton生成一個task
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            //< RDD對應的partition
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    // Step4: 提交tasks
    if (tasks.size > 0) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)

      stage match {
        case stage: ShuffleMapStage =>
          logDebug(s"Stage ${stage} is actually done; " +
              s"(available: ${stage.isAvailable}," +
              s"available outputs: ${stage.numAvailableOutputs}," +
              s"partitions: ${stage.numPartitions})")
          markMapStageJobsAsFinished(stage)
        case stage : ResultStage =>
          logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
      }
      submitWaitingChildStages(stage)
    }
  }
  • Step1: 得到RDD中需要計算的partition

對於Shuffle類型的stage,需要判斷stage中是否緩存了該結果;對於Result類型的Final Stage,則判斷計算Job中該partition是否已經計算完成。這么做(沒有直接提交全部tasks)的原因是,stage中某個task執行失敗其他執行成功的時候就需要找出這個失敗的task對應要計算的partition而不是要計算所有partition

  • Step2: 序列化task的binary

Executor可以通過廣播變量得到它。每個task運行的時候首先會反序列化

  • Step3: 為每個需要計算的partiton生成一個task

ShuffleMapStage對應的task全是ShuffleMapTask; ResultStage對應的全是ResultTask。task繼承Serializable,要確保task是可序列化的。

  • Step4: 提交tasks

先用tasks來初始化一個TaskSet對象,再調用TaskScheduler.submitTasks提交

 

 

參考:

1)《Spark運行機制之DAG原理

2)《Spark Scheduler模塊詳解-DAGScheduler實現

3)《spark源碼走讀之DAGScheduler

4)《Spark Scheduler模塊源碼分析之DAGScheduler

5)《Spark Scheduler模塊源碼分析之TaskScheduler和SchedulerBackend

6)《spark DAGScheduler、TaskSchedule、Executor執行task源碼分析

7)《[Spark源碼剖析] DAGScheduler提交stage

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM