Flink EventTime 與 Window


第七章 EventTime 與 Window

7.1 EventTime 的引入

  在 Flink 的 流 式 處 理中 , 絕 大 部 分 的 業務都 會 使 用 eventTime,一般只在
eventTime 無法使用時,才會被迫使用 ProcessingTime 或者 IngestionTime。
  如果要使用 EventTime,那么需要引入 EventTime 的時間屬性,
引入方式如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調用時刻開始給 env 創建的每一個 stream 追加時間特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

 

 

 

7.2 Watermark

7.2.1 基本概念

  我們知道,流處理從事件產生,到流經 source,再到 operator,中間是有一個過
程和時間的,雖然大部分情況下,流到 operator 的數據都是按照事件產生的時間順
序來的,但是也不排除由於網絡等原因,導致亂序的產生,所謂亂序,就是指 Flink
接收到的事件的先后順序不是嚴格按照事件的 Event Time 順序排列的。

  那么此時出現一個問題,一旦出現亂序,如果只根據 eventTime 決定 window 的
運行,我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有
個機制來保證一個特定的時間后,必須觸發 window 去進行計算了,這個特別的機
制,就是 Watermark。
  Watermark 是一種衡量 Event Time 進展的機制,它是數據本身的一個隱藏屬性,
數據本身攜帶着對應的 Watermark。
   Watermark 是 用 於 處 理 亂 序 事 件 的 , 而 正 確 的 處 理 亂 序 事 件 , 通 常 用
Watermark 機制結合 window 來實現。
  數據流中的 Watermark 用於表示 timestamp 小於 Watermark 的數據,都已經
到達了,因此,window 的執行也是由 Watermark 觸發的。
  Watermark 可以理解成一個延遲觸發機制,我們可以設置 Watermark 的延時
時 長 t, 每 次 系 統 會 校 驗 已 經 到 達 的 數 據 中 最 大 的 maxEventTime, 然 后 認 定
eventTime 小於 maxEventTime - t 的所有數據都已經到達,如果有窗口的停止時間
等於 maxEventTime – t,那么這個窗口被觸發執行。
 
  有序流的 Watermarker 如下圖所示:(Watermark 設置為 0)

  當 Flink 接收到每一條數據時,都會產生一條 Watermark,這條 Watermark
就等於當前所有到達數據中的 maxEventTime - 延遲時長,也就是說,Watermark
是由數據攜帶的,一旦數據攜帶的 Watermark 比當前未觸發的窗口的停止時間要
晚,那么就會觸發相應窗口的執行。由於 Watermark 是由數據攜帶的,因此,如果
運行過程中無法獲取新的數據,那么沒有被觸發的窗口將永遠都不被觸發。
  上圖中,我們設置的允許最大延遲到達時間為 2s,所以時間戳為 7s 的事件對應
的 Watermark 是 5s,時間戳為 12s 的事件的 Watermark 是 10s,如果我們的窗口 1
是 1s~5s,窗口 2 是 6s~10s,那么時間戳為 7s 的事件到達時的 Watermarker 恰好觸
發窗口 1,時間戳為 12s 的事件到達時的 Watermark 恰好觸發窗口 2。 
 
 
 

7.2.2 Watermark 的引入

val env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調用時刻開始給 env 創建的每一個 stream 追加時間特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream
= env.socketTextStream("localhost", 11111).assignTimestampsAndWatermarks(   new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(200)) {   override def extractTimestamp(t: String): Long = {     // EventTime 是日志生成時間,我們從日志中解析 EventTime     t.split(" ")(0).toLong   } })

 

 

 

 

7.3 EvnetTimeWindow API

  當使用 EventTimeWindow 時,所有的 Window 在 EventTime 的時間軸上進行划
分,也就是說,在 Window 啟動后,會根據初始的 EventTime 時間每隔一段時間划
分一個窗口,如果 Window 大小是 3 秒,那么 1 分鍾內會把 Window 划分為如下的
形式:
[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)

 

如果 Window 大小是 10 秒,則 Window 會被分為如下的形式:
[00:00:00,00:00:10)
[00:00:10,00:00:20)
...
[00:00:50,00:01:00)
  注意,窗口是左閉右開的,形式為:[window_start_time,window_end_time)。
 
  Window 的設定無關數據本身,而是系統定義好了的,也就是說,Window 會一
直按照指定的時間間隔進行划分,不論這個 Window 中有沒有數據,EventTime 在
這個 Window 期間的數據會進入這個 Window。
  Window 會不斷產生,屬於這個 Window 范圍的數據會被不斷加入到 Window 中,
所有未被觸發的 Window 都會等待觸發,只要 Window 還沒觸發,屬於這個 Window
范圍的數據就會一直被加入到 Window 中,直到 Window 被觸發才會停止數據的追
加,而當 Window 觸發之后才接受到的屬於被觸發 Window 的數據會被丟棄。
 
  Window 會在以下的條件滿足時被觸發執行:
   watermark 時間 >= window_end_time;
   在[window_start_time,window_end_time)中有數據存在。
 
  我們通過下圖來說明 Watermark、EventTime 和 Window 的關系。

 

 

 

7.3.1 滾動窗口(TumblingEventTimeWindows)

// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 創建 SocketSource val stream = env.socketTextStream("localhost", 11111)
// 對 stream 進行處理並按 key 聚合 val streamKeyBy = stream.assignTimestampsAndWatermarks(   new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) {     override def extractTimestamp(element: String): Long = {       val sysTime = element.split(" ")(0).toLong       println(sysTime)       sysTime     }
  }
).map(item
=> (item.split(" ")(1), 1)).keyBy(0)
// 引入滾動窗口 val streamWindow = streamKeyBy.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 執行聚合操作 val streamReduce = streamWindow.reduce(   (item1, item2) => (item1._1, item1._2 + item2._2) )
// 將聚合數據寫入文件 streamReduce.print
// 執行程序 env.execute("TumblingWindow")
  結果是按照 Event Time 的時間窗口計算得出的,而無關系統的時間(包括輸入的快慢)。 

 

 

7.3.2 滑動窗口(SlidingEventTimeWindows)

// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 創建 SocketSource val stream = env.socketTextStream("localhost", 11111)
// 對 stream 進行處理並按 key 聚合 val streamKeyBy = stream.assignTimestampsAndWatermarks(   new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(0)) {     override def extractTimestamp(element: String): Long = {       val sysTime = element.split(" ")(0).toLong       println(sysTime)       sysTime     }
  }
).map(item
=> (item.split(" ")(1), 1)).keyBy(0)
// 引入滾動窗口 val streamWindow = streamKeyBy.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
// 執行聚合操作 val streamReduce = streamWindow.reduce(   (item1, item2) => (item1._1, item1._2 + item2._2) )
// 將聚合數據寫入文件 streamReduce.print
// 執行程序 env.execute("TumblingWindow")

 

 

 

7.3.3 會話窗口(EventTimeSessionWindows)

   相鄰兩次數據的 EventTime 的時間差超過指定的時間間隔就會觸發執行。如果
加入 Watermark,那么當觸發執行時,所有滿足時間間隔而還沒有觸發的 Window 會
同時觸發執行。
// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 創建 SocketSource val stream = env.socketTextStream("localhost", 11111)
// 對 stream 進行處理並按 key 聚合 val streamKeyBy = stream.assignTimestampsAndWatermarks(   new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(0)) {     override def extractTimestamp(element: String): Long = {       val sysTime = element.split(" ")(0).toLong       println(sysTime)       sysTime     }
  }
).map(item
=> (item.split(" ")(1), 1)).keyBy(0)
// 引入滾動窗口 val streamWindow = streamKeyBy.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
// 執行聚合操作 val streamReduce = streamWindow.reduce(   (item1, item2) => (item1._1, item1._2 + item2._2) )
// 將聚合數據寫入文件 streamReduce.print
// 執行程序 env.execute("TumblingWindow")

 

 
測試代碼:
package eventtimewindow

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time

object EventTimeWindow01 {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //修改時間特性為 EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream = env.socketTextStream("localhost", 11111).assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) {
        override def extractTimestamp(element: String): Long = {
          // eventTime word
          val eventTime = element.split(" ")(0).toLong
          println(eventTime)
          eventTime
        }
      }
    ).map(item => (item.split(" ")(1), 1L)).keyBy(0)

    /* TumblingEventTimeWindows */
//    val streamWindow = stream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//
//    val streamReduce = streamWindow.reduce(
//      (item1, item2) => (item1._1, item2._2 + item1._2)
//    )
//
//    streamReduce.print()

    /* SlidingEventTimeWindows */
//    val streamWindow = stream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
//
//    val streamReduce = streamWindow.reduce(
//      (item1, item2) => (item1._1, item2._2 + item1._2)
//    )
//
//    streamReduce.print()

    /* EventTimeSessionWindows */
    val streamWindow = stream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))

    val streamReduce = streamWindow.reduce(
      (item1, item2) => (item1._1, item2._2 + item1._2)
    )

    streamReduce.print()


    env.execute("EventTimeJob")
  }

}

 

 

 

總結

  Flink 是一個真正意義上的流計算引擎,在滿足低延遲和低容錯開銷的基礎之上,完美
的解決了 exactly-once 的目標,真是由於 Flink 具有諸多優點,越來越多的企業開始使用 Flink
作為流處理框架,逐步替換掉了原本的 Storm 和 Spark 技術框架。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM