-
Spark 資源調度與任務調度的流程(Standalone):
-
啟動集群后, Worker 節點會向 Master 節點匯報資源情況, Master掌握了集群資源狀況。
-
當 Spark 提交一個 Application 后, 根據 RDD 之間的依賴關系將 Application 形成一個 DAG 有向無環圖。
-
任務提交后, Spark 會在任務端創建兩個對象: DAGSchedular 和 TaskScheduler
-
DAGSchedular 是任務調度的高層調度器, 是一個對象
-
DAGScheduler 的主要作用是 將 DAG 根據 RDD 之間的寬窄依賴關系划分為一個個的Stage, 然后將 stage 以 TaskSet 的形式 提交給 TaskScheduler
-
TaskScheduler 是任務調度的底層調度器
-
TaskSet 其實就是一個集合, 里面封裝的就是一個個task任務, 也就是stage中的並行度 task 任務
package org.apache.spark.scheduler import java.util.Properties /** * A set of tasks submitted together to the low-level TaskScheduler, * usually representing missing partitions of a particular stage. * 一同被提交到低等級的任務調度器的 一組任務集, 通常代表了一個特定的 stage(階 * 段) 的 缺失的分區 */ private[spark] class TaskSet( // 任務數組 val tasks: Array[Task[_]], // 階段Id val stageId: Int, // 嘗試的階段Id(也就是下級Stage?) val stageAttemptId: Int, // 優先級 val priority: Int, // 是個封裝過的Hashtable val properties: Properties) { // 拼接 階段Id 和 嘗試的階段Id val id: String = stageId + "." + stageAttemptId // 重寫 toString override def toString: String = "TaskSet " + id }
-
TaskScheduler 會遍歷 TaskSet 集合, 拿到每個 task 后將 task發送到 Executor 中執行
-
其實就是發送到Executor中的線程池ThreadPool去執行
-
當 task 執行失敗時, 則由TaskSchedular負責重試, 將 task重新發送給 Executor 去執行, 默認重試 3 次
//提交task,最后一行 backend.reviveOffers() 調用的是CoarseGrainedSchedulerBackend對象中的方法 override def submitTasks(taskSet: TaskSet) { // 獲取任務數組 val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") // 加同步鎖 this.synchronized { // 創建任務集管理器 參數: 任務集, 最大容忍任務失敗次數 val manager = createTaskSetManager(taskSet, maxTaskFailures) // 階段Id val stage = taskSet.stageId // taskSetsByStageIdAndAttempt 是一個 HashMap[Int, TaskSetManager] /* getOrElseUpdate(key: A, op: => B): B= * 如果 key 已經在這個 map 中, 就返回其對應的value * 否則就根據已知的表達式 'op' 計算其對應的value 並將其存儲到 map中, 並返回該 value */ val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) // 將階段任務集合設置為任務管理器 stageTaskSets(taskSet.stageAttemptId) = manager // 獲取沖突的任務集 如果 stageTaskSets 的任務集 不是傳入的任務集 並且stageTaskSets的任務集不是僵屍進程 那么它就是沖突的任務集 val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } // 如果有沖突的任務集 if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } // 通過可調度的構造器創建一個任務集管理器 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) // 如故不是本地提交 或者 沒有接收到任務 if (!isLocal && !hasReceivedTask) { // 通過飢餓的計時器 來 根據 固定的比例進行調度 // scheduleAtFIxedRate 方法的三個參數: 時間任務, 延遲時間, 周期 如果延遲時間或周期值為父會拋出異常 starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { // 如果沒有發送任務 if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { // 如果發送了任務, 就取消 this.cancel() } } // 默認的飢餓超時臨界值: 15s }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } // 調用 CoarseGrainedSchedulerBackend對象中的方法 backend.reviveOffers() }
-
如果重試 3 次 依然失敗, 那么這個 task 所在的 stage 就失敗了
-
如果 stage 失敗了則由 DAGScheduler 來負責重試, 重新發送 TaskSet 到 TaskScheduler, Stage 默認重試 4 次。
-
如果重試 4 次 以后依然失敗, 那么 該 job 就失敗了。
-
一個 job 失敗了, Application 就失敗了。
-
-
TaskScheduler 不僅能重試失敗的 task, 還會重試 straggling(直譯是掙扎的, 這邊可以意譯為緩慢的) task(執行速度比其他task慢太多的task)
/** * TaskScheduler 啟動 */ override def start() { //StandaloneSchedulerBackend 啟動 backend.start() if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") // 啟動定期執行推測任務線程 speculationScheduler.scheduleWithFixedDelay(new Runnable { override def run(): Unit = Utils.tryOrStopSparkContext(sc) { // 檢查所有活躍的jon中是否有可推測的任務 checkSpeculatableTasks() } }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS) } }
// Check for speculatable tasks in all our active jobs. // 檢查是否有可推測的任務 def checkSpeculatableTasks() { // 是否應該重新激活 var shouldRevive = false // 加同步鎖 synchronized { // 檢查是否有可推測的任務(傳入執行推測所需的最小時間) shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION) } // 如果需要重新激活 if (shouldRevive) { // 就嘗試運行推測任務 backend.reviveOffers() } }
-
Spark 推測執行機制:
-
如果有運行緩慢的 task, 那么 TaskScheduler 會啟動一個新的 task 來與該運行緩慢的 task 執行相同的處理邏輯。
-
兩個 task 哪個先執行完, 就以哪個 task 的執行結果為准。
-
在 Spark 中推測執行默認是關閉的。
-
推測執行可以通過 spark.speculation 屬性來配置
/** * Return a speculative task for a given executor if any are available * 如果有卡殼的進程,就向已知的executor進程返回一個推測任務 * The task should not have an attempt running on this host, in case * the host is slow. * 該任務不應有任何嘗試任務在該主機上運行, 以防止該主機是有延遲的 * In addition, the task should meet the given locality constraint. * 此外, 該任務需要滿足已知的本地約束 */ // Labeled as protected to allow tests to override providing speculative tasks if necessary // 標注為 protected 以允許測試 來重寫 提供的推測任務(如果需要的話) protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] = { // 從推測式執行任務列表中移除已經成功完成的task speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set // 1.判斷 task 是否可以在該executor對應的Host上執行, 判斷條件為: // 2.沒有taskAttempt在該Host上運行 // 3. 該 executor 沒有在 task 的黑名單中(task 在該executor上失敗過, 並且仍在‘黑暗’時間中) def canRunOnHost(index: Int): Boolean = { !hasAttemptOnHost(index, host) && !isTaskBlacklistedOnExecOrNode(index, execId, host) } // 判斷推測執行任務集合是否為空 if (!speculatableTasks.isEmpty) { // Check for process-local tasks; // 檢查 本地進程任務 // note that tasks can be process-local on multiple nodes when we replicate cached blocks, as in Spark Streaming // 需要注意的是: 當我們備份緩存塊時, 任務可以以本地進程 或者 多節點的形式運行 (就像spark流那樣) for (index <- speculatableTasks if canRunOnHost(index)) { val prefs = tasks(index).preferredLocations val executors = prefs.flatMap(_ match { case e: ExecutorCacheTaskLocation => Some(e.executorId) case _ => None }); // 如果 executor 進程包含該任務Id if (executors.contains(execId)) { // 就不推測該任務 speculatableTasks -= index // 返回某個本地進程 return Some((index, TaskLocality.PROCESS_LOCAL)) } } // Check for node-local tasks 檢查本地節點的任務 if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { for (index <- speculatableTasks if canRunOnHost(index)) { val locations = tasks(index).preferredLocations.map(_.host) if (locations.contains(host)) { speculatableTasks -= index return Some((index, TaskLocality.NODE_LOCAL)) } } } // Check for no-preference tasks 檢查非優先級的任務 if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) { // 遍歷 speculatableTasks, 如果有任務能夠在主機上運行 for (index <- speculatableTasks if canRunOnHost(index)) { // 獲取該task的優先級位置 val locations = tasks(index).preferredLocations if (locations.size == 0) { speculatableTasks -= index return Some((index, TaskLocality.PROCESS_LOCAL)) } } } // Check for rack-local tasks 監察本地構建的任務 if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { for (rack <- sched.getRackForHost(host)) { for (index <- speculatableTasks if canRunOnHost(index)) { val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost) if (racks.contains(rack)) { speculatableTasks -= index return Some((index, TaskLocality.RACK_LOCAL)) } } } } // Check for non-local tasks 檢查非本地性的任務 if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { for (index <- speculatableTasks if canRunOnHost(index)) { speculatableTasks -= index return Some((index, TaskLocality.ANY)) } } } None }
- 需要注意的是:
- 對於 ETL(Extract Transformation Load) 類型的需要導入數據庫的業務需要關閉推測執行機制, 否則會有重復的數據導入數據庫。
- 如果遇到數據傾斜的情況, 開啟推測執行則有可能導致一直會有 task 重新啟動處理相同的邏輯, 任務可能一直處於處理不完的狀態。
- 需要注意的是:
-
-
-
粗粒度資源申請 和 細粒度資源申請
-
粗粒度資源申請(Spark)
-
在 Application 執行之前, 將所有的資源申請完畢, 當資源申請成功后, 才會進行任務的調度, 當所有的 task 執行完成后才會釋放這部分資源
-
優點
- 在 Application 執行之前, 所有的資源都申請完畢, 每一個 task 直接使用資源就可以了, 不需要 task 在執行前自己去申請資源。
- task 執行快了 => stage 執行就快了 => job 執行就快了 => application 執行就快了
-
缺點
- 直到最后一個 task 執行完成才會釋放資源, 集群的資源無法充分利用。(俗稱: 占着M坑不拉S)
-
-
細粒度資源申請(MR)
-
Application 執行之前不需要先去申請資源, 而是直接執行, 讓 job 中的每一個 task 在執行前自己去申請資源, task執行完成就釋放資源。
-
優點
- 集群的資源可以充分利用
-
缺點
- task自己去申請資源,task啟動變慢,Application的運行就響應的變慢了。
-
-