Flink中watermark為什么選擇最小一條(源碼分析)


昨天在社區群看到有人問,為什么水印取最小的一條?這里分享一下自己的理解

首先水印一般是設置為:(事件時間 - 指定的值)  這里的作用是解決遲到數據的問題,從源碼來看一下它如何解決的

先來看下windowOperator.java接收到數據以后做了什么

在processElement方法中,會遍歷這條數據屬於的所有窗口執行

將窗口window作為Context的namaspace,這個window后面會被設置成每個定時器的namespace

因為這里是事件時間窗口所以會默認注冊一個事件時間trigger,這是默認trigger的onElement方法

如果窗口的右邊界已經小於當前水印時間了,就直接觸發窗口計算

當返回continue時,也就是說水印還沒有達到,這條數據屬於的窗口的右邊界,也就是說窗口還沒有到觸發的時機

可以看到這里他把這個數據屬於的窗口的右邊界注冊成為一個觸發器(timer) 

這個觸發器初始化的時候觸發器的時間等於窗口右邊界,且設置了觸發器的namespace = window

這個timer有什么用呢,來看一下窗口觸發的邏輯

所有的上游數據會從這里接收,在StreamInputProcessor.java的processInput()方法中有這樣一段邏輯,當接收到水印

里面又調用了

從名字就可以知道是取了一個最小的水印,具體更新最小水印時間邏輯如下

這里就是我們的問題了,為什么他選取了最小的一個水印?

看看這段代碼的后面他又做了什么

handleWatermark中調用peocessWatermark()方法

然后會走到這里

這里先使用最小水印更新了  當前的水印!!!!!!

這里會判斷定時器時間是否小於當前最小水印時間(是觸發 定時器的條件!!!!!)

當定時器的時間小於當前的事件時間時觸發,在onEventTime()方法中

這里看到當返回fire時,會調用emitWindowContents()這個方法里面就會調用我們真正用戶的process()方法了,而那個windowState.get()則是拿到了一個窗口中的所有數據

而,是否觸發窗口就看onEventTime()方法是否返回Fire,具體實現如下

time就是定時器的時間,這個window就是定時器的namespace,就是這個定時器創建時傳入的窗口,這里就是這個定時器時間的窗口

判斷定時器的時間,變量time(前面我們將數據屬於的窗口的右邊界作為定時器的時間)是否等於這個定時器的window窗口(初始化傳入的namespace參數這里就是注冊定時器的window)右邊界的時間,來決定窗口是否觸發

!!!那既然最小水印是觸發定時器的條件,定時器到時會觸發窗口,那我們為什么會選擇最小的水印來作為觸發條件呢?

看下面這張圖

可以看到一個窗口可能會有接收到許多的上游,每一個上游的流都會帶有事件時間,那我們哪知道選用哪個流的水印時間作為窗口觸發的條件呢?

有個最簡單的辦法就是:如果我上游每個流中取最小的水印,那就證明其他的水印時間肯定是大於最小的這個,我最小的一條流都達到了窗口的觸發時間,那其他來自上游的流肯定都已經超過這個觸發時間了,那我就可以觸發這個窗口了

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM