一、概述
Savepoint 是檢查點的一種特殊實現,底層實現其實也是使用 Checkpoint 的機制。
Savepoint 是用戶以手工命令的方式觸發 Checkpoint,並將結果持久化到指定的存儲路徑
中,其主要目的是幫助用戶在升級和維護集群過程中保存系統中的狀態數據,避免因為停機運維
或者升級應用等正常終止應用的操作而導致系統無法恢復到原有的計算狀態的情況,從而無法實
現從端到端的 Exactly-Once 語義保證。
1)配置 Savepoints 的存儲路徑
在 flink-conf.yaml 中配置 SavePoint 存儲的位置,設置后,如果要創建指定 Job 的 SavePoint,
可以不用在手動執行命令時指定 SavePoint 的位置。
state.savepoints.dir: hdfs:/hadoop101:9000/savepoints
2)在代碼中設置算子ID
為了能夠在作業的不同版本之間以及 Flink 的不同版本之間順利升級,強烈推薦通過手動給算子賦予 ID,
這些 ID 將用於確定每一個算子的狀態范圍。如果不手動給各算子指定 ID,則會由 Flink 自動給每個算子生成一個 ID。
而這些自動生成的 ID 依賴於程序的結構,並且對代碼的更改時很敏感的。因此,強烈建議手動設置 ID。
package com.apple.flink.savepoints import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object TestSavepoints { def main(args: Array[String]): Unit = { val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) import org.apache.flink.streaming.api.scala._ //讀取數據到DataStream val stream = streamEnv.socketTextStream("hadoop101", 8888).uid("mySource-001") stream.flatMap(_.split(" ")) .uid("flapMap-001") .map((_, 1)) .uid("map=001") .keyBy(0) .sum(1) .uid("sum-001") .print() //啟動流計算 streamEnv.execute("wc") } }
3)觸發 SavePoint
//先啟動Job [root@hadoop101 bin]# ./flink run -c com.bjsxt.flink.state.TestSavepoints -d /home/Flink-Demo-1.0-SNAPSHOT.jar //再取消Job ,觸發SavePoint [root@hadoop101 bin]# ./flink savepoint 6ecb8cfda5a5200016ca6b01260b94ce
[root@hadoop101 bin]# ./flink cancel 6ecb8cfda5a5200016ca6b01260b94ce
4)從 SavePoint 啟動 Job
[root@hadoop101 bin]# ./flink run -s \
hdfs://hadoop101:9000/savepoints/savepoint-6ecb8c-e56ccb88576a \
-c com.bjsxt.flink.state.TestSavepoints \
-d /home/Flink-Demo-1.0-SNAPSHOT.jar
也可以通過 Web UI 啟動 Job: