Flink之Checkpoint的設置和使用


具體實現代碼如下所示:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

/**
 * 配置狀態后端,知道checkpoint的存儲路徑(在最新的flink中 需要使用 StreamExecutionEnvironment 來直接設置,但在1.10.2中還不能使用該方法)
 */
// StreamExecutionEnvironment.setStateBackend(new FsStateBackend(""))
env.setStateBackend(new FsStateBackend(checkpointPath))

/**
 * checkpoint的相關設置
 */
// 啟用檢查點,指定觸發checkpoint的時間間隔(單位:毫秒,默認500毫秒),默認情況是不開啟的
env.enableCheckpointing(1000L)
// 設定語義模式,默認情況是exactly_once
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 設定Checkpoint超時時間,默認為10分鍾
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 設定兩個Checkpoint之間的最小時間間隔,防止出現例如狀態數據過大而導致Checkpoint執行時間過長,從而導致Checkpoint積壓過多,
// 最終Flink應用密切觸發Checkpoint操作,會占用了大量計算資源而影響到整個應用的性能(單位:毫秒)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// 默認情況下,只有一個檢查點可以運行
// 根據用戶指定的數量可以同時觸發多個Checkpoint,進而提升Checkpoint整體的效率
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 外部檢查點
// 不會在任務正常停止的過程中清理掉檢查點數據,而是會一直保存在外部系統介質中,另外也可以通過從外部檢查點中對任務進行恢復
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 如果有更近的保存點時,是否將作業回退到該檢查點
env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
// 設置可以允許的checkpoint失敗數
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)

/**
 * 重啟策略的配置
 */
// 重啟3次,每次失敗后等待10000毫秒
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))
// 在5分鍾內,只能重啟5次,每次失敗后最少需要等待10秒
env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))


val resultStream: DataStream[SensorReading] = env
    .readTextFile(sensorPath)
    .map(new MyMapToSensorReading)
    .map(data => {
        Thread.sleep(1000)
        data
    })
    .keyBy(_.id)
    .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))

resultStream.print()

env.execute("CheckPointDemo")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM