一、Spark RDD容錯原理
RDD不同的依賴關系導致Spark對不同的依賴關系有不同的處理方式。
對於寬依賴而言,由於寬依賴實質是指父RDD的一個分區會對應一個子RDD的多個分區,在此情況下出現部分計算結果丟失,單一計算丟失的數據無法達到效果,便采用重新計算該步驟中的所有數據,從而會導致計算數據重復;對於窄依賴而言,由於窄依賴實質是指父RDD的分區最多被一個子RDD使用,在此情況下出現部分計算的錯誤,由於計算結果的數據只與依賴的父RDD的相關數據有關,所以不需要重新計算所有數據,只重新計算出錯部分的數據即可。
二、RDD容錯的四大核心要點
Spark框架層面的容錯機制,主要分為三大層面(調度層、RDD血統層、Checkpoint層),在這三大層面中包括Spark RDD容錯四大核心要點。
(1)Stage輸出失敗,上層調度器DAGScheduler重試。
(2)Spark計算中,Task內部任務失敗,底層調度器重試。
(3)RDD Lineage血統中窄依賴、寬依賴計算。
(4)Checkpoint緩存。
1.調度層(包含DAG生成和Task重算兩大核心)
從調度層面講,錯誤主要出現在兩個方面,分別是在Stage輸出時出錯和在計算時出錯。
1)DAG生成層
Stage輸出失敗,上層調度器DAGScheduler會進行重試,DAGScheduler.scala的resubmitFailedStages的源碼如下。
/**
* Resubmit any failed stages. Ordinarily called after a small amount of time has passed since
* the last fetch failure.
*/
private[scheduler] def resubmitFailedStages() {
// 判斷是否存在失敗的Stages
if (failedStages.size > 0) {
// Failed stages may be removed by job cancellation, so failed might be empty even if
// the ResubmitFailedStages event has been scheduled.
// 失敗的階段可以通過作業取消刪除,如果ResubmitFailedStages事件已調度,失敗將是空值
logInfo("Resubmitting failed stages")
clearCacheLocs()
// 獲取所有失敗Stage的列表
val failedStagesCopy = failedStages.toArray
// 清空failedStages
failedStages.clear()
// 對之前獲取所有失敗的Stage,根據jobId排序后逐一重試
for (stage <- failedStagesCopy.sortBy(_.firstJobId)) {
submitStage(stage)
}
}
submitWaitingStages()
}
2)Task計算層
Spark計算過程中,計算內部某個Task任務出現失敗,底層調度器會對此Task進行若干次重試(默認4次)。TaskSetManager.scala的handleFailedTask的源碼如下。
/**
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
* DAG Scheduler.
*/
def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) {
......
if (!isZombie && state != TaskState.KILLED
&& reason.isInstanceOf[TaskFailedReason]
&& reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
assert (null != failureReason)
// 對失敗的Task的numFailures進行計數加1
numFailures(index) += 1
// 判斷失敗的Task計數是否大於設定的最大失敗次數,如果大於,則輸出日志,並不再重試
if (numFailures(index) >= maxTaskFailures) {
logError("Task %d in stage %s failed %d times; aborting job".format(
index, taskSet.id, maxTaskFailures))
abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"
.format(index, taskSet.id, maxTaskFailures, failureReason), failureException)
return
}
}
// 如果運行的Task為0時,則完成Task步驟
maybeFinishTaskSet()
}
2.RDD Lineage血統層容錯
Spark中RDD采用高度受限的分布式共享內存,且新的RDD的產生只能夠通過其他RDD上的批量操作來創建,依賴於以RDD的Lineage為核心的容錯處理,在迭代計算方面比Hadoop快20多倍,同時還可以在5~7s內交互式地查詢TB級別的數據集。
Spark RDD實現基於Lineage的容錯機制,基於RDD的各項transformation構成了compute chain,在部分計算結果丟失的時候可以根據Lineage重新恢復計算。
(1)在窄依賴中,在子RDD的分區丟失,要重算父RDD分區時,父RDD相應分區的所有數據都是子RDD分區的數據,並不存在冗余計算。
(2)在寬依賴情況下,丟失一個子RDD分區,重算的每個父RDD的每個分區的所有數據並不是都給丟失的子RDD分區用的,會有一部分數據相當於對應的是未丟失的子RDD分區中需要的數據,這樣就會產生冗余計算開銷和巨大的性能浪費。
3.checkpoint層容錯
Spark checkpoint通過將RDD寫入Disk作檢查點,是Spark lineage容錯的輔助,lineage過長會造成容錯成本過高,這時在中間階段做檢查點容錯,如果之后有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷。
checkpoint主要適用於以下兩種情況:
(1)DAG中的Lineage過長,如果重算,開銷太大,如PageRank、ALS等。
(2)尤其適合在寬依賴上作checkpoint,這個時候就可以避免為Lineage重新計算而帶來的冗余計算。
