使用Flink的Savepoint功能


     Flink通過Savepoint功能可以做到程序升級后,繼續從升級前的那個點開始執行計算,保證數據不中斷。

     Flink中Checkpoint用於保存狀態,是自動執行的,會過期,Savepoint是指向Checkpoint的指針,需要手動執行,並且不會過期。

     據Flink路線圖,后面Savepoint會和Checkpoint合並成一個,不像現在這樣分成兩個,而且一個自動、一個手動了。

1.flink-conf.yaml中配置Savepoint存儲位置

   不是必須設置,但是設置后,后面創建指定Job的Savepoint時,可以不用在手動執行命令時指定Savepoint的位置

state.savepoints.dir: hdfs://t-sha1-flk-01:9000/flink-savepoints

 

2.列出當前Job

[teld@T-SHA1-FLK-01 log]$ flink list
------------------ Running/Restarting Jobs -------------------
aaaaaaaaaaaa : 8eaee3ed045c14337568c1cf3a272a45 : MonitorEngine_V1.0_SH.A1_Minute (RUNNING)
bbbbbbbbbbbb : ca1f3ac0081711ee6a0767fe1fd5b31c : MonitorEngine_V1.0_SH.A1_Second (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

 

3.停止Job,並將狀態寫入Savepoint

 

[teld@T-SHA1-FLK-01 log]$ flink cancel -s ca1f3ac0081711ee6a0767fe1fd5b31c
Cancelling job ca1f3ac0081711ee6a0767fe1fd5b31c with savepoint to default savepoint directory.
Cancelled job ca1f3ac0081711ee6a0767fe1fd5b31c. Savepoint stored in
hdfs://t-sha1-flk-01:9000/flink-savepoints/savepoint-ca1f3a-9f86a020ee76.

 

4.從指定的Savepoint啟動Job

[teld@T-SHA1-FLK-01 log]$ flink run -s hdfs://t-sha1-flk-01:9000/flink-savepoints/savepoint-ca1f3a-9f86a020ee76

-p 6 -c cn.teld.monitor.MonitorEngine monitorengine_flink_sec-1.0-jar-with-dependencies.jar

 

5.建議為Flink程序中的每個操作設置uid以及name

 

6.從界面提交升級包

   前面是通過命令行的方式進行升級,也可以直接通過界面方式進行提交,提交時指定Savepoint路徑即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM