Flink中Periodic水印和Punctuated水印實現原理(源碼分析)


在用戶代碼中,我們設置生成水印和事件時間的方法assignTimestampsAndWatermarks()中這里有個方法的重載

我們傳入的對象分為兩種

AssignerWithPunctuatedWatermarks(可以理解為每條數據都會產生水印,如果不想產生水印,返回一個null的水印)

AssignerWithPeriodicWatermarks(周期性的生成水印)

來看一下源碼中是如何實現這兩種水印的

二話不說打開org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.java

這個類的processElement方法

 

看到源碼這里這段邏輯就 非常的清晰了

先通過用戶的代碼獲取到事件時間,注入到element里面就直接往下個opeartor發送了

然后通過用戶代碼獲取水印,這里會判斷水印是否為null

不為null的就直接往下游emit 了

現在看一下AssignerWithPeriodicWatermarks如何周期的發送生成的水印

直接打開TimestampsAndPeriodicWatermarksOperator.java這個類

這里先不看processElement()方法,先看open方法

 

可以看到它將  當前時間其實就是System.currentTimeMillis()+ watermarkInterval水印間隔 注冊作為了一個timer定時器

這樣就知道了,當他過了這個水印間隔時間以后肯定會觸發操作

來看一下這個間隔時間以后觸發了什么操作

 

可以看到,他先是獲取了當前的水印時間,然后直接emit出去了????

Periodic模式明明是在接收數據的processElement()發送水印的

然后又再次注冊了一個 當前時間+間隔的 timer,這樣就無限的觸發下去了

既然他在這里發送了水印,來看下他的processElement方法

 

果然他周期性的發送水印以后,接收數據的processElement()方法里面就沒有發送水印了

只有獲取事件時間的邏輯了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM