具體實現代碼如下所示:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) /** * 配置狀態后端,知道checkpoint的存儲路徑(在最新的flink中 需要使用 StreamExecutionEnvironment 來直接設置,但在1.10.2中還不能使用該方法) */ // StreamExecutionEnvironment.setStateBackend(new FsStateBackend("")) env.setStateBackend(new FsStateBackend(checkpointPath)) /** * checkpoint的相關設置 */ // 啟用檢查點,指定觸發checkpoint的時間間隔(單位:毫秒,默認500毫秒),默認情況是不開啟的 env.enableCheckpointing(1000L) // 設定語義模式,默認情況是exactly_once env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 設定Checkpoint超時時間,默認為10分鍾 env.getCheckpointConfig.setCheckpointTimeout(60000) // 設定兩個Checkpoint之間的最小時間間隔,防止出現例如狀態數據過大而導致Checkpoint執行時間過長,從而導致Checkpoint積壓過多, // 最終Flink應用密切觸發Checkpoint操作,會占用了大量計算資源而影響到整個應用的性能(單位:毫秒) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) // 默認情況下,只有一個檢查點可以運行 // 根據用戶指定的數量可以同時觸發多個Checkpoint,進而提升Checkpoint整體的效率 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) // 外部檢查點 // 不會在任務正常停止的過程中清理掉檢查點數據,而是會一直保存在外部系統介質中,另外也可以通過從外部檢查點中對任務進行恢復 env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 如果有更近的保存點時,是否將作業回退到該檢查點 env.getCheckpointConfig.setPreferCheckpointForRecovery(true) // 設置可以允許的checkpoint失敗數 env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3) /** * 重啟策略的配置 */ // 重啟3次,每次失敗后等待10000毫秒 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L)) // 在5分鍾內,只能重啟5次,每次失敗后最少需要等待10秒 env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))) val resultStream: DataStream[SensorReading] = env .readTextFile(sensorPath) .map(new MyMapToSensorReading) .map(data => { Thread.sleep(1000) data }) .keyBy(_.id) .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature)) resultStream.print() env.execute("CheckPointDemo")