Flink 1.8 Streaming Event Time 事件時間


 

事件時間/處理時間/進入時間(Event Time / Processing Time / Ingestion Time)

Flink在流處理程序中支持不同的時間概念。
  • 處理時間(processing time):處理時間是指執行相應操作的機器的系統時間。
當流處理程序基於處理時間運行時,所有基於時間的操作(如時間窗口)將使用運行相應運算符的機器的系統時鍾。 每小時處理時間窗口將包括在系統時鍾指示整個小時之間到達特定運算符的所有記錄。 例如,如果應用程序在上午9:15開始運行,則第一個每小時處理時間窗口將包括在上午9:15到10:00之間處理的事件,下一個窗口將包括在上午10:00到11:00之間處理的事件,以此類推。 
 
處理時間是最簡單的時間概念,不需要流和機器之間的協調。 它提供最佳性能和最低延遲。 但是,在分布式和異步環境中,處理時間不提供確定性,因為它容易受到記錄到達系統的速度(例如從消息隊列),記錄在系統內的運算符之間流動的速度的影響,以及停電(計划或其他)。
  • 事件時間(event time):事件時間是每個事件在其生產設備上發生的時間。此時間通常在進入Flink之前嵌入記錄中,並且可以從每個記錄中提取該事件時間戳。 在事件時間,時間的進展取決於數據,而不是任何時鍾。 事件時間程序必須指定如何生成事件時間水印,這是表示事件時間進度的機制。 該水印機制在下面的后面部分中描述。
在一個完美的世界中,事件時間處理將產生完全一致和確定的結果,無論事件何時到達或其它們的順序。 但是,除非事件已知按順序到達(按時間戳),否則事件時間處理會在等待無序事件時產生一些延遲。 由於只能等待一段有限的時間,因此限制了確定性事件時間應用程序的運行方式。
 
假設所有數據都已到達,事件時間操作將按預期運行,即使在處理無序或延遲事件或重新處理歷史數據時也會產生正確且一致的結果。 例如,每小時事件時間窗口將包含帶有落入該小時的事件時間戳的所有記錄,無論它們到達的順序如何,或者何時處理它們。 (有關更多信息,請參閱有關 遲到事件的部分。)
 
請注意,有時基於事件時間的程序處理實時數據時,它們將使用一些處理時間(processing time)操作,以保證它們及時進行。
  • 進入時間(Ingestion time): 進入時間是事件進入Flink的時間。 在源運算符處,每個記錄將源的當前時間作為時間戳,並且基於時間的操作(如時間窗口)引用該時間戳。
進入時間在概念上位於事件時間和處理時間之間。與處理時間相比,它代價稍高,但可以提供更可預測的結果。 因為進入時間使用穩定的時間戳(在源處分配一次),所以對記錄的不同窗口操作將引用相同的時間戳,而在處理時間中,每個窗口操作符可以將記錄分配給不同的窗口(基於本地系統時鍾和 任何傳輸延誤)。
 
與事件時間相比,進入時間程序無法處理任何無序事件或延遲數據,但程序不必指定如何生成水印。
 
在內部,攝取時間與事件時間非常相似,但具有自動分配時間戳和自動生成水印功能。

設置時間特征(Setting a Time Characteristic)

Flink DataStream程序的第一部分通常設置基本時間特性。 該設置定義了數據流源的行為方式(例如,它們是否將分配時間戳),以及像KeyedStream.timeWindow(Time.seconds(30))這樣的窗口操作應該使用什么時間概念。
 
以下示例顯示了一個Flink程序,該程序在每小時時間窗口中聚合事件。 窗戶的行為適應時間特征。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));

stream
    .keyBy( (event) -> event.getUser() )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) -> a.add(b) )
    .addSink(...);
請注意,為了基於事件時間運行此示例,程序需要使用直接定義數據事件時間的源並自己輸出水印,或者程序必須在源之后注入時間戳分配器和水印生成器。 這些函數描述了如何訪問事件時間戳,以及事件流表現出的無序程度。
 
以下部分描述了時間戳和水印背后的一般機制。 有關如何在Flink DataStream API中使用時間戳分配和水印生成的指南,請參閱 Generating Timestamps / Watermarks
 

事件時間和水印(Event Time and Watermarks)

注意:Flink實現了數據流模型中的許多技術。 有關活動時間和水印的詳細介紹,請查看以下文章。
 
支持事件時間的流處理器需要一種方法來衡量事件時間的進度。 例如,當事件時間超過一小時結束時,需要通知構建每小時窗口的窗口運算符,以便運算符可以關閉正在進行的窗口。
 
事件時間可以獨立於處理時間(由時鍾測量)進行。 例如,在一個程序中,運算符的當前事件時間可能略微落后於處理時間(考慮到接收事件的延遲),而兩者都以相同的速度進行。 另一方面,通過快速轉發已經在Kafka主題(或其它消息隊列)中緩沖的一些歷史數據,另一個流程序只需幾秒鍾處理幾周的事件時間。
 
Flink中用於衡量事件時間進度的機制是水印。 水印作為數據流的一部分流動並帶有時間戳t。 Watermark(t)聲明事件時間已到達該流中的時間t,這意味着不應該有來自流的具有時間戳t'<= t的元素(即,具有更早或等於水印的時間戳的事件)。
 
下圖顯示了帶有(邏輯)時間戳的事件流,以及內聯流動的水印。 在該示例中,事件按順序(相對於它們的時間戳),意味着水印是流中的周期性標記。
水印對於無序流是至關重要的,如下所示,其中事件不按時間戳排序。 通常,水印是一種聲明,通過流中的那一點,到達某個時間戳的所有事件都應該到達。 一旦水印到達運算符,運算符就可以將其內部事件時鍾提前到水印的值。
請注意,事件時間由一個新生成的流元素(或多個元素)繼承,這些元素來自生成它們的事件或觸發創建這些元素的水印。
 

並行流中的水印(Watermarks in Parallel Streams)

在源函數處或之后生成水印。 源函數的每個並行子任務通常獨立地生成其水印。 這些水印定義了該特定並行源的事件時間。
 
當水印流過流媒處理程序時,它們會在他們到達的運算符處提前事件時間。 每當運算符提前其事件時間時,它就為其后繼運算符生成下游的新水印。
 
一些運算符消費多個輸入流; 例如union,或者跟隨keyBy(...)或partition(...)函數的運算符。 這樣的運算符的當前事件時間是其輸入流的事件時間的最小值。 由於其輸入流更新其事件時間,運算符也是如此。
 
下圖顯示了流經並行流的事件和水印的示例,以及跟蹤事件時間的運算符。
請注意,Kafka源支持每分區水印,您可以在此處詳細了解。
 

晚到元素(Late Elements)

某些元素可能違反水印條件,這意味着即使在Watermark(t)發生之后,也會出現更多具有時間戳t'<= t的元素。 實際上,在許多現實世界設置中,某些元素可以被任意延遲,從而無法指定某個事件時間戳的所有元素將發生的時間。 此外,即使遲到可以被限制,通常也不希望延遲太多水印,因為它在事件時間窗口的計算中引起太大延遲。
 
出於這個原因,流程序可能明確地預料一些晚到元素。 晚到元素是在系統的事件時鍾(水印產生)之后到達的元素,事件時鍾已經超過了晚到元素的時間戳。 有關如何在事件時間窗口中使用延遲元素的更多信息,請參閱 Allowed Lateness
 

空閑源(Idling sources)

目前,對於純事件時間水印生成器,如果沒有要處理的元素,則水印不能產出。 這意味着在輸入數據存在間隙的情況下,事件時間將不會進行,例如窗口操作符將不會被觸發,因此現有窗口將不能產生任何輸出數據。
 
為了避免這種情況,可以使用定期水印分配器,它們不僅基於元素時間戳進行分配。 示例解決方案可以是在觀察不到新事件一段時間之后切換到使用當前處理時間作為時間基礎的分配器。
 
可以使用SourceFunction.SourceContext #markAsTemporarilyIdle將源標記為空閑。 有關詳細信息,請參閱此方法的Javadoc以及StreamStatus。
 

調試水印(Debugging Watermarks)

有關在運行時調試水印的信息,請參閱 Debugging Windows & Event Time
 

運算符如何處理水印(How operators are processing watermarks)

作為一般規則,運算符需要在向下游轉發之前完全處理給定的水印。 例如,WindowOperator將首先評估應該觸發哪些窗口,並且只有在產生由水印觸發的所有輸出之后,水印本身才會被發送到下游。 換句話說,由於出現水印而產生的所有元素將在水印之前發出。
 
同樣的規則適用於TwoInputStreamOperator。 但是,在這種情況下,運算符的當前水印被定義為其兩個輸入的最小值。
 
此行為的詳細信息由OneInputStreamOperator#processWatermark,TwoInputStreamOperator#processWatermark1和TwoInputStreamOperator#processWatermark2方法的實現定義。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM