flink的背壓問題產生原因和解決方法


定位問題:

如下圖:

1) flink的checkpoint生成超時, 失敗:

 
checkpoint超時

 

2) 查看jobmanager日志,定位問題:

 
jobmanager日志

 

3) 找大神幫忙定位問題, 原來是出現了背壓的問題,  緩沖區的數據處理不過來,barrier流動慢,導致checkpoint生成時間長, 出現超時的現象. (checkpoint超時時間設置了30分鍾)

下圖是背壓過高, input 和 output緩沖區都占滿的情況

 
buffer緩沖區情況

 

4) 背壓的情況也可以在flink后台的job的JobGraph中查看

 
背壓過高

 

下面說說flink感應反壓的過程:

下面這張圖簡單展示了兩個 Task 之間的數據傳輸以及 Flink 如何感知到反壓的:

 
flink感知背壓

 

記錄“A”進入了 Flink 並且被 Task 1 處理。(這里省略了 Netty 接收、反序列化等過程)

記錄被序列化到 buffer 中。

該 buffer 被發送到 Task 2,然后 Task 2 從這個 buffer 中讀出記錄。

注意記錄能被 Flink 處理的前提是,必須有空閑可用的 Buffer。

結合上面兩張圖看:Task 1 在輸出端有一個相關聯的 LocalBufferPool(稱緩沖池1),Task 2 在輸入端也有一個相關聯的 LocalBufferPool(稱緩沖池2)。如果緩沖池1中有空閑可用的 buffer 來序列化記錄 “A”,我們就序列化並發送該 buffer。

這里我們需要注意兩個場景:

本地傳輸:如果 Task 1 和 Task 2 運行在同一個 worker 節點(TaskManager),該 buffer 可以直接交給下一個 Task。一旦 Task 2 消費了該 buffer,則該 buffer 會被緩沖池1回收。如果 Task 2 的速度比 1 慢,那么 buffer 回收的速度就會趕不上 Task 1 取 buffer 的速度,導致緩沖池1無可用的 buffer,Task 1 等待在可用的 buffer 上。最終形成 Task 1 的降速。

遠程傳輸:如果 Task 1 和 Task 2 運行在不同的 worker 節點上,那么 buffer 會在發送到網絡(TCP Channel)后被回收。在接收端,會從 LocalBufferPool 中申請 buffer,然后拷貝網絡中的數據到 buffer 中。如果沒有可用的 buffer,會停止從 TCP 連接中讀取數據。在輸出端,通過 Netty 的水位值機制來保證不往網絡中寫入太多數據(后面會說)。如果網絡中的數據(Netty輸出緩沖中的字節數)超過了高水位值,我們會等到其降到低水位值以下才繼續寫入數據。這保證了網絡中不會有太多的數據。如果接收端停止消費網絡中的數據(由於接收端緩沖池沒有可用 buffer),網絡中的緩沖數據就會堆積,那么發送端也會暫停發送。另外,這會使得發送端的緩沖池得不到回收,writer 阻塞在向 LocalBufferPool 請求 buffer,阻塞了 writer 往 ResultSubPartition 寫數據。

這種固定大小緩沖池就像阻塞隊列一樣,保證了 Flink 有一套健壯的反壓機制,使得 Task 生產數據的速度不會快於消費的速度。我們上面描述的這個方案可以從兩個 Task 之間的數據傳輸自然地擴展到更復雜的 pipeline 中,保證反壓機制可以擴散到整個 pipeline。

解決辦法:

1)  首先說一下flink原來的JobGraph, 如下圖,  產生背壓的是中間的算子, 

 
flink job graph

 

2) 背壓是什么??  

如果您看到任務的背壓警告(例如High),這意味着它生成的數據比下游算子可以消耗的速度快。下游工作流程中的記錄(例如從源到匯)和背壓沿着相反的方向傳播到流上方。

以一個簡單的Source -> Sink工作為例。如果您看到警告Source,這意味着Sink消耗數據的速度比Source生成速度慢。Sink正在向上游算子施加壓力Source。

可以得出:  第三個算子的處理數據速度比第二個算子生成數據的速度,  明顯的解決方法:  提高第三個算子的並發度,  問題又出現了:  並發度要上調到多少呢? 

3) 第一次上調, 從原來的10並發 上調到 40 

    觀察緩存池對比的情況: 

    並發是10的buffer情況: (背壓的情況比較嚴重, 曲線持續性地達到峰值, 會導致資源占光)

 
10並發的buffer情況

 

       並發是40的buffer情況:(有了比較大的改善, 但是還是存在背壓的問題, 因為曲線有達到頂峰的時候)

 
40並發的buffer情況

4)  從網上了解到flink的並發度的優化策略后, 有了一個比較好的解決方法, 把第三個算子的並行度設置成100, 與第二個算子的並發度一致:

這樣做的好處是, flink會自動將條件合適的算子鏈化, 形成算子鏈,

滿足上下游形成算子鏈的條件比較苛刻的:

        1.上下游的並行度一致

        2.下游節點的入度為1 (也就是說下游節點沒有來自其他節點的輸入)

        3.上下游節點都在同一個 slot group 中(下面會解釋 slot group)

        4.下游節點的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認是ALWAYS)

        5.上游節點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認是HEAD)

6.兩個節點間數據分區方式是 forward(參考理解數據流的分區

        7.用戶沒有禁用 chain

算子鏈的好處: 鏈化成算子鏈可以減少線程與線程間的切換和數據緩沖的開銷,並在降低延遲的同時提高整體吞吐量。

flink還有另外一種優化手段就是槽共享,

flink默認開啟slot共享(所有operator都在default共享組)

默認情況下,Flink 允許同一個job里的不同的子任務可以共享同一個slot,即使它們是不同任務的子任務但是可以分配到同一個slot上。 這樣的結果是,一個 slot 可以保存整個管道pipeline, 換句話說,  flink會安排並行度一樣的算子子任務在同一個槽里運行

意思是每一個taskmanager的slot里面都可以運行上述的整個完整的流式任務, 減少了數據在不同機器不同分區之間的傳輸損耗, (如果算子之間的並發度不同, 會造成數據分區的重新分配(rebalance, shuffle, hash....等等), 就會導致數據需要在不同機器之間傳輸)

優化后的JobGraph, 如下圖, 

 
合並算子鏈

 

 
taskmanager和slot中的task情況

 

再次觀察緩存池對比的情況: 

並發是100的buffer情況: (背壓的情況已經大大緩解)

 
100並發的buffer情況

 

 
背壓正常

 

checkpoint生成的時間沒有出現超時的情況

 
checkpoint正常



作者:feng504x
鏈接:https://www.jianshu.com/p/74c031b1ec29
來源:簡書


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM