Flink狀態管理詳解:Keyed State和Operator List State深度解析


為什么要管理狀態

有狀態的計算是流處理框架要實現的重要功能,因為稍復雜的流處理場景都需要記錄狀態,然后在新流入數據的基礎上不斷更新狀態。下面的幾個場景都需要使用流處理的狀態功能:

  • 數據流中的數據有重復,我們想對重復數據去重,需要記錄哪些數據已經流入過應用,當新數據流入時,根據已流入過的數據來判斷去重。
  • 檢查輸入流是否符合某個特定的模式,需要將之前流入的元素以狀態的形式緩存下來。比如,判斷一個溫度傳感器數據流中的溫度是否在持續上升。
  • 對一個時間窗口內的數據進行聚合分析,分析一個小時內某項指標的75分位或99分位的數值。
  • 在線機器學習場景下,需要根據新流入數據不斷更新機器學習的模型參數。

我們知道,Flink的一個算子有多個子任務,每個子任務分布在不同實例上,我們可以把狀態理解為某個算子子任務在其當前實例上的一個變量,變量記錄了數據流的歷史信息。當新數據流入時,我們可以結合歷史信息來進行計算。實際上,Flink的狀態是由算子的子任務來創建和管理的。一個狀態更新和獲取的流程如下圖所示,一個算子子任務接收輸入流,獲取對應的狀態,根據新的計算結果更新狀態。一個簡單的例子是對一個時間窗口內輸入流的某個整數字段求和,那么當算子子任務接收到新元素時,會獲取已經存儲在狀態中的數值,然后將當前輸入加到狀態上,並將狀態數據更新。

獲取和更新狀態的邏輯其實並不復雜,但流處理框架還需要解決以下幾類問題:

  • 數據的產出要保證實時性,延遲不能太高。
  • 需要保證數據不丟不重,恰好計算一次,尤其是當狀態數據非常大或者應用出現故障需要恢復時,要保證狀態的計算不出任何錯誤。
  • 一般流處理任務都是7*24小時運行的,程序的可靠性非常高。

基於上述要求,我們不能將狀態直接交由內存管理,因為內存的容量是有限制的,當狀態數據稍微大一些時,就會出現內存不夠的問題。假如我們使用一個持久化的備份系統,不斷將內存中的狀態備份起來,當流處理作業出現故障時,需要考慮如何從備份中恢復。而且,大數據應用一般是橫向分布在多個節點上,流處理框架需要保證橫向的伸縮擴展性。可見,狀態的管理並不那么容易。

作為一個計算框架,Flink提供了有狀態的計算,封裝了一些底層的實現,比如狀態的高效存儲、Checkpoint和Savepoint持久化備份機制、計算資源擴縮容等問題。因為Flink接管了這些問題,開發者只需調用Flink API,這樣可以更加專注於業務邏輯。

Flink的幾種狀態類型

Managed State和Raw State

Flink有兩種基本類型的狀態:托管狀態(Managed State)和原生狀態(Raw State)。從名稱中也能讀出兩者的區別:Managed State是由Flink管理的,Flink幫忙存儲、恢復和優化,Raw State是開發者自己管理的,需要自己序列化。

兩者的具體區別有:

  • 從狀態管理的方式上來說,Managed State由Flink Runtime托管,狀態是自動存儲、自動恢復的,Flink在存儲管理和持久化上做了一些優化。當我們橫向伸縮,或者說我們修改Flink應用的並行度時,狀態也能自動重新分布到多個並行實例上。Raw State是用戶自定義的狀態。
  • 從狀態的數據結構上來說,Managed State支持了一系列常見的數據結構,如ValueState、ListState、MapState等。Raw State只支持字節,任何上層數據結構需要序列化為字節數組。使用時,需要用戶自己序列化,以非常底層的字節數組形式存儲,Flink並不知道存儲的是什么樣的數據結構。
  • 從具體使用場景來說,絕大多數的算子都可以通過繼承Rich函數類或其他提供好的接口類,在里面使用Managed State。Raw State是在已有算子和Managed State不夠用時,用戶自定義算子時使用。

下文將重點介紹Managed State。

Keyed State和Operator State

對Managed State繼續細分,它又有兩種類型:Keyed State和Operator State。這里先簡單對比兩種狀態,后續還將展示具體的使用方法。

Keyed State是KeyedStream上的狀態。假如輸入流按照id為Key進行了keyBy分組,形成一個KeyedStream,數據流中所有id為1的數據共享一個狀態,可以訪問和更新這個狀態,以此類推,每個Key對應一個自己的狀態。下圖展示了Keyed State,因為一個算子子任務可以處理一到多個Key,算子子任務1處理了兩種Key,兩種Key分別對應自己的狀態。

Operator State可以用在所有算子上,每個算子子任務或者說每個算子實例共享一個狀態,流入這個算子子任務的數據可以訪問和更新這個狀態。下圖展示了Operator State,算子子任務1上的所有數據可以共享第一個Operator State,以此類推,每個算子子任務上的數據共享自己的狀態。

無論是Keyed State還是Operator State,Flink的狀態都是基於本地的,即每個算子子任務維護着這個算子子任務對應的狀態存儲,算子子任務之間的狀態不能相互訪問。

在之前各算子的介紹中曾提到,為了自定義Flink的算子,我們可以重寫Rich Function接口類,比如RichFlatMapFunction。使用Keyed State時,我們也可以通過重寫Rich Function接口類,在里面創建和訪問狀態。對於Operator State,我們還需進一步實現CheckpointedFunction接口。

上表總結了Keyed State和Operator State的區別。

橫向擴展問題

狀態的橫向擴展問題主要是指修改Flink應用的並行度,確切的說,每個算子的並行實例數或算子子任務數發生了變化,應用需要關停或啟動一些算子子任務,某份在原來某個算子子任務上的狀態數據需要平滑更新到新的算子子任務上。其實,Flink的Checkpoint就是一個非常好的在各算子間遷移狀態數據的機制。算子的本地狀態將數據生成快照(snapshot),保存到分布式存儲(如HDFS)上。橫向伸縮后,算子子任務個數變化,子任務重啟,相應的狀態從分布式存儲上重建(restore)。

對於Keyed State和Operator State這兩種狀態,他們的橫向伸縮機制不太相同。由於每個Keyed State總是與某個Key相對應,當橫向伸縮時,Key總會被自動分配到某個算子子任務上,因此Keyed State會自動在多個並行子任務之間遷移。對於一個非KeyedStream,流入算子子任務的數據可能會隨着並行度的改變而改變。如上圖所示,假如一個應用的並行度原來為2,那么數據會被分成兩份並行地流入兩個算子子任務,每個算子子任務有一份自己的狀態,當並行度改為3時,數據流被拆成3支,或者並行度改為1,數據流合並為1支,此時狀態的存儲也相應發生了變化。對於橫向伸縮問題,Operator State有兩種狀態分配方式:一種是均勻分配,另一種是將所有狀態合並,再分發給每個實例上。

Keyed State的使用方法

對於Keyed State,Flink提供了幾種現成的數據結構供我們使用,包括ValueStateListState等,他們的繼承關系如下圖所示。首先,State主要有三種實現,分別為ValueStateMapStateAppendingStateAppendingState又可以細分為ListStateReducingStateAggregatingState

這幾個狀態的具體區別在於:

  • ValueState[T]是單一變量的狀態,T是某種具體的數據類型,比如DoubleString,或我們自己定義的復雜數據結構。我們可以使用value()方法獲取狀態,使用update(value: T)更新狀態。
  • MapState[K, V]存儲一個Key-Value map,其功能與Java的Map幾乎相同。get(key: K)可以獲取某個key下的value,put(key: K, value: V)可以對某個key設置value,contains(key: K)判斷某個key是否存在,remove(key: K)刪除某個key以及對應的value,entries(): java.lang.Iterable[java.util.Map.Entry[K, V]]返回MapState中所有的元素,iterator(): java.util.Iterator[java.util.Map.Entry[K, V]]返回一個迭代器。需要注意的是,MapState中的key和Keyed State的key不是同一個key。
  • ListState[T]存儲了一個由T類型數據組成的列表。我們可以使用add(value: T)addAll(values: java.util.List[T])向狀態中添加元素,使用get(): java.lang.Iterable[T]獲取整個列表,使用update(values: java.util.List[T])來更新列表,新的列表將替換舊的列表。
  • ReducingState[T]AggregatingState[IN, OUT]ListState[T]同屬於MergingState[T]。與ListState[T]不同的是,ReducingState[T]只有一個元素,而不是一個列表。它的原理是新元素通過add(value: T)加入后,與已有的狀態元素使用ReduceFunction合並為一個元素,並更新到狀態里。AggregatingState[IN, OUT]ReducingState[T]類似,也只有一個元素,只不過AggregatingState[IN, OUT]的輸入和輸出類型可以不一樣。ReducingState[T]AggregatingState[IN, OUT]與窗口上進行ReduceFunctionAggregateFunction很像,都是將新元素與已有元素做聚合。

注意,Flink的核心代碼目前使用Java實現的,而Java的很多類型與Scala的類型不太相同,比如ListMap。這里不再詳細解釋Java和Scala的數據類型的異同,但是開發者在使用Scala調用這些接口,比如狀態的接口,需要注意將Java的類型轉為Scala的類型。對於ListMap的轉換,只需要需要引用import scala.collection.JavaConversions._,並在必要的地方添加后綴asScalaasJava來進行轉換。此外,Scala和Java的空對象使用習慣不太相同,Java一般使用null表示空,Scala一般使用None

之前的文章中其實已經多次使用過狀態,這里再次使用電商用戶行為分析來演示如何使用狀態。我們知道電商平台會將用戶與商品的交互行為收集記錄下來,行為數據主要包括幾個字段:userId、itemId、categoryId、behavior和timestamp。其中userId和itemId分別代表用戶和商品的唯一ID,categoryId為商品類目ID,behavior表示用戶的行為類型,包括點擊(pv)、購買(buy)、加購物車(cart)、喜歡(fav)等,timestamp記錄行為發生時間。本文采用阿里巴巴提供的一個淘寶用戶行為數據集,為了精簡需要,只節選了部分數據。下面的代碼使用MapState[String, Int]記錄某個用戶某種行為出現的次數。這里讀取了數據集文件,模擬了一個淘寶用戶行為數據流。

/**  * 用戶行為  * categoryId為商品類目ID  * behavior包括點擊(pv)、購買(buy)、加購物車(cart)、喜歡(fav)  * */ case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long) class MapStateFunction extends RichFlatMapFunction[UserBehavior, (Long, String, Int)] { // 指向MapState的句柄  private var behaviorMapState: MapState[String, Int] = _ override def open(parameters: Configuration): Unit = { // 創建StateDescriptor  val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int]) // 通過StateDescriptor獲取運行時上下文中的狀態  behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor) } override def flatMap(input: UserBehavior, collector: Collector[(Long, String, Int)]): Unit = { var behaviorCnt = 1 // behavior有可能為pv、cart、fav、buy等  // 判斷狀態中是否有該behavior  if (behaviorMapState.contains(input.behavior)) { behaviorCnt = behaviorMapState.get(input.behavior) + 1 } // 更新狀態  behaviorMapState.put(input.behavior, behaviorCnt) collector.collect((input.userId, input.behavior, behaviorCnt)) } } def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(8) // 獲取數據源  val sourceStream: DataStream[UserBehavior] = env .addSource(new UserBehaviorSource("state/UserBehavior-50.csv")).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[UserBehavior]() { override def extractAscendingTimestamp(userBehavior: UserBehavior): Long = { // 原始數據單位為秒,乘以1000轉換成毫秒  userBehavior.timestamp * 1000 } } ) // 生成一個KeyedStream  val keyedStream = sourceStream.keyBy(user => user.userId) // 在KeyedStream上進行flatMap  val behaviorCountStream = keyedStream.flatMap(new MapStateFunction) behaviorCountStream.print() env.execute("state example") } class UserBehaviorSource(path: String) extends RichSourceFunction[UserBehavior] { var isRunning: Boolean = true // 輸入源  var streamSource: InputStream = _ override def run(sourceContext: SourceContext[UserBehavior]): Unit = { // 從項目的resources目錄獲取輸入  streamSource = MapStateExample.getClass.getClassLoader.getResourceAsStream(path) val lines: Iterator[String] = scala.io.Source.fromInputStream(streamSource).getLines while (isRunning && lines.hasNext) { val line = lines.next() val itemStrArr = line.split(",") val userBehavior = UserBehavior(itemStrArr(0).toLong, itemStrArr(1).toLong, itemStrArr(2).toInt, itemStrArr(3), itemStrArr(4).toLong) sourceContext.collect(userBehavior) } } override def cancel(): Unit = { streamSource.close() isRunning = false } }

Keyed State是針對KeyedStream的狀態,必須先對一個DataStream進行keyBy操作。在本例中,我們對用戶ID進行了keyBy,那么用戶ID為1的行為數據共享同一狀態數據,以此類推,每個用戶ID的行為數據共享自己的狀態數據。之后,我們需要實現Rich類函數,比如RichFlatMapFunction,或者KeyedProcessFunction等函數類。這些算子函數類都是RichFunction的一種實現,他們都有運行時上下文RuntimeContextRuntimeContext包含了狀態數據。 在實現這些算子函數類時,一般是在open方法中聲明狀態。open是算子的初始化方法,它在實際處理函數之前調用。具體到狀態的使用,我們首先要注冊一個StateDescriptor。從名字中可以看出,StateDescriptor是狀態的一種描述,它描述了狀態的名字和狀態的數據結構。狀態的名字可以用來區分不同的狀態,一個算子內可以有多個不同的狀態,每個狀態的StateDescriptor需要設置不同的名字。同時,我們也需要指定狀態的具體數據結構,指定具體的數據結構非常重要,因為Flink要對其進行序列化和反序列化,以便進行Checkpoint和必要的恢復。數據結構的類型和序列化機制可以參考我之前的文章:Flink進階教程:數據類型和序列化機制簡介。在本例中,我們使用val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int])注冊了一個MapStateStateDescriptor,key為某種行為,如pv、buy等,數據類型為String,value為該行為出現的次數,數據類型為Int。此外,每種類型的狀態都有對應的StateDescriptor,比如MapStateDescriptor對應MapStateValueStateDescriptor對應ValueState

接着我們通過StateDescriptorRuntimeContext中獲取狀態句柄。本例中對應的代碼為:behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor)。狀態句柄並不存儲狀態,它只是Flink提供的一種訪問狀態的接口,狀態數據實際存儲在State Backend中。

使用和更新狀態發生在實際的處理函數上,比如RichFlatMapFunction中的flatMap方法,在實現自己的業務邏輯時訪問和修改狀態,比如通過get方法獲取狀態。

其他類型的狀態使用方法與本例中所展示的大致相同。ReducingStateAggregatingState在注冊StateDescriptor時,還需要實現一個ReduceFunctionAggregationFunction。下面的代碼注冊ReducingStateDescriptor時實現一個YourReduceFunctionYourReduceFunction實現了ReduceFunction。我們在ReducingState中使用add(in: T)方法向狀態里增加一個元素,新元素和狀態中已有數據通過ReduceFunction兩兩聚合。AggregatingState的使用方法與之類似。

val reducingStateDescriptor = new ReducingStateDescriptor[UserBehavior]("reducing", new YourReduceFunction, classOf[UserBehavior])

必要時候,我們還需要調用Keyed State中的clear()方法來清除一個Keyed State。

Operator List State的使用方法

狀態從本質上來說,是Flink算子子任務的一種本地數據,為了保證數據可恢復性,使用Checkpoint機制來將狀態數據持久化輸出到存儲空間上。狀態相關的主要邏輯有兩項:一、將算子子任務本地內存數據在Checkpoint時snapshot寫入存儲;二、初始化或重啟應用時,以一定的邏輯從存儲中讀出並變為算子子任務的本地內存數據。Keyed State對這兩項內容做了更完善的封裝,開發者可以開箱即用。對於Operator State來說,每個算子子任務管理自己的Operator State,或者說每個算子子任務上的數據流共享同一個狀態,可以訪問和修改該狀態。Flink的算子子任務上的數據在程序重啟、橫向伸縮等場景下不能保證百分百的一致性。換句話說,重啟Flink應用后,某個數據流元素不一定會和上次一樣,還能流入該算子子任務上。因此,我們需要根據自己的業務場景來設計snapshot和restore的邏輯。為了實現這兩個步驟,Flink提供了最為基礎的CheckpointedFunction接口類。

public interface CheckpointedFunction { // Checkpoint時會調用這個方法,我們要實現具體的snapshot邏輯,比如將哪些本地狀態持久化  void snapshotState(FunctionSnapshotContext context) throws Exception; // 初始化時會調用這個方法,向本地狀態中填充數據  void initializeState(FunctionInitializationContext context) throws Exception; }

在Flink的Checkpoint機制下,當一次snapshot觸發后,snapshotState會被調用,將本地狀態持久化到存儲空間上。這里我們可以先不用關心snapshot是如何被觸發的,暫時理解成snapshot是自動觸發的,后續文章會介紹Flink的Checkpoint機制。initializeState在算子子任務初始化時被調用,初始化包括兩種場景:一、整個Flink作業第一次執行,狀態數據被初始化為一個默認值;二、Flink作業重啟,之前的作業已經將狀態輸出到存儲,通過這個方法將存儲上的狀態讀出並填充到這個本地狀態中。

目前Operator State主要有三種,其中ListState和UnionListState在數據結構上都是一種ListState,還有一種BroadcastState。這里我們主要介紹ListState這種列表形式的狀態。這種狀態以一個列表的形式序列化並存儲,以適應橫向擴展時狀態重分布的問題。每個算子子任務有零到多個狀態S,組成一個列表ListState[S]。各個算子子任務將自己狀態列表的snapshot到存儲,整個狀態邏輯上可以理解成是將這些列表連接到一起,組成了一個包含所有狀態的大列表。當作業重啟或橫向擴展時,我們需要將這個包含所有狀態的列表重新分布到各個算子子任務上。ListState和UnionListState的區別在於:ListState是將整個狀態列表按照round-ribon的模式均勻分布到各個算子子任務上,每個算子子任務得到的是整個列表的子集;UnionListState按照廣播的模式,將整個列表發送給每個算子子任務。

Operator State的實際應用場景不如Keyed State多,它經常被用在Source或Sink等算子上,用來保存流入數據的偏移量或對輸出數據做緩存,以保證Flink應用的Exactly-Once語義。這里我們來看一個Flink官方提供的Sink案例以了解CheckpointedFunction的工作原理。

// BufferingSink需要繼承SinkFunction以實現其Sink功能,同時也要繼承CheckpointedFunction接口類 class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)] with CheckpointedFunction { // Operator List State句柄  @transient private var checkpointedState: ListState[(String, Int)] = _ // 本地緩存  private val bufferedElements = ListBuffer[(String, Int)]() // Sink的核心處理邏輯,將上游數據value輸出到外部系統  override def invoke(value: (String, Int), context: Context): Unit = { // 先將上游數據緩存到本地的緩存  bufferedElements += value // 當本地緩存大小到達閾值時,將本地緩存輸出到外部系統  if (bufferedElements.size == threshold) { for (element <- bufferedElements) { // send it to the sink  } // 清空本地緩存  bufferedElements.clear() } } // 重寫CheckpointedFunction中的snapshotState  // 將本地緩存snapshot保存到存儲上  override def snapshotState(context: FunctionSnapshotContext): Unit = { // 將之前的Checkpoint清理  checkpointedState.clear() // 將最新的數據寫到狀態中  for (element <- bufferedElements) { checkpointedState.add(element) } } // 重寫CheckpointedFunction中的initializeState  // 初始化狀態  override def initializeState(context: FunctionInitializationContext): Unit = { // 注冊ListStateDescriptor  val descriptor = new ListStateDescriptor[(String, Int)]( "buffered-elements", TypeInformation.of(new TypeHint[(String, Int)]() {}) ) // 從FunctionInitializationContext中獲取OperatorStateStore,進而獲取ListState  checkpointedState = context.getOperatorStateStore.getListState(descriptor) // 如果是作業重啟,讀取存儲中的狀態數據並填充到本地緩存中  if(context.isRestored) { for(element <- checkpointedState.get()) { bufferedElements += element } } } }

上面的代碼在輸出到Sink之前,先將數據放在本地緩存中,並定期進行snapshot,這實現了批量輸出的功能,批量輸出能夠減少網絡等開銷。同時,程序能夠保證數據一定會輸出外部系統,因為即使程序崩潰,狀態中存儲着還未輸出的數據,下次啟動后還會將這些未輸出數據讀取到內存,繼續輸出到外部系統。

注冊和使用Operator State的代碼和Keyed State相似,也是先注冊一個StateDescriptor,並指定狀態名字和數據類型,然后從FunctionInitializationContext中獲取OperatorStateStore,進而獲取ListState。如果是UnionListState,那么代碼改為:context.getOperatorStateStore.getUnionListState

val descriptor = new ListStateDescriptor[(String, Long)]( "buffered-elements", TypeInformation.of(new TypeHint[(String, Long)]() {}) ) checkpointedState = context.getOperatorStateStore.getListState(descriptor)

狀態的初始化邏輯中,我們用context.isRestored來判斷是否為作業重啟,這樣可以從之前的Checkpoint中恢復並寫到本地緩存中。

注意,CheckpointedFunction接口類的initializeState方法的參數為FunctionInitializationContext,基於這個上下文參數我們不僅可以通過getOperatorStateStore獲取OperatorStateStore,也可以通過getKeyedStateStore來獲取KeyedStateStore,進而通過getStategetMapState等方法獲取Keyed State,比如:context.getKeyedStateStore().getState(valueDescriptor)。這與在Rich函數類中使用Keyed State的方式並不矛盾。CheckpointedFunction是Flink有狀態計算的最底層接口,它提供了最豐富的狀態接口。

ListCheckpointed接口類是CheckpointedFunction接口類的一種簡寫,ListCheckpointed提供的功能有限,只支持均勻分布的ListState,不支持全量廣播的UnionListState。

public interface ListCheckpointed<T extends Serializable> { // Checkpoint時會調用這個方法,我們要實現具體的snapshot邏輯,比如將哪些本地狀態持久化  List<T> snapshotState(long checkpointId, long timestamp) throws Exception; // 從上次Checkpoint中恢復數據到本地內存  void restoreState(List<T> state) throws Exception; }

CheckpointedFunction中的snapshotState方法一樣,這里的snapshotState也是在做備份,但這里的參數列表更加精簡,其中checkpointId是一個單調遞增的數字,用來表示某次Checkpoint,timestamp是Checkpoint發生的實際時間,這個方法以列表形式返回需要寫入存儲的狀態。restoreState方法用來初始化狀態,包括作業第一次啟動或者作業失敗重啟。參數是一個列表形式的狀態,是均勻分布給這個算子子任務的狀態數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM