默认情况下,state 会保存在TaskManager的内存中,checkpoint会存储在JobManager的内存中。
state 的存储和 checkpoint的位置取决于StateBackend的配置。
Flink一共提供了三种StateBackend
1.MemoryStateBackend (基于内存存储)
2.FsStateBackend (基于文件系统存储)
3.RocksDBStateBackend (基于RocksDB数据库存储)
一、MemoryStateBackend
在这种方式下,数据持久化状态存储在内存中,state数据保存在Java堆内存中,执行checkpoint时会把快照数据保存到JobManager的内存中。
基于内存的StateBackend在生产环境下不建议使用,适用于测试开发环境。
val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setStateBackend(new MemoryStateBackend())
二、FsStateBackend
在这种方式下,state数据保存在TaskManager的内存中,执行checkpoint时会把state的快照数据保存到配置的文件系统中。
可以使用HDSF分布式文件系统。基于HDFS数据有备份、很安全。
val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setStateBackend(new FsStateBackend("hdfs://flink/checkpoint")
三、RocksDBStateBackend
RocksDB使用一套日志结构的数据库引擎,它是Flink中内置的第三方状态管理器。
它会在本地文件系统中维护状态,state会直接写入本地RocksDB中。
同时,它需要配置一个远程的文件系统URI(一般是HDFS)
在做 checkpoint时会把本地的数据直接复制到文件系统中。
Failover(失效转移)的时候从文件系统中恢复到本地RocksDB,克服了state受内存限制的缺点,同时又能够持久化到远程文件系统中。
比较适合在生产环境中使用。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.12</artifactId> <version>1.10.1</version> </dependency>
val senv = StreamExecutionEnvironment.getExecutionEnvironment //org.apache.flink.contrib.streaming.state senv.setStateBackend(new RocksDBStateBackend("hdfs://flink/checkpoint", true)) //state.backend: rocksdb //state.backend.incremental: true