Flink之狀態之狀態存儲 state backends


流計算中可能有各種方式來保存狀態:

  • 窗口操作
  • 使用 了KV操作的函數
  • 繼承了CheckpointedFunction的函數

當開始做checkpointing的時候,狀態會被持久化到checkpoints里來規避數據丟失和狀態恢復。選擇的狀態存儲策略不同,會導致狀態持久化如何和checkpoints交互。

1.可用的狀態持久化策略

Flink提供了三種持久化策略,如果沒有顯式指定,則默認使用MemoryStateBackend。

The MemoryStateBackend

將數據保存在java的堆里,kv狀態或者window operator用hash table來保存values,triggers等等。

當進行checkpoints的時候,這種策略會對狀態做快照,然后將快照作為checkpoint acknowledgement的一部分發送給JobManager,JM也將其保存在堆中。

MemoryStateBackend可以使用異步的方式進行快照,我們也鼓勵使用異步的方式,避免阻塞,現在默認就是異步。如果不希望異步,可以在構造的時候傳入false,如下:

new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);

限制:

  • 單次狀態大小最大默認被限制為5MB,這個值可以通過構造函數來更改。
  • 無論單次狀態大小最大被限制為多少,都不可用大過akka的frame大小。
  • 聚合的狀態都會寫入JM的內存。

適合:

  • 本地開發和調試。
  • 狀態比較少的作業

The FsStateBackend

FsStateBackend 通過文件系統的URL來設置,比如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。

保持數據在TM的內存中,當做checkpointing的時候,會將狀態快照寫入文件,保存在文件系統或本地目錄。少量的元數據會保存在JM的內存中。

默認使用異步的方式進行快照,同樣,取消異步需要傳遞false:

 new FsStateBackend(path, false);

適用:

  • 狀態比較大,窗口比較長,大的KV狀態
  • 需要做HA的場景

The RocksDBStateBackend

RocksDBStateBackend 通過文件系統的URL來設置,例如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。

保存數據在一個叫做RocksDB的數據庫中,這個數據庫保存在TM的數據目錄中。當做checkpointing時,整個數據庫會被寫入文件系統和目錄。少量的元信息會保存在JM的內存中。

這種策略只支持異步快照。

限制:

  • 由於依賴於字節數組,支持的key和value的大小最大為2^31字節。對於使用Merge操作的狀態,大小很可能就默默的超過了這個限制,下次獲取就會失敗。

適合:

  • 非常大的狀態,長窗口,大的KV狀態
  • 需要HA的場景

能夠持有的狀態的多少只取決於可使用的磁盤大小,這會允許使用非常大的狀態,相比較FsStateBackend將狀態保存在內存中。但這也同時意味着,這個策略的吞吐量會受限。

RocksDBStateBackend是目前唯一支持incremental的checkpoints的策略。

2.配置狀態持久化策略

如果你沒有指定任何策略,默認使用JM作為存儲策略。如果你想更改,可以在flink-conf.yaml中變更,存儲策略也可以在作業中單獨設定。

Setting the Per-job State Backend

可以在StreamExecutionEnvironment中指定:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

Setting Default State Backend

默認的狀態存儲策略通過在flink-conf.yaml中通過state.backend來指定,有如下一些可選:

  • jobmanager (MemoryStateBackend)
  • filesystem (FsStateBackend)
  • rocksdb (RocksDBStateBackend)

也可以以全路徑來指定,比如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory 來代替 RocksDBStateBackend,不過,何必了。

state.checkpoints.dir這個參數來指定所有的checkpoints數據和元數據存儲的位置。示例如下:

# The backend that will be used to store operator state checkpoints

state.backend: filesystem


# Directory for storing checkpoints

state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM