引入
一般來說,分布式數據集的容錯性有兩種方式:數據檢查點和記錄數據的更新。
面向大規模數據分析,數據檢查點操作成本很高,需要通過數據中心的網絡連接在機器之間復制龐大的數據集,而網絡帶寬往往比內存帶寬低得多,同時還需要消耗更多的存儲資源。
因此,Spark選擇記錄更新的方式。但是,如果更新粒度太細太多,那么記錄更新成本也不低。因此,RDD只支持粗粒度轉換,即只記錄單個塊上執行的單個操作,然后將創建RDD的一系列變換序列(每個RDD都包含了他是如何由其他RDD變換過來的以及如何重建某一塊數據的信息。因此RDD的容錯機制又稱“血統(Lineage)”容錯)記錄下來,以便恢復丟失的分區。
Lineage本質上很類似於數據庫中的重做日志(Redo Log),只不過這個重做日志粒度很大,是對全局數據做同樣的重做進而恢復數據。
Lineage機制
Lineage簡介
相比其他系統的細顆粒度的內存數據更新級別的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定數據Transformation操作(如filter、map、join等)行為。當這個RDD的部分分區數據丟失時,它可以通過Lineage獲取足夠的信息來重新運算和恢復丟失的數據分區。因為這種粗顆粒的數據模型,限制了Spark的運用場合,所以Spark並不適用於所有高性能要求的場景,但同時相比細顆粒度的數據模型,也帶來了性能的提升。
兩種依賴關系
RDD在Lineage依賴方面分為兩種:窄依賴(Narrow Dependencies)與寬依賴(Wide Dependencies,源碼中稱為Shuffle
Dependencies),用來解決數據容錯的高效性。
- 窄依賴是指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應於一個子RDD的分區
或多個父RDD的分區對應於一個子RDD的分區,也就是說一個父RDD的一個分區不可能對應一個子RDD的多個分區。
1個父RDD分區對應1個子RDD分區,這其中又分兩種情況:1個子RDD分區對應1個父RDD分區(如map、filter等算子),1個子RDD分區對應N個父RDD分區(如co-paritioned(協同划分)過的Join)。- 寬依賴是指子RDD的分區依賴於父RDD的多個分區或所有分區,即存在一個父RDD的一個分區對應一個子RDD的多個分區。
1個父RDD分區對應多個子RDD分區,這其中又分兩種情況:1個父RDD對應所有子RDD分區(未經協同划分的Join)或者1個父RDD對應非全部的多個RDD分區(如groupByKey)。
本質理解:根據父RDD分區是對應1個還是多個子RDD分區來區分窄依賴(父分區對應一個子分區)和寬依賴(父分區對應多個子分
區)。如果對應多個,則當容錯重算分區時,因為父分區數據只有一部分是需要重算子分區的,其余數據重算就造成了冗余計算。
對於寬依賴,Stage計算的輸入和輸出在不同的節點上,對於輸入節點完好,而輸出節點死機的情況,通過重新計算恢復數據這種情況下,這種方法容錯是有效的,否則無效,因為無法重試,需要向上追溯其祖先看是否可以重試(這就是lineage,血統的意思),窄依賴對於數據的重算開銷要遠小於寬依賴的數據重算開銷。
窄依賴和寬依賴的概念主要用在兩個地方:一個是容錯中相當於Redo日志的功能;另一個是在調度中構建DAG作為不同Stage的划分點。
依賴關系的特性
第一,窄依賴可以在某個計算節點上直接通過計算父RDD的某塊數據計算得到子RDD對應的某塊數據;寬依賴則要等到父RDD所有數據都計算完成之后,並且父RDD的計算結果進行hash並傳到對應節點上之后才能計算子RDD。
第二,數據丟失時,對於窄依賴只需要重新計算丟失的那一塊數據來恢復;對於寬依賴則要將祖先RDD中的所有數據塊全部重新計算來恢復。所以在長“血統”鏈特別是有寬依賴的時候,需要在適當的時機設置數據檢查點。也是這兩個特性要求對於不同依賴關系要采取不同的任務調度機制和容錯恢復機制。
容錯原理
在容錯機制中,如果一個節點死機了,而且運算窄依賴,則只要把丟失的父RDD分區重算即可,不依賴於其他節點。而寬依賴需要父RDD的所有分區都存在,重算就很昂貴了。可以這樣理解開銷的經濟與否:在窄依賴中,在子RDD的分區丟失、重算父RDD分區時,父RDD相應分區的所有數據都是子RDD分區的數據,並不存在冗余計算。在寬依賴情況下,丟失一個子RDD分區重算的每個父RDD的每個分區的所有數據並不是都給丟失的子RDD分區用的,會有一部分數據相當於對應的是未丟失的子RDD分區中需要的數據,這樣就會產生冗余計算開銷,這也是寬依賴開銷更大的原因。因此如果使用Checkpoint算子來做檢查點,不僅要考慮Lineage是否足夠長,也要考慮是否有寬依賴,對寬依賴加Checkpoint是最物有所值的。
Checkpoint機制
通過上述分析可以看出在以下兩種情況下,RDD需要加檢查點。
- DAG中的Lineage過長,如果重算,則開銷太大(如在PageRank中)。
- 在寬依賴上做Checkpoint獲得的收益更大。
由於RDD是只讀的,所以Spark的RDD計算中一致性不是主要關心的內容,內存相對容易管理,這也是設計者很有遠見的地方,這樣減少了框架的復雜性,提升了性能和可擴展性,為以后上層框架的豐富奠定了強有力的基礎。
在RDD計算中,通過檢查點機制進行容錯,傳統做檢查點有兩種方式:通過冗余數據和日志記錄更新操作。在RDD中的doCheckPoint方法相當於通過冗余數據來緩存數據,而之前介紹的血統就是通過相當粗粒度的記錄更新操作來實現容錯的。
檢查點(本質是通過將RDD寫入Disk做檢查點)是為了通過lineage做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷。
一個 Streaming Application 往往需要7*24不間斷的跑,所以需要有抵御意外的能力(比如機器或者系統掛掉,JVM crash等)。為了讓這成為可能,Spark Streaming需要 checkpoint 足夠多信息至一個具有容錯設計的存儲系統才能讓 Application 從失敗中恢復。Spark Streaming 會 checkpoint 兩種類型的數據。
- Metadata(元數據) checkpointing - 保存定義了 Streaming 計算邏輯至類似 HDFS 的支持容錯的存儲系統。用來恢復 driver,元數據包括:
- 配置 - 用於創建該 streaming application 的所有配置
- DStream 操作 - DStream 一些列的操作
- 未完成的 batches - 那些提交了 job 但尚未執行或未完成的 batches
- Data checkpointing - 保存已生成的RDDs至可靠的存儲。這在某些 stateful 轉換中是需要的,在這種轉換中,生成 RDD 需要依賴前面的 batches,會導致依賴鏈隨着時間而變長。為了避免這種沒有盡頭的變長,要定期將中間生成的 RDDs 保存到可靠存儲來切斷依賴鏈
具體來說,metadata checkpointing主要還是從drvier失敗中恢復,而Data Checkpoing用於對有狀態的transformation操作進行checkpointing
Checkpointing具體的使用方式時通過下列方法:
//checkpointDirectory為checkpoint文件保存目錄 streamingContext.checkpoint(checkpointDirectory)
什么時候需要啟用 checkpoint?
什么時候該啟用 checkpoint 呢?滿足以下任一條件:
- 使用了 stateful 轉換 - 如果 application 中使用了
updateStateByKey或reduceByKeyAndWindow等 stateful 操作,必須提供 checkpoint 目錄來允許定時的 RDD checkpoint - 希望能從意外中恢復 driver
如果 streaming app 沒有 stateful 操作,也允許 driver 掛掉后再次重啟的進度丟失,就沒有啟用 checkpoint的必要了。
如何使用 checkpoint?
啟用 checkpoint,需要設置一個支持容錯 的、可靠的文件系統(如 HDFS、s3 等)目錄來保存 checkpoint 數據。通過調用 streamingContext.checkpoint(checkpointDirectory) 來完成。另外,如果你想讓你的 application 能從 driver 失敗中恢復,你的 application 要滿足:
- 若 application 為首次重啟,將創建一個新的 StreamContext 實例
- 如果 application 是從失敗中重啟,將會從 checkpoint 目錄導入 checkpoint 數據來重新創建 StreamingContext 實例
通過 StreamingContext.getOrCreate 可以達到目的:
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
如果 checkpointDirectory 存在,那么 context 將導入 checkpoint 數據。如果目錄不存在,函數 functionToCreateContext 將被調用並創建新的 context
除調用 getOrCreate 外,還需要你的集群模式支持 driver 掛掉之后重啟之。例如,在 yarn 模式下,driver 是運行在 ApplicationMaster 中,若 ApplicationMaster 掛掉,yarn 會自動在另一個節點上啟動一個新的 ApplicationMaster。
需要注意的是,隨着 streaming application 的持續運行,checkpoint 數據占用的存儲空間會不斷變大。因此,需要小心設置checkpoint 的時間間隔。設置得越小,checkpoint 次數會越多,占用空間會越大;如果設置越大,會導致恢復時丟失的數據和進度越多。一般推薦設置為 batch duration 的5~10倍。
導出 checkpoint 數據
上文提到,checkpoint 數據會定時導出到可靠的存儲系統,那么
- 在什么時機進行 checkpoint
- checkpoint 的形式是怎么樣的
checkpoint 的時機
在 Spark Streaming 中,JobGenerator 用於生成每個 batch 對應的 jobs,它有一個定時器,定時器的周期即初始化 StreamingContext 時設置的 batchDuration。這個周期一到,JobGenerator 將調用generateJobs方法來生成並提交 jobs,這之后調用 doCheckpoint 方法來進行 checkpoint。doCheckpoint 方法中,會判斷當前時間與 streaming application start 的時間之差是否是 checkpoint duration 的倍數,只有在是的情況下才進行 checkpoint。
checkpoint 的形式
最終 checkpoint 的形式是將類 Checkpoint的實例序列化后寫入外部存儲,值得一提的是,有專門的一條線程來做將序列化后的 checkpoint 寫入外部存儲。類 Checkpoint 包含以下數據
除了 Checkpoint 類,還有 CheckpointWriter 類用來導出 checkpoint,CheckpointReader 用來導入 checkpoint
Checkpoint 的局限
Spark Streaming 的 checkpoint 機制看起來很美好,卻有一個硬傷。上文提到最終刷到外部存儲的是類 Checkpoint 對象序列化后的數據。那么在 Spark Streaming application 重新編譯后,再去反序列化 checkpoint 數據就會失敗。這個時候就必須新建 StreamingContext。
針對這種情況,在我們結合 Spark Streaming + kafka 的應用中,我們自行維護了消費的 offsets,這樣一來及時重新編譯 application,還是可以從需要的 offsets 來消費數據,這里只是舉個例子,不詳細展開了。
轉載http://blog.csdn.net/jasonding1354/article/details/46882585

