前言:本文是我學習Spark 源碼與內部原理用,同時也希望能給新手一些幫助,入道不深,如有遺漏或錯誤的,請在原文評論或者發送至我的郵箱 tongzhenguotongzhenguo@gmail.com
摘要:
1.作業調度核心——DAGScheduler
2.DAGScheduler類說明
2.1DAGScheduler
2.2ActiveJob
2.3Stage
2.4Task
3.工作流程
3.1划分Stage
3.2生成Job,提交Stage
3.3任務集的提交
3.4任務作業完成狀態的監控
3.5任務結果的獲取
內容:
1.作業調度核心——DAGScheduler
用戶代碼都是基於RDD的一系列計算操作,實際運行時,這些計算操作是Lazy執行的,並不是所有的RDD操作都會觸發Spark往Cluster上提交實際作業,基本上只有一些需要返回數據或者向外部輸出的操作才會觸發實際計算工作(Action算子),其它的變換操作基本上只是生成對應的RDD記錄依賴關系(Transformation算子)。
在這些RDD.Action操作中(如count,collect)會自動觸發runJob提交作業,不需要用戶顯式的提交作業(這一部分可以看下Spark DAGSheduler生成Stage過程分析實驗)
作業調度的兩個主要入口是submitJob 和 runJob,兩者的區別在於前者返回一個Jobwaiter對象,可以用在異步調用中,用來判斷作業完成或者取消作業,runJob在內部調用submitJob,阻塞等待直到作業完成(或失敗),以下是源碼部分:
submitJob
/**
* Submit an action job to the scheduler.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @return a JobWaiter object that can be used to block until the job finishes executing
* or can be used to cancel the job.
*
* @throws IllegalArgumentException when partitions ids are illegal
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
runJob
/**
* Run an action job on the given RDD and pass all the results to the resultHandler function as
* they arrive.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @throws Exception when the job fails
*/
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
DAGScheduler最重要的任務之一即制定基於Stage的邏輯調度。先構建Stage之間的DAG圖,然后將Stage提交給TaskScheduler
/** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a * minimal schedule to run the job. It then submits stages as TaskSets to an underlying * TaskScheduler implementation that runs them on the cluster.
2.DAGScheduler類說明
問:DAGScheduler是什么時候生成的?
答:DAGScheduler在SparkContext初始化過程中實例化,一個SparkContext對應一個DAGScheduler
下面提到一些相關的概念:
ActiveJob: Jobs 是以ActiveJob類代表的,ActiveJob 可以根據finalStage區分為兩種:a result job(對應ResultStage)或者a map-stage job(對應ShuffleMapStage,主要用在查詢計划上)。以下是ActiveJob類:
/*
* Jobs 是以ActiveJob類代表的,ActiveJob 可以根據finalStage區分為兩種:
* a result job(對應ResultStage)或者a map-stage job(對應ShuffleMapStage,主要用在查詢計划上)。
*/
private[spark] class ActiveJob(
val jobId: Int,
val finalStage: Stage,
val callSite: CallSite,
val listener: JobListener,
val properties: Properties) {
/**
* Number of partitions we need to compute for this job. Note that result stages may not need
* to compute all partitions in their target RDD, for actions like first() and lookup().
*/
val numPartitions = finalStage match {
case r: ResultStage => r.partitions.length
case m: ShuffleMapStage => m.rdd.partitions.length
}
/** Which partitions of the stage have finished */
val finished = Array.fill[Boolean](numPartitions)(false)
var numFinished = 0
}
Stage:一個Stage就是一組並行的task,各個stage之間以Shuffle為邊界進行划分;Stage 也相應划分為兩種:a shuffle map stage和 a result stage,以下是Stage類:
/*
* 一個Stage就是一組並行的task,各個stage之間以Shuffle為邊界進行划分;
* Stage 也相應划分為兩種:
* a shuffle map stage
* a result stage
*/
private[scheduler] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
val firstJobId: Int,
val callSite: CallSite)
extends Logging {
val numPartitions = rdd.partitions.length
/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]
val pendingPartitions = new HashSet[Int]
/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0
val name: String = callSite.shortForm
val details: String = callSite.longForm
/**
* Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).
*/
private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
/**
* Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
* failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
* We keep track of each attempt ID that has failed to avoid recording duplicate failures if
* multiple tasks from the same stage attempt fail (SPARK-5945).
*/
private val fetchFailedAttemptIds = new HashSet[Int]
private[scheduler] def clearFailures() : Unit = {
fetchFailedAttemptIds.clear()
}
/**
* Check whether we should abort the failedStage due to multiple consecutive fetch failures.
*
* This method updates the running set of failed stage attempts and returns
* true if the number of failures exceeds the allowable number of failures.
*/
private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
fetchFailedAttemptIds.add(stageAttemptId)
fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
}
/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
val metrics = new TaskMetrics
metrics.register(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
nextAttemptId += 1
}
/** Returns the StageInfo for the most recent attempt for this stage. */
def latestInfo: StageInfo = _latestInfo
override final def hashCode(): Int = id
override final def equals(other: Any): Boolean = other match {
case stage: Stage => stage != null && stage.id == id
case _ => false
}
/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
def findMissingPartitions(): Seq[Int]
}
private[scheduler] object Stage {
// The number of consecutive failures allowed before a stage is aborted
val MAX_CONSECUTIVE_FETCH_FAILURES = 4
}
Task:也相應對應兩個類:ShuffleMapTask和ResultTask, 其中前者執行任務並將輸出寫入分區;后者執行任務將輸出發送到驅動程序中(Driver Application)(以后有時間分析任務執行的時候再分析源碼吧)
其他相關說明:
* * - Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them * and likewise remembers which shuffle map stages have already produced output files to avoid * redoing the map side of a shuffle. *
* - Preferred locations: the DAGScheduler also computes where to run each task in a stage based * on the preferred locations of its underlying RDDs, or the location of cached or shuffle data. * * - Cleanup: all data structures are cleared when the running jobs that depend on them finish, * to prevent memory leaks in a long-running application.
*
DAGScheduler內部維護了各種task / stage / job之間的映射關系表,值得一提的是這里根據執行情況,stages的幾種划分,有助於之后閱讀submitStages方法。
private[scheduler] val nextJobId = new AtomicInteger(0) private[scheduler] def numTotalJobs: Int = nextJobId.get() private val nextStageId = new AtomicInteger(0) private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] private[scheduler] val stageIdToStage = new HashMap[Int, Stage] private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage] private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] // Stages we need to run whose parents aren't done private[scheduler] val waitingStages = new HashSet[Stage] // Stages we are running right now private[scheduler] val runningStages = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures private[scheduler] val failedStages = new HashSet[Stage] private[scheduler] val activeJobs = new HashSet[ActiveJob]
3.工作流程
工作流程圖:

3.1 划分Stage
Spark的stages 是以shuffle為邊界切分RDD圖來創建的。具有窄依賴(例:map(),filter())的操作會在對應Stage的一系列任務中管道式的運行,但是具有寬依賴的操作則需要多個Stage.最后所有的Stage之間將只有shuffle依賴關系。
實際上這些操作發生在RDD.compute(),在各個RDD的實現上,比如MappedRDD,FilteredRDD等
當某個操作觸發計算,向DAGScheduler提交作業時,DAGScheduler需要從RDD依賴鏈最末端的RDD出發,遍歷整個RDD依賴鏈,划分Stage任務階段,並決定各個Stage之間的依賴關系。Stage的划分是以ShuffleDependency為依據的,也就是說當某個RDD的運算需要將數據進行Shuffle時,這個包含了Shuffle依賴關系的RDD將被用來作為輸入信息,構建一個新的Stage,由此為依據划分Stage,可以確保有依賴關系的數據能夠按照正確的順序得到處理和運算。這部分做了一個簡單的實驗:Spark DAGSheduler生成Stage過程分析實驗
以GroupByKey操作為例,該操作返回的結果實際上是一個ShuffleRDD,當DAGScheduler遍歷到這個ShuffleRDD的時候,因為其Dependency是一個ShuffleDependency,於是這個ShuffleRDD的父RDD以及shuffleDependency等對象就被用來構建一個新的Stage,這個Stage的輸出結果的分區方式,則由ShuffleDependency中的Partitioner對象來決定。
可以看到,盡管划分和構建Stage的依據是ShuffleDependency,對應的RDD也就是這里的ShuffleRDD,但是這個Stage所處理的數據是從這個shuffleRDD的父RDD開始計算的,只是最終的輸出結果的位置信息參考了ShuffleRDD返回的ShuffleDependency里所包含的內容。而shuffleRDD本身的運算操作(其實就是一個獲取shuffle結果的過程),是在下一個Stage里進行的。
貼一張圖:

3.2 生成Job,提交Stage
上一個步驟得到一個或多個有依賴關系的Stage,其中直接觸發Job的RDD所關聯的Stage作為FinalStage生成一個Job實例,這兩者的關系進一步存儲在resultStageToJob映射表中,用於在該Stage全部完成時做一些后續處理,如報告狀態,清理Job相關數據等。具體提交一個Stage時,首先判斷該Stage所依賴的父Stage的結果是否可用,如果所有父Stage的結果都可用,則提交該Stage,如果有任何一個父Stage的結果不可用,則迭代嘗試提交父Stage。 所有迭代過程中由於所依賴Stage的結果不可用而沒有提交成功的Stage都被放到waitingStages列表中等待將來被提交
什么時候waitingStages中的Stage會被重新提交呢?當一個屬於中間過程Stage的任務(這種類型的任務所對應的類為ShuffleMapTask)完成以后,DAGScheduler會檢查對應的Stage的所有任務是否都完成了,如果是都完成了,則DAGScheduler將重新掃描一次waitingStages中的所有Stage,檢查他們是否還有任何依賴的Stage沒有完成,如果沒有就可以提交該Stage。
此外每當完成一次DAGScheduler的事件循環以后,也會觸發一次從等待(waitingStages)和失敗列表(failedStages)中掃描並提交就緒Stage的調用過程
下面是submitStage的代碼:

3.3 任務集的提交
每個Stage的提交,最終是轉換成一個TaskSet任務集的提交,DAGScheduler通過TaskScheduler接口提交TaskSet,這個TaskSet最終會觸發TaskScheduler構建一個TaskSetManager的實例來管理這個TaskSet的生命周期,對於DAGScheduler來說提交Stage的工作到此就完成了。而TaskScheduler的具體實現則會在得到計算資源的時候,進一步通過TaskSetManager調度具體的Task到對應的Executor節點上進行運算
3.4 任務作業完成狀態的監控
要保證相互依賴的job/stage能夠得到順利的調度執行,DAGScheduler就必然需要監控當前Job / Stage乃至Task的完成情況。這是通過對外(主要是對TaskScheduler)暴露一系列的回調函數來實現的,對於TaskScheduler來說,這些回調函數主要包括任務的開始結束失敗,任務集的失敗,DAGScheduler根據這些Task的生命周期信息進一步維護Job和Stage的狀態信息。
private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)
/**
* Called by the TaskSetManager to report task's starting.
*/
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
}
問:DAGScheduler內部是如何運行的?如何循環的?
答:DAGScheduler的事件循環邏輯基於Akka Actor的消息傳遞機制來構建,在DAGScheduler的taskStarted函數中創建了一個eventProcessLoop用來處理各種DAGSchedulerEvent,這些事件包括作業的提交,任務狀態的變化,監控等等
這里跟讀一下DAGSchedulerEventProcessLoop,來看下這個類是如何處理消息事件(DAGSchedulerEvent)的
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
/**
* The main event loop of the DAG scheduler.
*/
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
override def onError(e: Throwable): Unit = {
logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
try {
dagScheduler.doCancelAllJobs()
} catch {
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
}
dagScheduler.sc.stop()
}
override def onStop(): Unit = {
// Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
}
此外TaskScheduler還可以通過回調函數通知DAGScheduler具體的Executor的生命狀態,如果某一個Executor崩潰了,或者由於任何原因與Driver失去聯系了,則對應的Stage的shuffleMapTask的輸出結果也將被標志為不可用,這也將導致對應Stage狀態的變更,進而影響相關Job的狀態,再進一步可能觸發對應Stage的重新提交來重新計算獲取相關的數據。
3.5 任務結果的獲取
一個具體的任務在Executor中執行完畢以后,其結果需要以某種形式返回給DAGScheduler,根據任務類型的不同,任務的結果的返回方式也不同
對於FinalStage所對應的任務(對應的類為ResultTask)返回給DAGScheduler的是運算結果本身,而對於ShuffleMapTask,返回給DAGScheduler的是一個MapStatus對象,MapStatus對象管理了ShuffleMapTask的運算輸出結果在BlockManager里的相關存儲信息,而非結果本身,這些存儲位置信息將作為下一個Stage的任務的獲取輸入數據的依據
而根據任務結果的大小的不同,ResultTask返回的結果又分為兩類,如果結果足夠小,則直接放在DirectTaskResult對象內,如果超過特定尺寸(默認約10MB)則在Executor端會將DirectTaskResult先序列化,再把序列化的結果作為一個Block存放在BlockManager里,而后將BlockManager返回的BlockID放在IndirectTaskResult對象中返回給TaskScheduler,TaskScheduler進而調用TaskResultGetter將IndirectTaskResult中的BlockID取出並通過BlockManager最終取得對應的DirectTaskResult。當然從DAGScheduler的角度來說,這些過程對它來說是透明的,它所獲得的都是任務的實際運算結果。
// This is a var so that we can reset it for testing purposes. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
ResultSetGetter 的enqueueSuccessfulTask 方法:
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
serializedData: ByteBuffer): Unit = {
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
/*
* 根據任務結果的大小的不同,ResultTask返回的結果又分為兩類:DirectTaskResult,IndirectTaskResult
* 1.如果結果足夠小,則直接放在DirectTaskResult對象內
*/
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
// deserialize "value" without holding any lock so that it won't block other threads.
// We should call it here, so that when it's called again in
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
directResult.value()
(directResult, serializedData.limit())
/**
* 如果超過特定尺寸(默認約10MB)則在Executor端會將DirectTaskResult先序列化,
* 再把序列化的結果作為一個Block存放在BlockManager里,
* 而后將BlockManager返回的BlockID放在IndirectTaskResult對象中返回給TaskScheduler,
* TaskScheduler進而調用TaskResultGetter將IndirectTaskResult中的BlockID取出並通過BlockManager最終取得對應的DirectTaskResult。
*
*/
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {
// dropped by executor if size is larger than maxResultSize
sparkEnv.blockManager.master.removeBlock(blockId)
return
}
logDebug("Fetching indirect task result for TID %s".format(tid))
scheduler.handleTaskGettingResult(taskSetManager, tid)
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
if (!serializedTaskResult.isDefined) {
/* We won't be able to get the task result if the machine that ran the task failed
* between when the task ended and when we tried to fetch the result, or if the
* block manager had to flush the result. */
scheduler.handleFailedTask(
taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
return
}
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get.toByteBuffer)
sparkEnv.blockManager.master.removeBlock(blockId)
(deserializedResult, size)
}
// Set the task result size in the accumulator updates received from the executors.
// We need to do this here on the driver because if we did this on the executors then
// we would have to serialize the result again after updating the size.
result.accumUpdates = result.accumUpdates.map { a =>
if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
val acc = a.asInstanceOf[LongAccumulator]
assert(acc.sum == 0L, "task result size should not have been set on the executors")
acc.setValue(size.toLong)
acc
} else {
a
}
}
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
// Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
case NonFatal(ex) =>
logError("Exception while getting task result", ex)
taskSetManager.abort("Exception while getting task result: %s".format(ex))
}
}
})
}
