Spark 資源調度 與 任務調度


1571883328592

  • Spark 資源調度與任務調度的流程(Standalone):

    • 啟動集群后, Worker 節點會向 Master 節點匯報資源情況, Master掌握了集群資源狀況。

    • 當 Spark 提交一個 Application 后, 根據 RDD 之間的依賴關系將 Application 形成一個 DAG 有向無環圖。

    • 任務提交后, Spark 會在任務端創建兩個對象: DAGSchedular 和 TaskScheduler

    • DAGSchedular 是任務調度的高層調度器, 是一個對象

    • DAGScheduler 的主要作用是 將 DAG 根據 RDD 之間的寬窄依賴關系划分為一個個的Stage, 然后將 stage 以 TaskSet 的形式 提交給 TaskScheduler

      • TaskScheduler 是任務調度的底層調度器

      • TaskSet 其實就是一個集合, 里面封裝的就是一個個task任務, 也就是stage中的並行度 task 任務

        package org.apache.spark.scheduler
        
        import java.util.Properties
        
        /**
         * A set of tasks submitted together to the low-level TaskScheduler, 
         * usually representing missing partitions of a particular stage.
         * 一同被提交到低等級的任務調度器的 一組任務集, 通常代表了一個特定的 stage(階
         * 段) 的 缺失的分區
         */
        private[spark] class TaskSet(
            // 任務數組
            val tasks: Array[Task[_]],
            // 階段Id
            val stageId: Int,
            // 嘗試的階段Id(也就是下級Stage?)
            val stageAttemptId: Int,
            // 優先級
            val priority: Int,
            // 是個封裝過的Hashtable
            val properties: Properties) {
            // 拼接 階段Id 和 嘗試的階段Id 
          val id: String = stageId + "." + stageAttemptId
          // 重寫 toString 
          override def toString: String = "TaskSet " + id
        }
        
      • TaskScheduler 會遍歷 TaskSet 集合, 拿到每個 task 后將 task發送到 Executor 中執行

      • 其實就是發送到Executor中的線程池ThreadPool去執行

      • 當 task 執行失敗時, 則由TaskSchedular負責重試, 將 task重新發送給 Executor 去執行, 默認重試 3 次

          //提交task,最后一行  backend.reviveOffers()  調用的是CoarseGrainedSchedulerBackend對象中的方法
          override def submitTasks(taskSet: TaskSet) {
            // 獲取任務數組  
            val tasks = taskSet.tasks
            logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
            // 加同步鎖  
            this.synchronized {
               // 創建任務集管理器 參數: 任務集, 最大容忍任務失敗次數
              val manager = createTaskSetManager(taskSet, maxTaskFailures)
               // 階段Id
              val stage = taskSet.stageId
              
                // taskSetsByStageIdAndAttempt 是一個 HashMap[Int, TaskSetManager]
                /* getOrElseUpdate(key: A, op: => B): B= 
                * 如果 key 已經在這個 map 中, 就返回其對應的value
                * 否則就根據已知的表達式 'op' 計算其對應的value 並將其存儲到 map中, 並返回該 value
                */
        val stageTaskSets =
                taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
                // 將階段任務集合設置為任務管理器
              stageTaskSets(taskSet.stageAttemptId) = manager
                // 獲取沖突的任務集 如果 stageTaskSets 的任務集 不是傳入的任務集 並且stageTaskSets的任務集不是僵屍進程 那么它就是沖突的任務集
              val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
                ts.taskSet != taskSet && !ts.isZombie
              }
                // 如果有沖突的任務集
              if (conflictingTaskSet) {
                throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
                  s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
              }
                // 通過可調度的構造器創建一個任務集管理器 
              schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
              // 如故不是本地提交 或者 沒有接收到任務
              if (!isLocal && !hasReceivedTask) {
              	// 通過飢餓的計時器 來 根據 固定的比例進行調度
              	// scheduleAtFIxedRate 方法的三個參數: 時間任務, 延遲時間, 周期 如果延遲時間或周期值為父會拋出異常
                starvationTimer.scheduleAtFixedRate(new TimerTask() {
                  override def run() {
                      // 如果沒有發送任務
                    if (!hasLaunchedTask) {
                      logWarning("Initial job has not accepted any resources; " +
                        "check your cluster UI to ensure that workers are registered " +
                        "and have sufficient resources")
                    } else {
                        // 如果發送了任務, 就取消
                      this.cancel()
                    }
                  }
                  // 默認的飢餓超時臨界值: 15s
                }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
              }
              hasReceivedTask = true
            }
            // 調用 CoarseGrainedSchedulerBackend對象中的方法
            backend.reviveOffers()
          }
        
      • 如果重試 3 次 依然失敗, 那么這個 task 所在的 stage 就失敗了

      • 如果 stage 失敗了則由 DAGScheduler 來負責重試, 重新發送 TaskSet 到 TaskScheduler, Stage 默認重試 4 次。

      • 如果重試 4 次 以后依然失敗, 那么 該 job 就失敗了。

      • 一個 job 失敗了, Application 就失敗了。

    • TaskScheduler 不僅能重試失敗的 task, 還會重試 straggling(直譯是掙扎的, 這邊可以意譯為緩慢的) task(執行速度比其他task慢太多的task)

      /**
          * TaskScheduler 啟動
          */
        override def start() {
          //StandaloneSchedulerBackend 啟動
          backend.start()
      
          if (!isLocal && conf.getBoolean("spark.speculation", false)) {
            logInfo("Starting speculative execution thread")
              // 啟動定期執行推測任務線程
            speculationScheduler.scheduleWithFixedDelay(new Runnable {
              override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
                  // 檢查所有活躍的jon中是否有可推測的任務
                checkSpeculatableTasks()
              }
            }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
          }
        }
      
      // Check for speculatable tasks in all our active jobs.
      // 檢查是否有可推測的任務
        def checkSpeculatableTasks() {
          // 是否應該重新激活  
          var shouldRevive = false
          // 加同步鎖  
          synchronized {
             // 檢查是否有可推測的任務(傳入執行推測所需的最小時間) 
            shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION)
          }
            // 如果需要重新激活
          if (shouldRevive) {
            // 就嘗試運行推測任務   
            backend.reviveOffers()
          }
        }
      
    • Spark 推測執行機制:

      • 如果有運行緩慢的 task, 那么 TaskScheduler 會啟動一個新的 task 來與該運行緩慢的 task 執行相同的處理邏輯。

      • 兩個 task 哪個先執行完, 就以哪個 task 的執行結果為准。

      • 在 Spark 中推測執行默認是關閉的。

      • 推測執行可以通過 spark.speculation 屬性來配置

          /**
           * Return a speculative task for a given executor if any are available
           * 如果有卡殼的進程,就向已知的executor進程返回一個推測任務 
           * The task should not have an attempt running on this host, in case 
           * the host is slow. 
           * 該任務不應有任何嘗試任務在該主機上運行, 以防止該主機是有延遲的
           * In addition, the task should meet the given locality constraint.
           * 此外, 該任務需要滿足已知的本地約束
           */
          // Labeled as protected to allow tests to override providing speculative tasks if necessary
          // 標注為 protected 以允許測試 來重寫 提供的推測任務(如果需要的話)
          protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
            : Option[(Int, TaskLocality.Value)] =
          {
            // 從推測式執行任務列表中移除已經成功完成的task	
            speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
        
            // 1.判斷 task 是否可以在該executor對應的Host上執行, 判斷條件為:
            // 2.沒有taskAttempt在該Host上運行
            // 3. 該 executor 沒有在 task 的黑名單中(task 在該executor上失敗過, 並且仍在‘黑暗’時間中)
            def canRunOnHost(index: Int): Boolean = {
              !hasAttemptOnHost(index, host) &&
                !isTaskBlacklistedOnExecOrNode(index, execId, host)
            }
            // 判斷推測執行任務集合是否為空
            if (!speculatableTasks.isEmpty) {
              // Check for process-local tasks; 
              // 檢查 本地進程任務
              // note that tasks can be process-local on multiple nodes when we replicate cached blocks, as in Spark Streaming
              // 需要注意的是: 當我們備份緩存塊時, 任務可以以本地進程 或者 多節點的形式運行 (就像spark流那樣)
              for (index <- speculatableTasks if canRunOnHost(index)) {
                val prefs = tasks(index).preferredLocations
                val executors = prefs.flatMap(_ match {
                  case e: ExecutorCacheTaskLocation => Some(e.executorId)
                  case _ => None
                });
                // 如果 executor 進程包含該任務Id
                if (executors.contains(execId)) {
                	// 就不推測該任務
                  speculatableTasks -= index
                  // 返回某個本地進程
                  return Some((index, TaskLocality.PROCESS_LOCAL))
                }
              }
        
              // Check for node-local tasks 檢查本地節點的任務
              if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
                for (index <- speculatableTasks if canRunOnHost(index)) {
                  val locations = tasks(index).preferredLocations.map(_.host)
                  if (locations.contains(host)) {
                    speculatableTasks -= index
                    return Some((index, TaskLocality.NODE_LOCAL))
                  }
                }
              }
        
              // Check for no-preference tasks 檢查非優先級的任務
              if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
              	// 遍歷 speculatableTasks, 如果有任務能夠在主機上運行
                for (index <- speculatableTasks if canRunOnHost(index)) {
                	// 獲取該task的優先級位置
                  val locations = tasks(index).preferredLocations
                  if (locations.size == 0) {
                    speculatableTasks -= index
                    return Some((index, TaskLocality.PROCESS_LOCAL))
                  }
                }
              }
        
              // Check for rack-local tasks 監察本地構建的任務
              if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
                for (rack <- sched.getRackForHost(host)) {
                  for (index <- speculatableTasks if canRunOnHost(index)) {
                    val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost)
                    if (racks.contains(rack)) {
                      speculatableTasks -= index
                      return Some((index, TaskLocality.RACK_LOCAL))
                    }
                  }
                }
              }
        
              // Check for non-local tasks 檢查非本地性的任務
              if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
                for (index <- speculatableTasks if canRunOnHost(index)) {
                  speculatableTasks -= index
                  return Some((index, TaskLocality.ANY))
                }
              }
            }
        
            None
          }
        
        • 需要注意的是:
          • 對於 ETL(Extract Transformation Load) 類型的需要導入數據庫的業務需要關閉推測執行機制, 否則會有重復的數據導入數據庫。
          • 如果遇到數據傾斜的情況, 開啟推測執行則有可能導致一直會有 task 重新啟動處理相同的邏輯, 任務可能一直處於處理不完的狀態。
  • 粗粒度資源申請 和 細粒度資源申請

    • 粗粒度資源申請(Spark)

      • 在 Application 執行之前, 將所有的資源申請完畢, 當資源申請成功后, 才會進行任務的調度, 當所有的 task 執行完成后才會釋放這部分資源

      • 優點

        • 在 Application 執行之前, 所有的資源都申請完畢, 每一個 task 直接使用資源就可以了, 不需要 task 在執行前自己去申請資源。
        • task 執行快了 => stage 執行就快了 => job 執行就快了 => application 執行就快了
      • 缺點

        • 直到最后一個 task 執行完成才會釋放資源, 集群的資源無法充分利用。(俗稱: 占着M坑不拉S)
    • 細粒度資源申請(MR)

      • Application 執行之前不需要先去申請資源, 而是直接執行, 讓 job 中的每一個 task 在執行前自己去申請資源, task執行完成就釋放資源。

      • 優點

        • 集群的資源可以充分利用
      • 缺點

        • task自己去申請資源,task啟動變慢,Application的運行就響應的變慢了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM