Spark Streaming源碼分析 – Checkpoint


Persistence
Streaming沒有做特別的事情,DStream最終還是以其中的每個RDD作為job進行調度的,所以persistence就以RDD為單位按照原先Spark的方式去做就可以了,不同的是Streaming是無限,需要考慮Clear的問題
在clearMetadata時,在刪除過期的RDD的同時,也會做相應的unpersist
比較特別的是,NetworkInputDStream,是一定會做persistence的,因為會事先將流數據轉化為persist block,然后NetworkInputDStream直接從block中讀到數據
在design中看到NetworkInputDStream會將source data存兩份,防止丟失,但在代碼中沒有找到這段邏輯,只看到往blockManager寫入一份

Checkpoint
在Streaming中Checkpoint有特殊的意義
對於普通的Spark,沒有cp不會影響正確性,因為任何數據都是可以從source replay出來的,而source data往往在HDFS上,所以cp只是一種優化。
並且Spark也只在worker級別做了failover,worker掛了,沒事把上面的tasks換個worker重新replay出來即可, 但是並沒有做driver的failover,driver掛了就失敗了
因為Spark本身就看成是個query engine,query失敗了沒什么損失,again就ok

但是對於SparkStreaming,這個問題就沒有那么簡單了,如果driver掛掉,不做任何處理,恢復以后到底從哪里開始做?
首先一定會丟數據,影響正確性,因為流數據是無限的,你不可能像Spark一樣把所有數據replay一遍,即使source支持replay,比如kafka

所以對於Streaming的checkpoint分為兩部分,RDD的cp和DStreamGraph的cp
對於RDD的cp和Spark是一致的,沒有區別
下面談談對於DStreamGraph的cp,目的就是在StreamingContext被重啟后,可以從cp中恢復出之前Graph的執行時狀況
a. Graph對象是會整個被序列化到文件,而其中最關鍵的是outputStreams,看似這里只會persist最終的outputStreams,其實會persist整個graph上所有的DStream
因為在def dependencies: List[DStream[_]]會包含所有的上一層DStream,依次遞歸,就會包含所有的DStream對象
在恢復出DStream對象后,如何恢復當時的RDD狀況,可以看到generatedRDDs是@transient的,並不會被persist
答案在DStream.DStreamCheckpointData中,通過currentCheckpointFiles可以記錄下cp時,generatedRDDs中所有完成cp的RDD的(times,cpfilename)
所以在恢復時只需要將RDD從cpfile中讀出來,並加入到generatedRDDs即可
並且cpfile是需要清理的,當每次完成DStreamGraph的cp時,在該graph中的最老的RDD之前的所有RDD的cpfile都可以刪掉,因為這些老的RDD不可能再被用到
b. 在Checkpoint對象中除了graph對象,還有該比較重要的是pendingTimes,這個記錄在cp時,有多少的jobs沒有被提交
這樣當JobScheduler重新啟動的時候會重新提交這些jobs,這里是at-least once邏輯,因為不知道在cp完多久后crash,所以其中某些job有可能已經被成功執行

創建cp的過程,
1. 在JobGenerator中,每次提交一組jobs到Spark后,會執行對DoCheckpoint將Checkpoint對象序列化寫入文件(其中Checkpoint對象包含graph對象等信息)
2. 在完成DoCheckpoint后,會調用ClearCheckpointData清除過期的RDD的checkpoint文件

使用cp的過程,
1. 調用StreamingContext.getOrCreate,使用CheckpointReader.read從文件中反序列化出Checkpoint對象, 並使用Checkpoint對象去初始化StreamingContext對象
2. 在StreamingContext中調用cp_.graph.restoreCheckpointData來恢復每個DStream.generatedRDDs
3. 在JobGenerator中調用Restart,重新提交哪些在cp中未被提交的jobs

 

DStreamGraph

DStreamCheckpointData

DStream

JobGenerator
1. 在每次runJobs結束,即每次新提交一組jobs后,會執行對DoCheckpoint將Checkpoint對象寫入文件
2. 在restart的時候,會重新run pendingTimes + downTimes的jobs,保證at-least once邏輯

StreamingContext
在有checkpoint文件時,需要先讀出Checkpoint對象,然后去初始化StreamingContext
從而使用Checkpoint去恢復graph中所有的DStream

Checkpoint (org.apache.spark.streaming)
Checkpoint主要是為了cp DStreamGraph對象,通過CheckpointWriter將Checkpoint序列化到文件

CheckpointWriter,用於將CP對象寫入文件

CheckpointReader



 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM