舉例子:
hive> select * from zz0;
111111
222222
888888
hive> select * from zz1;
111111
333333
444444
888888
hive> select * from zz0 join zz1 on zz0.uid = zz1.uid;
111111 111111
888888 888888
hive> select * from zz0 left outer join zz1 on zz0.uid = zz1.uid;
111111 111111
222222 NULL
888888 888888
hive> select * from zz0 right outer join zz1 on zz0.uid = zz1.uid;
NULL
111111 111111
NULL 333333
NULL 444444
888888 888888
hive> select * from zz0 full outer join zz1 on zz0.uid = zz1.uid;
NULL
111111 111111
222222 NULL
NULL 333333
NULL 444444
888888 888888
hive> select * from zz0 left semi join zz1 on zz0.uid = zz1.uid;
111111 111111
888888 888888
寫好Hive 程序的五個提示
使用Hive可以高效而又快速地編寫復雜的MapReduce查詢邏輯。但是某些情況下,因為不熟悉數據特性,或沒有遵循Hive的優化約定,Hive計算任務會變得非常低效,甚至無法得到結果。一個”好”的Hive程序仍然需要對Hive運行機制有深入的了解。
有一些大家比較熟悉的優化約定包括:Join中需要將大表寫在靠右的位置;盡量使用UDF而不是transfrom……諸如此類。下面討論5個性能和邏輯相關的問題,幫助你寫出更好的Hive程序。
全排序
Hive的排序關鍵字是SORT BY,它有意區別於傳統數據庫的ORDER BY也是為了強調兩者的區別–SORT BY只能在單機范圍內排序。考慮以下表定義:
CREATE TABLE if not exists t_order( id int, -- 訂單編號 sale_id int, -- 銷售ID customer_id int, -- 客戶ID product _id int, -- 產品ID amount int -- 數量 ) PARTITIONED BY (ds STRING);
在表中查詢所有銷售記錄,並按照銷售ID和數量排序:
set mapred.reduce.tasks=2; Select sale_id, amount from t_order Sort by sale_id, amount;
這一查詢可能得到非期望的排序。指定的2個reducer分發到的數據可能是(各自排序):
Reducer1:
Sale_id | amount 0 | 100 1 | 30 1 | 50 2 | 20
Reducer2:
Sale_id | amount 0 | 110 0 | 120 3 | 50 4 | 20
因為上述查詢沒有reduce key,hive會生成隨機數作為reduce key。這樣的話輸入記錄也隨機地被分發到不同reducer機器上去了。為了保證reducer之間沒有重復的sale_id記錄,可以使用DISTRIBUTE BY關鍵字指定分發key為sale_id。改造后的HQL如下:
set mapred.reduce.tasks=2; Select sale_id, amount from t_order Distribute by sale_id Sort by sale_id, amount;
這樣能夠保證查詢的銷售記錄集合中,銷售ID對應的數量是正確排序的,但是銷售ID不能正確排序,原因是hive使用hadoop默認的HashPartitioner分發數據。
這就涉及到一個全排序的問題。解決的辦法無外乎兩種:
1.) 不分發數據,使用單個reducer:
set mapred.reduce.tasks=1;
這一方法的缺陷在於reduce端成為了性能瓶頸,而且在數據量大的情況下一般都無法得到結果。但是實踐中這仍然是最常用的方法,原因是通常排序的查詢是為了得到排名靠前的若干結果,因此可以用limit子句大大減少數據量。使用limit n后,傳輸到reduce端(單機)的數據記錄數就減少到n* (map個數)。
2.) 修改Partitioner,這種方法可以做到全排序。這里可以使用Hadoop自帶的TotalOrderPartitioner(來自於Yahoo!的TeraSort項目),這是一個為了支持跨reducer分發有序數據開發的Partitioner,它需要一個SequenceFile格式的文件指定分發的數據區間。如果我們已經生成了這一文件(存儲在/tmp/range_key_list,分成100個reducer),可以將上述查詢改寫為
set mapred.reduce.tasks=100; set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner; set total.order.partitioner.path=/tmp/ range_key_list; Select sale_id, amount from t_order Cluster by sale_id Sort by amount;
有很多種方法生成這一區間文件(例如hadoop自帶的o.a.h.mapreduce.lib.partition.InputSampler工具)。這里介紹用Hive生成的方法,例如有一個按id有序的t_sale表:
CREATE TABLE if not exists t_sale ( id int, name string, loc string );
則生成按sale_id分發的區間文件的方法是:
create external table range_keys(sale_id int) row format serde 'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe' stored as inputformat 'org.apache.hadoop.mapred.TextInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat' location '/tmp/range_key_list'; insert overwrite table range_keys select distinct sale_id from source t_sale sampletable(BUCKET 100 OUT OF 100 ON rand()) s sort by sale_id;
生成的文件(/tmp/range_key_list目錄下)可以讓TotalOrderPartitioner按sale_id有序地分發reduce處理的數據。區間文件需要考慮的主要問題是數據分發的均衡性,這有賴於對數據深入的理解。
怎樣做笛卡爾積?
當Hive設定為嚴格模式(hive.mapred.mode=strict)時,不允許在HQL語句中出現笛卡爾積,這實際說明了Hive對笛卡爾積支持較弱。因為找不到Join key,Hive只能使用1個reducer來完成笛卡爾積。
當然也可以用上面說的limit的辦法來減少某個表參與join的數據量,但對於需要笛卡爾積語義的需求來說,經常是一個大表和一個小表的Join操作,結果仍然很大(以至於無法用單機處理),這時MapJoin才是最好的解決辦法。
MapJoin,顧名思義,會在Map端完成Join操作。這需要將Join操作的一個或多個表完全讀入內存。
MapJoin的用法是在查詢/子查詢的SELECT關鍵字后面添加/*+ MAPJOIN(tablelist) */提示優化器轉化為MapJoin(目前Hive的優化器不能自動優化MapJoin)。其中tablelist可以是一個表,或以逗號連接的表的列表。tablelist中的表將會讀入內存,應該將小表寫在這里。
PS:有用戶說MapJoin在子查詢中可能出現未知BUG。在大表和小表做笛卡爾積時,規避笛卡爾積的方法是,給Join添加一個Join key,原理很簡單:將小表擴充一列join key,並將小表的條目復制數倍,join key各不相同;將大表擴充一列join key為隨機數。
怎樣寫exist in子句?
Hive不支持where子句中的子查詢,SQL常用的exist in子句需要改寫。這一改寫相對簡單。考慮以下SQL查詢語句:
SELECT a.key, a.value FROM a WHERE a.key in (SELECT b.key FROM B);
可以改寫為
SELECT a.key, a.value FROM a LEFT OUTER JOIN b ON (a.key = b.key) WHERE b.key <> NULL;
一個更高效的實現是利用left semi join改寫為:
SELECT a.key, a.val FROM a LEFT SEMI JOIN b on (a.key = b.key);
left semi join是0.5.0以上版本的特性。
Hive怎樣決定reducer個數?
Hadoop MapReduce程序中,reducer個數的設定極大影響執行效率,這使得Hive怎樣決定reducer個數成為一個關鍵問題。遺憾的是Hive的估計機制很弱,不指定reducer個數的情況下,Hive會猜測確定一個reducer個數,基於以下兩個設定:
1. hive.exec.reducers.bytes.per.reducer(默認為1000^3)
2. hive.exec.reducers.max(默認為999)
計算reducer數的公式很簡單:
N=min(參數2,總輸入數據量/參數1)
通常情況下,有必要手動指定reducer個數。考慮到map階段的輸出數據量通常會比輸入有大幅減少,因此即使不設定reducer個數,重設參數2還是必要的。依據Hadoop的經驗,可以將參數2設定為0.95*(集群中TaskTracker個數)。
合並MapReduce操作
Multi-group by
Multi-group by是Hive的一個非常好的特性,它使得Hive中利用中間結果變得非常方便。例如,
FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b ON (a.userid = b.userid and a.ds='2009-03-20' ) ) subq1 INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20') SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20') SELECT subq1.school, COUNT(1) GROUP BY subq1.school
上述查詢語句使用了Multi-group by特性連續group by了2次數據,使用不同的group by key。這一特性可以減少一次MapReduce操作。
Multi-distinct
Multi-distinct是淘寶開發的另一個multi-xxx特性,使用Multi-distinct可以在同一查詢/子查詢中使用多個distinct,這同樣減少了多次MapReduce操作。