State Backends
本文翻譯自文檔Streaming Guide / Fault Tolerance / StateBackend
-----------------------------------------------------------------------------------------
使用Data Stream API編寫的程序通常以多種形式維護狀態:
· 窗口將收集element或在它被觸發后聚合element
· Transformation方法可能會使用key/value狀態接口來存儲值
· Transformation方法也可能會實現Checkpointed接口來使其本地變量進入容錯機制
相關信息請見Streaming API Guide的文檔Working with State。
當檢查點機制啟動時,上述的狀態將在檢查點中持久化來應對數據丟失以及恢復。而狀態在內部是如何表示的、狀態是如何持久化到檢查點中以及持久化到哪里都取決於選定的State Backend。
一、可用的State Backends
Flink自帶了以下幾種開箱即用的state backend:
· MemoryStateBackend
· FsStateBackend
· RocksDBStateBackend
在沒有配置的情況下,系統默認使用MemoryStateBackend
1.1 MemoryStateBackend
MemoryStateBackend在內部以Java堆中的對象形式持有數據。Key/Value狀態和窗口Operator則持有一個hash表來存儲值、trigger等。
在檢查點中,該StateBackend將對狀態進行快照並將該快照作為檢查點接受完成消息(checkpoint acknowledgement message)發送到JobManager(Master)處,在那里它們也是存儲在JobManager的Java堆中。
MemoryStateBackend的局限:
· 單個狀態的大小默認地被限制到5MB,該限制值可以在MemoryStateBackend的構造函數周增加。
· 不論配置的最大狀態大小是多少,狀態大小無法大於akka的frame大小(見於Configuration)
· 聚合的狀態必須能放入JobManager的內存
MemoryStateBackend適用於以下情景:
· 本地開發以及debug時使用
· Job只持有很小的狀態時,如job只包含那些擁有某時刻數據的方法(Map,FlatMap,Filter…)。此外,Kafka Consumer也只需要很少的狀態
1.2 FsStateBackend
FsStateBackend需要使用一個文件系統的URL來配置(type, address, path),如"hdfs://namenode:40010/flink/checkpoint"或者"file:///data/flink/checkpoints" 。
FsStateBackend在TaskManager的內存中持有in-flight的數據。當進行檢查點時,它像狀態的快照寫入配置好的文件系統及目錄下的文件中。而極少的元數據則存儲在JobManager的內存中(或者在高可用性模式(high-availability mode)下,存儲在元數據檢查點中(metadata checkpoint))。
FsStateBackend適用於以下情景:
· 擁有大狀態、長窗口、打key/Value狀態的Job。
· 所有高可用性部署中
1.3 RocksDBStateBackend
RocksDBStateBackend使用一個文件系統URL來配置(type, address, path),例如"hdfs://namenode:40010/flink/checkpoint"或者"file:///data/flink/checkpoints" 。
RocksDBStateBackend在RocksDB數據庫中持有in-flight數據,該數據庫默認存儲在每個TaskManager的數據目錄下。當進行檢查點時,整個RocksDB數據庫將會被檢查點到配置的文件系統及目錄中去。而極少的元數據則存儲在JobManager的內存掣肘(或者在高可用性模式下,存儲在元數據檢查點中)。
RocksDBStateBackend適用於以下情景:
· 擁有非常大的狀態、長窗口、大key/Value狀態的Job。
· 所有高可用性部署中。
注意:要使用RocksDBStateBackend,你必須添加正確的maven dependency到你的項目中:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.10</artifactId> <version>1.2-SNAPSHOT</version> </dependency>
現在backend不在binary發行版本當中,要將它引入到集群執行中,請見文檔Linking with modules not contained in the binary distribution
二、配置一個StateBackend
StateBackend可以每個job單獨配置。此外,你可以定義一個默認的StateBackend,它將在Job沒有定義一個StateBackend時啟用。
2.1 設置單個job的StateBackend
單個job的StateBackend可以在Job的StreamExecutionEnvrionment中設置,代碼如下所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
2.2 設置默認StateBackend
默認StateBackend可以在flink-conf.yaml中配置,使用配置關鍵字state.backend。
配置entry的可能的值為jobmanager(MemoryStateBackend),filesystem(FsStateBackend),或者是實現StateBackend工廠接口FsStateBackendFactory的類的完全限定類名(full qualified class name)
在默認StateBackend設置為filesystem時,配置項state.backend.fs.checkpointdir定義了檢查點數據存儲的目錄。
配置文件實例如下所示:
# The backend that will be used to store operator state checkpoints state.backend: filesystem # Directory for storing checkpoints state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints
