spark checkpoint詳解


checkpoint在spark中主要有兩塊應用:一塊是在spark core中對RDD做checkpoint,可以切斷做checkpoint RDD的依賴關系,將RDD數據保存到可靠存儲(如HDFS)以便數據恢復;另外一塊是應用在spark streaming中,使用checkpoint用來保存DStreamGraph以及相關配置信息,以便在Driver崩潰重啟的時候能夠接着之前進度繼續進行處理(如之前waiting batch的job會在重啟后繼續處理)。

本文主要將詳細分析checkpoint在以上兩種場景的讀寫過程。

1,spark core中checkpoint分析

1.1,checkpoint的使用方法

使用checkpoint對RDD做快照大體如下:

sc.setCheckpointDir(checkpointDir.toString)
val rdd = sc.makeRDD(1 to 20, numSlices = 1)
rdd.checkpoint()

首先,設置checkpoint的目錄(一般是hdfs目錄),這個目錄用來將RDD相關的數據(包括每個partition實際數據,以及partitioner(如果有的話))。然后在RDD上調用checkpoint的方法即可。

1.2,checkpoint寫流程

可以看到checkpoint使用非常簡單,設置checkpoint目錄,然后調用RDD的checkpoint方法。針對checkpoint的寫入流程,主要有以下四個問題:

Q1:RDD中的數據是什么時候寫入的?是在rdd調用checkpoint方法時候嗎?

Q2:在做checkpoint的時候,具體寫入了哪些數據到HDFS了?

Q3:在對RDD做完checkpoint以后,對做RDD的本省又做了哪些收尾工作?

Q4:實際過程中,使用RDD做checkpoint的時候需要注意什么問題?

弄清楚了以上四個問題,我想對checkpoint的寫過程也就基本清楚了。接下來將一一回答上面提出的問題。

A1:首先看一下RDD中checkpoint方法,可以看到在該方法中是只是新建了一個ReliableRDDCheckpintData的對象,並沒有做實際的寫入工作。實際觸發寫入的時機是在runJob生成改RDD后,調用RDD的doCheckpoint方法來做的。

A2:在經歷調用RDD.doCheckpoint → RDDCheckpintData.checkpoint → ReliableRDDCheckpintData.doCheckpoint → ReliableRDDCheckpintData.writeRDDToCheckpointDirectory后,在writeRDDToCheckpointDirectory方法中可以看到:將作為一個單獨的任務(RunJob)將RDD中每個parition的數據依次寫入到checkpoint目錄(writePartitionToCheckpointFile),此外如果該RDD中的partitioner如果不為空,則也會將該對象序列化后存儲到checkpoint目錄。所以,在做checkpoint的時候,寫入的hdfs中的數據主要包括:RDD中每個parition的實際數據,以及可能的partitioner對象(writePartitionerToCheckpointDir)。

A3:在寫完checkpoint數據到hdfs以后,將會調用rdd的markCheckpoined方法,主要斬斷該rdd的對上游的依賴,以及將paritions置空等操作。

A4:通過A1,A2可以知道,在RDD計算完畢后,會再次通過RunJob將每個partition數據保存到HDFS。這樣RDD將會計算兩次,所以為了避免此類情況,最好將RDD進行cache。即1.1中rdd的推薦使用方法如下:

sc.setCheckpointDir(checkpointDir.toString)
val rdd = sc.makeRDD(1 to 20, numSlices = 1)
rdd.cache()
rdd.checkpoint()

1.3,checkpoint 讀流程

在做完checkpoint后,獲取原來RDD的依賴以及partitions數據都將從CheckpointRDD中獲取。也就是說獲取原來rdd中每個partition數據以及partitioner等對象,都將轉移到CheckPointRDD中。

在CheckPointRDD的一個具體實現ReliableRDDCheckpintRDD中的compute方法中可以看到,將會從hdfs的checkpoint目錄中恢復之前寫入的partition數據。而partitioner對象(如果有)也會從之前寫入hdfs的paritioner對象恢復。

總的來說,checkpoint讀取過程是比較簡單的。

2,spark streaming中checkpoint分析

2.1,streaming中checkpoint的使用方法

在streaming中使用checkpoint主要包含以下兩點:設置checkpoint目錄,初始化StreamingContext時調用getOrCreate方法,即當checkpoint目錄沒有數據時,則新建streamingContext實例,並且設置checkpoint目錄,否則從checkpoint目錄中讀取相關配置和數據創建streamingcontext。

// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc }
 
         
// Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

2.2,streaming中checkpoint寫流程

同樣,針對streaming中checkpoint的寫流程,主要有以下三個問題,並對此做相關解釋。

Q1:streaming中checkpoint是在何時做的?

A1:在spark streaming中,jobGenerator會定期生成任務(jobGenerator.generateJobs)。在任務生成后將會調用doCheckpoint方法對系統做checkpoint。此外,在當前批次任務結束,清理metadata(jobGenerator.clearMetadata)時,也會調用doCheckpoint方法。

Q2:在streaming checkpoint過程中,具體都寫入了哪些數據到checkpoint目錄?

A2: 做checkpoint的主要邏輯基本都在JobGenerator.doCheckpoint方法中。

在該方法中,首先更新當前時間段需要做checkpoint RDD的相關信息,如在DirectKafkaInputDStream中,將已經生成的RDD信息的時間,topic,partition,offset等相關信息進行更新。

其次,通過checkpointWriter將Checkpoint對象寫入到checkpoint目錄中(CheckPoint.write → CheckpointWriteHandle)。至此,我們清楚了,寫入到checkpoint目錄的數據其實就是Checkpoint對象。

Checkpoint主要包含的信息如下:

val master = ssc.sc.master
val framework = ssc.sc.appName
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val sparkConfPairs = ssc.conf.getAll

具體包括相關配置信息,checkpoint目錄,DStreamGraph等。對於DStreamGraph,主要包含InputDstream以及outputStream等相關信息,從而我們可以看出定義應用相關的計算函數也被序列化保存到checkpoint目錄中了。

Q3:  streaming checkpoint都有哪些坑?

A3:

從A2中可以看到,應用定義的計算函數也被序列化到checkpoint目錄,當應用代碼發生改變時,此時就沒法從checkpoint恢復。個人感覺這是checkpoint在生產環境使用中碰到的最大障礙。

另外,當從checkpoint目錄恢復streamingContext時,配置信息啥的也都是從checkpoint讀取的(只有很少的一部分配置是reload的,具體見讀流程),當重啟任務時,新改變的配置就可能不生效,導致很奇怪的問題。

此外,broadcast變量在checkpoint中使用也受到限制(SPARK-5206)。

2.3,streaming中checkpoint讀流程

在spark streaming任務從checkpoint恢復streamingContext時,將會觸發對之前保存的checkpoint對象的讀取動作。在StreamingContext的getOrCreate方法中,通過checkpoint.read方法從checkpoint目錄中恢復之前保存的Checkpoint對象。一旦該對象存在,將使用Checkpoint創建streamingContext。於此同時,在StreamingContext中DStreamGraph的恢復借助之前保存的對象,並且調用restoreCheckpointData恢復之前生成而未計算的RDD,從而接着之前的進度進行數據處理。

另外需要注意的時,以下配置信息在使用checkpoint創建streamingContext時,這些配置信息是重新加載的。

val propertiesToReload = List(
"spark.yarn.app.id",
"spark.yarn.app.attemptId",
"spark.driver.host",
"spark.driver.bindAddress",
"spark.driver.port",
"spark.master",
"spark.yarn.jars",
"spark.yarn.keytab",
"spark.yarn.principal",
"spark.yarn.credentials.file",
"spark.yarn.credentials.renewalTime",
"spark.yarn.credentials.updateTime",
"spark.ui.filters",
"spark.mesos.driver.frameworkId")

3,小結

本文主要分析了checkpoint在spark core和streaming讀寫的基本過程,並且指出了在checkpoint使用中碰到一些坑。對於spark streaming,個人認為checkpoint在生產環境不適宜使用。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM