Spark之RDD容錯原理及四大核心要點


一、Spark RDD容錯原理

  RDD不同的依賴關系導致Spark對不同的依賴關系有不同的處理方式。

  對於寬依賴而言,由於寬依賴實質是指父RDD的一個分區會對應一個子RDD的多個分區,在此情況下出現部分計算結果丟失,單一計算丟失的數據無法達到效果,便采用重新計算該步驟中的所有數據,從而會導致計算數據重復;對於窄依賴而言,由於窄依賴實質是指父RDD的分區最多被一個子RDD使用,在此情況下出現部分計算的錯誤,由於計算結果的數據只與依賴的父RDD的相關數據有關,所以不需要重新計算所有數據,只重新計算出錯部分的數據即可。

二、RDD容錯的四大核心要點

  Spark框架層面的容錯機制,主要分為三大層面(調度層、RDD血統層、Checkpoint層),在這三大層面中包括Spark RDD容錯四大核心要點。

  (1)Stage輸出失敗,上層調度器DAGScheduler重試。
  (2)Spark計算中,Task內部任務失敗,底層調度器重試。
  (3)RDD Lineage血統中窄依賴、寬依賴計算。
  (4)Checkpoint緩存。

1.調度層(包含DAG生成和Task重算兩大核心)

  從調度層面講,錯誤主要出現在兩個方面,分別是在Stage輸出時出錯和在計算時出錯。

  1)DAG生成層

  Stage輸出失敗,上層調度器DAGScheduler會進行重試,DAGScheduler.scala的resubmitFailedStages的源碼如下。

  /**
   * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since
   * the last fetch failure.
   */
  private[scheduler] def resubmitFailedStages() {
    // 判斷是否存在失敗的Stages
    if (failedStages.size > 0) {
      // Failed stages may be removed by job cancellation, so failed might be empty even if
      // the ResubmitFailedStages event has been scheduled.
      // 失敗的階段可以通過作業取消刪除,如果ResubmitFailedStages事件已調度,失敗將是空值
      logInfo("Resubmitting failed stages")
      clearCacheLocs()
      // 獲取所有失敗Stage的列表
      val failedStagesCopy = failedStages.toArray
      // 清空failedStages
      failedStages.clear()
      // 對之前獲取所有失敗的Stage,根據jobId排序后逐一重試
      for (stage <- failedStagesCopy.sortBy(_.firstJobId)) {
        submitStage(stage)
      }
    }
    submitWaitingStages()
  }

  2)Task計算層

  Spark計算過程中,計算內部某個Task任務出現失敗,底層調度器會對此Task進行若干次重試(默認4次)。TaskSetManager.scala的handleFailedTask的源碼如下。

/**
   * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
   * DAG Scheduler.
   */
  def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) {

    ......

    if (!isZombie && state != TaskState.KILLED
        && reason.isInstanceOf[TaskFailedReason]
        && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
      assert (null != failureReason)
      // 對失敗的Task的numFailures進行計數加1
      numFailures(index) += 1
      // 判斷失敗的Task計數是否大於設定的最大失敗次數,如果大於,則輸出日志,並不再重試
      if (numFailures(index) >= maxTaskFailures) {
        logError("Task %d in stage %s failed %d times; aborting job".format(
          index, taskSet.id, maxTaskFailures))
        abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"
          .format(index, taskSet.id, maxTaskFailures, failureReason), failureException)
        return
      }
    }
    // 如果運行的Task為0時,則完成Task步驟
    maybeFinishTaskSet()
  }

2.RDD Lineage血統層容錯

  Spark中RDD采用高度受限的分布式共享內存,且新的RDD的產生只能夠通過其他RDD上的批量操作來創建,依賴於以RDD的Lineage為核心的容錯處理,在迭代計算方面比Hadoop快20多倍,同時還可以在5~7s內交互式地查詢TB級別的數據集。

  Spark RDD實現基於Lineage的容錯機制,基於RDD的各項transformation構成了compute chain,在部分計算結果丟失的時候可以根據Lineage重新恢復計算。

  (1)在窄依賴中,在子RDD的分區丟失,要重算父RDD分區時,父RDD相應分區的所有數據都是子RDD分區的數據,並不存在冗余計算。
  (2)在寬依賴情況下,丟失一個子RDD分區,重算的每個父RDD的每個分區的所有數據並不是都給丟失的子RDD分區用的,會有一部分數據相當於對應的是未丟失的子RDD分區中需要的數據,這樣就會產生冗余計算開銷和巨大的性能浪費。

3.checkpoint層容錯

  Spark checkpoint通過將RDD寫入Disk作檢查點,是Spark lineage容錯的輔助,lineage過長會造成容錯成本過高,這時在中間階段做檢查點容錯,如果之后有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷。

  checkpoint主要適用於以下兩種情況:

  (1)DAG中的Lineage過長,如果重算,開銷太大,如PageRank、ALS等。
  (2)尤其適合在寬依賴上作checkpoint,這個時候就可以避免為Lineage重新計算而帶來的冗余計算。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM