Flink中發送端反壓以及Credit機制(源碼分析)


上一篇《Flink接收端反壓機制》說到因為Flink每個Task的接收端和發送端是共享一個bufferPool的,形成了天然的反壓機制,當Task接收數據的時候,接收端會根據積壓的數據量以及可用的buffer數量(可用的memorySegment數)來決定是否向上游發送Credit(簡而言之就是當我還有空間的時候,我向上游也就是上一個Task的發送端發送一個ack消息,表明我還有空間你可以發送數據過來,如果下游沒有給你Credit就證明下游已經堵了,沒有空間了也就不能繼續往下游發送了)

現在從源碼來看一下Task的數據發送端,也就是Netty的Server端的實現

先看Task初始化的時候TaskManagerRunner.java中startTaskManager()方法中

 這個connectionManager其實分為兩種,Netty,local一看就知道netty這種肯定是對應需要通過網絡傳輸,本地模式這里就不講了

 

這個地方看到Flink的client和server都初始化了,需要注意的是其實這個地方client端只是初始化了一些配置,並沒有調用bind()方法啟動起來,這里看過上一篇文章的同學就會知道,client只有當第一次需要拉取上游subpatition數據的時候才會啟動起來也就是bind(),

而server端在這里也就是task啟動的時候就啟動起來了,繼續看server端如何啟動的server.init()方法

 init方法中,這里可以看到,這是Flink1.6以前只有基於netty的tcp網絡層反壓,這里是通過bootstrap的兩個參數

    ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK  大小為兩倍的memorySegment大小

    ChannelOption.WRITE_BUFFER_LOW_WATER_MARK   大小為memorySegment + 1

接着

  

       

1處2處常規的Netty定長編解碼器,沒有什么好說的

看看3處,4這里先不講后面會提到

 看到3是一個inboundHandler,反壓機制時他的用處是用來接收來自下游響應的Credit,來看他的ChannelRead0方法

 當接收到的消息是一個Credit信任的時候

先是

增加了這個reader的可用的Credit可用數

然后

 其實了解了接收端的反壓,發送端接收到了下游的credit,那發送數據的時候肯定有一個地方會先判斷是否有可用的Credit才決定是否往下發數據

其實就是這個帶星號的地方判斷,然后下面就是常規的從queue中拉取reader往netty下游writeAndFlash()數據了,沒什么好講的

來看一下他判斷Credit是否滿足的地方

可以看到只有當

    有數據且可用的Credit數量大於0

    或者有數據且數據是一個事件而不是record的時候,才返回true往下游發送

 

可以看到這個 enqueueAvailableReader()方法比較重要,里面包含了判斷credit以及往后下游發送數據的邏輯

那這個enqueueAvailableReader()方法除了會在接收到下游的Credit的時候觸發一次,還有哪會被觸發呢

既然是往下游發送數據那我們task處理完數據以后應該也會調用這個方法

於是來看一下Task發送數據,以前的文章講過,這里就不贅述了,直接看到RecordWriterOutput的emit()

會先將record寫入到這個Serializer里面去

然后copyFromSerializerToTargetChannel()方法中

 先去localBufferPool中請求buffer,這里就是反壓了

請求到buffer了以后

這個調用鏈有點長不全列出來了

最后

 這個requestQueue其實是前面Netty初始化時具體邏輯中的4,是一個ChannelInboundHandlerAdapter

 這個Inbound一開始我也很疑惑,這個Inbound沒有重寫他的channelRead()方法,那這個不就直接轉發數據了嗎,那他的作用是干嘛的呢

繼續往下看

原來發送數據的時候會觸發這個inbound的eventTrigger

看下userEventTriggered()具體觸發了什么

 

這個方法就很眼熟了,就是前面到接收到下游發送過來的Credit時會觸發一次的方法,用來判斷是否有Credit以及通過netty往下游發送數據

這里在發送數據的時候果然又觸發了,后面就是判斷是否有Credit滿足往下游發送數據的條件,然后往下游發送數據

也就是說

    當接收到下游返回的Credit的時候會觸發一次,是否能往下游寫數據的判斷並拉queue數據寫數據

    每次Task處理完數據以后emit,也會觸發一次判斷並拉queue數據寫數據

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM