流計算中可能有各種方式來保存狀態:
- 窗口操作
- 使用 了KV操作的函數
- 繼承了
CheckpointedFunction的函數
當開始做checkpointing的時候,狀態會被持久化到checkpoints里來規避數據丟失和狀態恢復。選擇的狀態存儲策略不同,會導致狀態持久化如何和checkpoints交互。
1.可用的狀態持久化策略
Flink提供了三種持久化策略,如果沒有顯式指定,則默認使用MemoryStateBackend。
The MemoryStateBackend
將數據保存在java的堆里,kv狀態或者window operator用hash table來保存values,triggers等等。
當進行checkpoints的時候,這種策略會對狀態做快照,然后將快照作為checkpoint acknowledgement的一部分發送給JobManager,JM也將其保存在堆中。
MemoryStateBackend可以使用異步的方式進行快照,我們也鼓勵使用異步的方式,避免阻塞,現在默認就是異步。如果不希望異步,可以在構造的時候傳入false,如下:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
限制:
- 單次狀態大小最大默認被限制為5MB,這個值可以通過構造函數來更改。
- 無論單次狀態大小最大被限制為多少,都不可用大過akka的frame大小。
- 聚合的狀態都會寫入JM的內存。
適合:
- 本地開發和調試。
- 狀態比較少的作業
The FsStateBackend
FsStateBackend 通過文件系統的URL來設置,比如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。
保持數據在TM的內存中,當做checkpointing的時候,會將狀態快照寫入文件,保存在文件系統或本地目錄。少量的元數據會保存在JM的內存中。
默認使用異步的方式進行快照,同樣,取消異步需要傳遞false:
new FsStateBackend(path, false);
適用:
- 狀態比較大,窗口比較長,大的KV狀態
- 需要做HA的場景
The RocksDBStateBackend
RocksDBStateBackend 通過文件系統的URL來設置,例如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。
保存數據在一個叫做RocksDB的數據庫中,這個數據庫保存在TM的數據目錄中。當做checkpointing時,整個數據庫會被寫入文件系統和目錄。少量的元信息會保存在JM的內存中。
這種策略只支持異步快照。
限制:
- 由於依賴於字節數組,支持的key和value的大小最大為2^31字節。對於使用Merge操作的狀態,大小很可能就默默的超過了這個限制,下次獲取就會失敗。
適合:
- 非常大的狀態,長窗口,大的KV狀態
- 需要HA的場景
能夠持有的狀態的多少只取決於可使用的磁盤大小,這會允許使用非常大的狀態,相比較FsStateBackend將狀態保存在內存中。但這也同時意味着,這個策略的吞吐量會受限。
RocksDBStateBackend是目前唯一支持incremental的checkpoints的策略。
2.配置狀態持久化策略
如果你沒有指定任何策略,默認使用JM作為存儲策略。如果你想更改,可以在flink-conf.yaml中變更,存儲策略也可以在作業中單獨設定。
Setting the Per-job State Backend
可以在StreamExecutionEnvironment中指定:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
Setting Default State Backend
默認的狀態存儲策略通過在flink-conf.yaml中通過state.backend來指定,有如下一些可選:
- jobmanager (MemoryStateBackend)
- filesystem (FsStateBackend)
- rocksdb (RocksDBStateBackend)
也可以以全路徑來指定,比如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
來代替 RocksDBStateBackend,不過,何必了。
state.checkpoints.dir這個參數來指定所有的checkpoints數據和元數據存儲的位置。示例如下:
# The backend that will be used to store operator state checkpoints state.backend: filesystem # Directory for storing checkpoints state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints