3. 事件-時間(Event-Time)處理
在“時間語義”中,我們強調了在流處理應用中時間語義的重要性,並解釋了處理時間與事件時間的不同點。處理時間較好理解,因為它基於本地機器的時間,它產生的是有點任意的、不一致的、以及無法復現的結果。而事件時間的語義產生的是可復現的、一致性的結果,它對於很多流處理場景是一個硬性的要求。然而,相對於處理時間語義,事件時間語義應用需要額外的配置,並且引入了更多的系統內部構件。
Flink為常見的event-time處理操作提供了直觀、並易於使用的原型。同時也提供了清晰的APIs,用於為用戶自定義的operators實現更高級的event-time 應用。有一個對Flink內部時間處理的理解,對與這類高級應用的開發與理解是很有幫助的,有時候也是必須的。前一章介紹過Flink依賴的兩個用於提供事件時間語義的概念:record時間戳和水印。下面我們會介紹Flink內部是如何實現並處理時間戳與水印,從而為流應用提供事件-時間語義。
時間戳
所有由Flink 事件-時間流應用生成的條目都必須伴隨着一個時間戳。時間戳將一個條目與一個特定的時間點關聯起來,一般這個時間點表示的是這條record發生的時間。不過application可以隨意選擇時間戳的含義,只要流中條目的時間戳是隨着流的前進而遞增即可。
當Flink以事件-時間的模式處理流數據時,它基於條目的時間戳來評估(evaluate)基於時間(time-based)的operators。例如,一個time-window operator 根據條目的時間戳,將它們分派給不同的windows。Flink將時間戳編碼為 16-byte,Long類型的值,並將它們以元數據(metadata)的方式附加到流記錄(records)中。它內置的operators將這個Long型的值解釋為Unix 時間戳,精確到毫秒,也就是自1970-01-01-00:00:00.000 開始,所經過的毫秒數。不過,用戶自定義的operators可以有它們自己的解釋方法(interpretation),例如,將精確度指定為微秒級別。
水印
除了條目時間戳,Flink 事件-時間應用必須也提供水印。在一個事件-時間應用中,水印用於從每個task中獲取當前的事件時間。Time-based operators 使用這個時間觸發計算,並取得進展。例如,一個time-window 任務在到達window的結束邊界后,會觸發計算並產生輸出。
在Flink中,水印是以特殊的records實現的,這些records會持有一個Long類型的值,作為時間戳。如下圖所示,水印流記錄穿插在正常流記錄(包含時間戳)之中:
水印有兩個基本屬性:
1. 它們必須單調遞增,以確保任務的event-time時鍾向前推進,而不是向后
2. 它們與記錄的時間戳是相關的。一個時間戳為T的水印表示的是:在它之后接下來的所有記錄的時間戳,都必須大於T
第二個屬性用於,處理有亂序時間戳的條目(例如上圖中的時間戳3和5)流。在基於時間的operator 中,它的tasks會收集並處理可能亂序的時間戳,在任務的event-time 時鍾(這里的event-time時鍾就是指收到的水印時間戳)表示沒有更多相關的記錄需要被考慮時,會觸發一個計算。當一個task收到一條與水印屬性有沖突的記錄,且此記錄的時間戳比收到的前一個水印要小時,說明此記錄可能本應屬於之前已經完成的計算批次。這種記錄稱為遲到記錄(late records)。Flink提供多不同的方法用於處理遲到的記錄,我們會在“處理延遲數據”部分介紹。
水印中一個比較有意思的屬性是,它們允許一個應用控制結果的完整度與延時。若是水印間隔十分緊湊,例如非常接近記錄的時間戳,則可達到較低的處理延時,因為一個任務在執行一個計算前,僅需要等待一小段時間,以獲取這段時間內的records。同時,結果的准確度可能會受到影響,因為相關的記錄(延遲或是遲到的記錄)可能未包括在結果中。反之,非常保守的水印則會增加處理延時,但是會提高結果的准確度。
水印傳播與事件時間
在這節,我們會討論operator 如何處理水印。Flink實現水印時,是將水印作為特殊的記錄處理,這些記錄可以由operator的任務接收或是釋放。Task有一個內部的時間服務(time service),此服務用於維護計時器,並在收到一個水印的時候被激活。Task可以在time service注冊計時器,以在未來某個時間點執行一個計算。例如,一個window operator 為每個活躍的窗口注冊一個計時器,在event time 超過窗口的終止時間后,會對窗口的狀態做清理。
當一個task收到一個水印,會發生以下行動:
1. 此任務根據水印的時間戳更新它內部的事件-時間(event-time)時鍾。
2. 任務的time service 標識出所有時間小於更新后的事件-時間的計時器。對於每個過期的計時器,task調用一個回調函數,用於執行計算並釋放結果
3. Task使用更新后的事件時間,賦值並釋放一個水印
這里需要注意的是,Flink限制了通過DataStream API 訪問時間戳或是水印的權限。DataStream中的Functions無法讀取或修改記錄的時間戳和水印,除了process functions。它們可以讀當前處理的record的時間戳,請求operator當前的event time,並注冊計時器。沒有方法提供設置emitted records的時間戳的API、也沒有提供操作一個task的event-time 時鍾的API、更沒有釋放水印的API。
我們現在詳細的解釋一下,一個task如何釋放一個水印,並在收到一個新的水印時如何更新它自身的event-time時鍾(clock)。如之前介紹過的數據並行(data parallelism),Flink會將數據流分成不同的分區(partition),對於每個分區,都會有不同的operator task 處理,這些task並行工作處理整個數據流。每個分區都是記錄(包含時間戳)與水印的數據流。對於一個operator,基於它與上游/下游 operators 連接的方式,它的tasks可以從一個或多個輸入分區接受records和水印,並釋放records和水印到一個或多個輸出分區。下面我們會詳細的介紹一個task如何釋放水印到多個output tasks,以及它如何根據(從輸入tasks)收到的水印,推進它自身的event-time時鍾。
一個task對每個輸入分區,都維護了一個分區水印。當task從一個分區收到一個水印,它會將對應分區的水印,更新為收到的水印最大值,並設置為當前值。然后,task更新它的event-time 時鍾為所有分區水印中的最小值。如果event-time 時鍾相較之前有增加,則task處理所有被觸發的計時器,並最終廣播它的新事件-時間到所有下游task,此操作通過釋放一個對應的水印到所有連接的輸出分區完成。
下圖是一個有4個輸入分區與3個輸出分區的task,在收到水印后如何更新它的分區水印以及事件-時間時鍾,並釋放水印:
對於有多個輸入流的(例如Union或CoFlatMap操作)operators,它們的tasks也會計算它們自身的event-time時鍾,並作為所有分區水印的最小值– 他們並不(從不同的輸入流中)區分partition watermarks。這樣做的結果是,兩個不同的輸入流中的數據會根據同一event-time時鍾進行處理。但是,如果一個application的各個輸入流的事件時間並不是一致的,則這個行為會導致問題。
Flink 的水印處理以及傳播算法,確保了operator task恰當地釋放一致時間戳的記錄和水印。然而它依賴的基礎是:所有分區持續提供遞增的水印。一旦一個分區的水印不再遞增,或者完全空閑(不再發送任何記錄與水印),則task的事件-時間時鍾不會再向前推進,並且task的計時器也不會被觸發。在基於時間的、依賴於向前(advancing)時鍾執行計算(並做清理)的operators中,便會造成問題。最終會導致處理延時、state大小劇增(如果沒有定期從所有的輸入任務中接收到新的水印)。
若是兩個輸入流的水印差異太大,也會造成類似的影響。在有兩個輸入流的task中,它的事件-時鍾會對應於較慢的流,並且較快的流的records或是中間結果一般會緩存到state中,直到event-time 時鍾允許處理它們。
時間戳分配與水印生成
到目前為止,我們已經解釋了什么是時間戳與水印,並且Flink 內部是如何處理它們的。然而,我們還沒有討論它們的是源頭是哪。一般來說,時間戳和水印是在一個stream被stream application 消費時產生的。因為時間戳的選擇是取決於application的,而水印是基於時間戳與stream的特點選擇,application必須明確的指定時間戳並生成水印。一個Flink DataStream 應用能以三種方式執行時間戳並生成水印:
1. At the source:當一個流被一個application消費時,通過一個SourceFunction,可以完成時間戳的分配與水印的生成。一個source 函數釋放一個記錄流。Records可以與一個關聯的時間戳一起被釋放,水印作為特殊的records,可以在任何時間點被釋放。如果一個source函數(暫時)不再釋放任何水印,它可以聲明它自己是空閑的。Flink 會將由空閑source 函數生成的流分區,從后繼的operators的水印計算中排除。Sources的空閑機制可以用於定位前面提到的“不遞增水印”的問題。
2. 定期分配者(Periodic assigner):DataStream API 提供了一個用戶定義的方法,名為AssignerWithPeriodicWatermarks。它從每個記錄中抽取一個時間戳,並定期查詢當前的水印。提取出的時間戳被分配給對應的記錄,查詢到的水印被送往流去消費。
3. Punctuated assigner:AssignerWithPunctuatedWatermarks是另一個用戶定義的方法,它從每個record抽取時間戳。它可以用於生成(被編碼到特殊的記錄中的)水印,相對於AssignerWithPeriodicWatermarks,這個方法可以(但是也不是必須的)從每個record抽取水印。
用戶定義的時間戳分配函數一般盡可能近的應用到離source operator,因為若是在records已經被一個operator處理后,將會很難推出原本的records順序。這也是為什么盡量不要在流處理程序的middle部分對時間戳與水印做覆蓋的原因,盡管這個是可以通過用戶定義函數實現的。
References:
Vasiliki Kalavri, Fabian Hueske. Stream Processing With Apache Flink. 2019