一.簡介
流應用程序必須保證7*24全天候運行,因此必須能夠適應與程序邏輯無關的故障【例如:系統故障、JVM崩潰等】。為了實現這一點,SparkStreaming需要將足夠的信息保存到容錯存儲系統中,以便它可以從故障中恢復。
檢查點有兩種類型。
1.元數據檢查點
將定義流式計算的信息保存到容錯存儲系統【如HDFS等】。這用於從運行流應用程序所在的節點的故障中恢復。
元數據包括:
1.配置
用於創建流應用程序的配置。
2.DStream操作
定義流應用程序的DStream操作集。
3.不完整的批次
在任務隊列中而尚未完成的批次。
2.數據檢查點
將生成的RDD保存到可靠的存儲系統。在一些跨多個批次組合數據的有狀態轉換中,這是必須的。在這種轉換中,生成的RDD依賴於先前批次的RDD,這導致依賴關系鏈的長度隨着時間而增加。為了避免恢復時間的這種無限增加【與依賴鏈成正比】,有狀態變換的中間RDD周期性地檢查以存儲到可靠的存儲系統中,以切斷依賴鏈。
總而言之,元數據檢查點主要用於從節點故障中恢復,而如果使用狀態轉換,即使對於基本功能也需要數據或RDD檢查點。
二.需要設置檢查點的情況
1.有狀態轉換的使用,如果在應用程序中使用了updateStateByKey或reduceByKeyAndWindow,則必須提供檢查點以緩存之前批次的中間結果。
2.從運行應用程序的節點故障中恢復,元數據檢查點用於使用進度信息進行恢復。
備注:在沒有上述狀態轉換的簡單流應用程序中可以不使用檢查點。在這種情況下,節點故障的恢復將是部分性的【某些以接收但未處理的數據可能會丟失】。
三.配置檢查點
可以通過在容錯,可靠的文件系統【例如:HDFS、S3或Windows文件系統】中設置目錄來啟用檢查點,檢查點信息將保存到該文件系統中。使用:streamingContext.checkpoint(checkpointDirectory)來設置的。這將允許使用上述狀態轉換。此外,如果要使應用程序從節點故障中恢復,則應重寫流應用程序以使其具有以下行為。
1.當程序首次啟動時,它將創建一個新的StreamingContext,設置所有流后調用start()。
2.當程序在失敗后重新啟動時,它將從檢查點目錄中的檢查點數據重新創建StreamingContext。
四.代碼實現
1 package big.data.analyse.streaming 2 3 import org.apache.log4j.{Level, Logger} 4 import org.apache.spark.SparkConf 5 import org.apache.spark.streaming.{Seconds, StreamingContext} 6 7 /** 8 * Created by zhen on 2019/8/15. 9 */ 10 object Checkpoint { 11 def functionToCreateContext():StreamingContext = { 12 val conf = new SparkConf().setMaster("local[2]").setAppName("StreaingTest") 13 val ssc = new StreamingContext(conf, Seconds(10)) 14 val lines = ssc.socketTextStream("192.168.245.137", 9999) 15 16 val words = lines.flatMap(_.split(" ")) 17 val pairs = words.map(word=>(word,1)).reduceByKey(_+_) 18 pairs.foreachRDD(row => row.foreach(println)) 19 ssc.checkpoint("D:\\checkpoint") 20 ssc 21 } 22 def main(args: Array[String]) { 23 /** 24 * 設置日志級別 25 */ 26 Logger.getLogger("org").setLevel(Level.WARN) // 設置日志級別 27 28 /** 29 * 獲取入口及設置checkpoint檢查點 30 */ 31 val ssc = StreamingContext.getOrCreate("D:\\checkpoint", functionToCreateContext _) 32 33 ssc.start() 34 ssc.awaitTermination() 35 ssc.stop() 36 } 37 }
五.結果
入參:

結果:

六.總結
1.需要確保節點進程在失敗時會自動重啟,這只能通過部署基礎結構來完成。
2.檢查點的默認間隔是批處理間隔的倍數,且至少為10秒。通常DStream的5~10個滑動間隔為檢查點間隔是一個很好的設置。
