SparkStreaming之checkpoint檢查點


一.簡介

  流應用程序必須保證7*24全天候運行,因此必須能夠適應與程序邏輯無關的故障【例如:系統故障、JVM崩潰等】。為了實現這一點,SparkStreaming需要將足夠的信息保存到容錯存儲系統中,以便它可以從故障中恢復。

  檢查點有兩種類型。

    1.元數據檢查點

      將定義流式計算的信息保存到容錯存儲系統【如HDFS等】。這用於從運行流應用程序所在的節點的故障中恢復。

      元數據包括:

        1.配置

          用於創建流應用程序的配置。

        2.DStream操作

          定義流應用程序的DStream操作集。

        3.不完整的批次

          在任務隊列中而尚未完成的批次。

    2.數據檢查點

      將生成的RDD保存到可靠的存儲系統。在一些跨多個批次組合數據的有狀態轉換中,這是必須的。在這種轉換中,生成的RDD依賴於先前批次的RDD,這導致依賴關系鏈的長度隨着時間而增加。為了避免恢復時間的這種無限增加【與依賴鏈成正比】,有狀態變換的中間RDD周期性地檢查以存儲到可靠的存儲系統中,以切斷依賴鏈。

  總而言之,元數據檢查點主要用於從節點故障中恢復,而如果使用狀態轉換,即使對於基本功能也需要數據或RDD檢查點。

二.需要設置檢查點的情況

  1.有狀態轉換的使用,如果在應用程序中使用了updateStateByKey或reduceByKeyAndWindow,則必須提供檢查點以緩存之前批次的中間結果。

  2.從運行應用程序的節點故障中恢復,元數據檢查點用於使用進度信息進行恢復。

  備注:在沒有上述狀態轉換的簡單流應用程序中可以不使用檢查點。在這種情況下,節點故障的恢復將是部分性的【某些以接收但未處理的數據可能會丟失】。

三.配置檢查點

  可以通過在容錯,可靠的文件系統【例如:HDFS、S3或Windows文件系統】中設置目錄來啟用檢查點,檢查點信息將保存到該文件系統中。使用:streamingContext.checkpoint(checkpointDirectory)來設置的。這將允許使用上述狀態轉換。此外,如果要使應用程序從節點故障中恢復,則應重寫流應用程序以使其具有以下行為。

  1.當程序首次啟動時,它將創建一個新的StreamingContext,設置所有流后調用start()。

  2.當程序在失敗后重新啟動時,它將從檢查點目錄中的檢查點數據重新創建StreamingContext。

四.代碼實現

 1 package big.data.analyse.streaming
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.SparkConf
 5 import org.apache.spark.streaming.{Seconds, StreamingContext}
 6 
 7 /**
 8   * Created by zhen on 2019/8/15.
 9   */
10 object Checkpoint {
11   def functionToCreateContext():StreamingContext = {
12     val conf = new SparkConf().setMaster("local[2]").setAppName("StreaingTest")
13     val ssc = new StreamingContext(conf, Seconds(10))
14     val lines = ssc.socketTextStream("192.168.245.137", 9999)
15 
16     val words = lines.flatMap(_.split(" "))
17     val pairs = words.map(word=>(word,1)).reduceByKey(_+_)
18     pairs.foreachRDD(row => row.foreach(println))
19     ssc.checkpoint("D:\\checkpoint")
20     ssc
21   }
22   def main(args: Array[String]) {
23     /**
24       * 設置日志級別
25       */
26     Logger.getLogger("org").setLevel(Level.WARN) // 設置日志級別
27 
28     /**
29       * 獲取入口及設置checkpoint檢查點
30       */
31     val ssc = StreamingContext.getOrCreate("D:\\checkpoint", functionToCreateContext _)
32 
33     ssc.start()
34     ssc.awaitTermination()
35     ssc.stop()
36   }
37 }

五.結果

  入參:

    

  結果:

    

六.總結

  1.需要確保節點進程在失敗時會自動重啟,這只能通過部署基礎結構來完成。

  2.檢查點的默認間隔是批處理間隔的倍數,且至少為10秒。通常DStream的5~10個滑動間隔為檢查點間隔是一個很好的設置。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM