Flink容錯機制(checkpoint)


checkpoint是Flink容錯的核心機制。它可以定期地將各個Operator處理的數據進行快照存儲( Snapshot )。如果Flink程序出現宕機,可以重新從這些快照中恢復數據。

1. checkpoint coordinator(協調器)線程周期生成 barrier (柵欄),發送給每一個source

2. source將當前的狀態進行snapshot(可以保存到HDFS)

3. source向coordinator確認snapshot已經完成

4. source繼續向下游transformation operator發送 barrier

5. transformation operator重復source的操作,直到sink operator向協調器確認snapshot完成

6. coordinator確認完成本周期的snapshot

代碼設置示例:

// 5秒啟動一次checkpoint

env.enableCheckpointing(5000)

// 設置checkpointcheckpoint一次

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 設置兩次checkpoint的最小時間間隔

env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)

// checkpoint超時的時長

env.getCheckpointConfig.setCheckpointTimeout(60000)

// 允許的最大checkpoint並行度

env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

// 當程序關閉的時,觸發額外的checkpoint

env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpoin

tCleanup.RETAIN_ON_CANCELLATION)

// 設置checkpoint的地址

env.setStateBackend(new FsStateBackend("hdfs://cdh1:8020/flink-checkpoint/"))

  

   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM