數據傾斜
為什么會數據傾斜
spark 中的數據傾斜並不是說原始數據存在傾斜,原始數據都是一個一個的 block,大小都一樣,不存在數據傾斜;
而是指 shuffle 過程中產生的數據傾斜,由於不同的 key 對應的數據量不同導致不同 task 處理的數據量不同
注意:數據傾斜與數據過量不同,數據傾斜是某幾個 task 處理的數據量很大,數據過量是所有 task 處理的數據量都很大
數據傾斜的表現
大部分 task 都快速執行完畢,少數 task 執行緩慢,甚至報錯 OOM,即使最終運行完畢,也叫數據傾斜
數據傾斜的后果
1. 程序執行緩慢
2. 報錯 OOM
定位數據傾斜問題
1. 經驗:查看代碼中的 shuffle 算子,如 reduceByKey、countByKey、groupByKey、join 等,根據代碼邏輯判斷是否會出現數據傾斜
2. spark log 文件:log 記錄錯誤發生在代碼的哪一行,從而再根據自己的理解定位哪個 shuffle 算子
3. spark web UI:
解決方案
聚合原數據
避免 shuffle
1. 既然數據傾斜是 shuffle 產生的,那沒有 shuffle 肯定就沒有數據傾斜了;
2. 沒有 shuffle 意味着在 原數據層面已經進行了 “shuffle”
這種方法適合 原數據方便實現大量數據合並操作 的數據源,確切的說就是 Hive;
比如在 Hive 中按照 key 進行分組,把每個 key 的 value 拼成一個 字符串(當然也可以是其他操作,根據需求定),那么這種數據進入 spark 后,無需 shuffle 了,直接 map 即可
縮小 key 粒度
比如把按 月 統計變成按 周 統計;
key 的粒度縮小,每個 key 對應的數據量降低,task 處理的數據量減少;
這樣可以保證程序順利執行,減少 OOM,但是可能引起更大的數據傾斜
增大 key 粒度
比如把按 周 統計變成按 月 統計;
key 的粒度增大,每個 key 對應的數據量增大,task 處理的數據量增大;
這樣可以降低數據傾斜的可能,但是 可能引起 OOM
過濾導致傾斜的 key
極端方法,扔掉導致傾斜的 key,不建議
提高 shuffle 操作的 reduce 並行度
增加 reduce 並行度其實就是增加 reduce 端 task 的數量, 這樣每個 task 處理的數據量減少,避免 oom
reduce 端並行度設置
1. 在很多 shuffle 算子中可直接指定並行度,如 reduceByKey(lambda x, y: x+y, 10)
// 注意如果並行度大於 executor 數 x executor core 數,以小為准
2. 在 sparkSQL 中 groupBy、join 等 shuffle 操作,需要設定 spark 參數:spark.sql.shuffle.partitions
// 該參數默認為 200
這樣做是有缺陷的
map 端不斷地寫入數據,reduce task 不斷地從指定位置讀取數據,如果 task 很多,讀取的速度增加,但是每個 key 對應的 reduce 處理的總量沒變,
所以它並沒有從根本上解決數據傾斜的問題,只是盡量去減少 reduce task 的數據量,適用於 較多 key 對應的數據量都很大的問題;
試想,如果只有 1 個 key 數據量較大,那么其他 key 高並行就是資源的浪費;
使用隨機 key 進行雙重聚合
雙重聚合,就是聚合多次,大致思路為:
由於一個 key 對應的數據量太大,我先給這個 key 加個隨機數,num_key,強行把一個 key 變成 多個 key,這樣每個 key 的數據量減小,然后按 num_key 進行聚合,聚合之后,把 num_key 再轉回 key,然后對 key 再次聚合;
把一次聚合變成多次聚合,如圖
這種方法適合於 reduceByKey、groupByKey 等算子,不適合 join 算子
將 reduce join 變成 map join
正常情況下,join 會產生 shuffle 過程,而且是 reduce join,即先將相同 key 對應的 value 匯聚到一個 reduce task 中,再進行 join,如下圖
如果其中有一個 RDD 很小,就可以采用 廣播小 RDD + map 大 RDD 實現 join 功能,
此時沒有 shuffle 操作,自然不會有數據傾斜
注意:RDD 是不能廣播的,只有將 RDD 通過 collect 拉取到 Driver 內存中才能進行廣播
如果是兩個 RDD 都很大,則不適合這種方法
sample 采樣對傾斜 key 單獨進行 join
思考一個問題,如果 RDD 中只有一個 key,shuffle 如何做?
在 spark 中,如果只有一個 key,shuffle 過程會將 所有 value 打散,分配到不同的 reduce task 中;
那么如果某個 RDD 中因為單個 key 導致數據傾斜,我們可以把這個 key 拿出來形成一個新的 RDD1,其他 key 形成一個 RDD2,然后用 RDD1 再與目標 RDD 進行 join 操作,根據 spark 運行機制,他會把 RDD1 打散分配到多個 task 中進行 join 操作;
這樣可以避免因 這個 key 對應的數據量太大導致數據傾斜,甚至 OOM
如果一個 RDD 中有多個 key 導致數據傾斜,則此法不適用
使用隨機數以及擴容進行 join
如果有多個 key 導致數據傾斜,無法通過采樣確定哪個 key 數據量大,只能把多個 key 都提出來,這樣效率貌似不太高;
此時我們只有一種解決方案:對一個 RDD 擴容,對另一個 RDD 稀釋,再進行 join
具體做法如圖
注意:如果 RDD 過大,進行 隨機數擴容后,可能產生 OOM
repartition
這個也是較常用的方法,它的本質就是減少 task 處理的數據量,一般發生在 shuffle 之前,當然它本身也是個 shuffle 操作
總結
數據傾斜 是 某些 task 處理了大量數據,所以數據傾斜很可能引起 OOM,數據傾斜和 OOM 的某些解決辦法可以通用;
解決數據傾斜的大致思路為:
1. 避免 shuffle,可在數據輸入 spark 之前進行 shuffle,或者 用 map 等替代 shuffle
// 聚合原始數據、廣播小 RDD
2. 如果避免不了 shuffle,就減少 reduce task 的數據量
// 縮小 key 粒度、增加 reduce task 數量
// 通過隨機數多次聚合,減少每次聚合的數據量,針對 reduceByKey、groupByKey 等
3. 其他情況,單個 key 數據量大,多個 key 數據量大,針對 join