狀態管理
之前我們提到過大多數流應用是有狀態的。很多operators會不斷的訪問並更新某中狀態,例如一個window中收集了多少條記錄,輸入源中當前讀到的位置,亦或是用戶定義的特定operators的狀態。無論是內置的operator還是用戶定義的operators,Flink對待它們都是一致的。在這章我們會討論Flink 支持的不同的狀態類型、state是如何被存儲並由state backends管理的,以及有狀態的應用如何通過重新分發state而進行擴展。
一般來說,所有數據都由一個task維護,並被用於計算一個函數的結果,這個函數包含於此task的state。可以認為state是一個本地變量或是一個實例變量,可以由task的業務邏輯訪問。下圖展示了一個task與它的state的常規交互過程:
一個task接收一些輸入數據。當處理數據時,task會訪問state,並根據輸入數據和state的信息更新它的state。一個簡單的例子如:一個task持續計算迄今它接收到了多少條記錄。當task接收到一個新的記錄時,它會訪問state獲取當前的count數,增加count,更新state,並釋放新的count作為結果輸出。
Application讀寫state的邏輯一般較為直接並易於理解,然而高效、可靠的管理state更具有挑戰性。它包括:處理超大的state(可能會超出內存),並確保在出現故障時state不會丟失等。Flink會處理所有關於state一致性、故障處理、高效存儲並訪問等問題,開發者僅需關注在他們的應用邏輯即可。
在Flink中,state一定是與一個特定的operator關聯的。為了讓Flink的runtime可以意識到一個operator的state,operator需要注冊它的state。在Flink中有兩種類型的state:operator state和keyed state。下面我們對它們做詳細介紹。
Operator State
Operator state 被限定到一個operator task中,這個意思是:各個並行的task都有它自己的state,Operator state無法被其他task(無論是同一個還是不同的operator的task)訪問。下圖是tasks如何訪問operator state:
Flink為operator state提供了三種原型:
List state
· 以list的方式表示state
Union list state
· 同樣以list的方式表示state。但是它與常規list state的不同點在於:發生故障時恢復的方式、或一個application從檢查點開始的方式。
Broadcast state
· 被用於特殊場景,當一個operator的每個task的state都是相同時。這個屬性可以被用於檢查點,或是rescaling 一個 operator時。
Keyed State
Keyed state 的維護與訪問是根據對應記錄中的key決定的。Flink對每個key value 維護了一個state 實例,並將所有同樣key的記錄,分區到維護這些key的state的operator task中。當一個task處理一條記錄時,它會自動歸類當前record的key所要訪問的state。最終,所有具有相同key的records會訪問同一個state。下圖展示了tasks與keyed state 的交互:
可以將keyed state看做是:對一個operator所有並行tasks上的所有key做分區后的key-value map。Flink為 keyed state 提供了不同的原語,用於決定在分布式的key-value map中,每個key里存儲的value類型。
Value state
· 為每個key存一個單值(可以是任意類型)。復雜的數據結構也可以作為value state 存儲
List state
· 為每個key存一個列表值。這個列表可以是任意類型
Map state
· 為每個key存一個key-value 映射。映射中的key和value可以是任意類型
State 原語為Flink提供了state的結構,並可以更高效的對state做訪問。
狀態后端(State Backends)
在有狀態的operator中,它的task在每接受到一條記錄時,一般都會訪問、並更新state。因為高效地訪問state 對於低延時處理records至關重要,所以每個並行的task都會在本地維護它的state,以確保快速訪問state。State是如何准確的存儲、訪問、以及維護是由一個可插拔的組件決定的,這個組件成為狀態后端(State backend)。一個state backend負責兩件事:本地state管理,以及為state做檢查點並存儲到外部地址。
對於本地state 管理,state backend存儲所有keyed state,並確保所有對keyed state的訪問都符合當前key的條件。Flink提供的了state backend 將keyed state作為對象存儲管理,並將它存儲在JVM的堆內存中。另一個state backend 將state 對象序列化,並放入RocksDB中。RocksDB會將它們寫入本地磁盤。第一個state backend 提供了快速訪問state的選擇,但是它會受到內存大小的限制。訪問由RocksDB state backend存儲的state會相對較慢,但是state可以增長到非常大。
對state做檢查點非常重要,因為Flink是一個分布式系統,並且state僅僅是本地維護的。一個TaskManager進程(包括里面所有運行的task)可能會在任何時候出現故障。所以它的存儲必須被認為是不穩定的。一個state backend 會對一個task的state做檢查點,並存儲到遠端的持久性存儲中。存儲檢查點的遠端存儲可以是一個分布式文件系統,或是一個數據庫系統。不同的state backend會有不同的為state做檢查點的方式。例如,RocksDB state backend 支持增量檢查點,此方法可以大量減少對超大state做檢查點時的開銷。
擴展有狀態的operators
對於流處理程序來說,一個常見的需求是:根據輸入數據的速率,調節operators的並行度。對於無狀態的operators 來說,擴展是很簡單的。但是對於有狀態的operators,會更具挑戰性,因為他們的state需要被重新分區,並分配給更多或是更少的並行tasks。Flink支持四種模式,用於擴展不同類型的state。
對於keyed state的operators,擴展的實現方式是將keys重新分區到更少或是更多的tasks中。然而,為了提高tasks之間傳遞state的效率,Flink不會重新分布keys。它會將keys組織在一個或多個key groups中。一個key group不僅是keys的一個分區,也是Flink分配keys給tasks的方式。下圖顯示了keyed state 是如何在key groups中重新分區的:
在擴展state為list state 的operators時,列表里的條目會被重新分配。概念上,所有並行tasks的列表里的條目被收集並均分的重新分布到更少或是更多的tasks中。如果列表條目數小於operator的新並行數,則一些task會從空state開始。下圖顯示了operator list state的重新分布:
在擴展state 為union list state的operator時,列表中所有的state條目會被廣播到每個task。Task之后可以選擇使用哪些條目,丟棄哪些條目。下圖顯示了operator union list state 是如何重新分布的:
在擴展state為broadcast state 的operator時,state會被復制到新的task中。這里適用於這個操作是因為:廣播state可以確保所有task有相同的state。在縮容時,多余的tasks會被簡單地取消,因為state已經被復制了並且不會被丟失。下圖顯示的是operator broadcast state 的重新分布:
References:
Vasiliki Kalavri, Fabian Hueske. Stream Processing With Apache Flink. 2019