1 Flink 應用程序啟動
./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -c streaming.SoetWindowWordCountJavaCheckPoint(入口類) /usr/local/install/testJar/FlinkExample-1.0-SNAPSHOT-jar-with-dependencies.jar (jar路徑) --port 9010
2 Checkpoint 保存與恢復
2.1 Checkpoin設置與保存
-
默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果我們希望保留多個Checkpoint,並能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時數據記錄處理有問題,希望將整個狀態還原到4小時之前
-
Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數。
state.checkpoints.num-retained: 20
-
這樣設置以后就查看對應的Checkpoint在HDFS上存儲的文件目錄 hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints 如果希望回退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現
2.2 Checkpoint恢復
- 如果Flink程序異常失敗,或者最近一段時間內數據處理錯誤,我們可以將程序從某一個Checkpoint點進行恢復
-
-s 后面接的就是待恢復checkpoint的路徑。
bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar
程序正常運行后,還會按照Checkpoint配置進行運行,繼續生成Checkpoint數據
3 SavePoint 剖析
3.1 全局一致性快照
- Flink通過Savepoint功能可以做到程序升級后,繼續從升級前的那個點開始執行計算,保證數據不中斷
- 全局,一致性快照。可以保存數據源offset,operator操作狀態等信息
- 可以從應用在過去任意做了savepoint的時刻開始繼續消費
3.2 checkpoint理論
- 應用定時觸發,用於保存狀態,會過期
- 內部應用失敗重啟的時候使用
3.3 savePoint 理論
- 用戶手動執行,是指向Checkpoint的指針,不會過期,
- 在升級的情況下使用
注意:為了能夠在作業的不同版本之間以及 Flink 的不同版本之間順利升級,強烈推薦通過 uid(String) 方法手動的給算子賦予 ID,這些 ID 將用於確定每一個算子的狀態范圍。如果不手動給各算子指定 ID,則會由 Flink 自動給每個算子生成一個 ID。只要這些 ID 沒有改變就能從保存點(savepoint)將程序恢復回來。而這些自動生成的 ID 依賴於程序的結構,並且對代碼的更改是很敏感的。因此,強烈建議用戶手動的設置 ID。
3.4 savePoint的使用
-
1:在flink-conf.yaml中配置Savepoint存儲位置
不是必須設置,但是設置后,后面創建指定Job的Savepoint時,可以不用在手動執行命令時指定Savepoint的位置:
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
-
2:觸發一個savepoint【直接觸發或者在cancel的時候觸發】
bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】 bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】
-
3:從指定的savepoint啟動job
bin/flink run -s savepointPath [runArgs]