基本原理 (YARN 模式)
每個 stage 會有多個 partition,每個 partition 由 Executor 的一個 Task 執行
stage 的默認 partition 數量由 spark.default.parallelism 參數決定,默認由 parent stage 決定
最大可以同時執行多少 Task,由三個參數決定
- Executor 的數量,由 spark.executor.instances 或 --num-executors 指定,默認是 1
- Executor 的核數,由 spark.executor.cores 或 --executor-cores 指定,默認是 1
- 每個 Task 需要的核數,由 spark.task.cpus 指定,默認是 1
Executor 的數量,還有 Executor 的核數,參考實際的機器數量和 CPU 數量,但可以配的比機器數和 CPU 數大
具體流程 (YARN 模式)
在 DAGScheduler 中
- 提交 Job 時執行 handleJobSubmitted 函數,handleJobSubmitted 函數調用 submitStage 函數
- submitStage 函數是一個遞歸函數,從最后一個 action stage 開始,不斷往前尋找 parent stage,將有 parent stage 的 stage 添加到 waitingStages,直到找到第一個 stage,對其執行 submitMissingTasks 函數
- submitMissingTasks 獲取 stage 的所有 partitions,為每個 partitions 創建 task,調用 taskScheduler.submitTasks 提交 stage 的所有 task
- 完成后再對相應的 waitingStages 再調用 submitStage
在 TaskSchedulerImpl 中
- submitTasks 函數添加所有 task 然后調用 backend.reviveOffers() 函數
在 CoarseGrainedSchedulerBackend 中
- reviveOffers 調用 driverEndpoint.send(ReviveOffers)
- Backend 的內部類 DriverEndpoint 收到后調用 makeOffers()
- makeOffers 函數 scheduler.resourceOffers(workOffers) 分配資源
- 對能分配到資源的 task 調用 launchTasks(taskDescs)
- launchTasks 發送 LaunchTask 消息給相應的 CoarseGrainedExecutorBackend
- CoarseGrainedExecutorBackend 就是真正的 Executor JVM 程序
在 TaskSchedulerImpl 中
- resourceOffers 調用 resourceOfferSingleTaskSet
- resourceOfferSingleTaskSet 判斷是否有足夠的 CPU 資源
在 CoarseGrainedExecutorBackend 中
- 收到 LaunchTask 后,調用 executor.launchTask 啟動任務
- Executor.launchTask,調用線程池執行任務
每個 Executor 的 Task 最大並發數量,由 Executor 定義的 CPU (默認是 1) 和 Task 定義的 CPU (默認是 1) 決定
代碼
// DAGScheduler.scala
private[scheduler] def handleJobSubmitted(......) {
......
submitStage(finalStage)
......
}
private def submitStage(stage: Stage) {
......
val missing = getMissingParentStages(stage).sortBy(_.id)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
......
}
private def submitMissingTasks(stage: Stage, jobId: Int) {
......
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
val tasks: Seq[Task[_]] = try {
......
}
if (tasks.size > 0) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
......
submitWaitingChildStages(stage)
}
}
// TaskSchedulerImpl.scala
override def submitTasks(taskSet: TaskSet) {
......
backend.reviveOffers()
}
// CoarseGrainedSchedulerBackend.scala
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) {
private def makeOffers(executorId: String) {
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
val workOffers = IndexedSeq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort)))
scheduler.resourceOffers(workOffers)
} else {
Seq.empty
}
}
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
}
}
// TaskSchedulerImpl.scala
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
......
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
......
}
private def resourceOfferSingleTaskSet(
......
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager.put(tid, taskSet)
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
// Only update hosts for a barrier task.
if (taskSet.isBarrier) {
// The executor address is expected to be non empty.
addressesWithDescs += (shuffledOffers(i).address.get -> task)
}
launchedTask = true
}
}
}
......
}
// CoarseGrainedSchedulerBackend.scala
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
......
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
......
}
// CoarseGrainedExecutorBackend.scala
override def receive: PartialFunction[Any, Unit] = {
......
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc)
}
......
// Executor.scala
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}
