Hive數據傾斜原因和解決辦法(Data Skew)
什么是數據傾斜(Data Skew)?
數據傾斜是指在原本應該並行處理的數據集中,某一部分的數據顯著多於其它部分,從而使得該部分數據的處理速度成為整個數據集處理的瓶頸。
假設數據分布不均勻,某個key對應幾十萬條數據,其他key對應幾百條或幾十條數據,那么在處理數據的時候,大量相同的key會被分配(partition)到同一個分區里,造成"一個人累死,其他人閑死“的情況,具體表現在:有些任務很快就處理完了,而有些任務則遲遲未能處理完,導致整體任務最終耗時過長甚至是無法完成。
數據傾斜分為map端傾斜和reduce端傾斜。
1.1 操作:
1.2 原因:
1)、key分布不均勻
2)、業務數據本身的特性
3)、建表時考慮不周
4)、某些SQL語句本身就有數據傾斜
1.3 表現:
任務進度長時間維持在99%(或100%),查看任務監控頁面,發現只有少量(1個或幾個)reduce子任務未完成。因為其處理的數據量和其他reduce差異過大。
單一reduce的記錄數與平均記錄數差異過大,通常可能達到3倍甚至更多。 最長時長遠大於平均時長。
要真正了解數據傾斜,需要知道MapReduce的工作原理。(以下摘自:https://www.zhihu.com/question/27593027)
舉個 word count 的入門例子,它的map 階段就是形成 (“aaa”,1)的形式,然后在reduce 階段進行 value 相加,得出 “aaa” 出現的次數。若進行 word count 的文本有100G,其中 80G 全部是 “aaa” ,剩下 20G 是其余單詞,那就會形成 80G 的數據量交給同一個 reduce 進行相加,其余 20G 根據分配到不同 reduce 進行相加的情況。如此就造成了數據傾斜,臨床反應就是 reduce 跑到 99%,然后一直在原地等着那80G 的reduce 跑完。
2、數據傾斜的解決方案
2.1 參數調節:
hive.map.aggr=true
Map 端部分聚合,相當於Combiner
hive.groupby.skewindata=true
有數據傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計划會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。
2.2 SQL語句調節:
如何Join:
關於驅動表的選取,選用join key分布最均勻的表作為驅動表。
做好列裁剪和filter操作,以達到兩表做join的時候,數據量相對變小的效果。
當出現小文件過多,需要合並小文件。可以通過set hive.merge.mapfiles=true來解決。
大小表Join:
解決方法:小表在join左側,大表在右側,或使用mapjoin 將小表加載到內存中。然后再對比較大的表進行map操作。
大表Join大表
遇到需要進行join的但是關聯字段有數據為null,如表一的id需要和表二的id進行關聯,null值的reduce就會落到一個節點上
解決方法1:子查詢中過濾掉null值,id為空的不參與關聯
解決方法2:用case when給空值分配隨機的key值(字符串+rand())
count (distinct)大量相同特殊值
如果數據量非常大,執行如select a,count(distinct b) from t group by a;類型的SQL時,會出現數據傾斜的問題。
解決方法:使用sum()…group by代替。如select a,sum(1) from (select a, b from t group by a,b) group by a;
如果還有其他計算,如需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。
實例:
場景:如日志中,常會有信息丟失的問題,比如日志中的 user_id,如果取其中的 user_id 和 用戶表中的user_id 關聯,會碰到數據傾斜的問題。
解決方法1: user_id為空的不參與關聯
select * from log a
join users b
on a.user_id is not null
and a.user_id = b.user_id
union all
select * from log a
where a.user_id is null;
解決方法2 :賦與空值分新的key值
select *
from log a
left outer join users b
on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
結論:方法2比方法1效率更好,不但io少了,而且作業數也少了。解決方法1中 log讀取兩次,jobs是2。解決方法2 job數是1 。這個優化適合無效 id (比如 -99 , ’’, null 等) 產生的傾斜問題。把空值的 key 變成一個字符串加上隨機數,就能把傾斜的數據分到不同的reduce上 ,解決數據傾斜問題。
特殊情況特殊處理:
在業務邏輯優化效果的不大情況下,有些時候是可以將傾斜的數據單獨拿出來處理。最后union回去。
實例:
不同數據類型關聯產生數據傾斜:
場景:用戶表中user_id字段為int,log表中user_id字段既有string類型也有int類型。當按照user_id進行兩個表的Join操作時,默認的Hash操作會按int型的id來進行分配,這樣會導致所有string類型id的記錄都分配到一個Reducer中。
解決方法:把數字類型轉換成字符串類型
select * from users a
left outer join logs b
on a.usr_id = cast(b.user_id as string)
下面是另一種總結方法:
數據傾斜產生的原因:
1,map端:輸入文件的大小不均勻
2,reduce端:key分布不均勻,導致partition不均勻
數據傾斜的解決辦法:
1,當出現小文件過多時:合並小文件
可以通過set hive.merge.mapfiles=true來解決。
2,當group by分組的維度過少,每個維度的值過多時:調優參數
(1)設置在map階段做部分聚合操作
hive.map.aggr=true
效率更高但需要更多的內存。
(2)設置數據傾斜時負載均衡
hive.groupby.skewindata=true
當選項設定為true,生成的查詢計划會有兩個MRJob。第一個MRJob 中,Map的輸出結果集合會隨機分布到Reduce中,每個Reduce做部分聚合操作,並輸出結果,這樣處理的結果是相同的GroupBy Key有可能被分發到不同的Reduce中,從而達到負載均衡的目的;第二個MRJob再根據預處理的數據結果按照GroupBy Key分布到Reduce中(這個過程可以保證相同的GroupBy Key被分布到同一個Reduce中),最后完成最終的聚合操作。
起至關作用的是第(2)項,它分為了兩個mapreduce,第一個在shuffle過程中partition時隨機給key打標記,使其分布在不同的reduce上計算,但不能完成全部運算,所以需要第二次mapreduce整合回歸正常的shuffle,由於數據分布不均問題在第一次時得到改善,所以基本解決數據傾斜問題。
3,調節SQL語句
(1)關聯字段帶空值的兩表Join時:把空值的key變成一個字符串加上隨機數,這樣就可以把傾斜的數據分到不同的reduce上,此外由於空值關聯不起來,所以處理后並不影響最終結果。
(2)大小表Join時:使用map join讓小表(1000條以下的記錄條數) 先進內存,在map端完成reduce。
在 hive 中,能夠在 HQL 語句中直接指定該次查詢使用map join,具體做法是:在查詢/子查詢的SELECT關鍵字后面添加/*+ MAPJOIN(tablelist) */,提示優化器轉化為map join(早期的 Hive 版本的優化器是不能自動優化 map join 的)。
select /* +mapjoin(movies) */ a.title, b.rating from movies a join ratings b on a.movieid = b.movieid;
在 hive0.11 版本以后會自動開啟 map join 優化,由兩個參數控制:
set hive.auto.convert.join=true; //設置 MapJoin 優化自動開啟
set hive.mapjoin.smalltable.filesize=25000000 //設置小表不超過多大時開啟 mapjoin 優化
(3)大表Join大表時:把大表切分成小表,然后分別map join。
(4)count(distinct xx)時有大量相同的特殊值:用sum() group by的方式來替換count(distinct)完成計算。如:如select a,count(distinct b) from t group by a,用select a,sum(1) from (select a,b from t group by a,b) group by a替代。
(5)其他情況:如果傾斜的key數量比較少,那么將傾斜的數據單獨拿出來處理,最后union回去;如果傾斜的key數量比較多,那么給key增加隨機前/后綴,使得原來Key相同的數據變為Key不相同的數據,從而使傾斜的數據集分散到不同的任務中,在Join的另一側數據中,將與傾斜Key對應的部分數據和隨機前/后綴集作笛卡爾乘積,從而保證無論數據傾斜側傾斜Key如何加前綴,都能與之正常Join。