Flink CheckPoint狀態點恢復與savePoint機制


1 Flink 應用程序啟動

./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 
-c streaming.SoetWindowWordCountJavaCheckPoint(入口類)
/usr/local/install/testJar/FlinkExample-1.0-SNAPSHOT-jar-with-dependencies.jar (jar路徑)
--port 9010

 

 

 

2 Checkpoint 保存與恢復

2.1 Checkpoin設置與保存

  • 默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果我們希望保留多個Checkpoint,並能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時數據記錄處理有問題,希望將整個狀態還原到4小時之前

  • Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數。

      state.checkpoints.num-retained: 20
  • 這樣設置以后就查看對應的Checkpoint在HDFS上存儲的文件目錄 hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints 如果希望回退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現

2.2 Checkpoint恢復

  • 如果Flink程序異常失敗,或者最近一段時間內數據處理錯誤,我們可以將程序從某一個Checkpoint點進行恢復

 

 

  • -s 后面接的就是待恢復checkpoint的路徑。

    bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar

 

程序正常運行后,還會按照Checkpoint配置進行運行,繼續生成Checkpoint數據

 

 

 

3 SavePoint 剖析

3.1 全局一致性快照

  • Flink通過Savepoint功能可以做到程序升級后,繼續從升級前的那個點開始執行計算,保證數據不中斷
  • 全局,一致性快照。可以保存數據源offset,operator操作狀態等信息
  • 可以從應用在過去任意做了savepoint的時刻開始繼續消費

3.2 checkpoint理論

  • 應用定時觸發,用於保存狀態,會過期
  • 內部應用失敗重啟的時候使用

3.3 savePoint 理論

  • 用戶手動執行,是指向Checkpoint的指針,不會過期,
  • 在升級的情況下使用

  注意:為了能夠在作業的不同版本之間以及 Flink 的不同版本之間順利升級,強烈推薦通過 uid(String) 方法手動的給算子賦予 ID,這些 ID 將用於確定每一個算子的狀態范圍。如果不手動給各算子指定 ID,則會由 Flink 自動給每個算子生成一個 ID。只要這些 ID 沒有改變就能從保存點(savepoint)將程序恢復回來。而這些自動生成的 ID 依賴於程序的結構,並且對代碼的更改是很敏感的。因此,強烈建議用戶手動的設置 ID。

3.4 savePoint的使用

  • 1:在flink-conf.yaml中配置Savepoint存儲位置

    不是必須設置,但是設置后,后面創建指定Job的Savepoint時,可以不用在手動執行命令時指定Savepoint的位置:

    state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
  • 2:觸發一個savepoint【直接觸發或者在cancel的時候觸發】

     bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】
      
     bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】

 

 

 

 

 

 

 

 

  • 3:從指定的savepoint啟動job

    bin/flink run -s savepointPath [runArgs]

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM