3. 數據流操作
流處理引擎一般會提供一組內置的操作,用於對流做消費、轉換,以及輸出。接下來我們介紹一下最常見的流操作。
操作分為無狀態的(stateless)與有狀態的(stateful)。無狀態的操作不包含任何內部狀態。也就是說,處理此event時,並不需要任何其他歷史event的信息,也不需要保存它自己的信息。無狀態的操作易於並行,因為events可以以它們到達的順序,相互獨立的被處理。在出現錯誤時,無狀態operator可以被簡單的重新執行,從它丟失數據的點開始繼續執行即可。
有狀態的operator可能會維護之前它處理過的events的信息。它的狀態信息(state)會根據到達的events進行更新,並且被用於處理之后events的邏輯。有狀態的流處理應用對於並行來說會更為復雜,並且需要以容錯的方式運行。因為它的狀態需要高效的分區(partitioned),並能夠在發生錯誤后得到穩定的恢復。
數據消費與輸出
數據消費與輸出的操作使得流處理器可以與外部系統進行通信。數據消費是指:從外部數據源獲取raw data,然后轉換成適用於處理的格式。實現了數據消費邏輯的operator稱為data sources,它可以消費如 TCP socket 的數據、文件、Kafka topic 的數據等。數據輸出(egress)是生成output的操作,它將數據以適合外部系統處理的格式輸出。實現數據輸出邏輯的operator稱為data sink。
轉換操作
轉換操作是一個單次操作,每次單獨的處理一個event。用於將event 做某些變換后再輸出一個新的流數據。轉換邏輯可以整合在operator中,或是由用戶定義的方法提供。如下圖所示:

Operators 可以接收多個inputs並產生多個輸出流。它們也可以用於修改dataflow graph 的結構,例如將流split為多個流,亦或是將多個流整合為一個流。
滾動聚合(rolling aggregation)
Rolling aggregation 是一個聚合操作,例如sum,minimum以及maximum。它會根據輸入的event,對結果做持續的更新。聚合操作是有狀態的,它將輸入的數據與當前的狀態信息(state)進行整合,再產生一個更新后的聚合值。為了高效地與當前狀態進行整合,並輸出一個single value,聚合操作必須滿足結合律(associative)與交換律(commutative)。否則 operator 需要存儲整個流的歷史記錄。下圖是一個滾動聚合求最小值的示例,它持有當前最小值,並根據輸入的events更新當前最小值:

窗口操作
轉換與滾動聚合每次處理一個event並產生一個output event,繼而(有可能)更新狀態。然而,某些操作需要收集並緩存一些記錄后再計算它們的結果。例如求中值函數,這個操作需要對多條數據聚合做處理,但是流是無限的,對此,我們需要限制此操作維護的數據量大小。此功能由窗口(window)操作提供。
考慮這么一個場景:應用為司機提供實時的路況信息。這里,我們需要知道在某個地段,前幾分鍾內是否有有擁堵或是事故)。如果僅是對流歷史記錄做一個單聚合(single aggregate),則會損失數據隨時間變化的信息。例如,你可能想知道每5分鍾內有多少個自行車穿過某交叉路口。
Window操作會持續地從一個無限流中,創建events的有限子集(稱為buckets),使得我們可以在這些有限集合上做計算。Events 通常是根據數據的屬性或是時間,被分配到buckets中。為了更好地定義window operator,我們先了解一下events是如何分配給buckets、以及windows是如何產生一個result的。Window 的行為由一組策略定義。Window 策略決定了什么時候創建bucket、哪個event被分配到哪個bucket中、以及bucket里的內容什么時候被評估(evaluate)。對於何時評估,這個基於觸發條件。當滿足某個出發條件時,bucket里的內容會被發往一個評估函數(evaluation function),此方法會對bucket里的元素進行計算。評估函數可以是聚合操作(例如求sum、最小值)、或是用戶自定義的操作。決策可以是基於時間的(例如在最近5秒內收到的events),也可以是基於數量的(例如,最近收到的100個events),亦或是基於數據的屬性。下面我們介紹一下常見的窗口類型。
- 滾動(tumbling)窗口:分配events到不重疊的固定大小的buckets中。在超出window的邊界后,所有在window內的的事件會被發往到評估函數做處理。基於數量的(count-based )滾動窗口定義了:在收集多少個events后,開始觸發評估。圖2-6顯示了一個count-based滾動窗口,將輸入流分散到四個events一組的buckets中。

基於時間的滾動窗口定義了:以時間周期分隔,在一個時間周期內的events會被緩存到bucket 中。
圖 2-7 展示了一個基於時間的滾動窗口,將events收集到buckets中,每10分鍾觸發一次計算。

2. 滑動窗口:分配events到可重疊的固定大小的buckets中。也就是說,一個event可能屬於多個buckets。我們在定義一個滑動窗口時,需要提供兩個變量:長度(length)和步長(slide)。Slide的值決定了新bucket創建的間隔。下圖是一個基於數量(count-based)的滑動窗口,長度為4個events,slide為3個events:

3. 會話(session)窗口:會話窗口在某些實際場景中會比滾動窗口與滑動窗口更適用。考慮這樣一個場景:一個應用需要分析在線用戶的行為。這里我們需要聚合在一個session內,某個用戶的所有事件。Session 由一系列連續的事件組成,並且在連續事件之后,會有一段無事件時間。例如,用戶瀏覽新聞時,點擊不同的頁面,可以被看作一個session。因為一個session 的長度並無法預先定義,而是取決於實際的數據。所以滾動已經滑動窗口並不適用於此場景。我們需要的是一個windows操作可以將所有屬於同一個session 的事件,分發到同一個bucket中。會話窗口可以根據一個“會話間隔”(session gap)定義一個session的過期時間。在到達過期時間后,一個會話窗口即被關閉。下圖展示了一個會話窗口:

到目前為止,我們看到的所有窗口類型都是應用於整個流。但是在實際場景中,可能需要將一個流分為多個邏輯上的流,並在之上使用並行window。例如,假設我們收到的數據源來自於各個不同的傳感器,我們可能需要通過傳感器的ID先對stream做整合,然后再在之上應用窗口計算。在並行窗口(parallel windows)中,每個分區(partition)均完全獨立地應用它特定的窗口策略。下圖展示了一個基於計數的滾動窗口,長度為2,通過event 顏色分區:

在流處理中有兩個十分重要的概念:時間語義(time semantics)以及狀態管理(state management)。窗口操作與這兩個概念關系密切。時間可能是流處理中最重要的方面。盡管低延時是流處理中非常棒的一個特性,但是它的實際延遲值已經遠超出了快速分析(just fast analytics)的延遲。流處理在實際系統、網絡、以及通信信道(communication channel)中還不夠完善,並且流數據經常會延遲到達,或是亂序到達。這里很重要的一點是:在這些情況下,如何交付出精准、明確(deterministic)的結果。除此之外,流處理應用除了處理當前產生的event外,還應具備處理歷史events的能力,這可以實現流的離線分析,甚至是時間穿梭分析(time travel analyses)。當然,如果你的系統無法保證對狀態信息的容錯,則這些功能均毫無意義。到目前為止,我們提到的所有窗口類型都需要先將數據緩存,然后再應用計算並產生結果。實際上,如果你想在一個流應用上計算任何感興趣的信息,即使是一個簡單的計數(count),也需要維護狀態信息(state)。假設一個流處理應用可能會跑幾天,幾個月,甚至幾年,我們需要確保在任何故障發生后,state能可靠地被恢復,並且系統需要確保仍能提供准確的結果。下面我們會介紹在流處理中time 的概念,以及在發生錯誤情況下的state guarantees。
References:
Vasiliki Kalavri, Fabian Hueske. Stream Processing With Apache Flink. 2019
