1 EventTime的引入
在Flink的流式處理中,絕大部分的業務都會使用eventTime,一般只在eventTime無法使用時,才會被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的時間屬性,引入方式如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment // 從調用時刻開始給env創建的每一個stream追加時間特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2 Watermark
2.1 基本概念
我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由於網絡、分布式等原因,導致亂序的產生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴格按照事件的Event Time順序排列的。
那么此時出現一個問題,一旦出現亂序,如果只根據eventTime決定window的運行,我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了,這個特別的機制,就是Watermark。
Watermark是一種衡量Event Time進展的機制,它是數據本身的一個隱藏屬性,數據本身攜帶着對應的Watermark。
Watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用Watermark機制結合window來實現。
數據流中的Watermark用於表示timestamp小於Watermark的數據,都已經到達了,因此,window的執行也是由Watermark觸發的。
Watermark可以理解成一個延遲觸發機制,我們可以設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,然后認定eventTime小於maxEventTime - t的所有數據都已經到達,如果有窗口的停止時間等於maxEventTime – t,那么這個窗口被觸發執行。
有序流的Watermarker如下圖所示:(Watermark設置為0)
亂序流的Watermarker如下圖所示:(Watermark設置為2)
當Flink接收到每一條數據時,都會產生一條Watermark,這條Watermark就等於當前所有到達數據中的maxEventTime - 延遲時長,也就是說,Watermark是由數據攜帶的,一旦數據攜帶的Watermark比當前未觸發的窗口的停止時間要晚,那么就會觸發相應窗口的執行。由於Watermark是由數據攜帶的,因此,如果運行過程中無法獲取新的數據,那么沒有被觸發的窗口將永遠都不被觸發。
上圖中,我們設置的允許最大延遲到達時間為2s,所以時間戳為7s的事件對應的Watermark是5s,時間戳為12s的事件的Watermark是10s,如果我們的窗口1是1s~5s,窗口2是6s~10s,那么時間戳為7s的事件到達時的Watermarker恰好觸發窗口1,時間戳為12s的事件到達時的Watermark恰好觸發窗口2。
Watermark 就是觸發前一窗口的“關窗時間”,一旦觸發關門那么以當前時刻為准在窗口范圍內的所有所有數據都會收入窗中。
只要沒有達到水位那么不管現實中的時間推進了多久都不會觸發關窗。
2.2 Watermark的引入
val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) { override def extractTimestamp(element: (String, Long, Int)): Long = { return element._2 } })
7.3 EvnetTimeWindow API
3.1 滾動窗口(TumblingEventTimeWindows)
def main(args: Array[String]): Unit = { // 環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val dstream: DataStream[String] = env.socketTextStream("hadoop1",7777) val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text => val arr: Array[String] = text.split(" ") (arr(0), arr(1).toLong, 1) } val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) { override def extractTimestamp(element: (String, Long, Int)): Long = { return element._2 } }) val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0) textKeyStream.print("textkey:") val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.seconds(2))) val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) => set += ts } groupDstream.print("window::::").setParallelism(1) env.execute() } }
結果是按照Event Time的時間窗口計算得出的,而無關系統的時間(包括輸入的快慢)。
3.2 滑動窗口(SlidingEventTimeWindows)
def main(args: Array[String]): Unit = { // 環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val dstream: DataStream[String] = env.socketTextStream("hadoop1",7777) val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text => val arr: Array[String] = text.split(" ") (arr(0), arr(1).toLong, 1) } val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) { override def extractTimestamp(element: (String, Long, Int)): Long = { return element._2 } }) val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0) textKeyStream.print("textkey:") val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.seconds(2),Time.milliseconds(500))) val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) => set += ts } groupDstream.print("window::::").setParallelism(1) env.execute() }
3.3 會話窗口(EventTimeSessionWindows)
相鄰兩次數據的EventTime的時間差超過指定的時間間隔就會觸發執行。如果加入Watermark, 會在符合窗口觸發的情況下進行延遲。到達延遲水位再進行窗口觸發。
def main(args: Array[String]): Unit = { // 環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val dstream: DataStream[String] = env.socketTextStream("hadoop1",7777) val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text => val arr: Array[String] = text.split(" ") (arr(0), arr(1).toLong, 1) } val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) { override def extractTimestamp(element: (String, Long, Int)): Long = { return element._2 } }) val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0) textKeyStream.print("textkey:") val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(500)) ) windowStream.reduce((text1,text2)=> ( text1._1,0L,text1._3+text2._3) ) .map(_._3).print("windows:::").setParallelism(1) env.execute() }