hive數據傾斜處理


Hive數據傾斜原因和解決辦法(Data Skew)

什么是數據傾斜(Data Skew)?

數據傾斜是指在原本應該並行處理的數據集中,某一部分的數據顯著多於其它部分,從而使得該部分數據的處理速度成為整個數據集處理的瓶頸。

假設數據分布不均勻,某個key對應幾十萬條數據,其他key對應幾百條或幾十條數據,那么在處理數據的時候,大量相同的key會被分配(partition)到同一個分區里,造成"一個人累死,其他人閑死“的情況,具體表現在:有些任務很快就處理完了,而有些任務則遲遲未能處理完,導致整體任務最終耗時過長甚至是無法完成。

數據傾斜分為map端傾斜和reduce端傾斜。

1.1 操作:

1.2 原因:

1)、key分布不均勻

2)、業務數據本身的特性

3)、建表時考慮不周

4)、某些SQL語句本身就有數據傾斜

 

1.3 表現:

任務進度長時間維持在99%(或100%),查看任務監控頁面,發現只有少量(1個或幾個)reduce子任務未完成。因為其處理的數據量和其他reduce差異過大。

單一reduce的記錄數與平均記錄數差異過大,通常可能達到3倍甚至更多。 最長時長遠大於平均時長。

 

 

要真正了解數據傾斜,需要知道MapReduce的工作原理。(以下摘自:https://www.zhihu.com/question/27593027

舉個 word count 的入門例子,它的map 階段就是形成 (“aaa”,1)的形式,然后在reduce 階段進行 value 相加,得出 “aaa” 出現的次數。若進行 word count 的文本有100G,其中 80G 全部是 “aaa” ,剩下 20G 是其余單詞,那就會形成 80G 的數據量交給同一個 reduce 進行相加,其余 20G 根據分配到不同 reduce 進行相加的情況。如此就造成了數據傾斜,臨床反應就是 reduce 跑到 99%,然后一直在原地等着那80G 的reduce 跑完。

 

2、數據傾斜的解決方案

2.1 參數調節:

hive.map.aggr=true

Map 端部分聚合,相當於Combiner

hive.groupby.skewindata=true

有數據傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計划會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。

2.2 SQL語句調節:

如何Join:

關於驅動表的選取,選用join key分布最均勻的表作為驅動表。
做好列裁剪和filter操作,以達到兩表做join的時候,數據量相對變小的效果。
當出現小文件過多,需要合並小文件。可以通過set hive.merge.mapfiles=true來解決。

大小表Join:

解決方法:小表在join左側,大表在右側,或使用mapjoin 將小表加載到內存中。然后再對比較大的表進行map操作。

大表Join大表

遇到需要進行join的但是關聯字段有數據為null,如表一的id需要和表二的id進行關聯,null值的reduce就會落到一個節點上

解決方法1:子查詢中過濾掉null值,id為空的不參與關聯

解決方法2:用case when給空值分配隨機的key值(字符串+rand())

count (distinct)大量相同特殊值

如果數據量非常大,執行如select a,count(distinct b) from t group by a;類型的SQL時,會出現數據傾斜的問題。

解決方法:使用sum()…group by代替。如select a,sum(1) from (select a, b from t group by a,b) group by a;
如果還有其他計算,如需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。

實例:

場景:如日志中,常會有信息丟失的問題,比如日志中的 user_id,如果取其中的 user_id 和 用戶表中的user_id 關聯,會碰到數據傾斜的問題。

解決方法1: user_id為空的不參與關聯

select * from log a
  join users b
  on a.user_id is not null
  and a.user_id = b.user_id
union all
select * from log a
  where a.user_id is null;

解決方法2 :賦與空值分新的key值

select *
  from log a
  left outer join users b
  on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;

結論:方法2比方法1效率更好,不但io少了,而且作業數也少了。解決方法1中 log讀取兩次,jobs是2。解決方法2 job數是1 。這個優化適合無效 id (比如 -99 , ’’, null 等) 產生的傾斜問題。把空值的 key 變成一個字符串加上隨機數,就能把傾斜的數據分到不同的reduce上 ,解決數據傾斜問題。

特殊情況特殊處理:

在業務邏輯優化效果的不大情況下,有些時候是可以將傾斜的數據單獨拿出來處理。最后union回去。

實例:

不同數據類型關聯產生數據傾斜:

場景:用戶表中user_id字段為int,log表中user_id字段既有string類型也有int類型。當按照user_id進行兩個表的Join操作時,默認的Hash操作會按int型的id來進行分配,這樣會導致所有string類型id的記錄都分配到一個Reducer中。

解決方法:把數字類型轉換成字符串類型

select * from users a
  left outer join logs b
  on a.usr_id = cast(b.user_id as string)

下面是另一種總結方法:

數據傾斜產生的原因:

1,map端:輸入文件的大小不均勻

2,reduce端:key分布不均勻,導致partition不均勻

 

數據傾斜的解決辦法:

1,當出現小文件過多時:合並小文件

可以通過set hive.merge.mapfiles=true來解決。

 

2,當group by分組的維度過少,每個維度的值過多時:調優參數

(1)設置在map階段做部分聚合操作

hive.map.aggr=true

 效率更高但需要更多的內存。

 

(2)設置數據傾斜時負載均衡

hive.groupby.skewindata=true

當選項設定為true,生成的查詢計划會有兩個MRJob。第一個MRJob 中,Map的輸出結果集合會隨機分布到Reduce中,每個Reduce做部分聚合操作,並輸出結果,這樣處理的結果是相同的GroupBy Key有可能被分發到不同的Reduce中,從而達到負載均衡的目的;第二個MRJob再根據預處理的數據結果按照GroupBy Key分布到Reduce中(這個過程可以保證相同的GroupBy Key被分布到同一個Reduce中),最后完成最終的聚合操作。

 

起至關作用的是第(2)項,它分為了兩個mapreduce,第一個在shuffle過程中partition時隨機給key打標記,使其分布在不同的reduce上計算,但不能完成全部運算,所以需要第二次mapreduce整合回歸正常的shuffle,由於數據分布不均問題在第一次時得到改善,所以基本解決數據傾斜問題。

 

3,調節SQL語句

(1)關聯字段帶空值的兩表Join時:把空值的key變成一個字符串加上隨機數,這樣就可以把傾斜的數據分到不同的reduce上,此外由於空值關聯不起來,所以處理后並不影響最終結果。

 

(2)大小表Join時:使用map join讓小表(1000條以下的記錄條數) 先進內存,在map端完成reduce。

在 hive 中,能夠在 HQL 語句中直接指定該次查詢使用map join,具體做法是:在查詢/子查詢的SELECT關鍵字后面添加/*+ MAPJOIN(tablelist) */,提示優化器轉化為map join(早期的 Hive 版本的優化器是不能自動優化 map join 的)。

select /* +mapjoin(movies) */ a.title, b.rating from movies a join ratings b on a.movieid = b.movieid;

在 hive0.11 版本以后會自動開啟 map join 優化,由兩個參數控制:

set hive.auto.convert.join=true; //設置 MapJoin 優化自動開啟
set hive.mapjoin.smalltable.filesize=25000000 //設置小表不超過多大時開啟 mapjoin 優化

 

(3)大表Join大表時:把大表切分成小表,然后分別map join。

 

(4)count(distinct xx)時有大量相同的特殊值:用sum() group by的方式來替換count(distinct)完成計算。如:如select a,count(distinct b) from t group by a,用select a,sum(1) from (select a,b from t group by a,b) group by a替代。

 

(5)其他情況:如果傾斜的key數量比較少,那么將傾斜的數據單獨拿出來處理,最后union回去;如果傾斜的key數量比較多,那么給key增加隨機前/后綴,使得原來Key相同的數據變為Key不相同的數據,從而使傾斜的數據集分散到不同的任務中,在Join的另一側數據中,將與傾斜Key對應的部分數據和隨機前/后綴集作笛卡爾乘積,從而保證無論數據傾斜側傾斜Key如何加前綴,都能與之正常Join。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM