Flink Program Guide (9) -- StateBackend : Fault Tolerance(Basic API Concepts -- For Java)


State Backends

本文翻譯自文檔Streaming Guide / Fault Tolerance / StateBackend

-----------------------------------------------------------------------------------------

使用Data Stream API編寫的程序通常以多種形式維護狀態:

·  窗口將收集element或在它被觸發后聚合element

·  Transformation方法可能會使用key/value狀態接口來存儲值

·  Transformation方法也可能會實現Checkpointed接口來使其本地變量進入容錯機制

相關信息請見Streaming API Guide的文檔Working with State

 

當檢查點機制啟動時,上述的狀態將在檢查點中持久化來應對數據丟失以及恢復。而狀態在內部是如何表示的、狀態是如何持久化到檢查點中以及持久化到哪里都取決於選定的State Backend

 

一、可用的State Backends

Flink自帶了以下幾種開箱即用的state backend

·  MemoryStateBackend

·  FsStateBackend

·  RocksDBStateBackend

在沒有配置的情況下,系統默認使用MemoryStateBackend

 

1.1 MemoryStateBackend

MemoryStateBackend在內部以Java堆中的對象形式持有數據。Key/Value狀態和窗口Operator則持有一個hash表來存儲值、trigger等。

 

在檢查點中,該StateBackend將對狀態進行快照並將該快照作為檢查點接受完成消息(checkpoint acknowledgement message)發送到JobManager(Master)處,在那里它們也是存儲在JobManagerJava堆中。

 

MemoryStateBackend的局限:

·  單個狀態的大小默認地被限制到5MB,該限制值可以在MemoryStateBackend的構造函數周增加。

·  不論配置的最大狀態大小是多少,狀態大小無法大於akkaframe大小(見於Configuration

·  聚合的狀態必須能放入JobManager的內存

 

MemoryStateBackend適用於以下情景:

·  本地開發以及debug時使用

·  Job只持有很小的狀態時,如job只包含那些擁有某時刻數據的方法(Map,FlatMap,Filter…)。此外,Kafka Consumer也只需要很少的狀態

 

1.2 FsStateBackend

FsStateBackend需要使用一個文件系統的URL來配置(type, address, path),如"hdfs://namenode:40010/flink/checkpoint"或者"file:///data/flink/checkpoints"

 

FsStateBackendTaskManager的內存中持有in-flight的數據。當進行檢查點時,它像狀態的快照寫入配置好的文件系統及目錄下的文件中。而極少的元數據則存儲在JobManager的內存中(或者在高可用性模式(high-availability mode)下,存儲在元數據檢查點中(metadata checkpoint))。

 

FsStateBackend適用於以下情景:

·  擁有大狀態、長窗口、打key/Value狀態的Job

·  所有高可用性部署中

 

1.3 RocksDBStateBackend

RocksDBStateBackend使用一個文件系統URL來配置(type, address, path),例如"hdfs://namenode:40010/flink/checkpoint"或者"file:///data/flink/checkpoints"

 

RocksDBStateBackendRocksDB數據庫中持有in-flight數據,該數據庫默認存儲在每個TaskManager的數據目錄下。當進行檢查點時,整個RocksDB數據庫將會被檢查點到配置的文件系統及目錄中去。而極少的元數據則存儲在JobManager的內存掣肘(或者在高可用性模式下,存儲在元數據檢查點中)。

 

RocksDBStateBackend適用於以下情景:

·  擁有非常大的狀態、長窗口、大key/Value狀態的Job

·  所有高可用性部署中。

 

注意:要使用RocksDBStateBackend,你必須添加正確的maven dependency到你的項目中:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
    <version>1.2-SNAPSHOT</version>
</dependency>

 

現在backend不在binary發行版本當中,要將它引入到集群執行中,請見文檔Linking with modules not contained in the binary distribution

 

二、配置一個StateBackend

StateBackend可以每個job單獨配置。此外,你可以定義一個默認的StateBackend,它將在Job沒有定義一個StateBackend時啟用。

 

2.1 設置單個jobStateBackend

單個jobStateBackend可以在JobStreamExecutionEnvrionment中設置,代碼如下所示:

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.
setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

 

2.2 設置默認StateBackend

默認StateBackend可以在flink-conf.yaml中配置,使用配置關鍵字state.backend

 

配置entry的可能的值為jobmanager(MemoryStateBackend)filesystem(FsStateBackend),或者是實現StateBackend工廠接口FsStateBackendFactory的類的完全限定類名(full qualified class name)

 

在默認StateBackend設置為filesystem時,配置項state.backend.fs.checkpointdir定義了檢查點數據存儲的目錄。

 

配置文件實例如下所示:

 

# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints
state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints

 

 

 

 

  

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM