Flink中State管理與恢復之CheckPoint原理及三種checkpoint使用方式對比


 CheckPoint

當程序出現問題需要恢復 Sate 數據的時候,只有程序提供支持才可以實現 State 的容錯。State 的容錯需要依靠 CheckPoint 機制,這樣才可以保證 Exactly-once 這種語義,但是注意,它只能保證 Flink 系統內的 Exactly-once,比如 Flink 內置支持的算子。針對 Source和 Sink 組件,如果想要保證 Exactly-once 的話,則這些組件本身應支持這種語義。

1) CheckPoint 原理

Flink 中基於異步輕量級的分布式快照技術提供了 Checkpoints 容錯機制,分布式快照可以將同一時間點 Task/Operator 的狀態數據全局統一快照處理,包括前面提到的 KeyedState 和 Operator State。Flink 會在輸入的數據集上間隔性地生成 checkpoint barrier,通過柵欄(barrier)將間隔時間段內的數據划分到相應的 checkpoint 中。每個算子都會進行checkpoint 操作。如下圖:

 

 

 從檢查點(CheckPoint)恢復如下圖:

假如我們設置了三分鍾進行一次CheckPoint,保存了上述所說的 chk-100 的CheckPoint狀態后,過了十秒鍾,offset已經消費到 (0,1100),pv統計結果變成了(app1,50080)(app2,10020),但是突然任務掛了,怎么辦?
莫慌,其實很簡單,flink只需要從最近一次成功的CheckPoint保存的offset(0,1000)處接着消費即可,當然pv值也要按照狀態里的pv值(app1,50000)(app2,10000)進行累加,不能從(app1,50080)(app2,10020)處進行累加,因為 partition 0 offset消費到 1000時,pv統計結果為(app1,50000)(app2,10000)當然如果你想從offset (0,1100)pv(app1,50080)(app2,10020)這個狀態恢復,也是做不到的,因為那個時刻程序突然掛了,這個狀態根本沒有保存下來。我們能做的最高效方式就是從最近一次成功的CheckPoint處恢復,也就是我一直所說的chk-100;

2) CheckPoint 參數和設置

默認情況下 Flink 不開啟檢查點的,用戶需要在程序中通過調用方法配置和開啟檢查點,另外還可以調整其他相關參數:

  •  Checkpoint 開啟和時間間隔指定:

開啟檢查點並且指定檢查點時間間隔為 1000ms,根據實際情況自行選擇,如果狀態比較大,則建議適當增加該值。
streamEnv.enableCheckpointing(1000);

  •  exactly-ance 和 at-least-once 語義選擇:

選擇 exactly-once 語義保證整個應用內端到端的數據一致性,這種情況比較適合於數據要求比較高,不允許出現丟數據或者數據重復,與此同時,Flink 的性能也相對較弱,而at-least-once 語義更適合於時廷和吞吐量要求非常高但對數據的一致性要求不高的場景。
如 下 通 過 setCheckpointingMode() 方 法 來 設 定 語 義 模 式 , 默 認 情 況 下 使 用 的 是exactly-once 模式。

streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//或者
streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
  •  Checkpoint 超時時間:

超時時間指定了每次 Checkpoint 執行過程中的上限時間范圍,一旦 Checkpoint 執行時間超過該閾值,Flink 將會中斷 Checkpoint 過程,並按照超時處理。該指標可以通過setCheckpointTimeout 方法設定,默認為 10 分鍾。

streamEnv.getCheckpointConfig.setCheckpointTimeout(50000)
  •  檢查點之間最小時間間隔:

該參數主要目的是設定兩個 Checkpoint 之間的最小時間間隔,防止出現例如狀態數據過大而導致 Checkpoint 執行時間過長,從而導致 Checkpoint 積壓過多,最終 Flink 應用密集地觸發 Checkpoint 操作,會占用了大量計算資源而影響到整個應用的性能。

streamEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(600)
  •  最大並行執行的檢查點數量:

通過 setMaxConcurrentCheckpoints()方法設定能夠最大同時執行的 Checkpoint 數量。在默認 情況下只 有一個檢查 點可以運行 ,根據用 戶指定的數 量可以同時 觸發多個Checkpoint,進而提升 Checkpoint 整體的效率。

streamEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

 

  •  是否刪除 Checkpoint 中保存的數據:

設置為 RETAIN_ON_CANCELLATION:表示一旦 Flink 處理程序被 cancel 后,會保留CheckPoint 數據,以便根據實際需要恢復到指定的 CheckPoint。
設置為 DELETE_ON_CANCELLATION:表示一旦 Flink 處理程序被 cancel 后,會刪除CheckPoint 數據,只有 Job 執行失敗的時候才會保存 CheckPoint

//刪除
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
//保留64
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  •  TolerableCheckpointFailureNumber:

設置可以容忍的檢查的失敗數,超過這個數量則系統自動關閉和停止任務。

streamEnv.getCheckpointConfig.setTolerableCheckpointFailureNumber(1)

  

保存機制 StateBackend(狀態后端)

默認情況下,State 會保存在 TaskManager 的內存中,CheckPoint 會存儲在 JobManager的內存中。State 和 CheckPoint 的存儲位置取決於 StateBackend 的配置。Flink 一共提供了 3 種 StateBackend 。 包 括 基 於 內 存 的 MemoryStateBackend 、 基 於 文 件 系 統 的FsStateBackend,以及基於 RockDB 作為存儲介質的 RocksDBState-Backend。

1) MemoryStateBackend


基於內存的狀態管理具有非常快速和高效的特點,但也具有非常多的限制,最主要的就是內存的容量限制,一旦存儲的狀態數據過多就會導致系統內存溢出等問題,從而影響整個應用的正常運行。同時如果機器出現問題,整個主機內存中的狀態數據都會丟失,進而無法恢復任務中的狀態數據。因此從數據安全的角度建議用戶盡可能地避免在生產環境中使用MemoryStateBackend。

streamEnv.setStateBackend(new MemoryStateBackend(10*1024*1024))

  

 

 

 

2) FsStateBackend


和 MemoryStateBackend 有所不同,FsStateBackend 是基於文件系統的一種狀態管理器,這里的文件系統可以是本地文件系統,也可以是 HDFS 分布式文件系統。FsStateBackend 更適合任務狀態非常大的情況,可以使checkpoint數據大量存儲於HDFS或本地文件,例如應用中含有時間范圍非常長的窗口計算,或 Key/valueState 狀態數據量非常大的場景。

缺點:跟MemoryStateBackend一樣,內存中保存的狀態數據不宜過大

streamEnv.setStateBackend(new FsStateBackend("hdfs://mycluster/checkpoint/cp1"))

 

 

3) RocksDBStateBackend

RocksDBStateBackend 是 Flink 中內置的第三方狀態管理器,和前面的狀態管理器不同,RocksDBStateBackend 需要單獨引入相關的依賴包到工程中。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
    <version>1.11.2</version>
</dependency>

 

 

 

RocksDBStateBackend 采用異步的方式進行狀態數據的 Snapshot,任務中的狀態數據首先被寫入本地 RockDB 中,這樣在 RockDB 僅會存儲正在進行計算的熱數據,而需要進行CheckPoint 的時候,會把本地的數據直接復制到遠端的 FileSystem 中。RocksDB 同時在內存及磁盤中存儲數據
與 FsStateBackend 相比,RocksDBStateBackend 在性能上要比 FsStateBackend 高一些,65主要是因為借助於 RocksDB 在本地存儲了最新熱數據,然后通過異步的方式再同步到文件系統中,但 RocksDBStateBackend 和 MemoryStateBackend 相比性能就會較弱一些。RocksDB克服了 State 受內存限制的缺點,同時又能夠持久化到遠端文件系統中,推薦在生產中使用。

streamEnv.setStateBackend(new RocksDBStateBackend("hdfs://mycluster/checkpoint/cp2"))

4) 全局配置 StateBackend

以 上 的 代 碼 都 是 單 job 配 置 狀 態 后 端 , 也 可 以 全 局 配 置 狀 態 后 端 , 需 要 修 改flink-conf.yaml 配置文件:

state.backend: filesystem
其中:
filesystem 表示使用 FsStateBackend,
jobmanager 表示使用 MemoryStateBackend
rocksdb 表示使用 RocksDBStateBackend。
state.checkpoints.dir: hdfs://hadoop101:9000/checkpoints

默認情況下,如果設置了 CheckPoint 選項,則 Flink 只保留最近成功生成的 1 個CheckPoint,而當 Flink 程序失敗時,可以通過最近的 CheckPoint 來進行恢復。但是,如果希望保留多個 CheckPoint,並能夠根據實際需要選擇其中一個進行恢復,就會更加靈活。
添加如下配置,指定最多可以保存的 CheckPoint 的個數。

state.checkpoints.num-retained: 2

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM