Flink中Idle停滯流機制(源碼分析)


前幾天在社區群上,有人問了一個問題

  既然上游最小水印會決定窗口觸發,那如果我上游其中一條流突然沒有了數據,我的窗口還會繼續觸發嗎?

看到這個問題,我蒙了????

對哈,因為我是選擇上游所有流中水印最小的一條作為當前水印時間,那萬一最小水印的那條流突然里面沒有數據了

那我的最小水印不就一直不往前走了,一直是那個沒有數據流的水印了嗎,因為它的水印最小,而且一直不會更新了

????然后窗口再也不觸發????

思考了一下,發現好像也對,當我有一個上游的水印沒來的時候,我就等着唄,誰知道他是不是延遲了

但是!!!

  萬一他真的就是正常的,出現這種hash極端數據傾斜的情況怎么辦呢,MQ的一個partation就是沒有數據

那難不成我還真不計算了,一直等着?

懷着這個疑問

首先我想到的是,難道是在生成水印的時候,這條流沒有數據了,我為了不讓流停下來,就算沒數據也周期性的發送水印?

於是有了這篇文章    Flink中Periodic水印和Punctuated水印實現原理(源碼分析)

但是,無果!!! 

那想要流不停下計算只能在source端實現了,於是看了下源碼

 

看到sourceFunction.java接口的這個方法時,便解開了我的疑惑

上面就是說事件時間處理時,可以把流標記為 idle停滯的,就是說這個流不會再發送數據和水印了

且允許下游任務推進

ok 找到了那現在來看一下它是如何實現的,看下具體實現類

 

 

 

 這里看到這個streamStatus 的停滯idle狀態會被emit廣播往下游發送

 既然往下發了,看下下游接收到這個status是做了什么

 打開StreamInputProcessor.java的processInput()方法  (這里是task端運行job的邏輯以后隨緣更新到會細講)

 

 這里接收到了某上游流的狀態改變了,這里毫無疑問就是更新stream的狀態

 

修改了stream和channel的狀態為idle 停滯 以后呢

來到水印更新的邏輯 (這里不了解的可以看看這里  Flink中watermark為什么選擇最小一條(源碼分析)

 

前面就是說如果是來自已經是idle停滯的流的水印,那我就忽略這條水印

然后來看看,來自沒有停滯idle的流的水印,是如何更新當前水印的 findAndOutputNewMinWatermarkAcrossAlignedChannels方法

 

注意到這里

會先判斷這個channel是否是idel的!!!!

也就是說當某一個上游的流沒有數據停滯了,他是不會參與水印更新邏輯的

真相大白,水印還是會繼續往前推進不會停下,計算不會停下

 

這里就引出了一個思考也是自己在思考的

  這里暴露的接口其實是留給我們source源自己實現的,什么時候我們認為流變成了停滯的,我們想他繼續強

制推進,繼續計算,應該都是要我們自己去決定的,就是說,我是等着數據來才計算呢,還是我繼續強制流繼續

執行呢,其實是根據自己對source的設計來的,這也是自己的一個思考,自己也沒有細研究以后會研究一下主流

source的設計,看能不能解開自己的疑惑

 

五分鍾以后    這!!!FlinkKafkaConsumerBase.java

 難道沒有offset就停滯了,這么簡單嗎


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM