Spark中對於數據的保存除了持久化操作之外,還提供了一種檢查點的機制,檢查點(本質是通過將RDD寫入Disk做檢查點)是為了通過lineage(血統)做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷。檢查點通過將數據寫入到HDFS文件系統實現了RDD的檢查點功能。
cache和checkpoint的區別:
緩存(cache)把 RDD 計算出來然后放在內存中,但是RDD 的依賴鏈(相當於數據庫中的redo 日志),也不能丟掉,當某個點某個 executor 宕了,上面cache 的RDD就會丟掉,需要通過依賴鏈重放計算出來。不同的是,checkpoint是把 RDD 保存在 HDFS中, 是多副本可靠存儲,所以依賴鏈就可以丟掉了,就斬斷了依賴鏈, 是通過復制實現的高容錯。
如果存在以下場景,則比較適合使用檢查點機制:
1) DAG中的Lineage過長,如果重算,則開銷太大(如在PageRank中)。
2) 在寬依賴上做Checkpoint獲得的收益更大。
為當前RDD設置檢查點。該函數將會創建一個二進制的文件,並存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。在checkpoint的過程中,該RDD的所有依賴於父RDD中的信息將全部被移出。對RDD進行checkpoint操作並不會馬上被執行,必須執行Action操作才能觸發。
checkpoint寫流程
RDD checkpoint 過程中會經過以下幾個狀態:
[ Initialized → marked for checkpointing → checkpointing in progress → checkpointed ]
轉換流程如下:
RDD 需要經過 [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ] 這幾個階段才能被 checkpoint。
Initialized: 首先 driver program 需要使用 rdd.checkpoint() 去設定哪些 rdd 需要 checkpoint,設定后,該 rdd 就接受 RDDCheckpointData 管理。用戶還要設定 checkpoint 的存儲路徑,一般在 HDFS 上。
marked for checkpointing:初始化后,RDDCheckpointData 會將 rdd 標記為 MarkedForCheckpoint,這時候標記為 Initialized 狀態。
checkpointing in progress:每個 job 運行結束后會調用 finalRdd.doCheckpoint(),finalRdd 會順着 computing chain 回溯掃描,碰到要 checkpoint 的 RDD 就將其標記為 CheckpointingInProgress,然后將寫磁盤(比如寫 HDFS)需要的配置文件(如 core-site.xml 等)broadcast 到其他 worker 節點上的 blockManager。完成以后,啟動一個 job 來完成 checkpoint(使用 rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf))
)。
checkpointed:job 完成 checkpoint 后,將該 rdd 的 dependency 全部清掉, 怎么清除依賴的呢, 就是把RDD 變量的強引用設置為 null,垃圾回收了,會觸發 ContextCleaner 里面的監聽,清除實際 BlockManager 緩存中的數據。並設定該 rdd 狀態為 checkpointed。然后,為該 rdd 強加一個依賴,設置該 rdd 的 parent rdd 為 CheckpointRDD,該 CheckpointRDD 負責以后讀取在文件系統上的 checkpoint 文件,生成該 rdd 的 partition。
checkpoint讀流程
在 runJob() 的時候會先調用 finalRDD 的 partitions() 來確定最后會有多個 task。rdd.partitions() 會去檢查(通過 RDDCheckpointData 去檢查,因為它負責管理被 checkpoint 過的 rdd)該 rdd 是會否被 checkpoint 過了,如果該 rdd 已經被 checkpoint 過了,直接返回該 rdd 的 partitions 也就是 Array[Partition]。
當調用 rdd.iterator() 去計算該 rdd 的 partition 的時候,會調用 computeOrReadCheckpoint(split: Partition) 去查看該 rdd 是否被 checkpoint 過了,如果是,就調用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),CheckpointRDD 負責讀取文件系統上的文件,生成該 rdd 的 partition。這就解釋了為什么那么 trickly 地為 checkpointed rdd 添加一個 parent CheckpointRDD。
總結:
- 下面來看一個關於checkpoint的例子:
object testCheckpoint { def main(args: Array[String]): Unit = { val sc =new SparkContext(new SparkConf().setAppName("testCheckpoint").setMaster("local[*]")) //設置檢查點目錄 sc.setCheckpointDir("file:///f:/spark/checkpoint") val rdd=sc.textFile("file:///F:/spark/b.txt").flatMap{line=>line.split(" ")}.map(word=>(word,1)).reduceByKey(_+_) rdd.checkpoint() //rdd.count() rdd.groupBy(x=>x._2).collect().foreach(println) } }
-
checkpoint流程分析
checkpoint初始化
我們可以看到最先調用了
SparkContext
的setCheckpointDir
設置了一個checkpoint 目錄
我們跟進這個方法看一下/** * Set the directory under which RDDs are going to be checkpointed. The directory must * be a HDFS path if running on a cluster. */ def setCheckpointDir(directory: String) { // If we are running on a cluster, log a warning if the directory is local. // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from // its own local file system, which is incorrect because the checkpoint files // are actually on the executor machines. if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) { logWarning("Spark is not running in local mode, therefore the checkpoint directory " + s"must not be on the local filesystem. Directory '$directory' " + "appears to be on the local filesystem.") } checkpointDir = Option(directory).map { dir => val path = new Path(dir, UUID.randomUUID().toString) val fs = path.getFileSystem(hadoopConfiguration) fs.mkdirs(path) fs.getFileStatus(path).getPath.toString } }
這個方法挺簡單的,就創建了一個目錄,接下來我們看RDD核心的
checkpoint
方法,跟進去/** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with `SparkContext#setCheckpointDir` and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ def checkpoint(): Unit = RDDCheckpointData.synchronized { // NOTE: we use a global lock here due to complexities downstream with ensuring // children RDD partitions point to the correct parent partitions. In the future // we should revisit this consideration. if (context.checkpointDir.isEmpty) { throw new SparkException("Checkpoint directory has not been set in the SparkContext") } else if (checkpointData.isEmpty) { checkpointData = Some(new ReliableRDDCheckpointData(this)) } }
這個方法沒有返回值,邏輯只有一個判斷,
checkpointDir
剛才設置過了,不為空,然后創建了一個ReliableRDDCheckpointData
,我們來看ReliableRDDCheckpointData
/** * An implementation of checkpointing that writes the RDD data to reliable storage. * This allows drivers to be restarted on failure with previously computed state. */ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) extends RDDCheckpointData[T](rdd) with Logging { 。。。。。 }
這個
ReliableRDDCheckpointData
的父類RDDCheckpointData
我們再繼續看它的父類/** * RDD 需要經過 * [ Initialized --> CheckpointingInProgress--> Checkpointed ] * 這幾個階段才能被 checkpoint。 */ private[spark] object CheckpointState extends Enumeration { type CheckpointState = Value val Initialized, CheckpointingInProgress, Checkpointed = Value } private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) extends Serializable { import CheckpointState._ // The checkpoint state of the associated RDD. protected var cpState = Initialized 。。。。。。 }
RDD 需要經過
[ Initialized --> CheckpointingInProgress--> Checkpointed ]
這幾個階段才能被 checkpoint。
這類里面有一個枚舉來標識CheckPoint的狀態,第一次初始化時是Initialized。
checkpoint這個一步已經完成了,回到我們的RDD成員變量里checkpointData
這個變量指向的RDDCheckpointData
的實例。
checkpoint什么時候寫入數據
- 我們知道一個spark job運行最終會調用
SparkContext
的runJob
方法將任務提交給Executor去執行,我們來看runJob
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
最后一行代碼調用了
doCheckpoint
,在dagScheduler
將任務提交給集群運行之后,我來看這個doCheckpoint
方法/** * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD * has completed (therefore the RDD has been materialized and potentially stored in memory). * doCheckpoint() is called recursively on the parent RDDs. */ private[spark] def doCheckpoint(): Unit = { RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) { if (!doCheckpointCalled) { doCheckpointCalled = true if (checkpointData.isDefined) { if (checkpointAllMarkedAncestors) { // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint // them in parallel. // Checkpoint parents first because our lineage will be truncated after we // checkpoint ourselves dependencies.foreach(_.rdd.doCheckpoint()) } checkpointData.get.checkpoint() } else { dependencies.foreach(_.rdd.doCheckpoint()) } } } }
這個是一個遞歸,遍歷RDD依賴鏈條,當rdd是checkpointData
不為空時,調用checkpointData
的checkpoint()
方法。還記得checkpointData
類型是什么嗎?就是RDDCheckpointData
,我們來看它的checkpoint
方法,以下/** * Materialize this RDD and persist its content. * This is called immediately after the first action invoked on this RDD has completed. */ final def checkpoint(): Unit = { // Guard against multiple threads checkpointing the same RDD by // atomically flipping the state of this RDDCheckpointData RDDCheckpointData.synchronized { if (cpState == Initialized) {
//標記當前狀態為 CheckpointingInProgress cpState = CheckpointingInProgress } else { return } } //這里調用的是子類的 doCheckPoint() val newRDD = doCheckpoint() // Update our state and truncate the RDD lineage RDDCheckpointData.synchronized { cpRDD = Some(newRDD) cpState = Checkpointed rdd.markCheckpointed() } }這個方法開始做checkpoint操作了。
checkpoint什么時候讀取數據
- 我們知道Task是spark運行任務的最小單元,當Task執行失敗的時候spark會重新計算,這里Task進行計算的地方就是讀取checkpoint的入口。我們可以看一下
ShuffleMapTask
里的計算方法runTask
,如下override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTime = System.currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }
這是spark真正調用計算方法的邏輯
runTask
調用rdd.iterator()
去計算該 rdd 的 partition 的,我們來看RDD的iterator()
/** * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */ final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } }
- 我們知道Task是spark運行任務的最小單元,當Task執行失敗的時候spark會重新計算,這里Task進行計算的地方就是讀取checkpoint的入口。我們可以看一下
-
- 這里會繼續調用
computeOrReadCheckpoint
,我們看該方法/** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */ private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { compute(split, context) } }
當調用rdd.iterator()
去計算該 rdd 的 partition 的時候,會調用computeOrReadCheckpoint(split: Partition)
去查看該 rdd 是否被 checkpoint 過了,如果是,就調用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),否則直接調用該RDD的compute
, 那么我們就跟進CheckpointRDD
的compute
/** * Read the content of the checkpoint file associated with the given partition. */ override def compute(split: Partition, context: TaskContext): Iterator[T] = { val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index)) ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context) }
這里就兩行代碼,意思是從Path上讀取我們的CheckPoint數據,看一下
readCheckpointFile
/** * Read the content of the specified checkpoint file. */ def readCheckpointFile[T]( path: Path, broadcastedConf: Broadcast[SerializableConfiguration], context: TaskContext): Iterator[T] = { val env = SparkEnv.get val fs = path.getFileSystem(broadcastedConf.value.value) val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() val deserializeStream = serializer.deserializeStream(fileInputStream) // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener(context => deserializeStream.close()) deserializeStream.asIterator.asInstanceOf[Iterator[T]] }
CheckpointRDD
負責讀取文件系統上的文件,生成該 rdd 的 partition。這就解釋了為什么要為調用了checkpoint
的RDD 添加一個parent CheckpointRDD
的原因。
到此,整個checkpoint的流程就結束了。
- 這里會繼續調用
參考:https://www.coderfei.com/2018/02/11/spark-6-spark-rdd-cache-checkpoint.html
https://www.jianshu.com/p/653ebabc8f87