Flink中接收端反壓以及Credit機制 (源碼分析)


先上一張圖整體了解Flink中的反壓

 

 

       可以看到每個task都會有自己對應的IG(inputgate)對接上游發送過來的數據和RS(resultPatation)對接往下游發送數據, 整個反壓機制通過inputgate,resultPatation公用一個一定大小的memorySegmentPool來實現(Flink中memorySegment作為內存使用的抽象,類比bytebuffer), 公用一個pool當接收上游數據時Decoder,往下游發送數據時Encoder,都會向pool中請求內存memorySegment 。因為是公共pool,也就是說運行時,當接受的數據占用的內存多了,往下游發送的數據就少了,這樣是個什么樣的情況呢?

比如說你sink端堵塞了,背壓了寫不進去,那這個task的resultPatation無法發送數據了,也就無法釋放memorySegment了,相應的用於接收數據的memorySegment就會越來越少,直到接收數據端拿不到memorySegment了,也就無法接收上游數據了,既然這個task無法接收數據了,自然引起這個task的上一個task數據發送端無法發送,那上一個task又反壓了,所以這個反壓從發生反壓的地方,依次的往上游擴散直到source,這個就是flink的天然反壓。

從源碼來看一下flink是如何實現的

來到數據接收的地方StreamInputProcessor.java中processInput()方法中

這里通過通過handler的getNextNonBlocked()方法獲取到了bufferOrEvent后面就會將這個bufferOrEvent解析成record數據然后使用用戶的代碼處理了

其實這里的handler分為兩種

  1. BarrierBuffer    
  2. BarrierTracker

區別主要是barrierbuffer實現了barrier對齊的數據緩存,用於實現一次語義,這里以后隨緣更新到容錯機制的時候講

來看一下getNextNonBlocked()方法

這個看到了通過會通過上游inputGate獲取數據,具體看一下getNextBufferOrEvent()其中有兩個比較重要的調用

 

 

 先看requestPartitions()

 先遍歷了所有的inputchannel然后調用了requestSubpartition()在其中

 先看一下1處,這里返回了一個Netty的Client來看一下createPartitionRequestClient是怎么創建的

可以看到源碼的描述,這里其實就是創建與上游發送數據端的tcp連接的client端,用來接收上游數據的

接着

這里如果已經建立TCP連接就直接拿,與上游還沒有建立tcp連接的話就會先初始化Client端,通過這個connect()方法

來看一下第一次是如何初始化連接的

看到這個應該熟悉Netty的同學一眼就了解了,在1處就是Client的具體邏輯了,然后與上游端口建立連接

來看一下具體的Client端具體的邏輯,這里最好對netty有一定的認識

  1. 1處是一個用於Encoder 的ChannelOutboundHandler常規的編碼器沒有什么好說的
  2.  2處是用於Decoder的ChannelinboundHandler常規的解碼器沒有什么好說的

  3.  3處 這里分為兩種Handler,區別主要是在notifyCreditAvailable()方法

       

    PartitionRequestClientHandler: 不帶信任機制的

         

      CreditBasedPartitionRequestClientHandler:帶credit信任機制的

      

    

     

    這里取出了所有的帶有信任的上游inputChannel並且向其響應發送了一個Credit對象

 

那帶Credit機制的handler何時觸發userEventTriggered()來觸發向上游發送Credit呢?

先不慌,先來看下client接收到數據后做了什么,看下Nettyclient端的channelRead()方法(這里只看credit機制的)

 decodeMsg()方法中

decodeBufferOrEvent()方法

在沒有Credit機制的PartitionRequestClientHandler中

requestBuffer()方法就是請求memorySegmentPool中的memorySegment

這里不能確保能獲取到,所以會用一個while(true)一直掛着

 在Credit機制的CreditBasedPartitionRequestClientHandler中

 

請求requestBuffer()方法就是請求memorySegmentPool中的memorySegment因為信任機制在請求前就已經保證有足夠的memorySegment所以不會請求不到,這里請求不到直接就拋異常了

然后OnBuffer( )方法

 1處將將這個buffer加入到了這個receivedBuffers的ArrayDeque中,這里要注意receivedBuffers,這個queue后面會用到(后面處理數據就是循環的從這個queue中poll拉數據出來)

這里還要注意onBuffer方法還傳入了backlog參數,這里是一個積壓的數據量(既發送端還沒有發送的且沒有獲取到Credit的數據量(buffer為單位)其實就是subpartation中的數據量,發送端會把這個積壓量往接收端發,接收端會用這個積壓量來判斷是否可以發送Credit給上游 )

接着會根據積壓的數據量

 

當可用的buffer數 <(擠壓的數據量 + 已經分配給信任Credit的buffer量) 時,就會向Pool中繼續請求buffer,這里請求不到也會一直while形成柱塞反壓

然后通過notifyCreditAvailable()方法發送Credit,具體來看一下 

 

 

 

可用看到這里就觸發了前面說到的向上游發送Credit的方法了

到這里,Nettyclient端的初始化以及Netty的處理邏輯就講完了

現在回到最最開始的地方

requestPartition()那里創建nettyclient后

currentChannel.getNextBuffer()方法中

前面我們說到的NettyClient端channelRead讀取數據后會把數據放到一個recivedBuffers的queue中,這里就是去那個queue中取數據然后返回到我們的 數據接收的地方StreamInputProcessor.java中processInput()方法中的得到上游數據以后,就是開始執行我們用戶的代碼了調用processElement方法了。

然后while(true)開始了下一輪拉取數據然后處理的過程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM