map/reduce程序執行時,reduce節點大部分執行完畢,但是有一個或者幾個reduce節點運行很慢,導致整個程序的處理時間很長,這是因為某一個key的條數比其他key多很多(有時是百倍或者千倍之多),這條key所在的reduce節點所處理的數據量比其他節點就大很多,從而導致某幾個節點遲遲運行不完,此稱之為數據傾斜。
1.萬能膏葯:hive.groupby.skewindata=true
當選項設定為 true,生成的查詢計划會有兩個 MR Job。
第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的
第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。
1.1.參數調優:hive.map.aggr=true. Map端部分聚合,相當於Combiner
2. 大小表關聯:
可以使用Map Join讓小的維度表(1000條以下的記錄條數)先進內存。在map端完成reduce.
3. 大表和大表關聯:
把空值NULL的key變成一個字符串加上隨機數,把傾斜的數據分到不同的reduce上,由於null值關聯不上,處理后並不影響最終結果。例如Demo1.空值數據傾斜
4. count distinct大量相同特殊值:
count distinct時,將值為空的情況單獨處理。如果是計算count distinct,可以不用處理,直接過濾,在最后結果中加1。如果還有其他計算,需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。
Demo1.空值數據傾斜
場景:如日志中,常會有信息丟失的問題,比如全網日志中的user_id,如果取其中的user_id和bmw_users關聯,會碰到數據傾斜的問題。
解決方法1: user_id為空的不參與關聯
Select * From log a Join bmw_users b On a.user_id is not nullAnd a.user_id = b.user_id Union all Select * from log a where a.user_id is null;
解決方法2 :賦予空值新的key值
Select * from log a left outer Join bmw_users b on case when a.user_id is null then concat(‘dp_hive’,rand())
else a.user_id end = b.user_id;
結論:方法2比方法1效率更好,不但io少了,而且作業數也少了。
方法1的log讀取兩次,jobs是2。方法2的job數是1。這個優化適合無效id(比如-99,’’,null等)產生的傾斜問題。
把空值的key變成一個字符串加上隨機數,就能把傾斜的數據分到不同的reduce上 ,解決數據傾斜問題。
Demo2.不同數據類型關聯產生數據傾斜
場景:一張表s8的日志,每個商品一條記錄,要和商品表關聯。但關聯卻碰到傾斜的問題,s8的日志中有字符串商品id,也有數字的商品id,類型是string的,但商品中的數字id是bigint的。
問題原因:把s8的商品id轉成數字id做hash來分配reduce,所以字符串id的s8日志,都到一個reduce上了,解決的方法驗證了這個猜測。
解決方法:把數字類型轉換成字符串類型
Select * from s8_log a Left outer join r_auction_auctions b On a.auction_id = cast(b.auction_id as string);
Demo3.大表Join的數據偏斜
MapReduce編程模型下開發代碼需要考慮數據偏斜的問題,Hive代碼也是一樣。數據偏斜的原因包括以下兩點:
1. Map輸出key數量極少,導致reduce端退化為單機作業。
2. Map輸出key分布不均,少量key對應大量value,導致reduce端單機瓶頸。
Hive中我們使用MapJoin解決數據偏斜的問題,即將其中的某個小表(全量)分發到所有Map端的內存進行Join,從而避免了reduce。這要求分發的表可以被全量載入內存。
極限情況下,Join兩邊的表都是大表,就無法使用MapJoin。這種問題最為棘手,目前已知的解決思路有兩種:
1. 如果是上述情況1,考慮先對Join中的一個表去重,以此結果過濾無用信息。
這樣一般會將其中一個大表轉化為小表,再使用MapJoin 。一個實例是廣告投放效果分析,
例如將廣告投放者信息表i中的信息填充到廣告曝光日志表w中,使用投放者id關聯。因為實際廣告投放者數量很少(但是投放者信息表i很大),因此可以考慮先在w表中去重查詢所有實際廣告投放者id列表,以此Join過濾表i,這一結果必然是一個小表,就可以使用MapJoin。
select /*+mapjoin(x)*/* from log a left outer join ( select /*+mapjoin(c)*/d.* from ( select distinct user_id from log ) c join users d on c.user_id = d.user_id ) x on a.user_id = b.user_id;
2. 如果是上述情況2,考慮切分Join中的一個表為多片,以便將切片全部載入內存,然后采用多次MapJoin得到結果。
一個實例是商品瀏覽日志分析,例如將商品信息表i中的信息填充到商品瀏覽日志表w中,使用商品id關聯。但是某些熱賣商品瀏覽量很大,造成數據偏斜。例如,以下語句實現了一個inner join邏輯,將商品信息表拆分成2個表:
select * from( select w.id, w.time, w.amount, i1.name, i1.loc, i1.cat from w left outer join i sampletable(1 out of 2 on id) i1) union all( select w.id, w.time, w.amount, i2.name, i2.loc, i2.cat from w left outer join i sampletable(1 out of 2 on id) i2);