回顧
上一篇,我們分析了了任務在executor端的運行流程,任務運行結束后,在Executor.launchTask方法最后,通過調用execBackend.statusUpdate方法將任務結果以及任務狀態發送給driver。回到driver端,我們在driver的rpc服務端DriverEndPoint的receive方法中尋找對StatusUpdate消息的處理邏輯。
DriverEndpoint.receive
case StatusUpdate(executorId, taskId, state, data) =>
// 通知TaskScheduler任務已完成
scheduler.statusUpdate(taskId, state, data.value)
// 如果任務已經運行結束了,包括FINISHED, FAILED, KILLED, LOST這幾種狀態
// 那么說明任務占用的資源已經釋放了,此時就可以回收這部分資源並重新分配任務
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
所以重點是scheduler.statusUpdate調用
TaskSchedulerImpl.statusUpdate
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
synchronized {
try {
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
// 這個狀態不明,沒看什么地方會產生這個狀態
if (state == TaskState.LOST) {
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the executor as failed.
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
// 任務運行結束,包括這幾種狀態FINISHED, FAILED, KILLED, LOST
if (TaskState.isFinished(state)) {
// 清除關於這個task的一些簿記量
cleanupTaskState(tid)
// 將這個task從正在運行的task集合中移除
taskSet.removeRunningTask(tid)
if (state == TaskState.FINISHED) {
// 啟動一個線程,用來異步地處理任務成功的情況
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
case None =>
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates) or its " +
"executor has been marked as failed.")
.format(state, tid))
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
}
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
assert(reason.isDefined)
dagScheduler.executorLost(failedExecutor.get, reason.get)
backend.reviveOffers()
}
}
這里,啟動了一個異步任務,用來處理任務成功的情況,所以我們分析一下異步任務的處理邏輯。
TaskResultGetter.enqueueSuccessfulTask
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
serializedData: ByteBuffer): Unit = {
// 啟動一個異步任務
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
// 對傳回的結果進行反序列化
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
// 如果是直接傳回的結果,那么直接從反序列化的對象中取數據即可
case directResult: DirectTaskResult[_] =>
// 首先檢查結果大小是否超過閾值,默認是1g,
// 也即最多能夠允許多大的結果放到driver端
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
// deserialize "value" without holding any lock so that it won't block other threads.
// We should call it here, so that when it's called again in
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
// 對任務結果進行反序列化,調用該方法不會引起其他線程阻塞,
directResult.value(taskResultSerializer.get())
(directResult, serializedData.limit())
case IndirectTaskResult(blockId, size) =>
// 檢查結果大小是否超過限制
if (!taskSetManager.canFetchMoreResults(size)) {
// dropped by executor if size is larger than maxResultSize
// 如果放棄了該任務,那么需要將該任務在blockmanager中對應的block移除掉
sparkEnv.blockManager.master.removeBlock(blockId)
return
}
logDebug("Fetching indirect task result for TID %s".format(tid))
// 這句話最終會通過DAGScheduler給事件總線投遞一條TaskGetting的事件
scheduler.handleTaskGettingResult(taskSetManager, tid)
// 通過blockManager遠程拉取結果數據
// 而這個blockId對應的塊的位置信息已經在之前由executor端傳回
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
if (!serializedTaskResult.isDefined) {
/* We won't be able to get the task result if the machine that ran the task failed
* between when the task ended and when we tried to fetch the result, or if the
* block manager had to flush the result. */
// 這里拉取數據失敗分為兩種情況:一種是由於任務序列化后體積太大主動丟棄
// 另一種是executor節點網絡異常,導致拉取失敗
// 這兩種情況都算作任務失敗
// 這個方法主要是對失敗的任務重新運行
scheduler.handleFailedTask(
taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
return
}
// 將從blockManager拉取到的數據進行反序列化
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get.toByteBuffer)
// force deserialization of referenced value
// 對任務結果進行反序列化
deserializedResult.value(taskResultSerializer.get())
// 將block移除,因為數據已經拉取到了
sparkEnv.blockManager.master.removeBlock(blockId)
(deserializedResult, size)
}
// Set the task result size in the accumulator updates received from the executors.
// We need to do this here on the driver because if we did this on the executors then
// we would have to serialize the result again after updating the size.
// 處理累加器,主要是對任務結果大小的統計量需要做特殊處理
result.accumUpdates = result.accumUpdates.map { a =>
// 對於任務結果大小的統計量需要做特殊處理
if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
val acc = a.asInstanceOf[LongAccumulator]
assert(acc.sum == 0L, "task result size should not have been set on the executors")
acc.setValue(size.toLong)
acc
} else {
a
}
}
// 將反序列化好的結果數據告訴TaskSchedulerImpl做進一步處理
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
// Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
case NonFatal(ex) =>
logError("Exception while getting task result", ex)
taskSetManager.abort("Exception while getting task result: %s".format(ex))
}
}
})
}
這里會有好幾次反序列化,這時因為在executor端對任務結果數據處理時就是經過了好幾次序列化,
- 首先會把任務運行的結果進行序列化,和累加器一起包裝成DirectTaskResult對象
- 然后對DirectTaskResult對象進行序列化
- 對於結果太大通過blockManager傳輸的情況,需要封裝一個IndirectTaskResult對象
- 最后還有對IndirectTaskResult對象進行序列化
可以看到在結果傳回driver端后,是按照與上面相反的順序進行反序列化的。
最后拿到任務運行的結果數據以后,將結果數據交給TaskSchedulerImpl做進一步處理。
TaskSchedulerImpl.handleSuccessfulTask
def handleSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_]): Unit = synchronized {
taskSetManager.handleSuccessfulTask(tid, taskResult)
}
TaskSetManager.handleSuccessfulTask
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
// 更新一些簿記量
val info = taskInfos(tid)
val index = info.index
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
if (speculationEnabled) {
successfulTaskDurations.insert(info.duration)
}
removeRunningTask(tid)
// Kill any other attempts for the same task (since those are unnecessary now that one
// attempt completed successfully).
// 對於這個任務的其他運行中的副本,全部都要殺掉,主要是推測執行機制會對同一個任務同時運行多個副本
for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
killedByOtherAttempt(index) = true
// 通過調度后端發送殺死任務的信息
sched.backend.killTask(
attemptInfo.taskId,
attemptInfo.executorId,
interruptThread = true,
reason = "another attempt succeeded")
}
// 檢查是不是第一次,如果是第一次才會更新這些簿記量
// 這么做主要是為了防止多個任務副本多次更新造成不一致
if (!successful(index)) {
tasksSuccessful += 1
logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +
s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +
s" ($tasksSuccessful/$numTasks)")
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
// 如果全部的任務都完成了,就說明這個任務集(stage)完成了
if (tasksSuccessful == numTasks) {
isZombie = true
}
} else {
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
// 進一步通知DAG調度器做進一步處理,
// 這里可見在任務提交運行是的處理順序是從DAGScheduler -> TaskScheduler -> SchedulerBackend -> executor
// 而任務運行結束后結果返回處理的順序則與上面的順正好反過來。
// 此外,也能看出TaskScheduler也充當了DAGScheduler和SchedulerBackend中間人的角色,傳遞消息
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
// 更新一些簿記量
maybeFinishTaskSet()
}
這個方法的主要工作是更新一些簿記量;殺掉其他的任務副本;
然后通知DAGScheduler做進一步處理。
DAGScheduler.handleTaskCompletion
這個方法很長,所以我們把這個方法的主要邏輯做一個總結:
-
處理累加器。對於ResultTask類型的任務不會進行重復累加,而對於ShuffleMapTask類型的任務則會進行重復累加(推測執行)
-
首先,向事件總線中投遞一個任務結束的事件
-
針對任務運行成功的情況做處理。如果是ResultTask類型的任務,需要更新一些簿記量,並在整個stage的所有任務完成時將stage標記為完成,並且通知作業監聽器;對於ShuffleMapTask類型的任務處理要復雜一些,同樣要更新一些簿記量,並且在mapOutputTracker組件中注冊這個任務的輸出block信息,如果所有的分區全部完成,那么還要將這個stage標記為完成。
-
處理拉取數據失敗的情況。除了更新一些簿記量,主要做的事就是判斷是否要再次提交stage,如果不能再次提交(沖提交次數超過閾值)那么就需要將關聯的job取消掉,否則再次提交這個stage。這里需要注意的是,再次提交stage並不會把所有的任務全部再重新運行一遍,只會把那些因失敗而導致沒有完成的任務重新提交,通過mapOutputTrackerMaster組件追蹤mShuffleMap任務的輸出情況。
private[scheduler] def handleTaskCompletion(event: CompletionEvent) { val task = event.task val taskId = event.taskInfo.id val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) // 通知outputCommitCoordinator組件對任務完成的事件做一些處理 // outputCommitCoordinator組件需要對失敗的任務 outputCommitCoordinator.taskCompleted( stageId, task.partitionId, event.taskInfo.attemptNumber, // this is a task attempt number event.reason) if (!stageIdToStage.contains(task.stageId)) { // The stage may have already finished when we get this event -- eg. maybe it was a // speculative task. It is important that we send the TaskEnd event in any case, so listeners // are properly notified and can chose to handle it. For instance, some listeners are // doing their own accounting and if they don't get the task end event they think // tasks are still running when they really aren't. // 在獲取這個事件時對應的stage可能已經完成了。比如,當前完成的task可能是一個推測執行的task。 // 但是,無論如何,我們都有必要向事件總線中投遞一個任務結束的事件, // 這樣才能正確第通知監聽器,以使得監聽器能夠做出正確的處理。 // 例如有的監聽器會對所有完成的任務(包括推測執行)進行計數,如果監聽器獲取不到任務完成的事件 // 他們就會認為任務還在運行。 postTaskEnd(event) // Skip all the actions if the stage has been cancelled. // 由於stage在之前已經被處理過了,所以這里直接返回 return } val stage = stageIdToStage(task.stageId) // Make sure the task's accumulators are updated before any other processing happens, so that // we can post a task end event before any jobs or stages are updated. The accumulators are // only updated in certain cases. // 這里應該思考一個問題:既然任務的多個副本可能會同時完成, // 那么也就有可能會同時發送任務結束事件, // 也就說這個方法可能因為任務的多個副本在同一段時間內完成而被同時執行 // 那么這里沒有加鎖,也沒有CAS或其他的一些同步措施,這樣不會嘗試線程不安全問題嗎?? // 答案在於EventLoop類中,這個類處理事件的線程只有一個, // 所以實際上所有的事件都是串行執行的,自然也就不會有線程不安全的問題了 // 這一步主要是處理累加器 event.reason match {case Success =>
task match {
case rt: ResultTask[_, _] =>
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
// Only update the accumulator once for each result task.
// 對於ResultTask的累加器只計算一次,不會重復計算
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
}
case None => // Ignore update if task's job has finished.
}
case _ =>
// 對於ShuffleMapTask則不會考慮累加器的重復計數,
// 也就意味着ShufleMapTask中執行的累加器會重復計數
updateAccumulators(event)
}
case _: ExceptionFailure => updateAccumulators(event)
case _ =>
}
// 向事件總線投遞一個任務完成的事件
postTaskEnd(event)// 這一步主要是對作業的一些簿記量的更新維護 // 如果作業的全部分區都已完成,那么移除掉這個作業 // 並移除作業內不被其他作業依賴的stage的信息 event.reason match { case Success => task match { case rt: ResultTask[_, _] => // Cast to ResultStage here because it's part of the ResultTask // TODO Refactor this out to a function that accepts a ResultStage val resultStage = stage.asInstanceOf[ResultStage] resultStage.activeJob match { case Some(job) => if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 // If the whole job has finished, remove it // 如果作業的全部分區都已完成,那么移除掉這個作業 // 並移除作業內不被其他作業依賴的stage的信息 if (job.numFinished == job.numPartitions) { // 把這個stage標記為已完成 markStageAsFinished(resultStage) // 移除作業內不被其他作業依賴的stage的信息 cleanupStateForJobAndIndependentStages(job) // 向事件總線追蹤投遞一個作業結束的事件 listenerBus.post( SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) } // taskSucceeded runs some user code that might throw an exception. Make sure // we are resilient against that. // 最后,需要調用作業監聽器的回調函數,以通知作業監聽器 try { job.listener.taskSucceeded(rt.outputId, event.result) } catch { case e: Exception => // TODO: Perhaps we want to mark the resultStage as failed? job.listener.jobFailed(new SparkDriverExecutionException(e)) } } case None => logInfo("Ignoring result from " + rt + " because its job has finished") } // 處理shuffleMapTask的情況 case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) { // This task was for the currently running attempt of the stage. Since the task // completed successfully from the perspective of the TaskSetManager, mark it as // no longer pending (the TaskSetManager may consider the task complete even // when the output needs to be ignored because the task's epoch is too small below. // In this case, when pending partitions is empty, there will still be missing // output locations, which will cause the DAGScheduler to resubmit the stage below.) // 如果如果task的stageAttemptId與當前最新的stage信息相同, // 說明該任務已經完成 shuffleStage.pendingPartitions -= task.partitionId } // 如果這個任務的epoch比被標記為失敗的epoch要早,那么忽略這次運行結果 if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { // The epoch of the task is acceptable (i.e., the task was launched after the most // recent failure we're aware of for the executor), so mark the task's output as // available. // 這個任務的epoch被接收,那么在mapOutputTracker組件中將這個任務標記為成功 // 然后就能通過mapOutputTracker組件獲取到這個分區的結果狀態了 mapOutputTracker.registerMapOutput( shuffleStage.shuffleDep.shuffleId, smt.partitionId, status) // Remove the task's partition from pending partitions. This may have already been // done above, but will not have been done yet in cases where the task attempt was // from an earlier attempt of the stage (i.e., not the attempt that's currently // running). This allows the DAGScheduler to mark the stage as complete when one // copy of each task has finished successfully, even if the currently active stage // still has tasks running. // 同樣將這個分區標記為已完成 shuffleStage.pendingPartitions -= task.partitionId } // 如果stage的所有分區都已完成,那么將這個stage標記為已完成 if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) logInfo("failed: " + failedStages) // This call to increment the epoch may not be strictly necessary, but it is retained // for now in order to minimize the changes in behavior from an earlier version of the // code. This existing behavior of always incrementing the epoch following any // successful shuffle map stage completion may have benefits by causing unneeded // cached map outputs to be cleaned up earlier on executors. In the future we can // consider removing this call, but this will require some extra investigation. // See https://github.com/apache/spark/pull/17955/files#r117385673 for more details. mapOutputTracker.incrementEpoch() // 清除RDD的分區結果位置緩存 // 以便在訪問緩存是重新從blockManager中或rdd分區結果的位置信息 clearCacheLocs() if (!shuffleStage.isAvailable) { // Some tasks had failed; let's resubmit this shuffleStage. // 如果有部分任務失敗,那么需要重新提交這個stage // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + shuffleStage.findMissingPartitions().mkString(", ")) submitStage(shuffleStage) } else { // Mark any map-stage jobs waiting on this stage as finished // 將所有依賴於這個stage的job標記為運行結束 if (shuffleStage.mapStageJobs.nonEmpty) { val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep) for (job <- shuffleStage.mapStageJobs) { markMapStageJobAsFinished(job, stats) } } // 提價下游的子stage submitWaitingChildStages(shuffleStage) } } } //處理重復提交的情況 case Resubmitted => logInfo("Resubmitted " + task + ", so marking it as still running") stage match { case sms: ShuffleMapStage => sms.pendingPartitions += task.partitionId case _ => assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + "tasks in ShuffleMapStages.") } // 處理拉取數據失敗的情況 case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) // 如果這個任務的attempId與stage最近一次的attemptId不同, // 那么忽略這個異常,因為又一次更新的stage的嘗試正在運行中 if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + s"(attempt ${failedStage.latestInfo.attemptNumber}) running") } else { // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is // possible the fetch failure has already been handled by the scheduler. // 將這個stage標記為已結束 if (runningStages.contains(failedStage)) { logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + s"due to a fetch failure from $mapStage (${mapStage.name})") markStageAsFinished(failedStage, Some(failureMessage)) } else { logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " + s"longer running") } // 把拉取失敗的stage的attemptId記錄下來 failedStage.fetchFailedAttemptIds.add(task.stageAttemptId) // 如果stage的嘗試次數已經超過最大允許值,那么將直接將取消該stage val shouldAbortStage = failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts || disallowStageRetryForTest if (shouldAbortStage) { val abortMessage = if (disallowStageRetryForTest) { "Fetch failure will not retry stage due to testing config" } else { s"""$failedStage (${failedStage.name}) |has failed the maximum allowable number of |times: $maxConsecutiveStageAttempts. |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } // 取消這個stage, 做一些處理 abortStage(failedStage, abortMessage, None) } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued // TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 val noResubmitEnqueued = !failedStages.contains(failedStage) // 將這個stage添加到失敗的stage隊列中, // 這個隊列是等待重新提交的stage隊列 failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { // We expect one executor failure to trigger many FetchFailures in rapid succession, // but all of those task failures can typically be handled by a single resubmission of // the failed stage. We avoid flooding the scheduler's event queue with resubmit // messages by checking whether a resubmit is already in the event queue for the // failed stage. If there is already a resubmit enqueued for a different failed // stage, that event would also be sufficient to handle the current failed stage, but // producing a resubmit for each failed stage makes debugging and logging a little // simpler while not producing an overwhelming number of scheduler events. logInfo( s"Resubmitting $mapStage (${mapStage.name}) and " + s"$failedStage (${failedStage.name}) due to fetch failure" ) // 200毫秒之后給內部的事件處理線程發送一個重新提交stage的事件 // 以通知DAGSchedduler重新提交失敗的stage messageScheduler.schedule( new Runnable { override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS ) } } // Mark the map whose fetch failed as broken in the map stage // 從mapOutputTracker中將這個任務的map輸出信息移除掉 if (mapId != -1) { mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) } // TODO: mark the executor as failed only if there were lots of fetch failures on it // 將拉取失敗的block所在的executor移除掉,通知DriverEndpoint移除 // 並且在blockManagerMaster中將對應的executor上的所有block信息全部移除 if (bmAddress != null) { val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && unRegisterOutputOnHostOnFetchFailure) { // We had a fetch failure with the external shuffle service, so we // assume all shuffle data on the node is bad. Some(bmAddress.host) } else { // Unregister shuffle data just for one executor (we don't have any // reason to believe shuffle data has been lost for the entire host). None } removeExecutorAndUnregisterOutputs( execId = bmAddress.executorId, fileLost = true, hostToUnregisterOutputs = hostToUnregisterOutputs, maybeEpoch = Some(task.epoch)) } } case commitDenied: TaskCommitDenied => // Do nothing here, left up to the TaskScheduler to decide how to handle denied commit case exceptionFailure: ExceptionFailure => // Nothing left to do, already handled above for accumulator updates. case TaskResultLost => // Do nothing here; the TaskScheduler handles these failures and resubmits the task. case _: ExecutorLostFailure | _: TaskKilled | UnknownReason => // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler // will abort the job. } }
