這里將介紹Flink對有狀態計算的支持,其中包括狀態計算和無狀態計算的區別,以及在Flink中支持的不同狀態類型,分別有 Keyed State 和 Operator State 。另外針對狀態數據的持久化,以及整個 Flink 任務的數據一致性保證,Flink 提供了 Checkpoint 機制處理和持久化狀態結果數據,隨后對狀態數據 Flink 提供了不同的狀態管理器來管理狀態數據,例如: MemoryStateBackend 等。
有狀態計算
在Flink架構體系中,有狀態計算可以說是Flink非常重要的特征之一。有狀態計算是指在程序計算過程中,在Flink程序內部,存儲計算產生的中間結果,並提供給Functions 或 孫子計算結果使用。如圖所示:

狀態數據可以維系在本地存儲中,這里的存儲可以是 Flink 的堆內存或者堆外內存,也可以借助第三方的存儲介質,例如:Flink中已經實現的RocksDB,當然用戶也可以自己實現相應的緩存系統去存儲狀態信息,以完成更加復雜的計算邏輯。和狀態計算不同的是,無狀態計算不會存儲計算過程中產生的結果,也不會將結果用於下一步計算過程中,程序只會在當前的計算流程中實行計算,計算完成就輸出結果,然后下一條數據接入,然后處理。
無狀態計算實現的復雜度相對較低,實現起來比較容易,但是無法完成提到的比較復雜的業務場景,例如:
- [ ] 用戶想實現CEP(復雜事件處理),獲取符合某一特定時間規則的事件,狀態計算就可以將接入的事件進行存儲,然后等待符合規則的事件觸發;
- [ ] 用戶想要按照 minutes / hour / day 等進行聚合計算,求取當前最大值、均值等聚合指標,這就需要利用狀態來維護當前計算過程中產生的結果,例如事件的總數、總和以及最大,最小值等;
- [ ] 用戶想在 Srteam 上實現機器學習的模型訓練,狀態計算可以幫助用戶維護當前版本模型使用的參數;
- [ ] 用戶想使用歷史的數據進行計算,狀態計算可以幫助用戶對數據進行緩存,使用戶可以直接從狀態中獲取相應的歷史數據。
Flink 狀態及應用
狀態類型
在 Flink 中根據數據集是否根據 Key 進行分區,將狀態分為 Keyed State 和 Operator State(Non-Keyed State) 兩種類型。
Keyed State
表示和key相關的一種state ,只能用於 KeyedStream 類型數據集對應的Functions和Operators之上。Keyed State 是 Operator State 的特例,區別在於 Keyed State 事先按照 key 對數據集進行了分區,每個 Key State 僅對應一個 Operator 和 Key 的組合。 Keyed State 可以通過 Key Group 進行管理,主要用於當算子並行度發生變化時,自動重新分布 Keyed State 數據。
Operator State
與 Keyed State 不同的是,Operator State 只和並行的算子實例綁定,和數據元素中的 Key 無關,每個算子實例中持有所有數據元素中的一部分狀態數據。 Operator State 支持當算子實例並行度發生變化時自動重新分配狀態數據。
同時在Flink中 Keyed State 和 Operator State 均具有兩種形式,其中一種為托管狀態(Managered State)形式,由Flink Runtime 中控制和管理狀態數據,並將狀態數據轉換稱為內存Hash tables 或 Recks DB 的對象存儲,然后將這些狀態數據通過內部接口持久化到 Checkpoints 中,任務異常時可以通過這些狀態數據恢復任務。另外一種是原生狀態(Row State)形式,由算子自己管理數據結構,當觸發 Checkpoints 過程中,Flink並不知道狀態數據內部的數據結構,只是將數據轉換成 bytes 數據存儲在 Checkpoints 中,當從 Checkpoints 恢復任務時,算子自己在反序列化出狀態的數據結構。
Notes: Flink中推薦用戶使用 Managered State 管理狀態數據,主要原因是:Manager State 能夠更好的支持狀態數據的重平衡以及更加完善的內存管理。
Managered Keyed State
Flink 有以下Managered Keyed State 類型可以使用,每種狀態都有相應的的使用場景,用戶可以根據實際需求選擇使用。
- [ ]
ValueState[T]: 與 Key 對應單個值的狀態,例如統計 user_id 對應的交易次數,每次用戶交易都會在 count 狀態值上進行更新。 ValueState 對應的更新方法是update(T), 取值是T value(); - [ ]
ListState[T]: 與 Key 對應元素列表的狀態,狀態中存放元素的 List 列表。例如定義 ListValue存儲用戶經常訪問的 IP 地址。在 ListState 中添加元素使用add(T) , addAll(List[T])兩個方法。獲取元素使用Iterable<T> get()方法,更新元素使用update(List[T])方法; - [ ]
ReducingState[T]: 定義與 Key 相關的數據元素單個聚合值的狀態,用戶存儲經過指定 ReduceFunction 計算之后的指標,因此,ReduceState 需要指定ReduceFunction 完成狀態數據的聚合。ReducingState 添加元素使用add(T)方法,獲取元素使用T get(); - [ ]
AggregeateState[IN,OUT]: 定義 與key相關的數據元素單個聚合值的狀態,用於維護數據經過指定 AggregateFunction 計算之后的指標。和ReducingState相比,AggregeateState 的輸入輸出類型不一定相同,但ReducingState 輸入/出 類型必須保持一致。和ListState相似,AggregatingState 需要指定AggregateFunction完成狀態數據的聚合操作。AggregatringState添加元素使用add(IN)方法, 獲取元素使用OUT get()方法; - [ ]
MapState<UK, UV>:這會保留一個映射列表。您可以將鍵值對放入狀態並檢索Iterable所有當前存儲的映射。使用put(UK, UV)或 添加映射putAll(Map[UK,UV])(Map<UK, UV>)。可以使用來檢索與用戶鍵關聯的值get(UK)。對於映射,鍵和值可迭代視圖可以使用被檢索entries(),keys()並values()分別。
Stateful Function定義
示例:
在RichFlatMapFunction 中定義 ValueState,已完成最小值的獲取:
inputStream.keyBy(_._1).flatMap(
// (String,Long,Int) 輸入類型
// (String,Long,Long) 輸出類型
new RichFlatMapFunction[(Int,Long) , (Int,Long,Long)] {
private var leastValueState:ValueState[Long] = _
// 定義狀態名稱
private var leastValueStateDesc:ValueStateDescriptor[Long] = _
override def open(parameters: Configuration): Unit = {
// 指定狀態類型
leastValueStateDesc = new ValueStateDescriptor[Long]("leastValueState" , classOf[Long])
// 通過 getRuntimeContext.getState 拿到狀態
leastValueState = getRuntimeContext.getState(leastValueStateDesc)
}
override def flatMap(value: (Int, Long), out: Collector[(Int, Long, Long)]): Unit = {
// 通過 value 拿到最小值
val leastValue: Long = leastValueState.value()
// 如果前一個指標大於最小值,則直接輸出數據元素和最小值
if ( leastValue != 0L && value._2 > leastValue){
out.collect((value._1 , value._2 , leastValue))
}else{
// 如果當前指標小於最小值,則更新狀態中的最小值
leastValueState.update(value._2)
// 將當前數據中的指標作為最小值輸出
out.collect(value._1 , value._2 , value._2)
}
}
}).print()
State生命周期
對於任何類型 Keyed State 都可以設定狀態生命周期(TTL),以確保能夠在規定時間內即時清理狀態數據。狀態生命周期功能可通過 StateTtlConfig 配置然后將 StateTtlConfig 配置傳入StateDescriptor 中的 enableTimeToLive 方法中即可。Keyed State 配置實例如下所示:
val config: StateTtlConfig = StateTtlConfig
// 指定TTL時長為 5s
.newBuilder(Time.seconds(5))
// 指定TTL 刷新只對創建和寫入操作有效
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// 指定狀態可見性不返回過期數據
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
leastValueStateDesc.enableTimeToLive(config)
在StateTtlConfig中除了通過 newBuilder() 方法中設定過期時間的參數是必須的之外,其他的參數都是可選的或使用默認值。其中 setUpdateType方法中傳入的類型有兩種:
- StateTtlConfig.UpdateType.onCreateAndWrite 僅在創建和寫入時更新 TTL ;
- StateTtlConfig.UpdateType.OnReadAndWriter 僅在讀與寫操作都更新 TTL ;
需要注意的是,過期的狀態數據根據UpdateType參數進行配置,只有被寫入或者讀取的是時間才會更新TTL,也就是說如果某個狀態指標一直不被使用活着更新,則永遠不會觸發對該狀態數據的清理操作,這種情況可能會導致系統中的狀態數據越來越大。
另外,可以通過 setStateVisibility 方法設定狀態的可見性,根據過期數據是否被清理來確定是否返回狀態數據:
- StateTtlConfig.StateVisibility.NeverReturnExpired: 狀態數據過期就不會返回(默認)
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp: 狀態數據即使過期但沒有被清理依然返回
Scala DataStream API中使用狀態
直接上代碼片段:
inputStream.keyBy(_._1)
// 指定輸入參數類型和狀態參數類型
.mapWithState((in:(Int,Long) , count : Option[Int]) =>
// 判斷count 類型是否非空
count match {
// 輸出 key , count 並在原來 count 數據上累加
case Some(c) => ((in._1 , c) , Some(c + in._2))
// 如果狀態為空,則將指標填入
case None => ((in._1 , 0) , Some(in._2))
}
)
Manager Operator State
Operator State 是一種 non-keyed-state ,與並行的操作算子實例相關聯,例如在 Kafka Connector 中,每個 Kafka 消費端算子實例都對應到 Kafka 的一個分區中,維護Topic分區和 Offsets 偏移量作為算子的 Operator State. 在Flink中可以實現 CheckpointedFunction 或者 ListCheckpoint<T extends Serializable>兩個接口來定義操作 Managered Operator State 的函數。
通過 CheckpointedFunction 接口操作Operator State
CheckpointedFunction 接口定義如圖:
@PublicEvolving
@SuppressWarnings("deprecation")
public interface CheckpointedFunction {
/**
* This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
* ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
* the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* This method is called when the parallel function instance is created during distributed
* execution. Functions typically set up their state storing data structures in this method.
*
* @param context the context for initializing the operator
* @throws Exception
*/
void initializeState(FunctionInitializationContext context) throws Exception;
}
在每個獨立的算子中,Managered Operator State 都是以 List 形式存儲的,算子和算子之間的狀態數據相互獨立,List存儲比較適合於狀態數據的重新分布,Flink目前支持Manager Operator State 兩種重要分布策略,分別是 Event-split Redistribution 和 Union Redistribution。
- [ ] Event-split Redistribution: 每個算子實例中含有部分元素的List列表,整個狀態數據是所有List列表,整個狀態數據是所有List列表的合集。當觸發 restore/redistribution 動作時,通過將狀態數據平均分配成與算子並行度相同數量的List列表,每個 task 實例中有一個 List,其可以為空或者含有多個元素。
- [ ] Union Redistribution: 每個算子實例中含有所有狀態元素的List 列表,當觸發 restore/redistribution 動作時,每個算子可以獲取到完整的狀態元素列表。
/**
* @title CheckpointCount
* @description 實現 CheckpointFunction 接口利用Operator State 統計輸入到算子的數據量
* @author Mr.Sun
* @version v.1.0
* @date 2019/12/24 9:16
*/
class CheckpointCount(val numElements: Int) extends FlatMapFunction[(Int, Long), (Int, Long, Long)] with CheckpointedFunction {
// 定義算子實例本地變量,存儲Operator數據數量
private var operatorCount: Long = _
// 定義 keyedState ,存儲和 key 相關的狀態值
private var keyedState: ValueState[Long] = _
// 定義 operatorState , 存儲算子的狀態值
private var operatorState: ListState[Long] = _
override def flatMap(value: (Int, Long), out: Collector[(Int, Long, Long)]): Unit = {
val keyedCount: Long = keyedState.value()
// 更新 keyedState 數量
keyedState.update(keyedCount)
// 更新本地的算子 operatorCount
operatorCount = operatorCount + 1
// 輸出結果,包括 id , id 對應的的數量統計 keyedCount ,算子輸入數據的數量統計 operatorCount
out.collect(value._1, keyedCount, operatorCount)
}
// 當發生了 snapshotState , 將 operatorCount 添加到 operatorState 中
override def snapshotState(context: FunctionSnapshotContext): Unit = {
operatorState.clear()
operatorState.add(operatorCount)
}
// 初始化狀態數據
override def initializeState(context: FunctionInitializationContext): Unit = {
// 定義並獲取 keyedState
keyedState = context.getKeyedStateStore.getState(new ValueStateDescriptor[Long]("keye-state", classOf[Long]))
// 定義並獲取 operatorState
operatorState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long]("operator-state", classOf[Long]))
// 定義在 Restored 過程中, 從 operatorState 中恢復數據的邏輯
if (context.isRestored){
val value: util.Iterator[Long] = operatorState.get().iterator()
while (value.hasNext){
operatorCount += value.next()
}
}
}
}
通過 ListCheckpointed接口定義 Operator State
/**
* @title NumberRecordsCount
* @description 實現 ListCheckpoint接口利用Operator State 統計算子輸入數據數量
* @author Mr.Sun
* @version v.1.0
* @date 2019/12/24 10:14
*/
class NumberRecordsCount extends FlatMapFunction[(String, Long), (String, Long)] with ListCheckpointed[Long] {
// 定義算子中接入的 numberRecords 數量
private var numberRecords: Long = 0L
override def flatMap(value: (String, Long), out: Collector[(String, Long)]): Unit = {
// 介入一條計算規則進行統計,並輸出
numberRecords += 1
out.collect(value._1, numberRecords)
}
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = {
Collections.singletonList(numberRecords)
}
override def restoreState(state: util.List[Long]): Unit = {
numberRecords = 0L
for (count <- state) {
// 從恢復狀態中 恢復 numberRecords
numberRecords += count
}
}
}
Checkpoints 和 Savepoints
Checkpoints檢查機制
Flink 中基於異步輕量級的分布式快照技術提供了 Checkpoints 容錯機制,分布式快找可以將同一時間點 Task / Operator 的狀態數據全局統一快照處理,包括前面提到的Keyed State 和 Operator State . Flink 會在輸入的數據集上間隔性的生成checkpoint barrier ,通過柵欄(barrier)將間隔時間段內的數據划分到相應的checkpoint 中,當應用出現異常時,Operator 就能夠從上一次快照中恢復所有算子之前的狀態,從而保證數據的一致性。
舉個栗子:在 KafkaConsumer 算子維護 Offset 狀態,當系統出現問題無法從 Kafka 中消費數據時,可以將 Offset 記錄在狀態中,當系統出現問題,無法從Kafka消費數據時,可以將 Offset 記錄在狀態中,當任務重新恢復時就能夠指定偏移量消費數據。
Checkpoint 過程中狀態數據一般會被保存在一個可配置的環境中,通常在 JobManager節點或者HDFS上。

Checkpoint 開啟和時間間隔指定
開啟檢查點並且指定檢查點時間間隔為 1000ms ,根據實際情況自行選擇,如果狀態比較大,則建議適當增加該值;
environment.enableCheckpointing(1000)
exactly-ance 和 at-least-once 語義
可以選擇 exactly-once 語義保證整個應用內 端到端 的數據一致性,這種情況比較適合數據要求高,不允許出現數據丟失或重復,與此同時,Flink 的性能也相對較弱,而 at-least-once 語義更適合於時延和吞吐要求非常高但對數據一致性要求不高的場景。
environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
Checkpoint 超時時間
超時時間制定了每次Checkpoint 執行過程中的上限時間范圍,一旦 Checkpoint 執行時間超過該閾值,Flink 將會中斷Checkpoint 過程,並按照超時處理。該指標可以通過 setCheckpointTimeout 方法設定,默認 10 分鍾
environment.getCheckpointConfig.setCheckpointTimeout(60000)
檢查點之間最小時間間隔
該參數主要目的是設定兩個Checkpoint 之間最小時間間隔,防止出現例如狀態數據過大導致Checkpoint 執行時間過長,導致 Checkpoint 積壓過多,最終Flink 應用密集地觸發 Checkpoint 操作,會占用大量計算資源而影響到整個應用的性能
environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
最大並行度執行檢查點數量
通過 setMaxCurrentCheckpoint()方法設定能夠最大同時執行的 Checkpoint 數量。在默認情況下只有一個檢查點可以運行,根據用戶指定的數量可以同時觸發多個Checkpoint,進而提升Checkpoint整體的效率.
environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
外部檢查點
設定周期性的外部檢查點,然后將狀態數據持久化到外部系統中,使用這種方式不會在任務正常停止的過程中清理檢查點數據,而是會一直保持在外部系統介質中,另外也可以通過從外部檢查點中對任務進行恢復.
environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
failOnCheckpointingErrors
ailOnCheckpointingErrors 參數決定了當Checkpoint執行過程中如果出現失敗或者錯誤時,任務是否同時被關閉,默認值為True
environment.getCheckpointConfig.setFailOnCheckpointingErrors(false)
// 上述的方式已經被棄用了,使用下面的方式
val number: Int = environment.getCheckpointConfig.getTolerableCheckpointFailureNumber
environment.getCheckpointConfig.setTolerableCheckpointFailureNumber(number)
Savepoints 機制
Savepoints 是檢查點的一種特殊實現,底層實現其實也是使用Checkpoints的機制。Savepoints是用戶以手工命令的方式觸發Checkpoint,並將結果持久化到指定的存儲路徑中,其主要目的是幫助用戶在升級和維護集群過程中保存系統中的狀態數據,避免因為停機運維或者升級應用等正常終止應用的操作而導致系統無法恢復到原有的計算狀態的情況,從而無法實現從端到端的 Excatly-Once 語義保證。
Operator ID 配置
當使用 Savepoints 對整個集群進行升級或運維操作的時候,需要停止整個 Flink 應用程序,此時用戶可能會對應用的代碼邏輯進行修改,即時 Flink 能夠通過 Savepoint 將應用中的狀態數據同步到磁盤然后恢復任務,但由於代碼邏輯發生了變化,在升級過程中有可能導致算子的狀態無法通過 Savepoints 中的數據恢復的情況,在這種情況下就需要通過唯一的 ID 標記算子。在Flink中默認支持自動生成 Operator ID, 但是這種方式不利於對代碼層面的維護和升級,建議用戶盡可能使用手工方式對算子進行唯一 ID 標記, ID 的應用范圍在每個算子內部,具體的使用方式如下:
environment.addSource(new SourceFunction[] {})
.uid("source-id")
.shuffle()
.map(new MapFunction[] {})
.uid("map-id")
.print()
Savepoints 操作
Savepoint 操作可以通過命令行的方式進行觸發,命令行提供了取消任務,從Savepoints中恢復任務,撤銷 Savepoints 等操作,在 Flink1.2 中以后也可以通過FlinkWeb頁面從 Savepoints中恢復應用。
手動觸發 Savepoints
bin/flink savepoint :jobId [:targetDirectory]
bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
取消任務並處觸發Savepoints
bin/flink cancel -s [:targetDirectory] :jobId
通過Savepoints中恢復任務
bin/flink run -s :savepointPath [:runArgs]
釋放Savepoints數據
bin/flink savepoint -d :savepointPath
通過 --dispose (-d) 命令釋放已經存儲的 Savepoint 數據,這樣存儲在指定路徑中的 savepointPath 將會被清理掉
TargetDirectory 配置
TargetDirectory配置
state.savepoints.dir: hdfs:///flink/savepoints
TargetDirectory 文件目錄
# 查看 TargetDirectory 文件目錄
hdfs dfs -ls /flink/flink-savepoints/savepoint-11bbc5-bd967f90709b
狀態管理器
在Flink 中提供了 StateBackend 來存儲和管理 Checkpoints 過程中的狀態數據。
StateBackend 類型
Flink中一共實現了三種類型的狀態管理器,包括基於內存的MemoryStateBackend、基於文件系統的 FsStateBackend , 以及基於 RockDB 作為存儲介質的 RockDBStateBackend .
MemoryStateBackend
基於內存的狀態管理器將狀態數據全部存儲在JVM堆內存中,包括用戶在使用 DataStream API 中創建 Key/Value State,窗口中緩存的狀態數據,以及觸發器等數據基於內存的狀態管理器具有非常快速和高校的特點,但也有非常多的限制,最主要的就是內存的容量限制,一旦存儲的狀態數據過多就會導致系統內存溢出,從而影響整個應用的正常運行。同時如果機器出現問題,整個主機內存中的狀態數據都會丟失,進而無法恢復任務中得玩狀態數據。因此這個玩意,避免使用。
Flink 將MemoryStateBackend 作為默認的狀態后端管理器,也可以通過如下參數配置初始化 MemoryStateBackend , 其中 "MAX_MEN_STATE_SIZE" 指定每個狀態值的內存使用大小。
new MemoryStateBackend(MAX_MEN_STATE_SIZE , false)
在Flink 中 MemoryStateBackend 具有如下特點:
- 聚合類算子的狀態會存儲在 JobManager 內存中,因此對於聚合類算子比較多的應用會對 JobManager 內存有一定的壓力,進而對整個集群會造成較大的負擔
- 創建MemoryStateBackend時可以指定狀態初始化內存大小,但狀態數據傳輸大小會受限於Akka框架通信的“akka.framesize” 大小限制(默認: 10485760 bit -> 1024 * 1024 * 10 )
- JVM內存容量受限於主機內存大小,也就是說不管是 JobManager 內存還是在 TaskManager 的內存中維護狀態數據都有內存的限制,因此對於非常大的狀態數據不適合使用 MemoryStateBackend 去存儲
important MemoryStateBackend 比較適合測試環境,並用於本地調試和驗證,不建議在生產環境中使用。
FsStateBackend
與MemoryStateBackend 有所不同,FsStateBackend 是基於文件系統的一種狀態管理器在,這里的文件系統可以是本地文件系統,也可以是HDFS分布式文件系統。
new FsStateBackend(path , false)
FsStateBackend 的 Boolean 參數類型指定是否以同步的方式記錄狀態數據,默認采用異步方式。異步方式可以盡可能避免在Checkpoint過程中影響流式計算任務
RockDBStateBackend
RockDBStateBackend 是Flink 中內置的第三方狀態管理器,和前面的狀態管理器不同,RocksDBStateBackend 需要單獨引入相關的依賴包到工程中,通過初始化 RockDBStateBackend 類,使可以得到 RockDBStateBackend 實例類。
RocksDBStateBackend 采用異步的方式進行狀態數據的 Snapshot ,任務中的狀態數據首先被寫入 RockDB中,然后再異步的將狀態數據寫入文件系統中,這樣RockDB僅會存儲在正在進行的計算的數據,對於長時間才更新的數據則寫入磁盤中進行存儲,而對於體量比較小的元數據狀態,則存儲在 JobManager 內存中。
與 FsStateBackend 相比,RockDBStateBackend性能更高,主要是因為借助了 RockDB 存儲了最新最熱的數據,然后通過異步的方式在同步到文件系統中。
