Spark Streaming揭秘 Day33 checkpoint的使用


Spark Streaming揭秘 Day33

checkpoint的使用

今天談下sparkstreaming中,另外一個至關重要的內容Checkpoint。
首先,我們會看下checkpoint的使用。另外,會看下在應用程序重新啟動時,是如何處理checkpoint的。

Checkpoint保存什么

checkpoint作為容錯的設計,基本思路是把當前運行的狀態,保存在容錯的存儲系統中(一般是hdfs)。對於容錯的處理,肯定是圍繞作業緊密相關的,保存內容包括元數據和數據兩部分。

從元數據角度講,SparkStreaming中會有哪些內容需要保存呢,主要有三類:

  1. 程序的配置
  2. 應用程序的業務邏輯,保存在DStreamGraph里。
  3. 還有哪些沒有處理的數據,即沒有完成的batch。

從數據角度講下,Checkpoint是對於狀態(state)操作生效。
首先,一般情況下在接收數據並保存時,是使用WAL來容錯,這個昨天討論過,並不放在checkpoint里。
對狀態(state)的DStream操作(updateStateByKey),操作會跨多個batch duration,后面數據對前面的有依賴,隨着時間的推移,依賴鏈條會越來越長,這個時候需要使用checkpoint,把這個長鏈條持久化,成為短鏈條。

在官方例子RecoverableNetworkCount中,我們看到可以用如下方法創建一個可恢復的streamingContext。
Snip20160712_68

那關鍵就是getOrCreate方法

Snip20160712_72

里面提供了兩種創建方式,第一次是創建新的streaming context,否則會從checkpoint數據中創建出上下文。第三個輸入參數是Hadoop的配置,一般來說checkpoint存放在hdfs中。ignoreReadError,可以控制是否拋出異常。

讓我們進入checkpoint的read方法。這里面是循環checkpoint目錄中的文件,讀取並反序列化,之后返回。
Snip20160712_71

我們看下反序列化方法,生成一個Checkpoint類型。
Snip20160712_73

Checkpoint里面包含了所有我們進行保存的內容。
Snip20160712_74

使用checkpoint恢復

下面我們來看下使用checkpoint構建環境的過程。

首先,調用sparkContext.getOrCreate 來構建sparkContext。
Snip20160712_75

之后是恢復DStreamGraph。
Snip20160712_76

DStreamGraph的恢復過程中,關鍵是恢復所有的outputStream。
Snip20160712_77

在恢復的過程中,默認是根據checkpoint文件重建RDD。
Snip20160712_79

checkpoint的數據是保存在ReliableCheckpointRDD中,我們看下compute方法。可以看到,就是從checkpoint文件讀取數據。
Snip20160712_80

最后,還有恢復checkpointDuration。
Snip20160712_81

Checkpoint的生成

生成是在JobGenerator中觸發。

在每次生成Job后,都會觸發checkpoint的寫入事件。

Snip20160712_83

doCheckpoint會寫入一個Checkpoint對象,其核心就是采用序列化技術把對象寫入磁盤。
Snip20160712_86

今天對checkpoint的介紹就到這里,對於整個機制來看,還是有些漏洞,如果目錄數據存在,但是代碼變化了,有可能出現不能讀取checkpoint里的內容,希望后續版本能改進。

欲知后事如何,且聽下回分解!

DT大數據每天晚上20:00YY頻道現場授課頻道68917580


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM