checkpoint是Flink容錯的核心機制。它可以定期地將各個Operator處理的數據進行快照存儲( Snapshot )。如果Flink程序出現宕機,可以重新從這些快照中恢復數據。
1. checkpoint coordinator(協調器)線程周期生成 barrier (柵欄),發送給每一個source
2. source將當前的狀態進行snapshot(可以保存到HDFS)
3. source向coordinator確認snapshot已經完成
4. source繼續向下游transformation operator發送 barrier
5. transformation operator重復source的操作,直到sink operator向協調器確認snapshot完成
6. coordinator確認完成本周期的snapshot
代碼設置示例:
// 5秒啟動一次checkpoint env.enableCheckpointing(5000) // 設置checkpoint只checkpoint一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 設置兩次checkpoint的最小時間間隔 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) // checkpoint超時的時長 env.getCheckpointConfig.setCheckpointTimeout(60000) // 允許的最大checkpoint並行度 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) // 當程序關閉的時,觸發額外的checkpoint env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpoin tCleanup.RETAIN_ON_CANCELLATION) // 設置checkpoint的地址 env.setStateBackend(new FsStateBackend("hdfs://cdh1:8020/flink-checkpoint/")) |
|