查詢操作
group by、 order by、 join 、 distribute by、 sort by、 clusrer by、 union all
底層的實現
mapreduce
常見的聚合操作
count計數
count(*) 所有值不全為NULL時,加1操作 count(1) 不管有沒有值,只要有這條記錄,值就加1 count(col) col列里面的值為null,值不會加1,這個列里面的值不為NULL,才加1
sum求和
sum(可轉成數字的值) 返回bigint
avg求平均值
avg(可轉成數字的值)返回double
distinct不同值個數
count(distinct col)
order by
按照某些字段排序 樣例
select col1,other... from table where conditio order by col1,col2 [asc|desc]
注意 order by后面可以有多列進行排序,默認按字典排序 order by為全局排序 order by需要reduce操作,且只有一個reduce,與配置無關。數據量很大時,慎用。
執行流程
從表中讀取數據,執行where條件,以col1,col2列的值做成組合key,其他列值作為value,然后在把數據傳到同一個reduce中,根據需要的排序方式進行。
group by
按照某些字段的值進行分組,有相同值放到一起。
樣例
select col1 [,col2] ,count(1),sel_expr(聚合操作)from table where condition -->Map端執行 group by col1 [,col2] -->Reduce端執行 [having] -->Reduce端執行
注意 select后面非聚合列,必須出現在group by中 select后面除了普通列就是一些聚合操作 group by后面也可以跟表達式,比如substr(col)
特性 使用了reduce操作,受限於reduce數量,設置reduce參數mapred.reduce.tasks
輸出文件個數與reduce數相同,文件大小與reduce處理的數據量有關。
問題 網絡負載過重 數據傾斜,優化參數hive.groupby.skewindata
為true,會啟動一個優化程序,避免數據傾斜。
執行流程
從表中讀取數據,執行where條件,以col1列分組,把col列的內容作為key,其他列值作為value,上傳到reduce,在reduce端執行聚合操作和having過濾。
eg:
set mapred.reduce.tasks=5; select * from TabOrder order by ch asc,num desc; set mapred.reduce.tasks=3; select ch ,count(1) as num from TabOrder group by ch; set hive.groupby.skewindata = true; select ch ,count(1) as num from TabOrder group by ch having count(1)>2; select col from tablename group by col; <==> select distinct col from tablename;
Join表連接
兩個表m,n之間按照on條件連接,m中的一條記錄和n中的一條記錄組成一條新記錄。
join等值連接(內連接),只有某個值在m和n中同時存在時。
left outer join
左外連接,左邊表中的值無論是否在b中存在時,都輸出;右邊表中的值,只有在左邊表中存在時才輸出。
right outer join
和left outer join
相反。
left semi join
類似exists
。即查找a表中的數據,是否在b表中存在,找出存在的數據。
mapjoin:在map端完成join操作,不需要用reduce,基於內存做join,屬於優化操作。
select m.col as col1, m.col2 as col2, n.col3 as col3 from (select col1,col2 from,test where ... (map端執行) )m (左表) [left outer |right outer | left semi] join n (右表) on m.col=n.col where condition (reduced端執行) set hive.optimize.skewjoin=true;
讀取數據執行where條件,按col列分組,把col列的內容作為key,其他列作為value,傳到reduce,在reduce端執行連接操作和where過濾。
eg:
create table m( ch string, num string ) row format delimited fields terminated by '\t' lines terminated by '\n' stored as textfile; load data local inpath '/liguodong/hivedata/m' into table m; create table n( ch string, num string ) row format delimited fields terminated by '\t' lines terminated by '\n' stored as textfile; load data local inpath '/liguodong/hivedata/n' into table n; select * from m; select * from n; 內連接 select s.ch,s.num,t.num from (select ch,num from m)s join (select ch,num from n)t on s.ch=t.ch; 左外連接 select s.ch,s.num,t.num from (select ch,num from m)s left outer join (select ch,num from n)t on s.ch=t.ch; 右外連接 select s.ch,s.num,t.num from (select ch,num from m)s right outer join (select ch,num from n)t on s.ch=t.ch;
數據輸出對比
select s.ch,s.num from (select ch,num from m)s left semi join (select ch,num from n)t on s.ch=t.ch;
運行結果: A 1 C 5 C 3
MapJoin
mapjoin(map side join) 在map端把小表加載到內存中,然后讀取大表,和內存中的小表完成連接操作。其中使用了分布式緩存技術。
優點 不消耗集群的reduce資源(reduce相對緊缺)。 減少了reduce操作,加快程序執行。 降低網絡負載。
缺點 占用部分內存,所以加載到內存中的表不能過大,因為每個計算節點都會加載一次。 生成較多的小文件。
執行流程
從大表讀取數據,執行where條件。把小表加載到內存中,每讀取大表中的一條數據,都要和內存中的小表數據進行比較。
第一種方式,自動方式 配置以下參數 hive**自動**根據sql,選擇使用common join或者map join
set hive.auto.convert.join=true; hive.mapjoin.smalltable.filesize默認值是25mb
第二種方式,手動指定
select /*+mapjoin(n)*/ m.col, m.col2, n.col3 from m join n on m.col=n.col;
注意:/*+mapjoin(n)*/
不能省略,只需替換表名n值即可。
簡單總結一下,map join的使用場景: 1、關聯操作中有一張表非常小 2、不等值的鏈接操作
select c.city,p.province from (select province,city from city)c join (select province from province)p on c.province=p.province; mapjoin手動方式 select /*+mapjoin(p)*/ c.city,p.province from (select province,city from city)c join (select province from province)p on c.province=p.province;
比較二則的比較時間。
Hive分桶JOIN 對於每一個表(table)或者分區,Hive可以進一步組織成桶,也就是說桶是更為細粒度的數據范圍划分。 Hive是針對某一列進行分桶。 Hive采用對列值哈希,然后除以桶的個數求余的方式決定該條記錄存放在哪個桶當中。 好處 獲得更高的查詢處理效率。 使取樣(sampling)更高效。
create table bucketed_user ( id int, name string ) clustered by (id) sorted by (name) into 4 buckets row format delimited fields terminated by '\t' stored as textfile; set hive.enforce.bucketing=true;
分桶的使用
select * from bucketed_user tablesample(bucket 1 out of 2 on id)
bucket join
set hive.optimize.bucketmapjoin=true; set hive.optimize.bucketmapjoin.sortedmerge=true; set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
連接兩個在(包含連接列)相同列上划分了桶的表,可以使用Map端連接(Map side join)高效的實現。比如JOIN操作。 對於JOIN操作兩個表有一個相同的列,如果對這兩個表都進行了捅操作。那么將保存相同列值的桶進行JOIN操作就可以,可以大大減少JOIN的數據量。 對於map端連接的情況,兩個表以相同方式划分桶。處理左邊表內某個桶的mapper知道右邊表內相匹配的行在對應的桶內。因此,mapper只需要獲取那個桶(這只是右邊表內存儲數據的·小部分)即可進行連接。 這一優化方法並不一定要求兩個表必須桶的個數相同,兩個表的桶個數是倍數關系也可以。
distribute by、sort by
distribute 分散數據 distribute by col – 按照col列把數據分散到不同的reduce。
Sort排序 sort by col – 按照col列把數據排序
select col1,col2 from M distribute by col1 sort by col1 asc,col2 desc
兩者結合出現,確保每個reduce的輸出都是有序的。
distribute by與group by對比
都是按key值划分數據 都使用reduce操作 **唯一不同的是**distribute by只是單純的分散數據,而group by把相同key的數據聚集到一起,后續必須是聚合操作。
order by與sort by 對比
order by是全局排序 sort by只是確保每個reduce上面輸出的數據有序。如果只有一個reduce時,和order by作用一樣。
執行流程
從表中讀取數據,執行where條件。 設置reduce數為3,以distribute by列的值作為key,其他列值作為value,然后把數據根據key值傳到不同的reduce,然后按sort by字段進行排序。
應用場景 map輸出的文件大小不均 reduce輸出文件大小不均 小文件過多 文件超大
把一個大文件放到一些小文件中 set mapred.reduce.tasks=5;-->下面的city將會輸出到五個文件中 insert overwrite table city selsct time,country,province,city from info distribute by province; 把一些小文件放到一個大文件中 set mapred.reduce.tasks=1;-->下面的province將會輸出到一個大文件中 insert overwrite table province partition(dt='20150719') selsct time,country,province from city distribute by country; 注:province是一個分區表。
cluster by
把有相同值的數據聚集到一起,並排序。 效果等價於distribute by col sort by col cluster by col <==> distribute by col sort by col
union all
多個表的數據合並成一個表,hive不支持union
select col from( select a as col from t1 union all select b as col from t2 )tmp
執行流程
從表中讀取數據,執行where條件。合並到同一個表中。
union all必須滿足如下要求 字段名字一樣 字段類型一樣 字段個數一樣 子表不能有別名 如果需要從合並之后的表中查詢數據,那么合並的表必須要有別名
select * from ( select * from m union all select * from n )temp; 如果兩張表的字段名不一樣,要將一個表修改別名同另一個表的字段名一樣。 select * from ( select col1,col2 from m union all select col1,col3 as col2 from n )temp;