為什么要管理狀態
有狀態的計算是流處理框架要實現的重要功能,因為稍復雜的流處理場景都需要記錄狀態,然后在新流入數據的基礎上不斷更新狀態。下面的幾個場景都需要使用流處理的狀態功能:
- 數據流中的數據有重復,我們想對重復數據去重,需要記錄哪些數據已經流入過應用,當新數據流入時,根據已流入過的數據來判斷去重。
- 檢查輸入流是否符合某個特定的模式,需要將之前流入的元素以狀態的形式緩存下來。比如,判斷一個溫度傳感器數據流中的溫度是否在持續上升。
- 對一個時間窗口內的數據進行聚合分析,分析一個小時內某項指標的75分位或99分位的數值。
- 在線機器學習場景下,需要根據新流入數據不斷更新機器學習的模型參數。
我們知道,Flink的一個算子有多個子任務,每個子任務分布在不同實例上,我們可以把狀態理解為某個算子子任務在其當前實例上的一個變量,變量記錄了數據流的歷史信息。當新數據流入時,我們可以結合歷史信息來進行計算。實際上,Flink的狀態是由算子的子任務來創建和管理的。一個狀態更新和獲取的流程如下圖所示,一個算子子任務接收輸入流,獲取對應的狀態,根據新的計算結果更新狀態。一個簡單的例子是對一個時間窗口內輸入流的某個整數字段求和,那么當算子子任務接收到新元素時,會獲取已經存儲在狀態中的數值,然后將當前輸入加到狀態上,並將狀態數據更新。
獲取和更新狀態的邏輯其實並不復雜,但流處理框架還需要解決以下幾類問題:
- 數據的產出要保證實時性,延遲不能太高。
- 需要保證數據不丟不重,恰好計算一次,尤其是當狀態數據非常大或者應用出現故障需要恢復時,要保證狀態的計算不出任何錯誤。
- 一般流處理任務都是7*24小時運行的,程序的可靠性非常高。
基於上述要求,我們不能將狀態直接交由內存管理,因為內存的容量是有限制的,當狀態數據稍微大一些時,就會出現內存不夠的問題。假如我們使用一個持久化的備份系統,不斷將內存中的狀態備份起來,當流處理作業出現故障時,需要考慮如何從備份中恢復。而且,大數據應用一般是橫向分布在多個節點上,流處理框架需要保證橫向的伸縮擴展性。可見,狀態的管理並不那么容易。
作為一個計算框架,Flink提供了有狀態的計算,封裝了一些底層的實現,比如狀態的高效存儲、Checkpoint和Savepoint持久化備份機制、計算資源擴縮容等問題。因為Flink接管了這些問題,開發者只需調用Flink API,這樣可以更加專注於業務邏輯。
Flink的幾種狀態類型
Managed State和Raw State
Flink有兩種基本類型的狀態:托管狀態(Managed State)和原生狀態(Raw State)。從名稱中也能讀出兩者的區別:Managed State是由Flink管理的,Flink幫忙存儲、恢復和優化,Raw State是開發者自己管理的,需要自己序列化。
兩者的具體區別有:
- 從狀態管理的方式上來說,Managed State由Flink Runtime托管,狀態是自動存儲、自動恢復的,Flink在存儲管理和持久化上做了一些優化。當我們橫向伸縮,或者說我們修改Flink應用的並行度時,狀態也能自動重新分布到多個並行實例上。Raw State是用戶自定義的狀態。
- 從狀態的數據結構上來說,Managed State支持了一系列常見的數據結構,如ValueState、ListState、MapState等。Raw State只支持字節,任何上層數據結構需要序列化為字節數組。使用時,需要用戶自己序列化,以非常底層的字節數組形式存儲,Flink並不知道存儲的是什么樣的數據結構。
- 從具體使用場景來說,絕大多數的算子都可以通過繼承Rich函數類或其他提供好的接口類,在里面使用Managed State。Raw State是在已有算子和Managed State不夠用時,用戶自定義算子時使用。
下文將重點介紹Managed State。
Keyed State和Operator State
對Managed State繼續細分,它又有兩種類型:Keyed State和Operator State。這里先簡單對比兩種狀態,后續還將展示具體的使用方法。
Keyed State是KeyedStream
上的狀態。假如輸入流按照id為Key進行了keyBy
分組,形成一個KeyedStream
,數據流中所有id為1的數據共享一個狀態,可以訪問和更新這個狀態,以此類推,每個Key對應一個自己的狀態。下圖展示了Keyed State,因為一個算子子任務可以處理一到多個Key,算子子任務1處理了兩種Key,兩種Key分別對應自己的狀態。
Operator State可以用在所有算子上,每個算子子任務或者說每個算子實例共享一個狀態,流入這個算子子任務的數據可以訪問和更新這個狀態。下圖展示了Operator State,算子子任務1上的所有數據可以共享第一個Operator State,以此類推,每個算子子任務上的數據共享自己的狀態。
無論是Keyed State還是Operator State,Flink的狀態都是基於本地的,即每個算子子任務維護着這個算子子任務對應的狀態存儲,算子子任務之間的狀態不能相互訪問。
在之前各算子的介紹中曾提到,為了自定義Flink的算子,我們可以重寫Rich Function接口類,比如RichFlatMapFunction
。使用Keyed State時,我們也可以通過重寫Rich Function接口類,在里面創建和訪問狀態。對於Operator State,我們還需進一步實現CheckpointedFunction
接口。
上表總結了Keyed State和Operator State的區別。
橫向擴展問題
狀態的橫向擴展問題主要是指修改Flink應用的並行度,確切的說,每個算子的並行實例數或算子子任務數發生了變化,應用需要關停或啟動一些算子子任務,某份在原來某個算子子任務上的狀態數據需要平滑更新到新的算子子任務上。其實,Flink的Checkpoint就是一個非常好的在各算子間遷移狀態數據的機制。算子的本地狀態將數據生成快照(snapshot),保存到分布式存儲(如HDFS)上。橫向伸縮后,算子子任務個數變化,子任務重啟,相應的狀態從分布式存儲上重建(restore)。
對於Keyed State和Operator State這兩種狀態,他們的橫向伸縮機制不太相同。由於每個Keyed State總是與某個Key相對應,當橫向伸縮時,Key總會被自動分配到某個算子子任務上,因此Keyed State會自動在多個並行子任務之間遷移。對於一個非KeyedStream
,流入算子子任務的數據可能會隨着並行度的改變而改變。如上圖所示,假如一個應用的並行度原來為2,那么數據會被分成兩份並行地流入兩個算子子任務,每個算子子任務有一份自己的狀態,當並行度改為3時,數據流被拆成3支,或者並行度改為1,數據流合並為1支,此時狀態的存儲也相應發生了變化。對於橫向伸縮問題,Operator State有兩種狀態分配方式:一種是均勻分配,另一種是將所有狀態合並,再分發給每個實例上。
Keyed State的使用方法
對於Keyed State,Flink提供了幾種現成的數據結構供我們使用,包括ValueState
、ListState
等,他們的繼承關系如下圖所示。首先,State
主要有三種實現,分別為ValueState
、MapState
和AppendingState
,AppendingState
又可以細分為ListState
、ReducingState
和AggregatingState
。
這幾個狀態的具體區別在於:
ValueState[T]
是單一變量的狀態,T是某種具體的數據類型,比如Double
、String
,或我們自己定義的復雜數據結構。我們可以使用value()
方法獲取狀態,使用update(value: T)
更新狀態。MapState[K, V]
存儲一個Key-Value map,其功能與Java的Map
幾乎相同。get(key: K)
可以獲取某個key下的value,put(key: K, value: V)
可以對某個key設置value,contains(key: K)
判斷某個key是否存在,remove(key: K)
刪除某個key以及對應的value,entries(): java.lang.Iterable[java.util.Map.Entry[K, V]]
返回MapState
中所有的元素,iterator(): java.util.Iterator[java.util.Map.Entry[K, V]]
返回一個迭代器。需要注意的是,MapState
中的key和Keyed State的key不是同一個key。ListState[T]
存儲了一個由T類型數據組成的列表。我們可以使用add(value: T)
或addAll(values: java.util.List[T])
向狀態中添加元素,使用get(): java.lang.Iterable[T]
獲取整個列表,使用update(values: java.util.List[T])
來更新列表,新的列表將替換舊的列表。ReducingState[T]
和AggregatingState[IN, OUT]
與ListState[T]
同屬於MergingState[T]
。與ListState[T]
不同的是,ReducingState[T]
只有一個元素,而不是一個列表。它的原理是新元素通過add(value: T)
加入后,與已有的狀態元素使用ReduceFunction
合並為一個元素,並更新到狀態里。AggregatingState[IN, OUT]
與ReducingState[T]
類似,也只有一個元素,只不過AggregatingState[IN, OUT]
的輸入和輸出類型可以不一樣。ReducingState[T]
和AggregatingState[IN, OUT]
與窗口上進行ReduceFunction
和AggregateFunction
很像,都是將新元素與已有元素做聚合。
注意,Flink的核心代碼目前使用Java實現的,而Java的很多類型與Scala的類型不太相同,比如List
和Map
。這里不再詳細解釋Java和Scala的數據類型的異同,但是開發者在使用Scala調用這些接口,比如狀態的接口,需要注意將Java的類型轉為Scala的類型。對於List
和Map
的轉換,只需要需要引用import scala.collection.JavaConversions._
,並在必要的地方添加后綴asScala
或asJava
來進行轉換。此外,Scala和Java的空對象使用習慣不太相同,Java一般使用null
表示空,Scala一般使用None
。
之前的文章中其實已經多次使用過狀態,這里再次使用電商用戶行為分析來演示如何使用狀態。我們知道電商平台會將用戶與商品的交互行為收集記錄下來,行為數據主要包括幾個字段:userId、itemId、categoryId、behavior和timestamp。其中userId和itemId分別代表用戶和商品的唯一ID,categoryId為商品類目ID,behavior表示用戶的行為類型,包括點擊(pv)、購買(buy)、加購物車(cart)、喜歡(fav)等,timestamp記錄行為發生時間。本文采用阿里巴巴提供的一個淘寶用戶行為數據集,為了精簡需要,只節選了部分數據。下面的代碼使用MapState[String, Int]
記錄某個用戶某種行為出現的次數。這里讀取了數據集文件,模擬了一個淘寶用戶行為數據流。
/** * 用戶行為 * categoryId為商品類目ID * behavior包括點擊(pv)、購買(buy)、加購物車(cart)、喜歡(fav) * */ case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long) class MapStateFunction extends RichFlatMapFunction[UserBehavior, (Long, String, Int)] { // 指向MapState的句柄 private var behaviorMapState: MapState[String, Int] = _ override def open(parameters: Configuration): Unit = { // 創建StateDescriptor val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int]) // 通過StateDescriptor獲取運行時上下文中的狀態 behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor) } override def flatMap(input: UserBehavior, collector: Collector[(Long, String, Int)]): Unit = { var behaviorCnt = 1 // behavior有可能為pv、cart、fav、buy等 // 判斷狀態中是否有該behavior if (behaviorMapState.contains(input.behavior)) { behaviorCnt = behaviorMapState.get(input.behavior) + 1 } // 更新狀態 behaviorMapState.put(input.behavior, behaviorCnt) collector.collect((input.userId, input.behavior, behaviorCnt)) } } def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(8) // 獲取數據源 val sourceStream: DataStream[UserBehavior] = env .addSource(new UserBehaviorSource("state/UserBehavior-50.csv")).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[UserBehavior]() { override def extractAscendingTimestamp(userBehavior: