FLINK-狀態管理-配置checkpoint


 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
// 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】
env.enableCheckpointing(1000); 
// 設置模式為exactly-once (這是默認值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
// 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); 
// 檢查點必須在一分鍾內完成,或者被丟棄【checkpoint的超時時間】
env.getCheckpointConfig().setCheckpointTimeout(60000); 
// 同一時間只允許進行一個檢查點
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
// 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數據,只有job執行失敗的時候才會保存checkpoint

//設置statebackend
//env.setStateBackend(new MemoryStateBackend());
//env.setStateBackend(new FsStateBackend("hdfs://zzy:9000/flink/checkpoints"));
//rocksDB需要引入依賴flink-statebackend-rocksdb_2.11
//env.setStateBackend(new RocksDBStateBackend("hdfs://zzy:9000/flink/checkpoints",true));
env.setStateBackend(new FsStateBackend("hdfs://192.168.5.63:9000/flink/checkpoints"));

State Backend(狀態的后端存儲)

  • 默認情況下,state會保存在taskmanager的內存中,checkpoint會存儲在JobManager的內存中。

  • state 的store和checkpoint的位置取決於State Backend的配置

    • env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints",true)) //異步checkpoint
    • env.setStateBackend(new MemoryStateBackend()) //默認存儲方式
    • env.setStateBackend(new RocksDBStateBackend(filebackend, true)) //【需要添加第三方依賴】


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM