第七章 EventTime 與 Window
7.1 EventTime 的引入
在 Flink 的 流 式 處 理中 , 絕 大 部 分 的 業務都 會 使 用 eventTime,一般只在
eventTime 無法使用時,才會被迫使用 ProcessingTime 或者 IngestionTime。
如果要使用 EventTime,那么需要引入 EventTime 的時間屬性,
引入方式如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調用時刻開始給 env 創建的每一個 stream 追加時間特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
7.2 Watermark
7.2.1 基本概念
我們知道,流處理從事件產生,到流經 source,再到 operator,中間是有一個過
程和時間的,雖然大部分情況下,流到 operator 的數據都是按照事件產生的時間順
序來的,但是也不排除由於網絡等原因,導致亂序的產生,所謂亂序,就是指 Flink
接收到的事件的先后順序不是嚴格按照事件的 Event Time 順序排列的。
那么此時出現一個問題,一旦出現亂序,如果只根據 eventTime 決定 window 的
運行,我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有
個機制來保證一個特定的時間后,必須觸發 window 去進行計算了,這個特別的機
制,就是 Watermark。
Watermark 是一種衡量 Event Time 進展的機制,它是數據本身的一個隱藏屬性,
數據本身攜帶着對應的 Watermark。
Watermark 是 用 於 處 理 亂 序 事 件 的 , 而 正 確 的 處 理 亂 序 事 件 , 通 常 用
Watermark 機制結合 window 來實現。
數據流中的 Watermark 用於表示 timestamp 小於 Watermark 的數據,都已經
到達了,因此,window 的執行也是由 Watermark 觸發的。
Watermark 可以理解成一個延遲觸發機制,我們可以設置 Watermark 的延時
時 長 t, 每 次 系 統 會 校 驗 已 經 到 達 的 數 據 中 最 大 的 maxEventTime, 然 后 認 定
eventTime 小於 maxEventTime - t 的所有數據都已經到達,如果有窗口的停止時間
等於 maxEventTime – t,那么這個窗口被觸發執行。
有序流的 Watermarker 如下圖所示:(Watermark 設置為 0)

當 Flink 接收到每一條數據時,都會產生一條 Watermark,這條 Watermark
就等於當前所有到達數據中的 maxEventTime - 延遲時長,也就是說,Watermark
是由數據攜帶的,一旦數據攜帶的 Watermark 比當前未觸發的窗口的停止時間要
晚,那么就會觸發相應窗口的執行。由於 Watermark 是由數據攜帶的,因此,如果
運行過程中無法獲取新的數據,那么沒有被觸發的窗口將永遠都不被觸發。
上圖中,我們設置的允許最大延遲到達時間為 2s,所以時間戳為 7s 的事件對應
的 Watermark 是 5s,時間戳為 12s 的事件的 Watermark 是 10s,如果我們的窗口 1
是 1s~5s,窗口 2 是 6s~10s,那么時間戳為 7s 的事件到達時的 Watermarker 恰好觸
發窗口 1,時間戳為 12s 的事件到達時的 Watermark 恰好觸發窗口 2。
7.2.2 Watermark 的引入
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調用時刻開始給 env 創建的每一個 stream 追加時間特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.socketTextStream("localhost", 11111).assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(200)) { override def extractTimestamp(t: String): Long = { // EventTime 是日志生成時間,我們從日志中解析 EventTime t.split(" ")(0).toLong } })
7.3 EvnetTimeWindow API
當使用 EventTimeWindow 時,所有的 Window 在 EventTime 的時間軸上進行划
分,也就是說,在 Window 啟動后,會根據初始的 EventTime 時間每隔一段時間划
分一個窗口,如果 Window 大小是 3 秒,那么 1 分鍾內會把 Window 划分為如下的
形式:
[00:00:00,00:00:03) [00:00:03,00:00:06) ... [00:00:57,00:01:00)
如果 Window 大小是 10 秒,則 Window 會被分為如下的形式:
[00:00:00,00:00:10) [00:00:10,00:00:20) ... [00:00:50,00:01:00)
注意,窗口是左閉右開的,形式為:[window_start_time,window_end_time)。
Window 的設定無關數據本身,而是系統定義好了的,也就是說,Window 會一
直按照指定的時間間隔進行划分,不論這個 Window 中有沒有數據,EventTime 在
這個 Window 期間的數據會進入這個 Window。
Window 會不斷產生,屬於這個 Window 范圍的數據會被不斷加入到 Window 中,
所有未被觸發的 Window 都會等待觸發,只要 Window 還沒觸發,屬於這個 Window
范圍的數據就會一直被加入到 Window 中,直到 Window 被觸發才會停止數據的追
加,而當 Window 觸發之后才接受到的屬於被觸發 Window 的數據會被丟棄。
Window 會在以下的條件滿足時被觸發執行:
watermark 時間 >= window_end_time;
在[window_start_time,window_end_time)中有數據存在。
我們通過下圖來說明 Watermark、EventTime 和 Window 的關系。
7.3.1 滾動窗口(TumblingEventTimeWindows)
// 獲取執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 創建 SocketSource val stream = env.socketTextStream("localhost", 11111)
// 對 stream 進行處理並按 key 聚合 val streamKeyBy = stream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) { override def extractTimestamp(element: String): Long = { val sysTime = element.split(" ")(0).toLong println(sysTime) sysTime }
}
).map(item => (item.split(" ")(1), 1)).keyBy(0)
// 引入滾動窗口 val streamWindow = streamKeyBy.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 執行聚合操作 val streamReduce = streamWindow.reduce( (item1, item2) => (item1._1, item1._2 + item2._2) )
// 將聚合數據寫入文件 streamReduce.print
// 執行程序 env.execute("TumblingWindow")
結果是按照 Event Time 的時間窗口計算得出的,而無關系統的時間(包括輸入的快慢)。
7.3.2 滑動窗口(SlidingEventTimeWindows)
// 獲取執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 創建 SocketSource val stream = env.socketTextStream("localhost", 11111)
// 對 stream 進行處理並按 key 聚合 val streamKeyBy = stream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(0)) { override def extractTimestamp(element: String): Long = { val sysTime = element.split(" ")(0).toLong println(sysTime) sysTime }
}
).map(item => (item.split(" ")(1), 1)).keyBy(0)
// 引入滾動窗口 val streamWindow = streamKeyBy.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
// 執行聚合操作 val streamReduce = streamWindow.reduce( (item1, item2) => (item1._1, item1._2 + item2._2) )
// 將聚合數據寫入文件 streamReduce.print
// 執行程序 env.execute("TumblingWindow")
7.3.3 會話窗口(EventTimeSessionWindows)
相鄰兩次數據的 EventTime 的時間差超過指定的時間間隔就會觸發執行。如果
加入 Watermark,那么當觸發執行時,所有滿足時間間隔而還沒有觸發的 Window 會
同時觸發執行。
// 獲取執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 創建 SocketSource val stream = env.socketTextStream("localhost", 11111)
// 對 stream 進行處理並按 key 聚合 val streamKeyBy = stream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(0)) { override def extractTimestamp(element: String): Long = { val sysTime = element.split(" ")(0).toLong println(sysTime) sysTime }
}
).map(item => (item.split(" ")(1), 1)).keyBy(0)
// 引入滾動窗口 val streamWindow = streamKeyBy.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
// 執行聚合操作 val streamReduce = streamWindow.reduce( (item1, item2) => (item1._1, item1._2 + item2._2) )
// 將聚合數據寫入文件 streamReduce.print
// 執行程序 env.execute("TumblingWindow")
測試代碼:
package eventtimewindow import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows} import org.apache.flink.streaming.api.windowing.time.Time object EventTimeWindow01 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //修改時間特性為 EventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = env.socketTextStream("localhost", 11111).assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) { override def extractTimestamp(element: String): Long = { // eventTime word val eventTime = element.split(" ")(0).toLong println(eventTime) eventTime } } ).map(item => (item.split(" ")(1), 1L)).keyBy(0) /* TumblingEventTimeWindows */ // val streamWindow = stream.window(TumblingEventTimeWindows.of(Time.seconds(5))) // // val streamReduce = streamWindow.reduce( // (item1, item2) => (item1._1, item2._2 + item1._2) // ) // // streamReduce.print() /* SlidingEventTimeWindows */ // val streamWindow = stream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // // val streamReduce = streamWindow.reduce( // (item1, item2) => (item1._1, item2._2 + item1._2) // ) // // streamReduce.print() /* EventTimeSessionWindows */ val streamWindow = stream.window(EventTimeSessionWindows.withGap(Time.seconds(5))) val streamReduce = streamWindow.reduce( (item1, item2) => (item1._1, item2._2 + item1._2) ) streamReduce.print() env.execute("EventTimeJob") } }
總結
Flink 是一個真正意義上的流計算引擎,在滿足低延遲和低容錯開銷的基礎之上,完美
的解決了 exactly-once 的目標,真是由於 Flink 具有諸多優點,越來越多的企業開始使用 Flink
作為流處理框架,逐步替換掉了原本的 Storm 和 Spark 技術框架。