Flink重啟策略機制


1、Failure Rate Restart Strategy 說明

故障率重啟策略,flink提供的一種比較"智能"的重啟策略;即當任務的失敗率上升到一定的程度時,flink認為本次任務最終是失敗的;

也可以理解為,在該策略中,flink關注的點是任務的失敗率,失敗率計算公式如下:

失敗率 = 失敗次數/時間區間
“失敗次數”對restart-strategy.failure-rate.max-failures-per-interval,

“時間區間”對應的配置項為restart-strategy.failure-rate.failure-rate-interval,也就是說在該時間范圍內,

運行時失敗次數超過配置的失敗次數則任務最終失敗;當然,類似於Fixed Delay Restart Strategy,在每兩次連續的重試之間也會有一個固定時間的delay。

2、文件配置說明

①失敗率配置

restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3  #失敗的次數
restart-strategy.failure-rate.failure-rate-interval: 2 min  #時間段內
restart-strategy.failure-rate.delay: 5s  #重試的間隔時間
間段內任務失敗后,重啟時,每兩次連續嘗試之間間隔5秒,在2分鍾內任務重啟失敗3次,則任務最終失敗,配置默認全局生效。

②間隔時間配置
  restart-strategy: fixed-delay
  restart-strategy.fixed-delay.attempts: 3  #次數
  restart-strategy.fixed-delay.delay: 10 s  #間隔

  

3、代碼配置

①失敗率配置

  env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3,//最大失敗次數
  Time.of(2, TimeUnit.MINUTES), // 衡量失敗次數的是時間段
  Time.of(5, TimeUnit.SECONDS) // 間隔
  ));

 ②時間間隔配置

  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
    3,,// 嘗試重啟的次數
    Time.of(5, TimeUnit.SECONDS) // 間隔
  ));

  

4、無重啟配置

在特殊情況下,希望處理出錯就不重復執行,直接報出錯,配置如下:

配置文件:

restart-strategy: none

代碼配置:

  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setRestartStrategy(RestartStrategies.noRestart());

  

5、代碼示例

  //獲取flink的運行環境
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  // 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】
  env.enableCheckpointing(1000);
  // 高級選項:
  // 設置模式為exactly-once (這是默認值)
  env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
  env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  // 檢查點必須在一分鍾內完成,或者被丟棄【checkpoint的超時時間】
  env.getCheckpointConfig().setCheckpointTimeout(60000);
  // 同一時間只允許進行一個檢查點
  env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  // 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】
  //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint
  //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數據,只有job執行失敗的時候才會保存checkpoint
  env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM