1.概念
① 節點有性能瓶頸可能是該節點所在的機器有故障(網絡、磁盤等)、機器的網絡延遲和磁盤不足、頻繁GC、數據熱點等原因。
② 大多數消息中間件,例如kafka的consumer從broker把數據pull到本地,而producer把數據push到broker。
2.反壓的影響
反壓會影響checkpoint ① checkpoint時長:checkpoint barrier跟隨普通數據流動,如果數據處理被阻塞,使得checkpoint barrier流經整個數據管道的時長變長,導致checkpoint 總體時間變長。 ② state大小:為保證Exactly-Once准確一次,對於有兩個以上輸入管道的 Operator,checkpoint barrier需要對齊,即接受到較快的輸入管道的barrier后,它后面數據會被緩存起來但不處理,直到較慢的輸入管道的barrier也到達。這些被緩存的數據會被放到state 里面,導致checkpoint變大。 checkpoint是保證准確一次的關鍵,checkpoint時間變長有可能導致checkpoint超時失敗,而state大小可能拖慢checkpoint甚至導致OOM。
3.Flink 的反壓機制
網絡流控的實現:動態反饋/自動反壓
Consumer 需要及時給 Producer 做一個 feedback,即告知 Producer 能夠承受的速率是多少。動態反饋分為兩種:
負反饋:接受速率小於發送速率時發生,告知 Producer 降低發送速率
正反饋:發送速率小於接收速率時發生,告知 Producer 可以把發送速率提上來
3.1Flink 反壓機制
Flink 的數據交換有3種:- 同一個 Task 的數據交換;
- 不同 Task 同 JVM 下的數據交換;
- 不同 Task 且不同 TaskManager 之間的交換。
3.1.1同一個 Task 的數據交換
通過算子鏈 operator chain 串聯多個算子,主要作用是避免了序列化和網絡通信的開銷。
算子鏈 operator chain 串聯多個算子的條件:
① 上下游的並行度一致
② 下游節點的入度為1
③ 上下游節點共享同一個slot
④ 下游節點的 chain 策略為 ALWAYS(例如 map、flatmap、filter等默認是ALWAYS)
⑤ 上游節點的 chain 策略為 ALWAYS 或 HEAD(source默認是HEAD)
⑥ 兩個節點間數據分區方式是 forward
⑦ 用戶沒有禁用 chain
3.1.2不同 Task 同 TaskManager 的數據交換
3.1.3不同 Task 且不同 TaskManager 之間的交換

與上述3.1.2的不同點是數據先傳遞給 netty ,通過 netty 把數據推送到遠程端的 Task 。
3.2Flink(before V1.5)的TCP-based反壓機制
1.5 版本之前是采用 TCP 流控機制,而沒有采用feedback機制。
- Flink1.5 版本之前的TCP-based 反壓機制

- TCP 利用滑動窗口實現網絡流控
參考:1.【計算機網絡】3.1 運輸層 - TCP/UDP協議
2.Apache Flink 進階教程(七):網絡流控及反壓剖析
例子:TCP 利用滑動窗口限制流量
步驟1:發送端將 4,5,6 發送,接收端也能接收全部數據。

步驟2:consumer 消費了 2 ,接收端的窗口會向前滑動一格,即窗口空余1格。接着向發送端發送 ACK = 7、window = 1。


- TCP-based 反壓機制的缺點

① 單個Task的反壓,阻塞了整個TaskManager的socket,導致checkpoint barrier也無法傳播,最終導致checkpoint時間增長甚至checkpoint超時失敗。
② 反壓路徑太長,導致反壓時間延遲。
3.3Flink(since V1.5)的 Credit-based 反壓機制
在 Flink 層面實現反壓機制,通過 ResultPartition 和 InputGate 傳輸 feedback 。Credit-base 的 feedback 步驟: ① 每一次 ResultPartition 向 InputGate 發送數據的時候,都會發送一個 backlog size 告訴下游准備發送多少消息,下游就會去計算有多少的 Buffer 去接收消息。(backlog 的作用是為了讓消費端感知到我們生產端的情況) ② 如果下游有充足的 Buffer ,就會返還給上游 Credit (表示剩余 buffer 數量),告知發送消息(圖上兩個虛線是還是采用 Netty 和 Socket 進行通信)。
生產段發送backlog=1
消費端返回credit=3

當生產端用完buffer,返回credit=0

生產端也出現了數據積壓

4.定位反壓節點
4.1 Flink Web UI 自帶的反壓監控 —— 直接方式

以下兩種場景可能導致反壓:
① 該節點發送速率跟不上它的產生數據速率。該場景一般是單輸入多輸出的算子,例如FlatMap。定位手段是因為這是從 Source Task 到 Sink Task 的第一個出現反壓的節點,所以該節點是反壓的根源節點。
② 下游的節點處理數據的速率較慢,通過反壓限制了該節點的發送速率。定位手段是從該節點開始繼續排查下游節點。
注意事項:
① 因為Flink Web UI 反壓面板是監控發送端的,所以反壓的根源節點並不一定會在反壓面板體現出高反壓。如果某個節點是性能瓶頸並不會導致它本身出現高反壓,而是導致它的上游出現高反壓。總體來看,如果找到第一個出現反壓的節點,則反壓根源是這個節點或者是它的下游節點。
② 通過反壓面板無法區分上述兩種狀態,需要結合 Metrics 等監控手段來定位。如果作業的節點數很多或者並行度很大,即需要采集所有 Task 的棧信息,反壓面板的壓力也會很大甚至不可用。
4.2 Flink Task Metrics —— 間接方式
(1)回顧 Flink Credit-based 網絡

① TaskManager 之間的數據傳輸 不同的 TaskManager 上的兩個 Subtask 通常情況下,channel 數量等於分組 key 的數量或者等於算子並發度。這些 channel 會復用同一個 TaskManager 進程的 TCP 請求,並且共享接收端 Subtask 級別的 Buffer Pool。 ② 接收端 每個 channel 在初始階段會被分配固定數量的獨享 Exclusive Buffer,用於存儲接收到的數據。算子 Operator 使用后再次釋放 Exclusive Buffer。說明:channel 接收端空閑的 Buffer 數量稱為 Credit,Credit 會被定時同步給發送端,用於決定發送多少個 Buffer 的數據。 ③ 流量較大的場景 接收端,channel 寫滿 Exclusive Buffer 后,Flink 會向 Buffer Pool 申請剩余的 Floating Buffer。發送端,一個 Subtask 所有的 Channel 會共享同一個 Buffer Pool,因此不區分 Exclusive Buffer 和 Floating Buffer。
(2)Flink Task Metrics 監控反壓
Network 和 task I/O metrics 是輕量級反壓監視器,用於正在持續運行的作業,其中一下幾個 metrics 是最有用的反壓指標。


解釋:
① outPoolUsage 和 inPoolUsage 同為低表明當前 Subtask 是正常的,同為高分別表明當前 Subtask 被下游反壓。
② 如果一個 Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影響,所以可以排查它本身是反壓根源的可能性。
③ 如果一個 Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,則表明它有可能是反壓的根源。因為通常反壓會傳導至其上游,導致上游某些 Subtask 的 outPoolUsage 為高。
注意:反壓有時是短暫的且影響不大,比如來自某個 channel 的短暫網絡延遲或者 TaskManager 的正常 GC,這種情況下可以不用處理。

解析:
① floatingBuffersUsage 為高則表明反壓正在傳導至上游。
② exclusiveBuffersUsage 則表明了反壓可能存在傾斜。如果floatingBuffersUsage 高、exclusiveBuffersUsage 低,則存在傾斜。因為少數 channel 占用了大部分的 floating Buffer(channel 有自己的 exclusive buffer,當 exclusive buffer 消耗完,就會使用floating Buffer)。
5.Flink 如何分析反壓
上述主要通過 TaskThread 定位反壓,而分析反壓原因類似一個普通程序的性能瓶頸。
(1)數據傾斜
通過 Web UI 各個 SubTask 的 Records Sent 和 Record Received 來確認,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一個分析數據傾斜的有用指標。解決方式把數據分組的 key 進行本地/預聚合來消除/減少數據傾斜。
(2)用戶代碼的執行效率
對 TaskManager 進行 CPU profile,分析 TaskThread 是否跑滿一個 CPU 核:如果沒有跑滿,需要分析 CPU 主要花費在哪些函數里面,比如生產環境中偶爾會卡在 Regex 的用戶函數(ReDoS);如果沒有跑滿,需要看 Task Thread 阻塞在哪里,可能是用戶函數本身有些同步的調用,可能是 checkpoint 或者 GC 等系統活動。
(3)TaskManager 的內存以及 GC
TaskManager JVM 各區內存不合理導致的頻繁 Full GC 甚至失聯。可以加上 -XX:+PrintGCDetails 來打印 GC 日志的方式來觀察 GC 的問題。推薦TaskManager 啟用 G1 垃圾回收器來優化 GC。