spark調優篇-數據傾斜(匯總)


數據傾斜

為什么會數據傾斜

spark 中的數據傾斜並不是說原始數據存在傾斜,原始數據都是一個一個的 block,大小都一樣,不存在數據傾斜;

而是指 shuffle 過程中產生的數據傾斜,由於不同的 key 對應的數據量不同導致不同 task 處理的數據量不同

 

注意:數據傾斜與數據過量不同,數據傾斜是某幾個 task 處理的數據量很大,數據過量是所有 task 處理的數據量都很大

 

數據傾斜的表現

大部分 task 都快速執行完畢,少數 task 執行緩慢,甚至報錯 OOM,即使最終運行完畢,也叫數據傾斜

 

數據傾斜的后果

1. 程序執行緩慢

2. 報錯 OOM

 

定位數據傾斜問題

1. 經驗:查看代碼中的 shuffle 算子,如 reduceByKey、countByKey、groupByKey、join 等,根據代碼邏輯判斷是否會出現數據傾斜

2. spark log 文件:log 記錄錯誤發生在代碼的哪一行,從而再根據自己的理解定位哪個 shuffle 算子

3. spark web UI:

 

解決方案

聚合原數據

避免 shuffle 

1. 既然數據傾斜是 shuffle 產生的,那沒有 shuffle 肯定就沒有數據傾斜了;

2. 沒有 shuffle 意味着在 原數據層面已經進行了 “shuffle”

這種方法適合 原數據方便實現大量數據合並操作 的數據源,確切的說就是 Hive;

比如在 Hive 中按照 key 進行分組,把每個 key 的 value 拼成一個 字符串(當然也可以是其他操作,根據需求定),那么這種數據進入 spark 后,無需 shuffle 了,直接 map 即可

 

縮小 key 粒度

比如把按 月 統計變成按 周 統計;

key 的粒度縮小,每個 key 對應的數據量降低,task 處理的數據量減少;

這樣可以保證程序順利執行,減少 OOM,但是可能引起更大的數據傾斜

 

增大 key 粒度

比如把按 周 統計變成按 月 統計;

key 的粒度增大,每個 key 對應的數據量增大,task 處理的數據量增大;

這樣可以降低數據傾斜的可能,但是 可能引起 OOM

 

過濾導致傾斜的 key

極端方法,扔掉導致傾斜的 key,不建議

 

提高 shuffle 操作的 reduce 並行度

增加 reduce 並行度其實就是增加 reduce 端 task 的數量, 這樣每個 task 處理的數據量減少,避免 oom

 

reduce 端並行度設置

1. 在很多 shuffle 算子中可直接指定並行度,如 reduceByKey(lambda x, y: x+y, 10)

  // 注意如果並行度大於 executor 數 x executor core 數,以小為准

2. 在 sparkSQL 中 groupBy、join 等 shuffle 操作,需要設定 spark 參數:spark.sql.shuffle.partitions

  // 該參數默認為 200

 

這樣做是有缺陷的

map 端不斷地寫入數據,reduce task 不斷地從指定位置讀取數據,如果 task 很多,讀取的速度增加,但是每個 key 對應的 reduce 處理的總量沒變,

所以它並沒有從根本上解決數據傾斜的問題,只是盡量去減少 reduce task 的數據量,適用於 較多 key 對應的數據量都很大的問題;

試想,如果只有 1 個 key 數據量較大,那么其他 key 高並行就是資源的浪費;

 

使用隨機 key 進行雙重聚合

雙重聚合,就是聚合多次,大致思路為:

由於一個 key 對應的數據量太大,我先給這個 key 加個隨機數,num_key,強行把一個 key 變成 多個 key,這樣每個 key 的數據量減小,然后按  num_key 進行聚合,聚合之后,把 num_key 再轉回 key,然后對 key 再次聚合;

把一次聚合變成多次聚合,如圖

這種方法適合於 reduceByKey、groupByKey 等算子,不適合 join 算子

 

將 reduce join 變成 map join 

正常情況下,join 會產生 shuffle 過程,而且是 reduce join,即先將相同 key 對應的 value 匯聚到一個 reduce task 中,再進行 join,如下圖

 

 

 

如果其中有一個 RDD 很小,就可以采用 廣播小 RDD + map 大 RDD 實現 join 功能,

此時沒有 shuffle 操作,自然不會有數據傾斜

 

注意:RDD 是不能廣播的,只有將 RDD 通過 collect 拉取到 Driver 內存中才能進行廣播

 

如果是兩個 RDD 都很大,則不適合這種方法 

 

sample 采樣對傾斜 key 單獨進行 join

思考一個問題,如果 RDD 中只有一個 key,shuffle 如何做?

在 spark 中,如果只有一個 key,shuffle 過程會將 所有 value 打散,分配到不同的 reduce task 中;

 

那么如果某個 RDD 中因為單個 key 導致數據傾斜,我們可以把這個 key 拿出來形成一個新的 RDD1,其他 key 形成一個 RDD2,然后用 RDD1 再與目標 RDD 進行 join 操作,根據 spark 運行機制,他會把 RDD1 打散分配到多個 task 中進行 join 操作;

這樣可以避免因 這個 key 對應的數據量太大導致數據傾斜,甚至 OOM

如果一個 RDD 中有多個 key 導致數據傾斜,則此法不適用

 

使用隨機數以及擴容進行 join

如果有多個 key 導致數據傾斜,無法通過采樣確定哪個 key 數據量大,只能把多個 key 都提出來,這樣效率貌似不太高;

此時我們只有一種解決方案:對一個 RDD 擴容,對另一個 RDD 稀釋,再進行 join

具體做法如圖

注意:如果 RDD 過大,進行 隨機數擴容后,可能產生 OOM

 

repartition

這個也是較常用的方法,它的本質就是減少 task 處理的數據量,一般發生在 shuffle 之前,當然它本身也是個 shuffle 操作

 

總結

數據傾斜 是 某些 task 處理了大量數據,所以數據傾斜很可能引起 OOM,數據傾斜和 OOM 的某些解決辦法可以通用;

解決數據傾斜的大致思路為:

1. 避免 shuffle,可在數據輸入 spark 之前進行 shuffle,或者 用 map 等替代 shuffle

  // 聚合原始數據、廣播小 RDD

2. 如果避免不了 shuffle,就減少 reduce task 的數據量

  // 縮小 key 粒度、增加 reduce task 數量

  // 通過隨機數多次聚合,減少每次聚合的數據量,針對 reduceByKey、groupByKey 等

3. 其他情況,單個 key 數據量大,多個 key 數據量大,針對 join


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM