Flink水印機制(watermark)


Flink流處理時間方式

  • EventTime

    時間發生的時間,例如:點擊網站上的某個鏈接的時間

  • IngestionTime

    某個Flink節點的source operator接收到數據的時間,例如:某個source消費到kafka中的數據

  • ProcessingTime

    某個Flink節點執行某個operation的時間,例如:timeWindow接收到數據的時間

   

設置Flink流處理的時間類型

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

問題

1. 使用時間窗口來統計10分鍾內的用戶流量

2. 有一個時間窗口

  • 開始時間為:2017-03-19 10:00:00
  • 結束時間為:2017-03-19 10:10:00

3. 有一個數據,因為網絡延遲

  • 事件發生的時間為:2017-03-19 10: 10 :00
  • 但進入到窗口的時間為:2017-03-19 10:10: 02 ,延遲了2秒中

4. 時間窗口並沒有將 59 這個數據計算進來,導致數據統計不正確

這種處理方式,根據消息進入到window時間,來進行計算。在網絡有延遲的時候,會引起計算誤差。

   

水印(watermark)

水印就是一個時間戳,可以給每個消息添加一個 允許一定延遲 的時間戳

  • 窗口可以繼續計算一定時間范圍內延遲的消息
  • 添加水印后,窗口會等 5 秒,再執行計算。若超過5秒,則舍棄。
  • 窗口執行計算時間由 水印時間 來觸發,當接收到消息的 watermark >= endtime ,觸發計算

       

   

Flink提供添加水印的API

        val watermarkData: DataStream[Message] =

        clicklogDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message]

        {

           var currentTimestamp: Long = 0L

           val maxDelayTime = 5000L

           var watermark: Watermark = null

        // 獲取當前的水印

           override def getCurrentWatermark = {

            watermark = new Watermark(currentTimestamp - maxDelayTime)

            watermark

          }

           // 時間戳抽取操作

           override def extractTimestamp(t: Message, l: Long) = {

            val timeStamp = t.timestamp

            currentTimestamp = Math.max(timeStamp, currentTimestamp)

            currentTimestamp

          }

         })

   

   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM