Spark 源碼解析 : DAGScheduler中的DAG划分與提交
》 介紹了DAGScheduler的Stage划分算法。
本文繼續分析Stage被封裝成TaskSet,並將TaskSet提交到集群的Executor執行的過程
在DAGScheduler的submitStage方法中,將Stage划分完成,生成拓撲結構,當一個stage沒有父stage時候,會調用
DAGScheduler的
submitMissingTasks方法來提交該stage包含tasks。
首先來分析一下
DAGScheduler的
submitMissingTasks方法
1.獲取Task的最佳計算位置:
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
val job = s.activeJob.get
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
}
核心是其中的getPreferredLocs方法,根據RDD的數據信息得到task的最佳計算位置,從而獲取較好的數據本地性。其中的細節這里先跳過,在以后的文章在做分析
2.序列化Task的Binary,並進行廣播。Executor端在執行task時會向反序列化Task。
3.根據stage的不同類型創建,為stage的每個分區創建創建task,並封裝成TaskSet。Stage分兩種類型ShuffleMapStage生成ShuffleMapTask,ResultStage生成ResultTask。
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.internalAccumulators)
}
case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, stage.internalAccumulators)
}
}
4.調用TaskScheduler的submitTasks,提交TaskSet
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
submitTasks方法的實現在TaskScheduler的實現類TaskSchedulerImpl中。
4.1 TaskSchedulerImpl的submitTasks方法首先創建TaskSetManager。
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
TaskSetManager負責管理
TaskSchedulerImpl中
一個單獨TaskSet,跟蹤每一個task,如果task失敗,負責重試task直到達到task重試次數的最多次數。並且通過延遲調度來執行task的位置感知調度。
private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,//綁定的TaskSchedulerImpl
val taskSet: TaskSet,
val maxTaskFailures: Int, //失敗最大重試次數
clock: Clock = new SystemClock())
extends Schedulable with Logging
4.2 將TaskSetManger加入schedulableBuilder
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) //將TaskSetManager加入rootPool調度池中,由schedulableBuilder決定調度順序
schedulableBuilder的類型是 SchedulerBuilder,
SchedulerBuilder是一個trait,有兩個實現FIFO
SchedulerBuilder和
Fair
SchedulerBuilder,並且默認采用的是FIFO方式
// default scheduler is FIFO
private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
而
schedulableBuilder的創建是在SparkContext創建SchedulerBackend和TaskScheduler后調用TaskSchedulerImpl的初始化方法進行創建的。
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}
schedulableBuilder是TaskScheduler中一個重要成員,他根據調度策略決定了TaskSetManager的調度順序。
4.3 接下來調用SchedulerBackend的riviveOffers方法對Task進行調度,決定task具體運行在哪個Executor中。
調用CoarseGrainedSchedulerBackend的riviveOffers方法,該方法給driverEndpoint發送ReviveOffer消息
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}
driverEndpoint收到
ReviveOffer消息后調用makeOffers方法
// Make fake resource offers on all executors
private def makeOffers() {
//過濾出活躍狀態的Executor
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
//將Executor封裝成WorkerOffer對象
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
注意:上面代碼中的executorDataMap,在客戶的向Master注冊Application的時候,Master已經為
Application分配並啟動好Executor,然后注冊給
CoarseGrainedSchedulerBackend,注冊信息就是存儲在executorDataMap數據結構中。
准備好計算資源后,接下來TaskSchedulerImpl基於這些計算資源為task分配Executor。
我們看一下
TaskSchedulerImpl的resourceOffers方法:
// 隨機打亂offers
val shuffledOffers = Random.shuffle(offers)
// 構建一個二維數組,保存每個Executor上將要分配的那些task
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
//
根據SchedulerBuilder的調度算法,給TaskManager排好序
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// 使用雙重循環,對每一個taskset 依照調度的順序,依次按照本地性級別順序嘗試啟動task
// 數據本地性級別順序: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
下面看看
resourceOfferSingleTaskSet 方法:
用當前的數據本地性,調用TaskSetManager的resourceOffer方法,在當前executor上分配task
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
//如果executor 的cup數大於 每個task的cup數目(值為1)
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
//
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
}
為Task分配好資源之后,DriverEndpint調用launchTask方法將task在Executor上啟動運行。task在Executor上的啟動運行過程,在后面的文章中會繼續分析,敬請關注。
總結一下調用過程:
TaskSchedulerImpl#
submitTasks
CoarseGrainedSchedulerBackend#
riviveOffers
CoarseGrainedSchedulerBackend$DriverEndpoint#makeOffers
|-TaskSchedulerImpl#resourceOffers(offers) 為offers分配task
|-
TaskSchedulerImpl
#
resourceOfferSingleTaskSet
CoarseGrainedSchedulerBackend$DriverEndpoint#launchTask