來源:https://mp.weixin.qq.com/s/_jZr9CIEtu92kE1r6XIFzA
導讀:HiveSQL是數據倉庫與數據分析過程中的必備技能,隨着數據量增加,這一技能越來越重要,熟練應用的同時會帶來效率的問題,
動輒十幾億的數據量如果處理不完善的話有可能導致一個作業運行幾個小時,更嚴重的還有可能因占用過多資源而引發生產問題,所以HQL優化就變得非常重要。
本文我們就深入HQL的原理中,探索HQL優化的方法和邏輯。
1 group by的計算原理
SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;
可以看到,group by本身不是全局變量,任務會被分到各個map中進行分組,然后再在reduce中聚合。
默認設置了hive.map.aggr=true,所以會在mapper端先group by一次,最后再把結果merge起來,為了減少reducer處理的數據量。注意看explain的mode是不一樣的。
mapper是hash,reducer是mergepartial。如果把hive.map.aggr=false,那將groupby放到reducer才做,他的mode是complete。
優化點:
Group by主要是面對數據傾斜的問題。
很多聚合操作可以現在map端進行,最后在Reduce端完成結果輸出:
Set hive.map.aggr = true;# 是否在Map端進行聚合,默認為true; Set hive.groupby.mapaggr.checkinterval = 1000000;# 在Map端進行聚合操作的條目數目;
當使用Group by有數據傾斜的時候進行負載均衡:
Set hive.groupby.skewindata = true;# hive自動進行負載均衡;
策略就是把MR任務拆分成兩個MR Job:第一個先做預匯總,第二個再做最終匯總;
第一個Job:
Map輸出結果集中緩存到maptask中,每個Reduce做部分聚合操作,並輸出結果,這樣處理的結果是相同Group by Key有可能被分到不同的reduce中,從而達到負載均衡的目的;
第二個Job:
根據第一階段處理的數據結果按照group by key分布到reduce中,保證相同的group by key分布到同一個reduce中,最后完成最終的聚合操作。
2 join的優化原理
SELECT a.id,a.dept,b.age FROM a join b ON (a.id = b.id);
1)Map階段:
讀取源表的數據,Map輸出時候以Join on條件中的列為key,如果Join有多個關聯鍵,則以這些關聯鍵的組合作為key;
Map輸出的value為join之后所關心的(select或者where中需要用到的)列;同時在value中還會包含表的Tag信息,用於標明此value對應哪個表;
按照key進行排序;
2)Shuffle階段:
根據key的值進行hash,並將key/value按照hash值推送至不同的reduce中,這樣確保兩個表中相同的key位於同一個reduce中。
3)Reduce階段:
根據key的值完成join操作,期間通過Tag來識別不同表中的數據。
在多表join關聯時:
如果 Join 的 key 相同,不管有多少個表,都會合並為一個Map-Reduce,例如:
SELECT pv.pageid, u.age FROM page_view p JOIN user u ON (pv.userid = u.userid) JOIN newuser x ON (u.userid = x.userid);
如果 Join 的 key不同,Map-Reduce 的任務數目和 Join 操作的數目是對應的,例如:
SELECT pv.pageid, u.age FROM page_view p JOIN user u ON (pv.userid = u.userid) JOIN newuser x on (u.age = x.age);
優化點:
1)應該將條目少的表/子查詢放在 Join 操作符的左邊。
2)我們知道文件數目大小,容易在文件存儲端造成瓶頸,給HDFS 帶來壓力,影響處理效率。對此,可以通過合並Map和Reduce的結果文件來消除這樣的影響。用於設置合並屬性的參數有
合並Map輸出文件:hive.merge.mapfiles=true(默認值為真) 合並Reduce端輸出文件:hive.merge.mapredfiles=false(默認值為假) 合並文件的大小:hive.merge.size.per.task=256*1000*1000(默認值為 256000000)
3)Common join即普通的join,性能較差,因為涉及到了shuffle的過程(Hadoop/spark開發的過程中,有一個原則:能避免不使用shuffle就不使用shuffle),可以轉化成map join。
hive.auto.convert.join=true;# 表示將運算轉化成map join方式
使用的前提條件是需要的數據在 Map 的過程中可以訪問到。
1)啟動Task A:Task A去啟動一個MapReduce的local task;通過該local task把small table data的數據讀取進來;之后會生成一個HashTable Files;之后將該文件加載到分布式緩存(Distributed Cache)中來;
2)啟動MapJoin Task:去讀大表的數據,每讀一個就會去和Distributed Cache中的數據去關聯一次,關聯上后進行輸出。
整個階段,沒有reduce 和 shuffle,問題在於如果小表過大,可能會出現OOM。
3 Union與union all優化原理
union將多個結果集合並為一個結果集,結果集去重。代碼為:
select id,name from t1 union select id,name from t2 union select id,name from t3
對應的運行邏輯為:
union all將多個結果集合並為一個結果集,結果集不去重。使用時多與group by結合使用,代碼為:
select all.id, all.name from( select id,name from t1 union all select id,name from t2 union all select id,name from t3 )all group by all.id ,all.name
從上面的兩個邏輯圖可以看到,第二種寫法性能要好。union寫法每兩份數據都要先合並去重一次,再和另一份數據合並去重,會產生較多次的reduce。第二種寫法直接將所有數據合並再一次性去重。
對union all的操作除了與group by結合使用還有一些細節需要注意:
1)對 union all 優化只局限於非嵌套查詢。
原代碼:job有3個:
SELECT * FROM ( SELECT * FROM t1 GROUP BY c1,c2,c3 UNION ALL SELECT * FROM t2 GROUP BY c1,c2,c3 )t3 GROUP BY c1,c2,c3
這樣的結構是不對的,應該修改為:job有1個:
這樣的修改可以減少job數量,進而提高效率。
2)語句中出現count(distinct …)結構時:
原代碼為:
SELECT * FROM ( SELECT * FROM t1 UNION ALL SELECT c1,c2,c3,COUNT(DISTINCT c4) FROM t2 GROUP BY c1,c2,c3 ) t3 GROUP BY c1,c2,c3;
修改為:(采用臨時表消滅 COUNT(DISTINCT)作業不但能解決傾斜問題,還能有效減少jobs)。
INSERT t4 SELECT c1,c2,c3,c4 FROM t2 GROUP BY c1,c2,c3; SELECT c1,c2,c3,SUM(income),SUM(uv) FROM ( SELECT c1,c2,c3,income,0 AS uv FROM t1 UNION ALL SELECT c1,c2,c3,0 AS income,1 AS uv FROM t2 ) t3 GROUP BY c1,c2,c3;