深入淺出Spark的Checkpoint機制


1 Overview

當第一次碰到 Spark,尤其是 Checkpoint 的時候難免有點一臉懵逼,不禁要問,Checkpoint 到底是什么。所以,當我們在說 Checkpoint 的時候,我們到底是指什么?

網上找到一篇文章,說到 Checkpoint,大概意思是檢查點創建一個已知的節點,SQL Server 數據庫引擎可以在意外關閉或崩潰后從恢復期間開始應用日志中包含的更改。所以你可以簡單理解成 Checkpoint 是用來容錯的,當錯誤發生的時候,可以迅速恢復的一種機制,這里就不展開講了。

A checkpoint creates a known good point from which the SQL Server Database Engine can start applying changes contained in the log during recovery after an unexpected shutdown or crash.

回到 Spark 上,尤其在流式計算里,需要高容錯的機制來確保程序的穩定和健壯。從源碼中看看,在 Spark 中,Checkpoint 到底做了什么。在源碼中搜索,可以在 Streaming 包中的 Checkpoint

作為 Spark 程序的入口,我們首先關注一下 SparkContext 里關於 Checkpoint 是怎么寫的。SparkContext 我們知道,定義了很多 Spark 內部的對象的引用。可以找到 Checkpoint 的文件夾路徑是這么定義的。

// 定義 checkpointDir
private[spark] var checkpointDir: Option[String] = None

/**
 * Set the directory under which RDDs are going to be checkpointed. The directory must
 * be a HDFS path if running on a cluster.
 */
def setCheckpointDir(directory: String) {

  // If we are running on a cluster, log a warning if the directory is local.
  // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
  // its own local file system, which is incorrect because the checkpoint files
  // are actually on the executor machines.
  // 如果運行的是 cluster 模式,當設置本地文件夾的時候,會報 warning
  // 道理很簡單,被創建出來的文件夾路徑實際上是 executor 本地的文件夾路徑,不是不行,
  // 只是有點不合理,Checkpoint 的東西最好還是放在分布式的文件系統中
  if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
    logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
      s"must not be on the local filesystem. Directory '$directory' " +
      "appears to be on the local filesystem.")
  }

  checkpointDir = Option(directory).map { dir =>
    // 顯然文件夾名就是 UUID.randoUUID() 生成的
    val path = new Path(dir, UUID.randomUUID().toString)
    val fs = path.getFileSystem(hadoopConfiguration)
    fs.mkdirs(path)
    fs.getFileStatus(path).getPath.toString
  }
}

 

 

關於 setCheckpointDir 被那些類調用了,可以看以下截圖。除了常見的 StreamingContext 中需要使用(以為容錯性是流式計算的基本保證),另外的就是一些需要反復迭代計算使用 RDD 的場景,包括各種機器學習算法的時候,圖中可以看到像 ALS, Decision Tree 等等算法,這些算法往往需要反復使用 RDD,遇到大的數據集用 Cache 就沒有什么意義了,所以一般會用 Checkpoint。

此處我只計划深挖一下 spark core 里的代碼。推薦大家一個 IDEA 的功能,下圖右下方可以將你搜索的關鍵詞的代碼輸出到外部文件中,到時候可以打開自己看看 spark core 中關於 Checkpoint 的代碼是怎么組織的。
在這里向大家推薦一個學習資料分享群:894951460

繼續找找 Checkpoint 的相關信息,可以看到 runJob 方法的最后是一個 rdd.toCheckPoint() 的使用。runJob 我們知道是觸發 action 的一個方法,那么我們進入 doCheckpoint() 看看。

/**
 * Run a function on a given set of partitions in an RDD and pass the results to the given
 * handler function. This is the main entry point for all actions in Spark.
 */
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

 

然后基本就發現了 Checkpoint 的核心方法了。而 doCheckpoint()RDD 的私有方法,所以這里基本可以回答最開始提出的問題,我們在說 Checkpoint 的時候,到底是 Checkpoint 什么。答案就是 RDD。

/**
 * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
 * has completed (therefore the RDD has been materialized and potentially stored in memory).
 * doCheckpoint() is called recursively on the parent RDDs.
 */
// 顯然 checkpoint 是在使用完前一個 RDD 之后才會被執行的操作
private[spark] def doCheckpoint(): Unit = {
  RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
    if (!doCheckpointCalled) {
      doCheckpointCalled = true
      if (checkpointData.isDefined) {
        if (checkpointAllMarkedAncestors) {
          // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
          // them in parallel.
          // Checkpoint parents first because our lineage will be truncated after we
          // checkpoint ourselves
          dependencies.foreach(_.rdd.doCheckpoint())
        }
        checkpointData.get.checkpoint()
      } else {
        dependencies.foreach(_.rdd.doCheckpoint())
      }
    }
  }
}

 

上面代碼可以看到,需要判斷一下一個變量 checkpointData 是否為空。那么它是這么被定義的。

private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None 

然后看看 RDDCheckPointData 是個什么樣的數據結構。

/**
 * This class contains all the information related to RDD checkpointing. Each instance of this
 * class is associated with an RDD. It manages process of checkpointing of the associated RDD,
 * as well as, manages the post-checkpoint state by providing the updated partitions,
 * iterator and preferred locations of the checkpointed RDD.
 */
private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
  extends Serializable {
  import CheckpointState._
  // The checkpoint state of the associated RDD.
  protected var cpState = Initialized
  // The RDD that contains our checkpointed data
  // 顯然,這個就是被 Checkpoint 的 RDD 的數據
  private var cpRDD: Option[CheckpointRDD[T]] = None
  // TODO: are we sure we need to use a global lock in the following methods?
  /**
   * Return whether the checkpoint data for this RDD is already persisted.
   */
  def isCheckpointed: Boolean = RDDCheckpointData.synchronized {
    cpState == Checkpointed
  }
  /**
   * Materialize this RDD and persist its content.
   * This is called immediately after the first action invoked on this RDD has completed.
   */
  final def checkpoint(): Unit = {
    // Guard against multiple threads checkpointing the same RDD by
    // atomically flipping the state of this RDDCheckpointData
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {
        cpState = CheckpointingInProgress
      } else {
        return
      }
    }
    val newRDD = doCheckpoint()
    // Update our state and truncate the RDD lineage
    // 可以看到 cpRDD 在此處被賦值,通過 newRDD 來生成,而生成的方法是 doCheckpointa()
    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)
      cpState = Checkpointed
      rdd.markCheckpointed()
    }
  }
  /**
   * Materialize this RDD and persist its content.
   *
   * Subclasses should override this method to define custom checkpointing behavior.
   * @return the checkpoint RDD created in the process.
   */
   // 這個是 Checkpoint RDD 的抽象方法
  protected def doCheckpoint(): CheckpointRDD[T]
  /**
   * Return the RDD that contains our checkpointed data.
   * This is only defined if the checkpoint state is `Checkpointed`.
   */
  def checkpointRDD: Option[CheckpointRDD[T]] = RDDCheckpointData.synchronized { cpRDD }
  /**
   * Return the partitions of the resulting checkpoint RDD.
   * For tests only.
   */
  def getPartitions: Array[Partition] = RDDCheckpointData.synchronized {
    cpRDD.map(_.partitions).getOrElse { Array.empty }
  }
}

 

根據注釋,可以知道這個類涵蓋了 RDD Checkpoint 的所有信息。除了控制 Checkpoint 的過程,還會處理之后的狀態變更。說到 Checkpoint 的狀態變更,我們看看是如何定義的。

/**
 * Enumeration to manage state transitions of an RDD through checkpointing
 *
 * [ Initialized --{@literal >} checkpointing in progress --{@literal >} checkpointed ]
 */
private[spark] object CheckpointState extends Enumeration {
  type CheckpointState = Value
  val Initialized, CheckpointingInProgress, Checkpointed = Value
}

 

顯然 Checkpoint 的過程分為初始化[Initialized] -> 正在 Checkpoint[CheckpointingInProgress] -> 結束 Checkpoint[Checkpointed] 三種狀態。

圖片.png

關於 RDDCheckpointData 有兩個實現,分別分析一下。

  1. LocalRDDCheckpointData: RDD 會被保存到 Executor 本地文件系統中,以減少保存到分布式容錯性文件系統的巨額開銷,因此 Local 形式的 Checkpoint 是基於持久化來做的,沒有寫到外部分布式文件系統。
  2. ReliableRDDCheckpointData: Reliable 很好理解,就是把 RDD Checkpoint 到可依賴的文件系統,言下之意就是 Driver 重啟的時候也可以從失敗的時間點進行恢復,無需再走一次 RDD 的轉換過程。

1.1 LocalRDDCheckpointData

LocalRDDCheckpointData 中的核心方法 doCheckpoint()。需要保證 RDD 用了 useDisk 級別的持久化。需要運行一個 Spark 任務來重新構建這個 RDD。最終 new 一個 LocalCheckpointRDD 實例。

/**
 * Ensure the RDD is fully cached so the partitions can be recovered later.
 */
protected override def doCheckpoint(): CheckpointRDD[T] = {
  val level = rdd.getStorageLevel

  // Assume storage level uses disk; otherwise memory eviction may cause data loss
  assume(level.useDisk, s"Storage level $level is not appropriate for local checkpointing")

  // Not all actions compute all partitions of the RDD (e.g. take). For correctness, we
  // must cache any missing partitions. TODO: avoid running another job here (SPARK-8582).
  val action = (tc: TaskContext, iterator: Iterator[T]) => Utils.getIteratorSize(iterator)
  val missingPartitionIndices = rdd.partitions.map(_.index).filter { i =>
    !SparkEnv.get.blockManager.master.contains(RDDBlockId(rdd.id, i))
  }
  if (missingPartitionIndices.nonEmpty) {
    rdd.sparkContext.runJob(rdd, action, missingPartitionIndices)
  }

  new LocalCheckpointRDD[T](rdd)
}

 

1.2 ReliableRDDCheckpointData

這個是寫外部文件系統的 Checkpoint 類。

/**
 * Materialize this RDD and write its content to a reliable DFS.
 * This is called immediately after the first action invoked on this RDD has completed.
 */
protected override def doCheckpoint(): CheckpointRDD[T] = {
  val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)

  // Optionally clean our checkpoint files if the reference is out of scope
  if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
    rdd.context.cleaner.foreach { cleaner =>
      cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
    }
  }

  logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
  newRDD
}

 

可以看到核心方法是通過 ReliableCheckpointRDD.writeRDDToCheckpointDirectory() 來寫 newRDD。這個方法就不進去看了,代碼邏輯非常清晰,同樣是起一個 Spark 任務把 RDD 生成之后按 Partition 來寫到文件系統中。

2 Checkpoint嘗試

Spark 的 Checkpoint 機制通過上文在源碼上分析了一下,那么也可以在 Local 模式下實踐一下。利用 spark-shell 來簡單嘗試一下就好了。

scala> val data = sc.parallelize(List(1, 2, 3))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> sc.setCheckpointDir("/tmp")
scala> data.checkpoint
scala> data.count
res2: Long = 3

 

從以上代碼示例上可以看到,首先構建一個 rdd,並且設置 Checkpoint 文件夾,因為是 Local 模式,所以可以設定本地文件夾做嘗試。

# list 一下 /tmp 目錄,發現 Checkpoint 的文件夾 ➜ /tmp ls 73d8442e-a375-401c-b1fc-84284e25b89c # tree 一下 Checkpoint 文件夾看看是什么結構的,可以看到默認構建的 rdd 四個分區都被 checkpoint 了 ➜ /tmp tree 73d8442e-a375-401c-b1fc-84284e25b89c 73d8442e-a375-401c-b1fc-84284e25b89c └── rdd-0 ├── part-00000 ├── part-00001 ├── part-00002 └── part-00003 1 directory, 4 files 

3 Summary

至此,Spark 的 Checkpoint 機制已經說得差不多了,順便提一下 這個SPARK-8582 已經提出很久時間了,Spark 社區似乎一直都在嘗試解決而又未有解決。大意就是每次 Checkpoint 實際上是對同一個 RDD 進行了兩次計算,第一次是在程序運行的時候,第二次則是 Checkpoint 的時候就需要把這個 RDD 的轉換關系重新計算一次。那么很顯然,能否在第一次計算的時候就 Checkpoint 呢?這是社區幾個大神的 PR 的趨勢,具體大家可以參照一下 JIRA 上提及到的 PR。
如何學習大數據?學習沒有資料?

想學習大數據開發技術,Hadoop,spark,雲計算,數據分析、爬蟲等技術,在這里向大家推薦一個學習資料分享群:894951460,里面有大牛已經整理好的相關學習資料,希望對你們有所幫助。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM