Flink StateBackend 狀態后端


一、概述

保存機制 StateBackend ,默認情況下,State 會保存在 TaskManager 的內存中,CheckPoint 會存儲在 JobManager 的內存中。

State 和 CheckPoint 的存儲位置取決於 StateBackend 的配置。

Flink 一共提供了 3 中 StateBackend,包括

  基於內存的 MemoryStateBackend、基於文件系統的 FsStateBackend、基於RockDB存儲介質的 RocksDBState-Backend

1)MemoryStateBackend

  基於內存的狀態管理,具有非常快速和高效的特點,但也具有非常多的限制。最主要的就是內存的容量限制,

一旦存儲的狀態數據過多就會導致系統內存溢出等問題,從而影響整個應用的正常運行。同時如果機器出現問題,

整個主機內存中的狀態數據都會丟失,進而無法恢復任務中的狀態數據。因此從數據安全的角度建議用戶盡可能地避免

在生產環境中使用 MemoryStateBackend

streamEnv.setStateBackend(new MemoryStateBackend(10*1024*1024))

2)FsStateBackend

  和MemoryStateBackend有所不同,FsStateBackend是基於文件系統的一種狀態管理器,這里的文件系統可以是本地文件系統,

也可以是HDFS分布式文件 系統。FsStateBackend更適合任務狀態非常大的情況,例如:應用中含有時間范圍非常長的窗口計算,

或 Key/Value State 狀態數據量非常大的場景。

  streamEnv.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/checkpoint/cp1"))

3)RocksDBStatusBackend

  RocksDBStatusBackend是Flink中內置的第三方狀態管理器,和前面的狀態管理器不同,RocksDBStateBackend 需要單獨引入

相關的依賴包到工程中。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
  <version>1.9.1</version>
</dependency>

  RocksDBStateBackend 采用異步的方式進行狀態數據的 Snapshot,任務中的狀態數據首先被寫入本地 RockDB 中,這樣在

RockDB 僅會存儲正在計算的熱數據,而需要進行 CheckPoint 的時候,會把本地的數據直接復制到遠端的 FileSystem 中。

  與 FsStateBackend 相比,RocksDBStateBackend 在性能上要比 FsStateBackend 高一些,主要是因為借助於 RocksDB 在

本地存儲了最新熱數據,然后通過異步的方式再同步到文件系統中,但 RocksDBStateBackend 和 MemoryStateBackend 相比性能

就會較弱一些。RocksDB 克服了 State 受內存限制的缺點,同時又能夠持久化到遠端文件系統中,推薦在生產中使用。

streamEnv.setStateBackend(new RocksDBStateBackend("hdfs://hadoop101:9000/checkpoint/cp2"))

4)全局配置 StateBackend

  前面的幾個代碼都是單 job 配置狀態后端,也可以全局配置狀態后端,需要修改 flink-conf.yaml配置文件:

state.backend:filesystem

  其中:

  filesystem 表示使用 FsStateBackend

  jobmanager 表示使用 MemoryStateBackend

  rocksdb 表示使用 RocksDBStateBackend

state.checkpoint.dir:hdfs://hadoop101:9000/checkpoints

  默認情況下,如果設置了 Checkpoint 選項,則 Flink 只保留最近成功生成的 1 個 Checkpoint,而當 Flink 程序失敗時,可以通過最近的 CheckPoint 來進行恢復。但是希望保留多個 CheclPoint,並能夠根據實際需要選擇其中一個進行恢復,就會更加靈活。添加如下配置,指定最多可以保存的 CheckPoint 的個數

 state.checkpoints.num-retained: 2

Checkpoint 案例

 案例:設置 HDFS 文件系統的狀態后端,取消 Job 之后再次恢復 Job

package com.apple.flink.point

import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object CheckpointOnFsBackend {

  def main(args: Array[String]): Unit = {
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //開啟檢查點且指定檢查點時間間隔 5000ms
    streamEnv.enableCheckpointing(5000)

    //保存機制 StateBackend:目前為 FsStateBackend
    streamEnv.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/checkpoint/cp1"))

    //exactly-once 語義保證整個應用內端到端的數據一致性
    streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    //Checkpoint 超時時間
    streamEnv.getCheckpointConfig.setCheckpointTimeout(50000)

    //setMaxConcurrentCheckpoints()方法設定能夠最大同時執行的 Checkpoint 數量,默認為1
    streamEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    //是否刪除 Checkpoint 中保存的數據
    streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    //可以容忍的檢查的失敗數,超過這個數量則系統自動關閉和停止任務
    streamEnv.getCheckpointConfig.setTolerableCheckpointFailureNumber(1)

    //設置上下文並行度
    streamEnv.setParallelism(1)

    import org.apache.flink.streaming.api.scala._

    //讀取數據得到 DataStream
    val stream = streamEnv.socketTextStream("hadoop101", 8888)

    stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1).print()

    //啟動流計算
    streamEnv.execute("wc")

  }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM