Flink assignAscendingTimestamps 生成水印的三個重載方法


先簡單介紹一下Timestamp 和Watermark 的概念: 

1. Timestamp和Watermark都是基於事件的時間字段生成的
2. Timestamp和Watermark是兩個不同的東西,並且一旦生成都跟事件數據沒有關系了(所有即使事件中不再包含生成Timestamp和Watermark的字段也沒關系)
3. 事件數據和 Timestamp 一一對應(事件在流中傳遞以StreamRecord對象表示,value 和 timestamp 是它的兩個成員變量)
4. Watermark 在生成之后與事件數據沒有直接關系,Watermark 作為一個消息,和事件數據一樣在流中傳遞(Watermark 和StreamRecord 具有相同的父類:StreamElement)
5. Timestamp 與 Watermark 在生成之后,會在下游window算子中做比較,判斷事件數據是否是過期數據
6. 只有window算子才會用Watermark判斷事件數據是否過期

Flink 在流上手動生成水印有三個重載的方法(忽略過期的一個)

 

  1. assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T]

此方法是數據流的快捷方式,其中已知元素時間戳在每個並行流中單調遞增。在這種情況下,系統可以通過跟蹤上升時間戳自動且完美地生成水印。

val input = env.addSource(source)
.map(json => {
        val id = json.get("id").asText()
        val createTime = json.get("createTime").asText()
        val amt = json.get("amt").asText()
        LateDataEvent("key", id, createTime, amt)
      })
      // flink auto create timestamp & watermark
      .assignAscendingTimestamps(element => sdf.parse(element.createTime).getTime)

注:這種方法創建時間戳與水印最簡單,返回一個long類型的數字就可以了

 

2.assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] 

基於給定的水印生成器生成水印,即使沒有新元素到達也會定期檢查給定水印生成器的新水印,以指定允許延遲時間
val input = env.addSource(source)
      .map(json => {
        val id = json.get("id").asText()
        val createTime = json.get("createTime").asText()
        val amt = json.get("amt").asText()
        LateDataEvent("key", id, createTime, amt)
      })
      // assign timestamp & watermarks periodically(定期生成水印)
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LateDataEvent](Time.milliseconds(50)) {
       override def extractTimestamp(element: LateDataEvent): Long = {
         println("want watermark : " + sdf.parse(element.createTime).getTime)
         sdf.parse(element.createTime).getTime
       }
     })
      

3.assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]

此方法僅基於流元素創建水印,對於通過[[AssignerWithPunctuatedWatermarks#extractTimestamp(Object,long)]]處理的每個元素,
調用[[AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark()]]方法,如果返回的水印值大於以前的
水印,會發出新的水印,
此方法可以完全控制水印的生成,但是要注意,每秒生成數百個水印會影響性能


val input = env.addSource(source)
      .map(json => {
        val id = json.get("id").asText()
        val createTime = json.get("createTime").asText()
        val amt = json.get("amt").asText()
        LateDataEvent("key", id, createTime, amt)
      })
      // assign timestamp & watermarks every event
      .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() {
      // check extractTimestamp emitted watermark is non-null and large than previously
      override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = {
        new Watermark(extractedTimestamp)
      }
      // generate next watermark
      override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = {
        val eventTime = sdf.parse(element.createTime).getTime
        eventTime
      }
    })

注:本文基於全部事件時間

 歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM