實時的流式處理系統必須是7*24運行的,同時可以從各種各樣的系統錯誤中恢復,在設計之處,Spark Streaing就支持driver和worker節點的錯誤恢復。然后,在使用某些數據源的時候,錯誤恢復時輸入數據可能會丟失。在spark 1.2中,加入write ahead logs(日志)這個初步方案用來改進恢復機制,保證數據的無丟失。
背景
spark和rdd的設計保證了集群中worker節點的容錯性。spark streaming構建在spark之上,所以它的worker節點也是同樣的容錯機制,然后Spark Streaming應用的高可用性要求應用的driver進程也要有容錯性,它是應用的主要進程用於協調所有的worker節點,因為用戶應用的計算模式是可變的導致driver的容錯性非常棘手,然后Spark Streaming會對實時流中的每一批數據進行運行同樣的Spark計算,這樣就可以定期的保存應用的狀態到一個可靠的存儲中,driver重啟的時候恢復這些狀態。
對於一些文件的數據源,driver的恢復機制可以保證數據無丟失,因為所有的數據都保存在HDFS或S3上面,對於一些像kafka,flume等數據源,接收的數據保存在內存中將有可能丟失,這是因為spark應用分布式運行的,如果driver進程掛了,所有的executor進程將團滅,保存在這些進程所持有內存中的數據將會丟失。為了避免這些數據的丟失,在spark streaming 1.2中 引入了一個write ahead logs。
Write Ahead Logs
WAL使用在文件系統和數據庫中用於數據操作的持久性,先把數據寫到一個持久化的日志中,然后對數據做操作,如果操作過程中系統掛了,恢復的時候可以重新讀取日志文件再次進行操作。
對於像kafka和flume這些使用接收器來接收數據的數據源。接收器作為一個長時間的任務運行在executor中,負責從數據源接收數據,如果數據源支持的話,向數據源確認接收到數據,然后把數據存儲在executor的內存中,然后driver在exector上運行任務處理這些數據。
如果wal啟用了,所有接收到的數據會保存到一個日志文件中去(HDFS), 這樣保存接收數據的持久性,此外,如果只有在數據 寫入到log中之后接收器才向數據源確認,這樣drive重啟后那些保存在內存中但是沒有寫入到log中的數據將會重新發送,這兩點保證的數據的無丟失。
配置
啟動WAL需要做如下的配置
1:給streamingContext設置checkpoint的目錄,該目錄必須是HADOOP支持的文件系統,用來保存WAL和做Streaming的checkpoint
2:spark.streaming.receiver.writeAheadLog.enable 設置為true
當WAL被啟動了以后,所有的接收器接收的數據可以很穩定的恢復,推薦的內存備份可以關閉了(給輸入流設置合適的持久化級別),因為WAL保存在可容錯的文件系統上,數據已經備份了。
此外,如果想要恢復緩沖的數據,必須使用支持應答的數據源(flume,kafka,kinses)。 當數據存儲到日志以后那些支持應答接收器可以向數據源確認。內置的flume和kafka接收器已經實現了這些功能
最后,值得注意的是WAL開啟了以后會減少Spark Streaming處理數據的吞吐,因為所有接收的數據會被寫到到容錯的文件系統上,這樣文件系統的吞吐和網絡帶寬將成為瓶頸。可以通過添加更多接收器或使用更好的軟件。
實現細節
下面講解下WAL的工作原理。過一下Spark Streaming的架構
當一個Spark Streaming應用啟動了(例如driver啟動), 相應的StreamingContext使用SparkContet去啟動receiver,receiver是一個長時間執行的作業,這些接收器接收並保存這些數據到Spark的executor進程的內存中,這些數據的生命周期如下圖所示

1:藍色的箭頭表示接收的數據,接收器把數據流打包成塊,存儲在executor的內存中,如果開啟了WAL,將會把數據寫入到存在容錯文件系統的日志文件中
2:青色的箭頭表示提醒driver, 接收到的數據塊的元信息發送給driver中的StreamingContext, 這些元數據包括:executor內存中數據塊的引用ID和日志文件中數據塊的偏移信息
3:紅色箭頭表示處理數據,每一個批處理間隔,StreamingContext使用塊信息用來生成RDD和jobs. SparkContext執行這些job用於處理executor內存中的數據塊
4:黃色箭頭表示checkpoint這些計算,以便於恢復。流式處理會周期的被checkpoint到文件中
當一個失敗的driver重啟以后,恢復流程如下

1:黃色的箭頭用於恢復計算,checkpointed的信息是用於重啟driver,重新構造上下文和重啟所有的receiver
2: 青色箭頭恢復塊元數據信息,所有的塊信息對已恢復計算很重要
3:重新生成未完成的job(紅色箭頭),會使用到2恢復的元數據信息
4:讀取保存在日志中的塊(藍色箭頭),當job重新執行的時候,塊數據將會直接從日志中讀取,
5:重發沒有確認的數據(紫色的箭頭)。緩沖的數據沒有寫到WAL中去將會被重新發送。
