Spark任務調度
TaskScheduler調度入口:
(1) CoarseGrainedSchedulerBackend 在啟動時會創建DriverEndPoint. 而DriverEndPoint中存在一定時任務,每隔一定時間(spark.scheduler.revive.interval, 默認為1s)進行一次調度(給自身發送ReviveOffers消息, 進行調用makeOffers進行調度)。代碼如下所示
override def onStart() { // Periodically revive offers to allow delay scheduling to work val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s") reviveThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReviveOffers)) } }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS) }
(2)當Executor執行完成已分配任務時,會向Driver發送StatusUpdate消息,當Driver接收到消后會調用 makeOffers(executorId)方法,進行任務調度, CoarseGrainedExecutorBackend 狀態變化時向Driver (DriverEndPoint)向送StatusUpdate消息
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") } }
Dirver接收到StatusUpdate消息時將會觸發設調度(makeOffers),為完成任務的Executor分配任務。
override def receive: PartialFunction[Any, Unit] = { case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId) case None => // Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } } case ReviveOffers => makeOffers() case KillTask(taskId, executorId, interruptThread) => executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread)) case None => // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } }
其中makeOffers方法,會調用TaskSchedulerImpl中的resourceOffers方法,依其的調度策略為Executor分配適合的任務。具體代碼如下:
a、為所有資源分配任務
// Make fake resource offers on all executors private def makeOffers() { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_)) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq launchTasks(scheduler.resourceOffers(workOffers)) }
b、為單個executor分配任務
// Make fake resource offers on just one executor private def makeOffers(executorId: String) { // Filter out executors under killing if (!executorsPendingToRemove.contains(executorId)) { val executorData = executorDataMap(executorId) val workOffers = Seq( new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) launchTasks(scheduler.resourceOffers(workOffers)) } }
分配完任務后,向Executor發送LaunchTask指令,啟動任務,執行用戶邏輯代碼

// Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + "spark.akka.frameSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } }
Spark任務調度策略
Ò FIFO
FIFO(先進先出)方式調度Job,如下圖所示,每個Job被切分成多個Stage.第一個Job優先獲取所有可用資源,接下來第二個Job再獲取剩余可用資源。(每個Stage對應一個TaskSetManager)
Ò FAIR
FAIR共享模式調度下,Spark以在多Job之間輪詢方式為任務分配資源,所有的任務擁有大致相當的優先級來共享集群的資源。FAIR調度模型如下圖:
下面從源碼的角度對調度策略進行說明:
當觸發調度時,會調用TaskSchedulerImpl的resourceOffers方法,方法中會依照調度策略選出要執行的TaskSet, 然后取出適合(考慮本地性)的task交由Executor執行, 其代碼如下:

/** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host activeExecutorIds += o.executorId if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } // Randomly shuffle offers to avoid always placing tasks on the same set of workers. val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { do { launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) } if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
經過分析可知,通過rootPool.getSortedTaskSetQueue對隊列中的TaskSet進行排序,getSortedTaskSetQueue的具體實現如下:
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] val sortedSchedulableQueue = schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue } sortedTaskSetQueue }
由上述代碼可知,其通過算法做為比較器對taskSet進行排序, 其中調度算法有FIFO和FAIR兩種,下面分別進行介紹。
FIFO
優先級(Priority): 在DAGscheduler創建TaskSet時使用JobId做為優先級的值。
FIFO調度算法實現如下所示

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val priority1 = s1.priority val priority2 = s2.priority var res = math.signum(priority1 - priority2) if (res == 0) { val stageId1 = s1.stageId val stageId2 = s2.stageId res = math.signum(stageId1 - stageId2) } if (res < 0) { true } else { false } } }
由源碼可知,FIFO依據JobId進行挑選較小值。因為越早提交的作業,JobId越小。
對同一個作業(Job)來說越先生成的Stage,其StageId越小,有依賴關系的多個Stage之間,DAGScheduler會控制Stage是否會被提交到調度隊列中(若其依賴的Stage未執行完前,此Stage不會被提交),其調度順序可通過此來保證。但若某Job中有兩個無入度的Stage的話,則先調度StageId小的Stage.
Fair
Fair調度隊列相比FIFO較復雜,其可存在多個調度隊列,且隊列呈樹型結構(現階段Spark的Fair調度只支持兩層樹結構),每用戶可以使用sc.setLocalProperty(“spark.scheduler.pool”, “poolName”)來指定要加入的隊列,默認情況下會加入到buildDefaultPool。每個隊列中還可指定自己內部的調度策略,且Fair還存在一些特殊的屬性:
schedulingMode: 設置調度池的調度模式FIFO或FAIR, 默認為FIFO.
minShare:最少資源保證量,當一個隊列最少資源未滿足時,它將優先於其它同級隊列獲取資源。
weight: 在一個隊列內部分配資源時,默認情況下,采用公平輪詢的方法將資源分配給各個應用程序,而該參數則將打破這種平衡。例如,如果用戶配置一個指定調度池權重為2, 那么這個調度池將會獲得相對於權重為1的調度池2倍的資源。
以上參數,可通過conf/fairscheduler.xml文件配置調度池的屬性。
Fair調度算法實現如下所示:

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val minShare1 = s1.minShare val minShare2 = s2.minShare val runningTasks1 = s1.runningTasks val runningTasks2 = s2.runningTasks val s1Needy = runningTasks1 < minShare1 val s2Needy = runningTasks2 < minShare2 val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var compare: Int = 0 if (s1Needy && !s2Needy) { return true } else if (!s1Needy && s2Needy) { return false } else if (s1Needy && s2Needy) { compare = minShareRatio1.compareTo(minShareRatio2) } else { compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) } if (compare < 0) { true } else if (compare > 0) { false } else { s1.name < s2.name } } }
由原碼可知,未滿足minShare規定份額的資源的隊列或任務集先執行;如果所有均不滿足minShare的話,則選擇缺失比率小的先調度;如果均不滿足,則按執行權重比進行選擇,先調度執行權重比小的。如果執行權重也相同的話則會選擇StageId小的進行調度(name=“TaskSet_”+ taskSet.stageId.toString)。
以此為標准將所有TaskSet進行排序, 然后選出優先級最高的進行調度。
Spark 任務調度之任務本地性
當選出TaskSet后,將按本地性從中挑選適合Executor的任務,在Executor上執行。
(詳細見http://www.cnblogs.com/barrenlake/p/4550800.html一小節相關內容)
文章地址: http://www.cnblogs.com/barrenlake/p/4891589.html