在用戶代碼中,我們設置生成水印和事件時間的方法assignTimestampsAndWatermarks()中這里有個方法的重載
我們傳入的對象分為兩種
AssignerWithPunctuatedWatermarks(可以理解為每條數據都會產生水印,如果不想產生水印,返回一個null的水印)
AssignerWithPeriodicWatermarks(周期性的生成水印)
來看一下源碼中是如何實現這兩種水印的
二話不說打開org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.java
這個類的processElement方法
看到源碼這里這段邏輯就 非常的清晰了
先通過用戶的代碼獲取到事件時間,注入到element里面就直接往下個opeartor發送了
然后通過用戶代碼獲取水印,這里會判斷水印是否為null
不為null的就直接往下游emit 了
現在看一下AssignerWithPeriodicWatermarks如何周期的發送生成的水印
直接打開TimestampsAndPeriodicWatermarksOperator.java這個類
這里先不看processElement()方法,先看open方法
可以看到它將 當前時間其實就是System.currentTimeMillis()+ watermarkInterval水印間隔 注冊作為了一個timer定時器
這樣就知道了,當他過了這個水印間隔時間以后肯定會觸發操作
來看一下這個間隔時間以后觸發了什么操作
可以看到,他先是獲取了當前的水印時間,然后直接emit出去了????
Periodic模式明明是在接收數據的processElement()發送水印的
然后又再次注冊了一個 當前時間+間隔的 timer,這樣就無限的觸發下去了
既然他在這里發送了水印,來看下他的processElement方法
果然他周期性的發送水印以后,接收數據的processElement()方法里面就沒有發送水印了
只有獲取事件時間的邏輯了