一、Hive 執行過程實例分析
1、join
對於 join 操作:
SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON (pv.userid = u.userid);
執行的最后結果條數: page_view 表中的 userid 數目 * user 表中的 userid 數目
實現過程:
Map:
(1)以 JOIN ON 條件中的列作為 Key,如果有多個列,則 Key 是這些列的組合
(2)以 JOIN 之后所關心的列作為 Value,當有多個列時, Value 是這些列的組合。在 Value 中還會包含表的 Tag 信息,用於標明此 Value 對應於哪個表。
(3) 按照 Key 進行排序。
Shuffle:
(1) 根據 Key 的值進行 Hash,並將 Key/Value 對按照 Hash 值推至不同對 Reduce 中。
Reduce:
(1) Reducer 根據 Key 值進行 Join 操作,並且通過 Tag 來識別不同的表中的數據。
具體實現過程:
2、group by
對於 group by:
SELECT pageid, age, count(1) FROM pv_users GROUP BY pageid, age;
3、distinct
對於 distinct:
SELECT age, count(distinct pageid) FROM pv_users GROUP BY age;
按照 age 分組,然后統計每個分組里面的不重復的 pageid 有多少個。
二、hive優化策略
1、hadoop框架計算特性
(1) 數據量大不是問題,數據傾斜是個問題。
(2) jobs 數比較多的作業運行效率相對比較低,比如即使有幾百行的表,如果多次關聯多次匯總,產生十幾個 jobs,耗時很長。原因是 map reduce 作業初始化的時間是比較長的。
(3) sum,count,max,min 等 UDAF,不怕數據傾斜問題,hadoop 在 map 端的匯總合並優化,使數據傾斜不成問題。
(4)count(distinct userid),在數據量大的情況下,效率較低,如果是多 count(distinct userid,month)
效率更低,因為 count(distinct)是按 group by 字段分組,按 distinct 字段排序,一般這種分布 方式是很傾斜的,比如男 uv,女 uv,淘寶一天 30 億的 pv,如果按性別分組,分配 2 個 reduce, 每個 reduce 處理 15 億數據。
2、優化常用手段
(1)好的模型設計事半功倍。
(2)解決數據傾斜問題。
(3) 減少 job 數。
(4) 設置合理的 map reduce 的 task 數,能有效提升性能。 (比如, 10w+級別的計算,用 160 個 reduce,那是相當的浪費, 1 個足夠)。
(5) 了 解 數 據 分 布 , 自 己 動 手 解 決 數 據 傾 斜 問 題 是 個 不 錯 的 選 擇 。 set hive.groupby.skewindata=true;這是通用的算法優化,但算法優化有時不能適應特定業務背景, 開發人員了解業務,了解數據,可以通過業務邏輯精確有效的解決數據傾斜問題。
(6) 數據量較大的情況下,慎用 count(distinct), group by 容易產生傾斜問題。
(7) 對小文件進行合並,是行至有效的提高調度效率的方法,假如所有的作業設置合理的文 件數,對雲梯的整體調度效率也會產生積極的正向影響。
(8) 優化時把握整體,單個作業最優不如整體最優。
3、全排序
Cluster by: 對同一字段分桶並排序,不能和 sort by 連用
Distribute by: 分桶,保證同一字段值只存在一個結果當中
Sort by: 單機排序,單個 reduce 結果
Order by: 全局排序
一定要區分這四種排序的使用。
4、怎樣做笛卡爾積
當 Hive 設定為嚴格模式( hive.mapred.mode=strict)時,不允許在 HQL 語句中出現笛卡爾積,這實際說明了 Hive 對笛卡爾積支持較弱。因為找不到 Join key, Hive 只能使用 1 個 reducer來完成笛卡爾積。
當然也可以用上面說的 limit 的辦法來減少某個表參與 join 的數據量,但對於需要笛卡爾積語義的需求來說,經常是一個大表和一個小表的 Join 操作,結果仍然很大(以至於無法用單機處理),這時 MapJoin 才是最好的解決辦法。
MapJoin,顧名思義,會在 Map 端完成 Join 操作。這需要將 Join 操作的一個或多個表完全讀入內存。
MapJoin的用法是在查詢/子查詢的 SELECT關鍵字后面添加/*+ MAPJOIN(tablelist) */提示優化器轉化為 MapJoin (目前 Hive 的優化器不能自動優化 MapJoin)。其中 tablelist 可以是一個表,或以逗號連接的表的列表。 tablelist 中的表將會讀入內存,應該將小表寫在這里。
PS:有用戶說 MapJoin 在子查詢中可能出現未知 BUG。在大表和小表做笛卡爾積時,規避笛卡爾積的方法是,給 Join 添加一個 Join key, 原理很簡單:將小表擴充一列 join key,並將小表的條目復制數倍, join key 各不相同;將大表擴充一列 join key 為隨機數。
精髓就在於復制幾倍,最后就有幾個 reduce 來做, 而且大表的數據是前面小表擴張 key 值范圍里面隨機出來的,所以復制了幾倍 n,就相當於這個隨機范圍就有多大 n,那么相應的,大表的數據就被隨機的分為了 n 份。並且最后處理所用的 reduce 數量也是 n,而且也不會出現數據傾斜。
5、怎樣寫in/exists語句
雖然經過測驗, hive1.2.1 也支持 in 操作,但還是推薦使用 hive 的一個高效替代方案: left semi join
6、怎樣決定 reduce 的個數(設置reduce的個數比分桶數大於或等於,不能小於)
Hadoop MapReduce 程序中, reducer 個數的設定極大影響執行效率,這使得 Hive 怎樣決定reducer 個數成為一個關鍵問題。遺憾的是 Hive 的估計機制很弱,不指定 reducer 個數的情況下, Hive 會猜測確定一個 reducer 個數,基於以下兩個設定:
1. hive.exec.reducers.bytes.per.reducer(默認為 256000000)
2. hive.exec.reducers.max(默認為 1009)
3. mapreduce.job.reduces=-1(設置一個常量 reducetask 數量)
計算 reducer 數的公式很簡單:
N=min(參數 2,總輸入數據量/參數 1)
通常情況下,有必要手動指定 reducer 個數。考慮到 map 階段的輸出數據量通常會比輸入有大幅減少,因此即使不設定 reducer 個數,重設參數 2 還是必要的。依據 Hadoop 的經驗, 可以將參數 2 設定為 0.95*(集群中 datanode 個數)。
7、合並mapreduce操作
FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b ON (a.userid =
b.userid and a.ds='2009-03-20' ) ) subq1
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20')
SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20')
SELECT subq1.school, COUNT(1) GROUP BY subq1.school
上述查詢語句使用了 Multi-group by特性連續 group by了 2 次數據,使用不同的 group by key。這一特性可以減少一次 MapReduce 操作
8、Bucketing和Sampling
Bucket 是指將數據以指定列的值為 key 進行 hash, hash 到指定數目的桶中。這樣就可以支持高效采樣了。
如下例就是以 userid 這一列為 bucket 的依據,共設置 32 個 buckets
Sampling 可以在全體數據上進行采樣,這樣效率自然就低,它還是要去訪問所有數據。而
如果一個表已經對某一列制作了 bucket,就可以采樣所有桶中指定序號的某個桶,這就減
少了訪問量。
如下例所示就是采樣了 page_view 中 32 個桶中的第三個桶。
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);
9、Partition(分區,查詢時where條件可以是范圍)
Partition 就是分區。分區通過在創建表時啟用 partition by 實現,用來 partition 的維度並不是實際數據的某一列,具體分區的標志是由插入內容時給定的。當要查詢某一分區的內容時可以采用 where 語句,形似 where tablename.partition_key > a 來實現。
創建含分區的表
10、join
Join 原則: 在使用寫有 Join 操作的查詢語句時有一條原則:應該將條目少的表/子查詢放在Join 操作符的左邊。原因是在 Join 操作的 Reduce 階段,位於 Join 操作符左邊的表的內容會被加載進內存,將條目少的表放在左邊,可以有效減少發生 OOM 錯誤的幾率。對於一條語句中有多個 Join 的情況,如果 Join 的條件相同,比如查詢
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x ON (u.userid = x.userid);
如果 Join 的 key 相同,不管有多少個表,都會則會合並為一個 Map-Reduce一個 Map-Reduce 任務,而不是 ‘ n’ 個
在做 OUTER JOIN 的時候也是一樣
如果 join 的條件不相同,比如:
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x on (u.age = x.age);
Map-Reduce 的任務數目和 Join 操作的數目是對應的,上述查詢和以下查詢是等價的
INSERT OVERWRITE TABLE tmptable
SELECT * FROM page_view p JOIN user u
ON (pv.userid = u.userid);
INSERT OVERWRITE TABLE pv_users
SELECT x.pageid, x.age FROM tmptable x
JOIN newuser y ON (x.age = y.age);
11、小文件合並
文件數目過多,會給 HDFS 帶來壓力,並且會影響處理效率,可以通過合並 Map 和 Reduce的結果文件來消除這樣的影響:
hive.merge.mapfiles = true 是否和並 Map 輸出文件,默認為 True
hive.merge.mapredfiles = false 是否合並 Reduce 輸出文件,默認為 False
hive.merge.size.per.task = 256*1000*1000 合並文件的大小
12、group by
Map 端部分聚合:
並不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端
進行部分聚合,最后在 Reduce 端得出最終結果。
MapReduce 的 combiner 組件
參數包括:
hive.map.aggr = true 是否在 Map 端進行聚合,默認為 True
hive.groupby.mapaggr.checkinterval = 100000 在 Map 端進行聚合操作的條目數目
有數據傾斜的時候進行負載均衡:
hive.groupby.skewindata = false
在 MR 的第一個階段中, Map 的輸出結果集合會緩存到 maptaks 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的
Reduce 中,從而達到負載均衡的目的;第二個階段 再根據預處理的數據結果按照 Group ByKey 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce中),最后完成最終的聚合操作。