Flink狀態管理之 StateBackend


默認情況下,state 會保存在TaskManager的內存中,checkpoint會存儲在JobManager的內存中。

state 的存儲和 checkpoint的位置取決於StateBackend的配置。

 

 Flink一共提供了三種StateBackend

1.MemoryStateBackend    (基於內存存儲)

2.FsStateBackend               (基於文件系統存儲)

3.RocksDBStateBackend   (基於RocksDB數據庫存儲)

 

一、MemoryStateBackend

在這種方式下,數據持久化狀態存儲在內存中,state數據保存在Java堆內存中,執行checkpoint時會把快照數據保存到JobManager的內存中。

基於內存的StateBackend在生產環境下不建議使用,適用於測試開發環境。

val senv = StreamExecutionEnvironment.getExecutionEnvironment

senv.setStateBackend(new MemoryStateBackend())

 

二、FsStateBackend

在這種方式下,state數據保存在TaskManager的內存中,執行checkpoint時會把state的快照數據保存到配置的文件系統中。

可以使用HDSF分布式文件系統。基於HDFS數據有備份、很安全。

val senv = StreamExecutionEnvironment.getExecutionEnvironment

senv.setStateBackend(new FsStateBackend("hdfs://flink/checkpoint")

 

三、RocksDBStateBackend

RocksDB使用一套日志結構的數據庫引擎,它是Flink中內置的第三方狀態管理器。

它會在本地文件系統中維護狀態,state會直接寫入本地RocksDB中。

同時,它需要配置一個遠程的文件系統URI(一般是HDFS)

在做 checkpoint時會把本地的數據直接復制到文件系統中。

Failover(失效轉移)的時候從文件系統中恢復到本地RocksDB,克服了state受內存限制的缺點,同時又能夠持久化到遠程文件系統中。

比較適合在生產環境中使用。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
    <version>1.10.1</version>
</dependency>
val senv = StreamExecutionEnvironment.getExecutionEnvironment

//org.apache.flink.contrib.streaming.state
senv.setStateBackend(new RocksDBStateBackend("hdfs://flink/checkpoint", true))
//state.backend: rocksdb
//state.backend.incremental: true    

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM