Flink中TaskManager端執行用戶邏輯過程(源碼分析)


TaskManager接收到來自JobManager的jobGraph轉換得到的TDD對象,啟動了任務,在StreamInputProcessor類的processInput()方法中

通過一個while(true)中不停的拉取上游的數據,然后調用streamOperator.processElement(record)調用用戶實現的方法去處理數據拉取的數據

首先先來看下這個operator對象

然后看看OneInputStreamOperator類的UML

這里所有的實現類沒有全部列出,只列了一些代表

看到這里,寫過Flink的streamAPI的同學,肯定感覺到很熟悉!!!!!!

這里!不就是我們常寫flink代碼的那些算子嘛

對沒有錯,我們程序中實現的那些算子邏輯,最后都會被封裝成一個OneInputStreamOperator,這里具體看一個最熟悉的Fliter

來看一下StreamFilter的processElement方法

!!!這里傳入一個數據后,這個userFunction調用了filter方法並且把數據放進去了

當返回true通過這個output.collect發送出去了

這不就對應了我們用戶自己實現的filter算子嘛,沒錯這個方法其實就是客戶端的filter方法,這個userFunction包含了用戶實現filter算子的邏輯

(!!!!!就是說這個processElement方法會調用用戶的邏輯)

(所以這個userFunction可以帶上client的方法實現,這對我們很重要,特別是對flink源碼修改,為clientApi添加新功能方法,運行時可以通過這里拿到)

繼續

來看看這個output.collect()方法

然后

 

看到這個,等等等等

我不是從這個processElement()方法進來的嗎,怎么又開始調processElement()方法了

難道遞歸了? 不對不對

這里operator不是上一個operator了,而是這個output對象的(這里是chainOutPut)

看下這個output對象

看下UML類圖,也是只列舉了重要的

先看chainingOutPut的屬性

 

發現了又出現了OneInputStreamOperator對象

看到這個實現類的名字!chain聯想起了什么

Flink會將可以chain在一起的算子在streamGraph轉換成jobGraph的時候根據條件chain在一起

一驚!

來分別看一下ChainingOutPut和RecordWriterOutput的collect()方法有什么區別

在chain中

 在RecordWriter中

這里chain的ouput,又繼續調用了下一個operator的processElement方法,然后又在processElement方法中又調用output.collect( ),collect中又調用了下一個operator的processElement方法

整個過程就是個無限的循環,直到,某一個operator的ouput不為ChainingOutPut,當變為RecordWriterOutput時

上面看到RecordWriterOutput的processElement直接emit發送出去了這個數據,再也沒有繼續調用processElement方法了

這里也就對應了,flink中的責任鏈,chain在一起的算子會一個接着一個執行,直到無法chain,就會往下游發送emit了

來看一下UML類圖幫助理解

 

 里中有我,我中有你,一直相互調用直到無法chain,然后emit往下游發送(這里肯定就有發送端的反壓邏輯,以后隨緣更新)

那這里的循環調用理解了就會想,那如何確定第一個operator調用,然后進入整個調用鏈呢

回到TaskManager接收到JobManager的TDD以后初始化整個任務的時候

StreamTask.java中invoke方法中

 先是初始化了一個OperatorChain,里面其實就是一個數組StreamOperator

在他初始化的時候,其實就是為我們所有的streamOutputs設置了他的output以及會根據jobManager發送過來的TDD(包含信息)

設置成對應的ChainingOutPut還是RecordWriterOutput,chainOutput會設置他的的operator

然后獲取了getHeadOperator()其實就是獲取了他調用連中的第一個

然后在

 

將這個第一個operator關聯到了inputProcessor對象里面

后面就簡單了在inputProcessor.processInput中就進入了while(true)循環拉取上游數據的邏輯

然后

在這里調用的第一個processElement方法就是我們的那個headOperator

這樣整個調用責任鏈就開始從第一個Operator運行起來了

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM