Flink數據傾斜概述與優化


在大數據處理領域,數據傾斜是一個非常常見的問題,今天我們就簡單講講在flink中如何處理流式數據傾斜問題。

1.數據傾斜的原理和影響

1.1 原理

數據傾斜就是數據的分布嚴重不均,造成一部分數據很多,一部分數據很少的局面。

數據分布理論上都是傾斜的,符合“二八原理”:例如80%的財富集中在20%的人手中、80%的用戶只使用20%的功能、20%的用戶貢獻了80%的訪問量。

數據傾斜的現象,如下圖所示。

1.2 影響

(1)單點問題

數據集中在某些分區上(Subtask),導致數據嚴重不平衡。

(2)GC 頻繁

過多的數據集中在某些 JVM(TaskManager),使得JVM 的內存資源短缺,導致頻繁 GC。

(3)吞吐下降、延遲增大

數據單點和頻繁 GC 導致吞吐下降、延遲增大。

(4)系統崩潰

嚴重情況下,過長的 GC 導致 TaskManager 失聯,系統崩潰。

 

 

2.Flink 如何定位數據傾斜?

步驟1:定位反壓

定位反壓有2種方式:Flink Web UI 自帶的反壓監控(直接方式)、Flink Task Metrics(間接方式)。通過監控反壓的信息,可以獲取到數據處理瓶頸的 Subtask。

參考:【Flink 精選】如何分析及處理反壓?

步驟2:確定數據傾斜

Flink Web UI 自帶Subtask 接收和發送的數據量。當 Subtasks 之間處理的數據量有較大的差距,則該 Subtask 出現數據傾斜。如下圖所示,紅框內的 Subtask 出現數據熱點。

3.Flink 如何處理常見數據傾斜?

 

場景一:數據源 source 消費不均勻

解決思路:通過調整並發度,解決數據源消費不均勻或者數據源反壓的情況。

例如kafka數據源,可以調整 KafkaSource 的並發度解決消費不均勻。

調整並發度的原則:KafkaSource 並發度與 kafka 分區數是一樣的,或者 kafka 分區數是KafkaSource 並發度的整數倍。

場景二:key 分布不均勻的無統計場景

說明:key 分布不均勻的無統計場景,例如上游數據分布不均勻,使用keyBy來打散數據。

解決思路: 通過添加隨機前綴,打散 key 的分布,使得數據不會集中在幾個 Subtask。

 

 

 具體措施:
① 在原來分區 key/uid 的基礎上,加上隨機的前綴或者后綴。
② 使用數據到達的順序seq,作為分區的key。

場景三:key 分布不均勻的統計場景

解決思路:聚合統計前,先進行預聚合,例如兩階段聚合(加鹽局部聚合+去鹽全局聚合)。

 兩階段聚合的具體措施:
① 預聚合:加鹽局部聚合,在原來的 key 上加隨機的前綴或者后綴。
② 聚合:去鹽全局聚合,刪除預聚合添加的前綴或者后綴,然后進行聚合統計。 

4.一個SQL演示案例

我們先來看一個可能產生數據傾斜的sql

select TUMBLE_END(proc_time, INTERVAL '1' MINUTE) as winEnd,plat,count(*) as pv  from source_kafka_table 
group by TUMBLE(proc_time, INTERVAL '1' MINUTE) ,plat

在這個sql里,我們統計一個網站各個端的每分鍾的pv,從kafka消費過來的數據首先會按照端進行分組,然后執行聚合函數count來進行pv的計算。

如果某一個端產生的數據特別大,比如我們的微信小程序端產生數據遠遠大於其他app端的數據,那么把這些數據分組到某一個算子之后,由於這個算子的處理速度跟不上,就會產生數據傾斜。

 查看flink的ui,會看到如下的場景。

對於這種簡單的數據傾斜,我們可以通過對分組的key加上隨機數,再次打散,分別計算打散后不同的分組的pv數,然后在最外層再包一層,把打散的數據再次聚合,這樣就解決了數據傾斜的問題。

優化后的sql如下:

select winEnd,split_index(plat1,'_',0) as plat2,sum(pv) from (

  select TUMBLE_END(proc_time, INTERVAL '1' MINUTE) as winEnd,plat1,count(*) as pv from (

    -- 最內層,將分組的key,也就是plat加上一個隨機數打散
    select plat || '_' || cast(cast(RAND()*100 as int) as string) as plat1 ,proc_time from source_kafka_table 

) group by TUMBLE(proc_time, INTERVAL '1' MINUTE), plat1

) group by winEnd,split_index(plat1,'_',0)

在這個sql的最內層,將分組的key,也就是plat加上一個隨機數打散,然后求打散后的各個分組(也就是sql中的plat1)的pv值,然后最外層,將各個打散的pv求和。

注意:最內層的sql,給分組的key添加的隨機數,范圍不能太大,也不能太小,太大的話,分的組太多,增加checkpoint的壓力,太小的話,起不到打散的作用。

在我的測試中,一天大概十幾億的數據量,5個並行度,隨機數的范圍在100范圍內,就可以正常處理了。

修改后我們看到各個子任務的數據基本均勻了。

 

 

 

https://www.cnblogs.com/qiu-hua/p/14056747.html

https://www.cnblogs.com/qiu-hua/p/14056780.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM