Flink通過Savepoint功能可以做到程序升級后,繼續從升級前的那個點開始執行計算,保證數據不中斷。
Flink中Checkpoint用於保存狀態,是自動執行的,會過期,Savepoint是指向Checkpoint的指針,需要手動執行,並且不會過期。
據Flink路線圖,后面Savepoint會和Checkpoint合並成一個,不像現在這樣分成兩個,而且一個自動、一個手動了。
1.flink-conf.yaml中配置Savepoint存儲位置
不是必須設置,但是設置后,后面創建指定Job的Savepoint時,可以不用在手動執行命令時指定Savepoint的位置
state.savepoints.dir: hdfs://t-sha1-flk-01:9000/flink-savepoints
2.列出當前Job
[teld@T-SHA1-FLK-01 log]$ flink list ------------------ Running/Restarting Jobs ------------------- aaaaaaaaaaaa : 8eaee3ed045c14337568c1cf3a272a45 : MonitorEngine_V1.0_SH.A1_Minute (RUNNING) bbbbbbbbbbbb : ca1f3ac0081711ee6a0767fe1fd5b31c : MonitorEngine_V1.0_SH.A1_Second (RUNNING) -------------------------------------------------------------- No scheduled jobs.
3.停止Job,並將狀態寫入Savepoint
[teld@T-SHA1-FLK-01 log]$ flink cancel -s ca1f3ac0081711ee6a0767fe1fd5b31c Cancelling job ca1f3ac0081711ee6a0767fe1fd5b31c with savepoint to default savepoint directory. Cancelled job ca1f3ac0081711ee6a0767fe1fd5b31c. Savepoint stored in
hdfs://t-sha1-flk-01:9000/flink-savepoints/savepoint-ca1f3a-9f86a020ee76.
4.從指定的Savepoint啟動Job
[teld@T-SHA1-FLK-01 log]$ flink run -s hdfs://t-sha1-flk-01:9000/flink-savepoints/savepoint-ca1f3a-9f86a020ee76
-p 6 -c cn.teld.monitor.MonitorEngine monitorengine_flink_sec-1.0-jar-with-dependencies.jar
5.建議為Flink程序中的每個操作設置uid以及name
6.從界面提交升級包
前面是通過命令行的方式進行升級,也可以直接通過界面方式進行提交,提交時指定Savepoint路徑即可。