DAGScheduler概述:是一個面向Stage層面的調度器;
主要入參有:
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)
rdd: final RDD;
cleanedFunc: 計算每個分區的函數;
resultHander: 結果偵聽器;
主要功能如下:
1、接收用戶提交的job;
2、將job根據類型划分為不同的stage,記錄哪些RDD、Stage被物化,並在每一個stage內產生一系列的task,並封裝成TaskSet;
3、決定每個Task的最佳位置(任務在數據所在的節點上運行),並結合當前的緩存情況;將TaskSet提交給TaskScheduler;
4、重新提交Shuffle輸出丟失的Stage給TaskScheduler;
注:一個Stage內部的錯誤不是由shuffle輸出丟失造成的,DAGScheduler是不管的,由TaskScheduler負責嘗試重新提交task執行;
以如下示例描述Job提交過程:
val sc = new SparkContext("local[2]", "WordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile("xxx") val result = textFile.flatMap(line => line.split("\t")).map(word => (word, 1)).reduceByKey(_ + _) result.collect
RDD.collect
==>sc.runJob #####至此完成了將RDD提交DAGScheduler#####
val results = new Array[U](partitions.size) //result存放的是返回值,數組大小為最后一個RDD的partition的個數
==>dagScheduler.runJob(rdd, func, partitions, resultHandler......) //DAGScheduler的輸入:RDD and partitions to compute
==>dagScheduler.submitJob
==>eventProcessActor ! JobSubmitted
def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal...) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal...) } //完成job到stage的轉換,生成finalStage並提交 private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], allowLocal: Boolean...){ //注意:該RDD是final RDD,而不是一系列的RDD,用finalRDD來創建finalStage //newStage操作對應會生成新的result stage或者shuffle stage:內部有一個isShuffleMap變量來標識該stage是shuffle or result var finalStage: Stage = newStage(rdd, partitions.size, None, jobId, Some(callSite)) //使用finalStage來構建job val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) //對於簡單的job,沒有依賴關系並且只有一個partition,該類job會使用local thread處理而並非提交到TaskScheduler上處理 if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { runLocally(job) } else { submitStage(finalStage) //提交finalStage } }
handleJobSubmitted方法完成了job到stage的轉換,生成finalStage;每個job都有一個finalStage。
newStage()方法分析:根據finalRDD生成finalStage
private def newStage( rdd: RDD[_], numTasks: Int, //task個數就是partitions個數 shuffleDep: Option[ShuffleDependency[_,_]], jobId: Int, callSite: Option[String] = None) : Stage = { val id = nextStageId.getAndIncrement() val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) ...... } private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] def visit(r: RDD[_]) { if (!visited(r)) { visited += r for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_,_] => parents += getShuffleMapStage(shufDep, jobId) case _ => visit(dep.rdd) } } } } visit(rdd) parents.toList } private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = { shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => val stage = newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId) shuffleToMapStage(shuffleDep.shuffleId) = stage stage }
newStage()后產生的finalStage中已經包含了該stage的所有依賴的父Stage;
通過getParentStages()方法構建該stage的依賴關系;反向visit RDD DAG圖,遇到窄依賴就將依賴的RDD加入到stage,遇到寬依賴就切開並遞歸寬依賴的stage;
生成stage實例,stage的id通過nextStageId的值加一得到,task的個數就是partitions的個數;
有兩種類型的Stage:ShuffleStage和ResultStage;
Stage內部有一個isShuffleMap變量標識該Stage是shuffle還是result類型;
Spark對stage的划分是按照寬依賴來進行區分的:根據RDD的依賴關系,如果遇到寬依賴則創建ShuffleStage;
submitStage()方法分析:計算stage之間的依賴關系(Stage DAG)並對依賴關系進行處理
private def submitStage(stage: Stage) { if (!waiting(stage) && !running(stage) && !failed(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) //根據final stage發現是否有parent stage if (missing == Nil) { // 如果計算中發現當前的stage沒有任何依賴或者所有的依賴都已經准備完畢,則提交task submitMissingTasks(stage, jobId.get) running += stage //設置當前的stage為running,因為當前的stage沒有未處理完的依賴的stage } else { //如果有parent stage,需要先submit parent, 因為stage之間需要順序執行 for (parent <- missing) { submitStage(parent) } waiting += stage //當前stage放入到waiting列表中,表示該stage需要等待parent先執行完成 } } } //根據final stage的parents找出所有的parent stage private def getMissingParentStages(stage: Stage): List[Stage] = { ...... dep match { //如果是ShuffleDependency,則新建一個shuffle map stage,且該stage是可用的話則加入missing中 case shufDep: ShuffleDependency[_,_] => //ShuffleDependecy val mapStage = getShuffleMapStage(shufDep, stage.jobId) if (!mapStage.isAvailable) { missing += mapStage } case narrowDep: NarrowDependency[_] => //NarrowDependecy visit(narrowDep.rdd) } }
getMissParentStages(stage)處理步驟:
1、根據該stage得到該stage的parent,也就是RDD的依賴關系,生成parentStage是通過RDD的dependencies;
2、如果依賴關系是寬依賴,則生成一個mapStage來作為finalStage的parent;也就是說對於需要shuffle操作的job,會生成mapStage和finalStage進行處理
3、如果依賴關系是窄依賴,不會生成新的stage。也就是說對於不需要shuffle的job只需要一個finalStage;
注意:getMissParentStages(stage)得到的結果集是按照stageid的降序排列的
submitStage()處理步驟:
1、計算該stage的getMissParentStages(),如果當前stage沒有任何依賴或者所有的依賴都已執行完,則提交該stage;
2、如果發現該stage有依賴的stage未執行,則先執行完所有依賴的父stage(根據getMissParentStages()方法得到的結果集降序來執行stage);
submitMissingTasks()方法分析:把stage根據parition拆分成task(決定每個Task的最佳位置)生成TaskSet,並提交到TaskScheduler
private def submitMissingTasks(stage: Stage, jobId: Int) { //首先根據stage所依賴的RDD的partition的分布,會產生出與partition數量相等的task var tasks = ArrayBuffer[Task[_]]() //對於finalStage或是mapStage會產生不同的task。 //檢查該stage時是否ShuffleMap,如果是則生成ShuffleMapTask if (stage.isShuffleMap) { //mapStage:表示還有其他stage依賴此stage for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { //task根據partition的locality進行分布 val locs = getPreferredLocs(stage.rdd, p) tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs) } } else { //finalStage:該類型stage直接輸出結果生成ResultTask val job = resultStageToJob(stage) for (id <- 0 until job.numPartitions if !job.finished(id)) { val partition = job.partitions(id) val locs = getPreferredLocs(stage.rdd, partition) //由於是ResultTask,因此需要傳入定義的func,也就是如果處理結果返回 tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) } } //向TaskSchuduler提交任務,以stage為單位,一個stage對應一個TaskSet taskSched.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) }
submitMissingTask()方法的處理步驟:
1、通過stage.isShuffleMap來決定生成的是ShuffleMapTask還是ResultTask;
2、如果是ShuffleMapTask則根據stage所依賴的RDD的partition分布,產生和partition數量相同的task,這些task根據partition的locality進行分布’
3、把stage對應生成所有的task封裝到一個TaskSet中,提交給TaskScheduler的submitTasks()方法進行調度;
重新提交shuffle輸出丟失的stage
case ResubmitFailedStages => dagScheduler.resubmitFailedStages() private[scheduler] def resubmitFailedStages() { if (failedStages.size > 0) { logInfo("Resubmitting failed stages") clearCacheLocs() val failedStagesCopy = failedStages.toArray failedStages.clear() for (stage <- failedStagesCopy.sortBy(_.jobId)) { submitStage(stage) } } submitWaitingStages() }
####至此完成了DAGScheduler提交TaskSet到TaskSchuduler#####
Job的生成:
一旦driver程序中出現action,就會生成一個job,比如:count等,向DAGScheduler提交job;如果driver程序后面還有action,那么其他action也會對應生成相應的job;
所以:driver有多少個action就會生成多少個job。為什么spark將driver程序稱為application而不是job的原因,估計就是這吧。
每一個job可能會包含多個stage,最后一個stage產生result。在提交job過程中,DAGScheduler會首先划分stage,然后先提交無parent stage的stages,並在提交過程中計算該stage的task數目和類型,並提交具體的task;無parent stage的stage提交完后,依賴該stage的stage才能提交。