flink中的計算分為有狀態計算和無狀態計算,
無狀態計算每次只轉換一條輸入記錄,並且只根據最新的輸入記錄輸出結果。
有狀態計算維護所有已處理的記錄的狀態值,並根據每條新輸入的記錄更新狀態,因此輸出記錄反應的是綜合考慮多個事件之后的結果。
無狀態在這里不在贅述,這里來記錄一下有狀態計算。
flink中的狀態分為算子狀態(operatior state),鍵控狀態(keyed state),狀態后端(state backends)
1.算子狀態的作用范圍限定為算子任務。這意味着由同一並行任務所處理的所有數據都可以訪問到相同的狀態,狀態對於同一任務而言是共享的。算子狀態不能由相同或不同算子的另一個任務訪問。(也就是一個task下面的subtask共享這個狀態)。
Flink 為算子狀態提供三種基本數據結構:
⚫ 列表狀態(List state)
將狀態表示為一組數據的列表。
⚫ 聯合列表狀態(Union list state)
也將狀態表示為數據的列表。它與常規列表狀態的區別在於,在發生故障時,或者從保
存點(savepoint)啟動應用程序時如何恢復。
⚫ 廣播狀態(Broadcast state)
如果一個算子有多項任務,而它的每項任務狀態又都相同,那么這種特殊情況最適合應
用廣播狀態。
鍵控狀態的作用范圍
鍵控狀態是根據輸入數據流中定義的鍵(key)來維護和訪問的。Flink 為每個鍵值維護
一個狀態實例,並將具有相同鍵的所有數據,都分區到同一個算子任務中,這個任務會維護
和處理這個 key 對應的狀態。(一般在keyby之后,keyby算子把經過hash得到的相同的key放到一個分區中,也就是內存中的一塊管道處,讓下游對應的subtask來拉取)
也就是每個key 都持有着對應的一個map結構
Flink 的 Keyed State 支持以下數據類型:
⚫ ValueState<T>保存單個的值,值的類型為 T。
o get 操作: ValueState.value()
o set 操作: ValueState.update(T value)
⚫ ListState<T>保存一個列表,列表里的元素的數據類型為 T。基本操作如下:
o ListState.add(T value)
o ListState.addAll(List<T> values)
o ListState.get()返回 Iterable<T>
o ListState.update(List<T> values)
⚫ MapState<K, V>保存 Key-Value 對。
o MapState.get(UK key)
o MapState.put(UK key, UV value)
o MapState.contains(UK key)
o MapState.remove(UK key)
⚫ ReducingState<T>
⚫ AggregatingState<I, O>
下面來看一段典型的狀態的代碼(reduce方法底層調用的)
open方法初始化拿到歷史狀態(定義一個狀態描述器,然后根據狀態描述器拿到對應Key的狀態,注意這里是不出現數據的key的,只是去管道拿對應的內存塊,也就是對應的map數據結構)
processElement方法更新歷史狀態並存儲起來。
要知道即使是經過hash之后,得到的hashkey的分區,中間也是有不同的數據key 的,
比如 spark,1和hive,1現在被分到了0號分區,那么現在再來一條spark,1 。我們是把它加到之前的spark,1還是hive,1進行累加計數呢。所以在拿到對應hashkey的內存數據結構之后,還要判斷到底是哪個組。現在主要來看一下
IN currentValue = this.values.value();
中的value方法
現在我們就看到了,在調用value方法,實際上在調用一個get方法,之所以沒有在一開始選定key,是因為底層會根據這次新進來的數據拿到對應的key和分區內組號,這樣就能唯一鎖定對應key的歷史狀態了。至於更新歷史狀態update也是同樣的道理,這里就不在贅述。
最后貼幾張圖片很典型的keystate代碼段
valuestate
spark,1
hive,1
spark,1 計算相同key的累加次數
Mapstate
遼寧 梧州 1000
遼寧 松花江 2000
東北 大連 1000
遼寧 梧州 1000 最前面的Key進行keyby分區 ,第二個字段作為map結構的key,拿到累加的第三個字段
LIstState
A LIst(a,b,c,c,c,d)
B list(a,d,s,v,x,c)
A,B是keyby 的關鍵字,后面分別跟對應的行為序列list
狀態的邏輯圖
狀態的物理圖