定位問題:
如下圖:
1) flink的checkpoint生成超時, 失敗:

2) 查看jobmanager日志,定位問題:

3) 找大神幫忙定位問題, 原來是出現了背壓的問題, 緩沖區的數據處理不過來,barrier流動慢,導致checkpoint生成時間長, 出現超時的現象. (checkpoint超時時間設置了30分鍾)
下圖是背壓過高, input 和 output緩沖區都占滿的情況

4) 背壓的情況也可以在flink后台的job的JobGraph中查看

下面說說flink感應反壓的過程:
下面這張圖簡單展示了兩個 Task 之間的數據傳輸以及 Flink 如何感知到反壓的:

記錄“A”進入了 Flink 並且被 Task 1 處理。(這里省略了 Netty 接收、反序列化等過程)
記錄被序列化到 buffer 中。
該 buffer 被發送到 Task 2,然后 Task 2 從這個 buffer 中讀出記錄。
注意:記錄能被 Flink 處理的前提是,必須有空閑可用的 Buffer。
結合上面兩張圖看:Task 1 在輸出端有一個相關聯的 LocalBufferPool(稱緩沖池1),Task 2 在輸入端也有一個相關聯的 LocalBufferPool(稱緩沖池2)。如果緩沖池1中有空閑可用的 buffer 來序列化記錄 “A”,我們就序列化並發送該 buffer。
這里我們需要注意兩個場景:
本地傳輸:如果 Task 1 和 Task 2 運行在同一個 worker 節點(TaskManager),該 buffer 可以直接交給下一個 Task。一旦 Task 2 消費了該 buffer,則該 buffer 會被緩沖池1回收。如果 Task 2 的速度比 1 慢,那么 buffer 回收的速度就會趕不上 Task 1 取 buffer 的速度,導致緩沖池1無可用的 buffer,Task 1 等待在可用的 buffer 上。最終形成 Task 1 的降速。
遠程傳輸:如果 Task 1 和 Task 2 運行在不同的 worker 節點上,那么 buffer 會在發送到網絡(TCP Channel)后被回收。在接收端,會從 LocalBufferPool 中申請 buffer,然后拷貝網絡中的數據到 buffer 中。如果沒有可用的 buffer,會停止從 TCP 連接中讀取數據。在輸出端,通過 Netty 的水位值機制來保證不往網絡中寫入太多數據(后面會說)。如果網絡中的數據(Netty輸出緩沖中的字節數)超過了高水位值,我們會等到其降到低水位值以下才繼續寫入數據。這保證了網絡中不會有太多的數據。如果接收端停止消費網絡中的數據(由於接收端緩沖池沒有可用 buffer),網絡中的緩沖數據就會堆積,那么發送端也會暫停發送。另外,這會使得發送端的緩沖池得不到回收,writer 阻塞在向 LocalBufferPool 請求 buffer,阻塞了 writer 往 ResultSubPartition 寫數據。
這種固定大小緩沖池就像阻塞隊列一樣,保證了 Flink 有一套健壯的反壓機制,使得 Task 生產數據的速度不會快於消費的速度。我們上面描述的這個方案可以從兩個 Task 之間的數據傳輸自然地擴展到更復雜的 pipeline 中,保證反壓機制可以擴散到整個 pipeline。
解決辦法:
1) 首先說一下flink原來的JobGraph, 如下圖, 產生背壓的是中間的算子,

2) 背壓是什么??
如果您看到任務的背壓警告(例如High),這意味着它生成的數據比下游算子可以消耗的速度快。下游工作流程中的記錄(例如從源到匯)和背壓沿着相反的方向傳播到流上方。
以一個簡單的Source -> Sink工作為例。如果您看到警告Source,這意味着Sink消耗數據的速度比Source生成速度慢。Sink正在向上游算子施加壓力Source。
可以得出: 第三個算子的處理數據速度比第二個算子生成數據的速度, 明顯的解決方法: 提高第三個算子的並發度, 問題又出現了: 並發度要上調到多少呢?
3) 第一次上調, 從原來的10並發 上調到 40
觀察緩存池對比的情況:
並發是10的buffer情況: (背壓的情況比較嚴重, 曲線持續性地達到峰值, 會導致資源占光)

並發是40的buffer情況:(有了比較大的改善, 但是還是存在背壓的問題, 因為曲線有達到頂峰的時候)

4) 從網上了解到flink的並發度的優化策略后, 有了一個比較好的解決方法, 把第三個算子的並行度設置成100, 與第二個算子的並發度一致:
這樣做的好處是, flink會自動將條件合適的算子鏈化, 形成算子鏈,
滿足上下游形成算子鏈的條件比較苛刻的:
1.上下游的並行度一致
2.下游節點的入度為1 (也就是說下游節點沒有來自其他節點的輸入)
3.上下游節點都在同一個 slot group 中(下面會解釋 slot group)
4.下游節點的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認是ALWAYS)
5.上游節點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認是HEAD)
6.兩個節點間數據分區方式是 forward(參考理解數據流的分區)
7.用戶沒有禁用 chain
算子鏈的好處: 鏈化成算子鏈可以減少線程與線程間的切換和數據緩沖的開銷,並在降低延遲的同時提高整體吞吐量。
flink還有另外一種優化手段就是槽共享,
flink默認開啟slot共享(所有operator都在default共享組)
默認情況下,Flink 允許同一個job里的不同的子任務可以共享同一個slot,即使它們是不同任務的子任務但是可以分配到同一個slot上。 這樣的結果是,一個 slot 可以保存整個管道pipeline, 換句話說, flink會安排並行度一樣的算子子任務在同一個槽里運行
意思是每一個taskmanager的slot里面都可以運行上述的整個完整的流式任務, 減少了數據在不同機器不同分區之間的傳輸損耗, (如果算子之間的並發度不同, 會造成數據分區的重新分配(rebalance, shuffle, hash....等等), 就會導致數據需要在不同機器之間傳輸)
優化后的JobGraph, 如下圖,


再次觀察緩存池對比的情況:
並發是100的buffer情況: (背壓的情況已經大大緩解)


checkpoint生成的時間沒有出現超時的情況

作者:feng504x
鏈接:https://www.jianshu.com/p/74c031b1ec29
來源:簡書