什么是數據傾斜(Data Skew)?
數據傾斜是指在原本應該並行處理的數據集中,某一部分的數據顯著多於其它部分,從而使得該部分數據的處理速度成為整個數據集處理的瓶頸。
假設數據分布不均勻,某個key對應幾十萬條數據,其他key對應幾百條或幾十條數據,那么在處理數據的時候,大量相同的key會被分配(partition)到同一個分區里,造成"一個人累死,其他人閑死“的情況,具體表現在:有些任務很快就處理完了,而有些任務則遲遲未能處理完,導致整體任務最終耗時過長甚至是無法完成。
數據傾斜分為map端傾斜和reduce端傾斜。
要真正了解數據傾斜,需要知道MapReduce的工作原理。(以下摘自:https://www.zhihu.com/question/27593027)
舉個 word count 的入門例子,它的map 階段就是形成 (“aaa”,1)的形式,然后在reduce 階段進行 value 相加,得出 “aaa” 出現的次數。若進行 word count 的文本有100G,其中 80G 全部是 “aaa” ,剩下 20G 是其余單詞,那就會形成 80G 的數據量交給同一個 reduce 進行相加,其余 20G 根據分配到不同 reduce 進行相加的情況。如此就造成了數據傾斜,臨床反應就是 reduce 跑到 99%,然后一直在原地等着那80G 的reduce 跑完。
數據傾斜產生的原因:
1,map端:輸入文件的大小不均勻
2,reduce端:key分布不均勻,導致partition不均勻
數據傾斜的解決辦法:
1,當出現小文件過多時:合並小文件
可以通過set hive.merge.mapfiles=true來解決。
2,當group by分組的維度過少,每個維度的值過多時:調優參數
(1)設置在map階段做部分聚合操作
hive.map.aggr=true
效率更高但需要更多的內存。
(2)設置數據傾斜時負載均衡
hive.groupby.skewindata=true
當選項設定為true,生成的查詢計划會有兩個MRJob。第一個MRJob 中,Map的輸出結果集合會隨機分布到Reduce中,每個Reduce做部分聚合操作,並輸出結果,這樣處理的結果是相同的GroupBy Key有可能被分發到不同的Reduce中,從而達到負載均衡的目的;第二個MRJob再根據預處理的數據結果按照GroupBy Key分布到Reduce中(這個過程可以保證相同的GroupBy Key被分布到同一個Reduce中),最后完成最終的聚合操作。
起至關作用的是第(2)項,它分為了兩個mapreduce,第一個在shuffle過程中partition時隨機給key打標記,使其分布在不同的reduce上計算,但不能完成全部運算,所以需要第二次mapreduce整合回歸正常的shuffle,由於數據分布不均問題在第一次時得到改善,所以基本解決數據傾斜問題。
3,調節SQL語句
(1)關聯字段帶空值的兩表Join時:把空值的key變成一個字符串加上隨機數,這樣就可以把傾斜的數據分到不同的reduce上,此外由於空值關聯不起來,所以處理后並不影響最終結果。
(2)大小表Join時:使用map join讓小表(1000條以下的記錄條數) 先進內存,在map端完成reduce。
在 hive 中,能夠在 HQL 語句中直接指定該次查詢使用map join,具體做法是:在查詢/子查詢的SELECT關鍵字后面添加/*+ MAPJOIN(tablelist) */,提示優化器轉化為map join(早期的 Hive 版本的優化器是不能自動優化 map join 的)。
select /* +mapjoin(movies) */ a.title, b.rating from movies a join ratings b on a.movieid = b.movieid;
在 hive0.11 版本以后會自動開啟 map join 優化,由兩個參數控制:
set hive.auto.convert.join=true; //設置 MapJoin 優化自動開啟
set hive.mapjoin.smalltable.filesize=25000000 //設置小表不超過多大時開啟 mapjoin 優化
(3)大表Join大表時:把大表切分成小表,然后分別map join。
(4)count(distinct xx)時有大量相同的特殊值:用sum() group by的方式來替換count(distinct)完成計算。如:如select a,count(distinct b) from t group by a,用select a,sum(1) from (select a,b from t group by a,b) group by a替代。
(5)其他情況:如果傾斜的key數量比較少,那么將傾斜的數據單獨拿出來處理,最后union回去;如果傾斜的key數量比較多,那么給key增加隨機前/后綴,使得原來Key相同的數據變為Key不相同的數據,從而使傾斜的數據集分散到不同的任務中,在Join的另一側數據中,將與傾斜Key對應的部分數據和隨機前/后綴集作笛卡爾乘積,從而保證無論數據傾斜側傾斜Key如何加前綴,都能與之正常Join。