Flink状态管理之 StateBackend


默认情况下,state 会保存在TaskManager的内存中,checkpoint会存储在JobManager的内存中。

state 的存储和 checkpoint的位置取决于StateBackend的配置。

 

 Flink一共提供了三种StateBackend

1.MemoryStateBackend    (基于内存存储)

2.FsStateBackend               (基于文件系统存储)

3.RocksDBStateBackend   (基于RocksDB数据库存储)

 

一、MemoryStateBackend

在这种方式下,数据持久化状态存储在内存中,state数据保存在Java堆内存中,执行checkpoint时会把快照数据保存到JobManager的内存中。

基于内存的StateBackend在生产环境下不建议使用,适用于测试开发环境。

val senv = StreamExecutionEnvironment.getExecutionEnvironment

senv.setStateBackend(new MemoryStateBackend())

 

二、FsStateBackend

在这种方式下,state数据保存在TaskManager的内存中,执行checkpoint时会把state的快照数据保存到配置的文件系统中。

可以使用HDSF分布式文件系统。基于HDFS数据有备份、很安全。

val senv = StreamExecutionEnvironment.getExecutionEnvironment

senv.setStateBackend(new FsStateBackend("hdfs://flink/checkpoint")

 

三、RocksDBStateBackend

RocksDB使用一套日志结构的数据库引擎,它是Flink中内置的第三方状态管理器。

它会在本地文件系统中维护状态,state会直接写入本地RocksDB中。

同时,它需要配置一个远程的文件系统URI(一般是HDFS)

在做 checkpoint时会把本地的数据直接复制到文件系统中。

Failover(失效转移)的时候从文件系统中恢复到本地RocksDB,克服了state受内存限制的缺点,同时又能够持久化到远程文件系统中。

比较适合在生产环境中使用。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
    <version>1.10.1</version>
</dependency>
val senv = StreamExecutionEnvironment.getExecutionEnvironment

//org.apache.flink.contrib.streaming.state
senv.setStateBackend(new RocksDBStateBackend("hdfs://flink/checkpoint", true))
//state.backend: rocksdb
//state.backend.incremental: true    

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM