1、Failure Rate Restart Strategy 說明
故障率重啟策略,flink提供的一種比較"智能"的重啟策略;即當任務的失敗率上升到一定的程度時,flink認為本次任務最終是失敗的;
也可以理解為,在該策略中,flink關注的點是任務的失敗率,失敗率計算公式如下:
失敗率 = 失敗次數/時間區間
“失敗次數”對restart-strategy.failure-rate.max-failures-per-interval,
“時間區間”對應的配置項為restart-strategy.failure-rate.failure-rate-interval,也就是說在該時間范圍內,
運行時失敗次數超過配置的失敗次數則任務最終失敗;當然,類似於Fixed Delay Restart Strategy,在每兩次連續的重試之間也會有一個固定時間的delay。
2、文件配置說明
①失敗率配置
restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: 3 #失敗的次數 restart-strategy.failure-rate.failure-rate-interval: 2 min #時間段內 restart-strategy.failure-rate.delay: 5s #重試的間隔時間
間段內任務失敗后,重啟時,每兩次連續嘗試之間間隔5秒,在2分鍾內任務重啟失敗3次,則任務最終失敗,配置默認全局生效。
②間隔時間配置
restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3 #次數 restart-strategy.fixed-delay.delay: 10 s #間隔
3、代碼配置
①失敗率配置
env.setRestartStrategy(RestartStrategies.failureRateRestart( 3,//最大失敗次數 Time.of(2, TimeUnit.MINUTES), // 衡量失敗次數的是時間段 Time.of(5, TimeUnit.SECONDS) // 間隔 ));
②時間間隔配置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,,// 嘗試重啟的次數 Time.of(5, TimeUnit.SECONDS) // 間隔 ));
4、無重啟配置
在特殊情況下,希望處理出錯就不重復執行,直接報出錯,配置如下:
配置文件:
restart-strategy: none
代碼配置:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.noRestart());
5、代碼示例
//獲取flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】 env.enableCheckpointing(1000); // 高級選項: // 設置模式為exactly-once (這是默認值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 檢查點必須在一分鍾內完成,或者被丟棄【checkpoint的超時時間】 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只允許進行一個檢查點 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】 //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數據,只有job執行失敗的時候才會保存checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);