Flink如何分析及處理反壓?


1.概念

反壓(backpressure)是流式計算中十分常見的問題。 反壓意味着數據管道中某個節點成為瓶頸,處理速率跟不上上游發送數據的速率,而需要對上游進行限速。由於實時計算應用通常使用消息隊列來進行生產端和消費端的解耦,消費端數據源是 pull-based 的,所以 反壓通常是從某個節點傳導至數據源並降低數據源(比如 Kafka consumer)的攝入速率。
① 節點有性能瓶頸可能是該節點所在的機器有故障(網絡、磁盤等)、機器的網絡延遲和磁盤不足、頻繁GC、數據熱點等原因。

② 大多數消息中間件,例如kafka的consumer從broker把數據pull到本地,而producer把數據push到broker。

2.反壓的影響

反壓並不會直接影響作業的可用性,它表明作業處於亞健康的狀態,有潛在的性能瓶頸並可能導致更大的數據處理延遲。通常來說,對於一些對延遲要求不高或者數據量較少的應用,反壓的影響可能並不明顯。然而對於規模比較大的 Flink 作業,反壓可能會導致嚴重的問題。
反壓會影響checkpoint

① checkpoint時長:checkpoint barrier跟隨普通數據流動,如果數據處理被阻塞,使得checkpoint barrier流經整個數據管道的時長變長,導致checkpoint 總體時間變長。

② state大小:為保證Exactly-Once准確一次,對於有兩個以上輸入管道的 Operator,checkpoint barrier需要對齊,即接受到較快的輸入管道的barrier后,它后面數據會被緩存起來但不處理,直到較慢的輸入管道的barrier也到達。這些被緩存的數據會被放到state 里面,導致checkpoint變大。

checkpoint是保證准確一次的關鍵,checkpoint時間變長有可能導致checkpoint超時失敗,而state大小可能拖慢checkpoint甚至導致OOM。 

3.Flink 的反壓機制

網絡流控的實現:動態反饋/自動反壓

Consumer 需要及時給 Producer 做一個 feedback,即告知 Producer 能夠承受的速率是多少。動態反饋分為兩種:

負反饋:接受速率小於發送速率時發生,告知 Producer 降低發送速率

正反饋:發送速率小於接收速率時發生,告知 Producer 可以把發送速率提上來

3.1Flink 反壓機制

Flink 的數據交換有3種:
  1. 同一個 Task 的數據交換;
  2. 不同 Task 同 JVM 下的數據交換;
  3. 不同 Task 且不同 TaskManager 之間的交換。

3.1.1同一個 Task 的數據交換

通過算子鏈 operator chain 串聯多個算子,主要作用是避免了序列化和網絡通信的開銷。
算子鏈 operator chain 串聯多個算子的條件:

① 上下游的並行度一致

② 下游節點的入度為1

③ 上下游節點共享同一個slot

④ 下游節點的 chain 策略為 ALWAYS(例如 map、flatmap、filter等默認是ALWAYS)

⑤ 上游節點的 chain 策略為 ALWAYS 或 HEAD(source默認是HEAD)

⑥ 兩個節點間數據分區方式是 forward

⑦ 用戶沒有禁用 chain

3.1.2不同 Task 同 TaskManager 的數據交換

在 TaskA 中,算子輸出的數據首先通過 record Writer 進行序列化,然后傳遞給 result Partition 。接着,數據通過 local channel 傳遞給 TaskB 的 Input Gate,然后傳遞給 record reader 進行反序列。

3.1.3不同 Task 且不同 TaskManager 之間的交換

 與上述3.1.2的不同點是數據先傳遞給 netty ,通過 netty 把數據推送到遠程端的 Task 。

3.2Flink(before V1.5)的TCP-based反壓機制

1.5 版本之前是采用 TCP 流控機制,而沒有采用feedback機制。

  • Flink1.5 版本之前的TCP-based 反壓機制
發送端 Flink 有一層Network Buffer,底層用Netty通信即有一層Channel Buffer,最后Socket通信也有Buffer,同理接收端也有對應的3級 Buffer。Flink (before V1.5)實質是利用 TCP 的流控機制來實現 feedback 。
  • TCP 利用滑動窗口實現網絡流控
TCP報文段首部有16位窗口字段,當接收方收到發送方的數據后,ACK響應報文中就將自身緩沖區的剩余大小設置到放入16位窗口字段。該窗口字段值是隨網絡傳輸的情況變化的,窗口越大,網絡吞吐量越高。

參考:1.【計算機網絡】3.1 運輸層 - TCP/UDP協議

           2.Apache Flink 進階教程(七):網絡流控及反壓剖析

例子:TCP 利用滑動窗口限制流量

步驟1:發送端將 4,5,6 發送,接收端也能接收全部數據。

 步驟2:consumer 消費了 2 ,接收端的窗口會向前滑動一格,即窗口空余1格。接着向發送端發送 ACK = 7、window = 1。

步驟3:發送端將 7 發送后,接收端接收到 7 ,但是接收端的 consumer 故障不能消費數據。這時候接收端向發送端發送 ACK = 8、window = 0 ,由於這個時候 window = 0,發送端是不能發送任何數據,也就會使發送端的發送速度降為 0。

  • TCP-based 反壓機制的缺點
① 單個Task的反壓,阻塞了整個TaskManager的socket,導致checkpoint barrier也無法傳播,最終導致checkpoint時間增長甚至checkpoint超時失敗。

② 反壓路徑太長,導致反壓時間延遲。

3.3Flink(since V1.5)的 Credit-based 反壓機制

在 Flink 層面實現反壓機制,通過 ResultPartition 和 InputGate 傳輸 feedback 。
Credit-base 的 feedback 步驟:

① 每一次 ResultPartition 向 InputGate 發送數據的時候,都會發送一個 backlog size 告訴下游准備發送多少消息,下游就會去計算有多少的 Buffer 去接收消息。(backlog 的作用是為了讓消費端感知到我們生產端的情況)

② 如果下游有充足的 Buffer ,就會返還給上游 Credit (表示剩余 buffer 數量),告知發送消息(圖上兩個虛線是還是采用 Netty 和 Socket 進行通信)。

生產段發送backlog=1

 消費端返回credit=3

 當生產端用完buffer,返回credit=0

 生產端也出現了數據積壓

 4.定位反壓節點

4.1 Flink Web UI 自帶的反壓監控 —— 直接方式

Flink Web UI 的反壓監控提供了 Subtask 級別的反壓監控。監控的原理是 通過Thread.getStackTrace() 采集在 TaskManager 上正在運行的所有線程,收集在緩沖區請求中阻塞的線程數(意味着下游阻塞),並計算緩沖區阻塞線程數與總線程數的比值 rate。其中,rate < 0.1 為 OK,0.1 <= rate <= 0.5 為 LOW,rate > 0.5 為 HIGH。
以下兩種場景可能導致反壓:

① 該節點發送速率跟不上它的產生數據速率。該場景一般是單輸入多輸出的算子,例如FlatMap。定位手段是因為這是從 Source Task 到 Sink Task 的第一個出現反壓的節點,所以該節點是反壓的根源節點。

② 下游的節點處理數據的速率較慢,通過反壓限制了該節點的發送速率。定位手段是從該節點開始繼續排查下游節點。

注意事項:

① 因為Flink Web UI 反壓面板是監控發送端的,所以反壓的根源節點並不一定會在反壓面板體現出高反壓。如果某個節點是性能瓶頸並不會導致它本身出現高反壓,而是導致它的上游出現高反壓。總體來看,如果找到第一個出現反壓的節點,則反壓根源是這個節點或者是它的下游節點。

② 通過反壓面板無法區分上述兩種狀態,需要結合 Metrics 等監控手段來定位。如果作業的節點數很多或者並行度很大,即需要采集所有 Task 的棧信息,反壓面板的壓力也會很大甚至不可用。

4.2 Flink Task Metrics —— 間接方式

(1)回顧 Flink Credit-based 網絡

① TaskManager 之間的數據傳輸

不同的 TaskManager 上的兩個 Subtask 通常情況下,channel 數量等於分組 key 的數量或者等於算子並發度。這些 channel 會復用同一個 TaskManager 進程的 TCP 請求,並且共享接收端 Subtask 級別的 Buffer Pool。

② 接收端

每個 channel 在初始階段會被分配固定數量的獨享 Exclusive Buffer,用於存儲接收到的數據。算子 Operator 使用后再次釋放 Exclusive  Buffer。說明:channel 接收端空閑的 Buffer 數量稱為 Credit,Credit 會被定時同步給發送端,用於決定發送多少個 Buffer 的數據。

③ 流量較大的場景

接收端,channel 寫滿 Exclusive Buffer 后,Flink 會向 Buffer Pool 申請剩余的 Floating Buffer。發送端,一個 Subtask 所有的 Channel 會共享同一個 Buffer Pool,因此不區分 Exclusive Buffer 和 Floating Buffer。

(2)Flink Task Metrics 監控反壓

Network 和 task I/O metrics 是輕量級反壓監視器,用於正在持續運行的作業,其中一下幾個 metrics 是最有用的反壓指標。

 
采用 Metrics 分析反壓的思路: 如果一個 Subtask 的發送端 Buffer 占用率很高,則表明它被下游反壓限速了;如果一個 Subtask 的接受端 Buffer 占用很高,則表明它將反壓傳導至上游
解釋:

① outPoolUsage 和 inPoolUsage 同為低表明當前 Subtask 是正常的,同為高分別表明當前 Subtask 被下游反壓。

② 如果一個 Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影響,所以可以排查它本身是反壓根源的可能性。

③ 如果一個 Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,則表明它有可能是反壓的根源。因為通常反壓會傳導至其上游,導致上游某些 Subtask 的 outPoolUsage 為高。

注意:反壓有時是短暫的且影響不大,比如來自某個 channel 的短暫網絡延遲或者 TaskManager 的正常 GC,這種情況下可以不用處理。
下表把 inPoolUsage 分為 floatingBuffersUsage 和 exclusiveBuffersUsage,並且總結上游 Task outPoolUsage 與 floatingBuffersUsage 、 exclusiveBuffersUsage 的關系,進一步的分析一個 Subtask 和其上游 Subtask 的反壓情況。
解析:

① floatingBuffersUsage 為高則表明反壓正在傳導至上游。

② exclusiveBuffersUsage 則表明了反壓可能存在傾斜。如果floatingBuffersUsage 高、exclusiveBuffersUsage 低,則存在傾斜。因為少數 channel 占用了大部分的 floating Buffer(channel 有自己的 exclusive buffer,當 exclusive buffer 消耗完,就會使用floating Buffer)。 

5.Flink 如何分析反壓

上述主要通過 TaskThread 定位反壓,而分析反壓原因類似一個普通程序的性能瓶頸

(1)數據傾斜

通過 Web UI 各個 SubTask 的 Records Sent 和 Record Received 來確認,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一個分析數據傾斜的有用指標。解決方式把數據分組的 key 進行本地/預聚合來消除/減少數據傾斜。

(2)用戶代碼的執行效率

TaskManager 進行 CPU profile,分析 TaskThread 是否跑滿一個 CPU 核:如果沒有跑滿,需要分析 CPU 主要花費在哪些函數里面,比如生產環境中偶爾會卡在 Regex 的用戶函數(ReDoS);如果沒有跑滿,需要看 Task Thread 阻塞在哪里,可能是用戶函數本身有些同步的調用,可能是 checkpoint 或者 GC 等系統活動

(3)TaskManager 的內存以及 GC

TaskManager JVM 各區內存不合理導致的頻繁 Full GC 甚至失聯。可以加上 -XX:+PrintGCDetails 來打印 GC 日志的方式來觀察 GC 的問題。推薦TaskManager 啟用 G1 垃圾回收器來優化 GC。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM