StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】 env.enableCheckpointing(1000); // 設置模式為exactly-once (這是默認值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 檢查點必須在一分鍾內完成,或者被丟棄【checkpoint的超時時間】 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只允許進行一個檢查點 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數據,只有job執行失敗的時候才會保存checkpoint //設置statebackend //env.setStateBackend(new MemoryStateBackend()); //env.setStateBackend(new FsStateBackend("hdfs://zzy:9000/flink/checkpoints")); //rocksDB需要引入依賴flink-statebackend-rocksdb_2.11 //env.setStateBackend(new RocksDBStateBackend("hdfs://zzy:9000/flink/checkpoints",true)); env.setStateBackend(new FsStateBackend("hdfs://192.168.5.63:9000/flink/checkpoints"));
State Backend(狀態的后端存儲)
-
默認情況下,state會保存在taskmanager的內存中,checkpoint會存儲在JobManager的內存中。
-
state 的store和checkpoint的位置取決於State Backend的配置
- env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints",true)) //異步checkpoint
- env.setStateBackend(new MemoryStateBackend()) //默認存儲方式
- env.setStateBackend(new RocksDBStateBackend(filebackend, true)) //【需要添加第三方依賴】
