Flink流處理時間方式
-
EventTime
時間發生的時間,例如:點擊網站上的某個鏈接的時間
-
IngestionTime
某個Flink節點的source operator接收到數據的時間,例如:某個source消費到kafka中的數據
-
ProcessingTime
某個Flink節點執行某個operation的時間,例如:timeWindow接收到數據的時間
設置Flink流處理的時間類型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
問題
1. 使用時間窗口來統計10分鍾內的用戶流量
2. 有一個時間窗口
- 開始時間為:2017-03-19 10:00:00
- 結束時間為:2017-03-19 10:10:00
3. 有一個數據,因為網絡延遲
- 事件發生的時間為:2017-03-19 10: 10 :00
- 但進入到窗口的時間為:2017-03-19 10:10: 02 ,延遲了2秒中
4. 時間窗口並沒有將 59 這個數據計算進來,導致數據統計不正確
這種處理方式,根據消息進入到window時間,來進行計算。在網絡有延遲的時候,會引起計算誤差。
水印(watermark)
水印就是一個時間戳,可以給每個消息添加一個 允許一定延遲 的時間戳
- 窗口可以繼續計算一定時間范圍內延遲的消息
- 添加水印后,窗口會等 5 秒,再執行計算。若超過5秒,則舍棄。
-
窗口執行計算時間由 水印時間 來觸發,當接收到消息的 watermark >= endtime ,觸發計算
Flink提供添加水印的API
val watermarkData: DataStream[Message] = clicklogDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message] { var currentTimestamp: Long = 0L val maxDelayTime = 5000L var watermark: Watermark = null // 獲取當前的水印 override def getCurrentWatermark = { watermark = new Watermark(currentTimestamp - maxDelayTime) watermark } // 時間戳抽取操作 override def extractTimestamp(t: Message, l: Long) = { val timeStamp = t.timestamp currentTimestamp = Math.max(timeStamp, currentTimestamp) currentTimestamp } }) |