何謂數據傾斜?數據傾斜指的是,並行處理的數據集 中,某一部分(如Spark的一個Partition)的數據顯著多於其它部分,從而使得該部分的處理速度成為整個數據集處理的瓶頸。
表現為整體任務基本完成,但仍有少量子任務的reduce還在運行。
數據傾斜的原因:
1.join
一個表較小,但key集中,分發到一個或者幾個reduce上的數據遠高於平均值;
大表與大表關聯,但分桶的判斷字段0值或者空值過多,這些空值或者0值都由一個reduce處理
2.group by
分組的維度過少,每個維度的值過多,導致處理某值的reduce耗時很久
3.count distinct
特殊值過多,處理特殊值耗時
綜上所述原因就是:
key值分布不均,數據本身的原因(特殊值過多),sql語句不合理,表建的不合理
解決數據傾斜的方法:
1.參數配置
hive> set hive.map.aggr=true; 設置map端聚合
hive> set hive.groupby.skewindata=true; 當數據傾斜時,進行負責均衡
2.語句優化
小表與大表join時,使用mapjoin 將小表加載到內存中。
scala> hivecon.sql("select /*MAPJOIN(tbsex)*/ b.custname,b.nianling,a.sexname from tbsex a join cust b on a.id=b.sex").show
+---------------+--------+-------+
| custname|nianling|sexname|
+---------------+--------+-------+
| hello| 100| man|
| wangwu| 47| man|
| liuqin| 56| man|
| hello| 100| man|
| wangwu| 47| man|
| liuqin| 56| man|
| nihao| 5| woman|
| mahuateng| 1001| woman|
| liuyang| 32| woman|
| nihao| 5| woman|
| zhangsan| 20| woman|
| wangwu| 85| woman|
|tianyt_touch100| 50| woman|
| mahuateng| 1001| woman|
| liuyang| 32| woman|
| zhangsan| 20| woman|
| wangwu| 85| woman|
|tianyt_touch100| 50| woman|
+---------------+--------+-------+
如果關聯的key存在空值,可以過濾掉空值再進行關聯也可以為空值賦一個隨機值
scala> hivecon.sql("select b.custname,b.nianling,a.sexname from tbsex a join cust b on b.sex is not null and a.id=b.sex").show
+---------------+--------+-------+
| custname|nianling|sexname|
+---------------+--------+-------+
| hello| 100| man|
| wangwu| 47| man|
| liuqin| 56| man|
| hello| 100| man|
| wangwu| 47| man|
| liuqin| 56| man|
| nihao| 5| woman|
| mahuateng| 1001| woman|
| liuyang| 32| woman|
| nihao| 5| woman|
| zhangsan| 20| woman|
| wangwu| 85| woman|
|tianyt_touch100| 50| woman|
| mahuateng| 1001| woman|
| liuyang| 32| woman|
| zhangsan| 20| woman|
| wangwu| 85| woman|
|tianyt_touch100| 50| woman|
+---------------+--------+-------+
把空值的 key 變成一個字符串加上隨機數,就能把傾斜的數據分到不同的reduce上 ,解決數據傾斜問題。 concat('hehe',rand())
count distinct 引起的數據傾斜,可以先去重后再進行統計
scala> hivecon.sql("select sex,count(distinct custname) from cust group by sex").show
+----+------------------------+
| sex|count(DISTINCT custname)|
+----+------------------------+
|null| 1|
| 1| 6|
| 0| 3|
+----+------------------------+
scala> hivecon.sql("select sex,count(1) from (select sex,custname from cust group by custname, sex) mm group by sex").show
+----+--------+
| sex|count(1)|
+----+--------+
|null| 1|
| 1| 6|
| 0| 3|
+----+--------+
3.map和reduce優化
小文件過多的時候合並小文件
hive> set hive.merge.mapfiles=true;
單個文件過大可以設置map的個數