「Flink」Flink的狀態管理與容錯


在Flink中的每個函數和運算符都是有狀態的。在處理過程中可以用狀態來存儲數據,這樣可以利用狀態來構建復雜操作。為了讓狀態容錯,Flink需要設置checkpoint狀態。Flink程序是通過checkpoint來保證容錯,通過checkpoint機制,Flink可恢復作業的狀態和計算位置。

checkpoint檢查點

前提條件

Flink的checkpoin機制需要與流和狀態的持久化存儲交互,一般它要求:

  • 一個持久化的數據源
    • 當Flink程序出現問題時,可以通過checkpoint持久化存儲中恢復,然后從出錯的地方開始重新消費數據
    • 該數據源可以在一定時間內重跑數據,例如:Kafka、RabbitMQ或者文件系統HDFS、S3、…
  • 狀態的持久存儲
    • 狀態需要永久的保存下來,通常是分布式文件系統(例如:HDFS、S3、GFS、…)

啟用和配置檢查點

默認情況,Flink是禁用檢查點。要啟用檢查點,調用

// 啟用檢查點
// 單位:毫秒
env.enableCheckpointing(1000);

在啟用檢查點時,還可以配置檢查點的其他參數。

  • exactly-one or at-least-once(僅一次或者至少一次)
    • 大多數程序都是設置為exactly-once,只有在某些超低延遲的應用(例如:始終要求是毫秒級的應用)
    • 通過查看源碼,我們看到,Flink默認是 exactly-once
      • public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
  • checkpoint timeout(檢查點超時時間)
    • 檢查點超過規定的時間就會自動終止
  • minimum time between checkpoints
    • 檢查點之間的最小時間
    • 下一個檢查點將在上一個檢查點完成后5秒鍾啟動
    • 檢查點最小間隔時間不會受檢查點間隔更容易配置
  • number of concureent checkpoint
    • 檢查點的並發數目。默認情況一個檢查點在運行時不會觸發另一個檢查點,這樣可以確保Flink不會花太多時間在checkpoint上,並確保流可以有效進行。
    • 可以設置多個重疊的checkpoint,這對容許有一定延遲,並希望較頻繁的檢查(100ms)來重新處理故障是有用的
  • externalized checkpoint
    • 外部檢查點
    • 可以將檢查點設置為外部持久化,這樣檢查點的元數據將寫入持久存儲,並且但作業運行失敗是不會自動清理
    • 這樣可以做雙重保險
  • fail/continue task on checkpoint errors
    • 檢查點執行發生錯誤,是否執行任務。
    • 默認情況,如果checkpoint失敗,任務也將失敗
  • perfer checkpoint for recovery
    • 即時最近有更多的savepoint可用於恢復,flink依然會選擇使用最近一次的checkpoint來進行錯誤恢復

參考配置:

        // --------
        // 配置checkpoint
        // 啟用檢查點
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

選擇狀態后端存儲

Flink的checkpoint機制可以存儲計時器和有狀態operation的所有快照,包括:連接器、窗口或者用戶自定義狀態。具體checkpoint存儲在哪兒(例如:是JobManager內存、文件系統或者數據庫),依賴於狀態后端的配置。

默認情況,狀態保存在TaskManager的內存中,檢查點存儲在TM的內存中。為了適當地保存大狀態,Flink支持其他的存儲。我們可以通過:

StreamExecutionEnvironment.setStateBackend(…)

來指定存儲方式

Flink狀態管理

狀態的應用場景:

  • 當應用程序想要按照某種模式搜索某些事件時,狀態可以保存迄今所有的事件序列
  • 當每分鍾/小時/天需要對流數據進行聚合,狀態可以保存掛起的聚合
  • 當在數據流上訓練機器學習模型時,狀態可以用來保存某一類參數的版本
  • 當需要管理歷史數據時,狀態允許訪問過去歷史數據

Flink狀態可以保存在堆內、或者是堆外。Flink也可以管理應用程序的狀態,必要時也可以溢出到磁盤,如果應用要保持非常大的狀態,可以不修改程序邏輯情況下配置狀態后端存儲。

Flink狀態分類

Flink中有兩種基本的狀態:

  • Keyed State
  • Operator State

Keyed State

Keyed State通常和key相關,僅僅在KeyedStream的方法和算子中使用。可以把 Keyed State看作是分區,而且每一個key僅出現在一個分區內。邏輯上每個 keyed-state和唯一元組<算子並發實例, key>綁定,由於每個key僅屬於算子的一個並發,因此可以簡化為<算子, key>

Operator State

對於 Operator State來說,每個Operator State和一個並發實例綁定。Kafka connector是Flink中使用operator state的一個很好的示例。每個Kafka消費者的並發在Operator State中維護一個 topic partition到offset的映射關系。

Operator state在Flink作業的並發改變后,會重新分發狀態,分發的策略和keyed stated不一樣。

Raw State與Managed State

Keyed Stated和Operator State分別有兩種形式:managed 和 raw

Managed State是由Flink運行時管理的數據結構來表示的,例如:內部的Hash Table或者RocksDB。例如:ValueState、ListState等。Flink運行時會對這些狀態進行編碼並寫入Checkpoint。

Raw State則保存在自己的數據結構中。checkpoint的時候,Flink並不知道狀態里面具體的內容,僅僅寫入一串字節序列到checkpoint中。

所有的DataStream的function都可以使用managed state,但raw state只能在實現算子時使用。由於Flink可以在修改並發時更好的分發狀態數據,並且能夠更好的管理內存,因為講義使用 managed state.

使用Managed Keyed State

Managed keyed state接口提供不同類型的狀態訪問接口,這些狀態都作用在當前輸入數據的key下。這些狀態僅可在KeyedStream上使用,可以通過 stream.keyBy(…)得到KeyedStream。

所有支持的狀態類型如下:

  • ValueState<T>
    • 保存一個可以更新和獲取的值,算子接收到的每個key都可能對應一個值
    • 可以通過update(T)進行更新,通過value()獲取
  • ListState<T>
    • 保存一個元素的列表,可以往這個列表中追加數據,並在當前列表上檢索
    • 可以通過 add(T)或者addAll(List<T>)進行追加元素
    • 通過get()獲取整個列表
    • 通過 update(List<T>)覆蓋當前列表
  • ReducingState<T>
    • 保存一個單值,表示添加到狀態的所有值的聚合。接口與ListState類似
  • AggregatingState<IN, OUT>
    • 保存一個單值,表示添加到狀態的所有值的聚合
    • 與ReducingState相反的是,聚合類型可能與添加到狀態的元素類型不同。接口與ListState類似
  • FoldingState<T, ACC>(后續將過期)
    • 保存一個單值,白搜狐添加到狀態的所有值的集合
    • 與ReducingState相反的是,聚合類型可能與添加到狀態的元素類型不同。接口與ListState類似
  • MapState<UK, UV>
    • 維護一個映射列表,可以添加鍵值到狀態中,可以獲取當前映射的迭代器
    • 使用put、putAll添加映射,使用 get檢索特定key

注意:

  • 這些狀態對象僅用於狀態交互。狀態本身不一定存儲在內存中,還有可能保存在磁盤或者其他位置
  • 從狀態中獲取的值取決於輸入元素說代表的key,因此,在不同key上調用同一個接口,可能得到不同的值

使用Managed Operator State

可以通過實現 CheckpointedFunction 或者 ListCheckpointed<T extends Serialized>接口來使用Managed Operator State。

CheckpointedFunction接口:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

在Flink進行checkpoint時,會調用snapshotstate(),用戶自定義函數初始化時會調用 initializeState。初始化包括第一次自定義函數初始化和從之前的 checkpoint 回復。因此,initializeState 中應該也包括狀態恢復的邏輯。

Managed Operator State以list的形式存在,這些狀態是一個可序列化對象的集合List,彼此獨立,方便在改變並發后進行狀態的重新分派。換句話說,這些對象是重新分配 non-keyed state的最細粒度。根據狀態的不同訪問方式,有以下兩種分配模式:

  • Even-split redistribution
    • 每個算子都存儲一個列表形式的狀態集合,整個狀態由所有的列表拼接而成
    • 但作業恢復或者重新分配時,整個狀態按照算子的並行度均勻分配
  • Union redistribution
    • 每個算子保存一個列表形式的狀態集合,整個狀態由所有的列表拼接而成
    • 但作業恢復或者重新分配時,每個算子都將獲得所有的狀態數據

ListCheckpointed接口:

ListCheckpointed接口是CheckpointedFunction接口的精簡版,僅支持 even-split redistribution的list state

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;

snapshotState()需要返回一個將寫入到checkpoint的對象列表, restoreState則需要處理恢復回來的對象列表。


參考文獻:

Flink官方文檔:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/checkpointing.html

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/ops/state/checkpoints.html

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/state.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM