spark數據傾斜處理
- 危害:
- 當出現數據傾斜時,小量任務耗時遠高於其它任務,從而使得整體耗時過大,未能充分發揮分布式系統的並行計算優勢。
- 當發生數據傾斜時,部分任務處理的數據量過大,可能造成內存不足使得任務失敗,並進而引進整個應用失敗。
表現:同一個stage的多個task執行時間不一致。
- 原因:
- 機器本身性能,導致速度不一致。
- 數據來源的問題:
- 從數據源直接讀取。如讀取HDFS,Kafka
- 讀取上一個Stage的Shuffle數據
如何緩解/消除數據傾斜
-
kafka:取決於kafka topic中消息在partition是否分布均勻。隨機partition沒問題,其他沒有分布均勻的情況需要另外處理。
-
hdfs: 使文件可切分或者保證各文件的數據量基本一致。
-
shuffle: 調整並行度分散同一個Task的不同Key。
Spark在做Shuffle時,默認使用HashPartitioner(非Hash Shuffle)對數據進行分區。如果並行度設置的不合適,可能造成大量不相同的Key對應的數據被分配到了同一個Task上,造成該Task所處理的數據遠大於其它Task,從而造成數據傾斜。
簡單說,數據根據並行度做hashpartionter時,可能沒分配均勻,此時可以通過調整並行度來改變。
優勢
實現簡單,可在需要Shuffle的操作算子上直接設置並行度或者使用spark.default.parallelism設置。如果是Spark SQL,還可通過SET spark.sql.shuffle.partitions=[num_tasks]設置並行度。可用最小的代價解決問題。一般如果出現數據傾斜,都可以通過這種方法先試驗幾次,如果問題未解決,再嘗試其它方法。
劣勢
適用場景少,只能將分配到同一Task的不同Key分散開,但對於同一Key傾斜嚴重的情況該方法並不適用。並且該方法一般只能緩解數據傾斜,沒有徹底消除問題。從實踐經驗來看,其效果一般。
自定義Partitioner
原理:
使用自定義的Partitioner(默認為HashPartitioner),將原本被分配到同一個Task的不同Key分配到不同Task。
將Reduce side Join轉變為Map side Join
原理:
通過Spark的Broadcast機制,將Reduce側Join轉化為Map側Join,避免Shuffle從而完全消除Shuffle帶來的數據傾斜
為skew的key增加隨機前/后綴
原理:
為數據量特別大的Key增加隨機前/后綴,使得原來Key相同的數據變為Key不相同的數據,從而使傾斜的數據集分散到不同的Task中,徹底解決數據傾斜問題。Join另一則的數據中,與傾斜Key對應的部分數據,與隨機前綴集作笛卡爾乘積,從而保證無論數據傾斜側傾斜Key如何加前綴,都能與之正常Join。
大表隨機添加N種隨機前綴,小表擴大N倍
原理:
如果出現數據傾斜的Key比較多,上一種方法將這些大量的傾斜Key分拆出來,意義不大。此時更適合直接對存在數據傾斜的數據集全部加上隨機前綴,然后對另外一個不存在嚴重數據傾斜的數據集整體與隨機前綴集作笛卡爾乘積(即將數據量擴大N倍)
為什么小表要擴大?
因為兩個表的數據本來是可以join上,現在加上大表加上隨機前綴,小表也需要加上同樣的前綴才能join上。
優勢
對大部分場景都適用,效果不錯。
劣勢
需要將一個數據集整體擴大N倍,會增加資源消耗。
總結
避免spark數據傾斜的辦法,就是在了解其執行機制的基礎上,盡可能的分散key。針對不同的情況,采取不同的策略。