默認情況下,state 會保存在TaskManager的內存中,checkpoint會存儲在JobManager的內存中。
state 的存儲和 checkpoint的位置取決於StateBackend的配置。
Flink一共提供了三種StateBackend
1.MemoryStateBackend (基於內存存儲)
2.FsStateBackend (基於文件系統存儲)
3.RocksDBStateBackend (基於RocksDB數據庫存儲)
一、MemoryStateBackend
在這種方式下,數據持久化狀態存儲在內存中,state數據保存在Java堆內存中,執行checkpoint時會把快照數據保存到JobManager的內存中。
基於內存的StateBackend在生產環境下不建議使用,適用於測試開發環境。
val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setStateBackend(new MemoryStateBackend())
二、FsStateBackend
在這種方式下,state數據保存在TaskManager的內存中,執行checkpoint時會把state的快照數據保存到配置的文件系統中。
可以使用HDSF分布式文件系統。基於HDFS數據有備份、很安全。
val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setStateBackend(new FsStateBackend("hdfs://flink/checkpoint")
三、RocksDBStateBackend
RocksDB使用一套日志結構的數據庫引擎,它是Flink中內置的第三方狀態管理器。
它會在本地文件系統中維護狀態,state會直接寫入本地RocksDB中。
同時,它需要配置一個遠程的文件系統URI(一般是HDFS)
在做 checkpoint時會把本地的數據直接復制到文件系統中。
Failover(失效轉移)的時候從文件系統中恢復到本地RocksDB,克服了state受內存限制的缺點,同時又能夠持久化到遠程文件系統中。
比較適合在生產環境中使用。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.12</artifactId> <version>1.10.1</version> </dependency>
val senv = StreamExecutionEnvironment.getExecutionEnvironment //org.apache.flink.contrib.streaming.state senv.setStateBackend(new RocksDBStateBackend("hdfs://flink/checkpoint", true)) //state.backend: rocksdb //state.backend.incremental: true