Spark Executor Task 的執行和數量


基本原理 (YARN 模式)

每個 stage 會有多個 partition,每個 partition 由 Executor 的一個 Task 執行

stage 的默認 partition 數量由 spark.default.parallelism 參數決定,默認由 parent stage 決定

最大可以同時執行多少 Task,由三個參數決定

  • Executor 的數量,由 spark.executor.instances 或 --num-executors 指定,默認是 1
  • Executor 的核數,由 spark.executor.cores 或 --executor-cores 指定,默認是 1
  • 每個 Task 需要的核數,由 spark.task.cpus 指定,默認是 1

Executor 的數量,還有 Executor 的核數,參考實際的機器數量和 CPU 數量,但可以配的比機器數和 CPU 數大

具體流程 (YARN 模式)

在 DAGScheduler 中

  • 提交 Job 時執行 handleJobSubmitted 函數,handleJobSubmitted 函數調用 submitStage 函數
  • submitStage 函數是一個遞歸函數,從最后一個 action stage 開始,不斷往前尋找 parent stage,將有 parent stage 的 stage 添加到 waitingStages,直到找到第一個 stage,對其執行 submitMissingTasks 函數
  • submitMissingTasks 獲取 stage 的所有 partitions,為每個 partitions 創建 task,調用 taskScheduler.submitTasks 提交 stage 的所有 task
  • 完成后再對相應的 waitingStages 再調用 submitStage

在 TaskSchedulerImpl 中

  • submitTasks 函數添加所有 task 然后調用 backend.reviveOffers() 函數

在 CoarseGrainedSchedulerBackend 中

  • reviveOffers 調用 driverEndpoint.send(ReviveOffers)
  • Backend 的內部類 DriverEndpoint 收到后調用 makeOffers()
  • makeOffers 函數 scheduler.resourceOffers(workOffers) 分配資源
  • 對能分配到資源的 task 調用 launchTasks(taskDescs)
  • launchTasks 發送 LaunchTask 消息給相應的 CoarseGrainedExecutorBackend
  • CoarseGrainedExecutorBackend 就是真正的 Executor JVM 程序

在 TaskSchedulerImpl 中

  • resourceOffers 調用 resourceOfferSingleTaskSet
  • resourceOfferSingleTaskSet 判斷是否有足夠的 CPU 資源

在 CoarseGrainedExecutorBackend 中

  • 收到 LaunchTask 后,調用 executor.launchTask 啟動任務
  • Executor.launchTask,調用線程池執行任務

每個 Executor 的 Task 最大並發數量,由 Executor 定義的 CPU (默認是 1) 和 Task 定義的 CPU (默認是 1) 決定

代碼

// DAGScheduler.scala

  private[scheduler] def handleJobSubmitted(......) {
    ......
    submitStage(finalStage)
    ......
  }

  private def submitStage(stage: Stage) {
    ......
        val missing = getMissingParentStages(stage).sortBy(_.id)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
    ......
  }

  private def submitMissingTasks(stage: Stage, jobId: Int) {
    ......
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    val tasks: Seq[Task[_]] = try {
        ......
    }

    if (tasks.size > 0) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
      ......
      submitWaitingChildStages(stage)
    }
  }

// TaskSchedulerImpl.scala

  override def submitTasks(taskSet: TaskSet) {
    ......
    backend.reviveOffers()
  }
// CoarseGrainedSchedulerBackend.scala

  override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
  }

  class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) {
    private def makeOffers(executorId: String) {
      // Make sure no executor is killed while some task is launching on it
      val taskDescs = withLock {
        // Filter out executors under killing
        if (executorIsAlive(executorId)) {
          val executorData = executorDataMap(executorId)
          val workOffers = IndexedSeq(
            new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores,
              Some(executorData.executorAddress.hostPort)))
          scheduler.resourceOffers(workOffers)
        } else {
          Seq.empty
        }
      }
      if (!taskDescs.isEmpty) {
        launchTasks(taskDescs)
      }
    }
  }
// TaskSchedulerImpl.scala

  def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    ......
            launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
              currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
    ......
  }

  private def resourceOfferSingleTaskSet(
    ......
      if (availableCpus(i) >= CPUS_PER_TASK) {
        try {
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            tasks(i) += task
            val tid = task.taskId
            taskIdToTaskSetManager.put(tid, taskSet)
            taskIdToExecutorId(tid) = execId
            executorIdToRunningTaskIds(execId).add(tid)
            availableCpus(i) -= CPUS_PER_TASK
            assert(availableCpus(i) >= 0)
            // Only update hosts for a barrier task.
            if (taskSet.isBarrier) {
              // The executor address is expected to be non empty.
              addressesWithDescs += (shuffledOffers(i).address.get -> task)
            }
            launchedTask = true
          }
        }
      }
    ......
  }

// CoarseGrainedSchedulerBackend.scala

    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      ......
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK

          logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")

          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
      ......
    }
// CoarseGrainedExecutorBackend.scala

  override def receive: PartialFunction[Any, Unit] = {
    ......
    case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskDesc)
      }
    ......

// Executor.scala

  def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    val tr = new TaskRunner(context, taskDescription)
    runningTasks.put(taskDescription.taskId, tr)
    threadPool.execute(tr)
  }




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM