一、概述
保存機制 StateBackend ,默認情況下,State 會保存在 TaskManager 的內存中,CheckPoint 會存儲在 JobManager 的內存中。
State 和 CheckPoint 的存儲位置取決於 StateBackend 的配置。
Flink 一共提供了 3 中 StateBackend,包括
基於內存的 MemoryStateBackend、基於文件系統的 FsStateBackend、基於RockDB存儲介質的 RocksDBState-Backend
1)MemoryStateBackend
基於內存的狀態管理,具有非常快速和高效的特點,但也具有非常多的限制。最主要的就是內存的容量限制,
一旦存儲的狀態數據過多就會導致系統內存溢出等問題,從而影響整個應用的正常運行。同時如果機器出現問題,
整個主機內存中的狀態數據都會丟失,進而無法恢復任務中的狀態數據。因此從數據安全的角度建議用戶盡可能地避免
在生產環境中使用 MemoryStateBackend
streamEnv.setStateBackend(new MemoryStateBackend(10*1024*1024))
2)FsStateBackend
和MemoryStateBackend有所不同,FsStateBackend是基於文件系統的一種狀態管理器,這里的文件系統可以是本地文件系統,
也可以是HDFS分布式文件 系統。FsStateBackend更適合任務狀態非常大的情況,例如:應用中含有時間范圍非常長的窗口計算,
或 Key/Value State 狀態數據量非常大的場景。
streamEnv.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/checkpoint/cp1"))
3)RocksDBStatusBackend
RocksDBStatusBackend是Flink中內置的第三方狀態管理器,和前面的狀態管理器不同,RocksDBStateBackend 需要單獨引入
相關的依賴包到工程中。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.9.1</version> </dependency>
RocksDBStateBackend 采用異步的方式進行狀態數據的 Snapshot,任務中的狀態數據首先被寫入本地 RockDB 中,這樣在
RockDB 僅會存儲正在計算的熱數據,而需要進行 CheckPoint 的時候,會把本地的數據直接復制到遠端的 FileSystem 中。
與 FsStateBackend 相比,RocksDBStateBackend 在性能上要比 FsStateBackend 高一些,主要是因為借助於 RocksDB 在
本地存儲了最新熱數據,然后通過異步的方式再同步到文件系統中,但 RocksDBStateBackend 和 MemoryStateBackend 相比性能
就會較弱一些。RocksDB 克服了 State 受內存限制的缺點,同時又能夠持久化到遠端文件系統中,推薦在生產中使用。
streamEnv.setStateBackend(new RocksDBStateBackend("hdfs://hadoop101:9000/checkpoint/cp2"))
4)全局配置 StateBackend
前面的幾個代碼都是單 job 配置狀態后端,也可以全局配置狀態后端,需要修改 flink-conf.yaml配置文件:
state.backend:filesystem
其中:
filesystem 表示使用 FsStateBackend
jobmanager 表示使用 MemoryStateBackend
rocksdb 表示使用 RocksDBStateBackend
state.checkpoint.dir:hdfs://hadoop101:9000/checkpoints
默認情況下,如果設置了 Checkpoint 選項,則 Flink 只保留最近成功生成的 1 個 Checkpoint,而當 Flink 程序失敗時,可以通過最近的 CheckPoint 來進行恢復。但是希望保留多個 CheclPoint,並能夠根據實際需要選擇其中一個進行恢復,就會更加靈活。添加如下配置,指定最多可以保存的 CheckPoint 的個數
state.checkpoints.num-retained: 2
Checkpoint 案例
案例:設置 HDFS 文件系統的狀態后端,取消 Job 之后再次恢復 Job
package com.apple.flink.point import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object CheckpointOnFsBackend { def main(args: Array[String]): Unit = { val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //開啟檢查點且指定檢查點時間間隔 5000ms streamEnv.enableCheckpointing(5000) //保存機制 StateBackend:目前為 FsStateBackend streamEnv.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/checkpoint/cp1")) //exactly-once 語義保證整個應用內端到端的數據一致性 streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //Checkpoint 超時時間 streamEnv.getCheckpointConfig.setCheckpointTimeout(50000) //setMaxConcurrentCheckpoints()方法設定能夠最大同時執行的 Checkpoint 數量,默認為1 streamEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //是否刪除 Checkpoint 中保存的數據 streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //可以容忍的檢查的失敗數,超過這個數量則系統自動關閉和停止任務 streamEnv.getCheckpointConfig.setTolerableCheckpointFailureNumber(1) //設置上下文並行度 streamEnv.setParallelism(1) import org.apache.flink.streaming.api.scala._ //讀取數據得到 DataStream val stream = streamEnv.socketTextStream("hadoop101", 8888) stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1).print() //啟動流計算 streamEnv.execute("wc") } }