分享兩篇文章,結合看更清楚一點。
背景
假設有一個學生各門課的成績的表單,應用hive取出每科成績前100名的學生成績。
這個就是典型在分組取Top N的需求。
解決思路
對於取出每科成績前100名的學生成績,針對學生成績表,根據學科,成績做order by排序,然后對排序后的成績,執行自定義函數row_number(),必須帶一個或者多個列參數,如ROW_NUMBER(col1, ....),它的作用是按指定的列進行分組生成行序列。在ROW_NUMBER(a,b) 時,若兩條記錄的a,b列相同,則行序列+1,否則重新計數。
只要返回row_number()返回值小於100的的成績記錄,就可以返回每個單科成績前一百的學生。
解決過程
成績表結構
create table score_table ( subject string, student string, score int ) partitioned by (date string)
如果要查詢2012年每科成績前100的學生成績,sql如下
create temporary function row_number as 'com.blue.hive.udf.RowNumber';
select subject,score,student from
(select subject,score,student from score where dt='2012' order by subject,socre desc) order_score
where row_number(subject) <= 100;
com.blue.hive.udf.RowNumber是自定義函數,函數的作用是按指定的列進行分組生成行序列。這里根據每個科目的所有成績,生成序列,序列值從1開始自增。
假設成績表的記錄如下:
物理 80 張三 數學 100 李一 物理 90 張二 數學 90 李二 物理 100 張一 數學 80 李三
.....
經過order by全局排序后,記錄如下
物理 100 張一 物理 90 張二 物理 80 張三
..... 數學 100 李一 數學 90 李二 數學 80 李三
....
接着執行row_number函數,返回值如下
科目 成績 學生 row_number 物理 100 張一 1 物理 90 張二 2 物理 80 張三 3 ..... 數學 100 李一 1 數學 90 李二 2 數學 80 李三 3 ....
因為hive是基於MAPREADUCE的,必須保證row_number執行是在reducer中執行。上述的語句保證了成績表的記錄,按照科目和成績做了全局排序,然后在reducer端執行row_number函數,如果在map端執行了row_number,那么結果將是錯誤的。
要查看row_number函數在map端還是reducer端執行,可以查看hive的執行計划:
create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select subject,score,student from
(select subject,score,student from score where dt='2012' order by subject,socre desc) order_score
where row_number(subject) <= 100;
explain不會執行mapreduce計算,只會顯示執行計划。
只要row_number函數在reducer端執行,除了使用order by全局排序配合,也可以使用distribute by + sort by。distribute by可以讓相同科目的成績記錄發送到同一個reducer,而sort by可以在reducer端對記錄做排序。
而使用order by全局排序,只有一個reducer,未能充分利用資源,相比之下,distribute by + sort by在這里更有性能優勢,可以在多個reducer做排序,再做row_number的計算。
sql如下:
create temporary function row_number as 'com.blue.hive.udf.RowNumber';
select subject,score,student from
(select subject,score,student from score where dt='2012' distribute by subject sort by subject asc, socre desc) order_score
where row_number(subject) <= 100;
如果成績有學院字段college,要找出學院里,單科成績前一百的學生,解決方法如下:
create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select college,subject,score,student from
(select college,subject,score,student from score where dt='2012' order by college asc,subject asc,socre desc) order_score
where row_number(college,subject) <= 100;
如果成績有學院字段college,要找出學院里,總成績前一百的學生,解決方法如下:
create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select college,totalscore,student from
(select college,student,sum(score) as totalscore from score where dt='2012' group by college,student order by college asc,totalscore desc) order_score
where row_number(college) <= 100;
row_number的源碼
函數row_number(),必須帶一個或者多個列參數,如ROW_NUMBER(col1, ....),它的作用是按指定的列進行分組生成行序列。在ROW_NUMBER(a,b) 時,若兩條記錄的a,b列相同,則行序列+1,否則重新計數。
package com.blue.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
public class RowNumber extends UDF {
private static int MAX_VALUE = 50;
private static String comparedColumn[] = new String[MAX_VALUE];
private static int rowNum = 1;
public int evaluate(Object... args) {
String columnValue[] = new String[args.length];
for (int i = 0; i < args.length; i++) 『
columnValue[i] = args[i].toString();
}
if (rowNum == 1) {
for (int i = 0; i < columnValue.length; i++)
comparedColumn[i] = columnValue[i];
}
for (int i = 0; i < columnValue.length; i++) {
if (!comparedColumn[i].equals(columnValue[i])) {
for (int j = 0; j < columnValue.length; j++) {
comparedColumn[j] = columnValue[j];
}
rowNum = 1;
return rowNum++;
}
}
return rowNum++;
}
}
編譯后,打包成一個jar包,如/usr/local/hive/udf/blueudf.jar
然后在hive shell下使用,如下:
add jar /usr/local/hive/udf/blueudf.jar;
create temporary function row_number as 'com.blue.hive.udf.RowNumber';
select subject,score,student from
(select subject,score,student from score where dt='2012' order by subject,socre desc) order_score
where row_number(subject) <= 100;
hive 0.12之前可用,0.12之后不可用,只能用窗口函數替代。
參考 http://chiyx.iteye.com/blog/1559460
-----------------------------------------分割線-----------------------------------------------------
問題:
有如下數據文件 city.txt (id, city, value)
cat city.txt
1 wh 500
2 bj 600
3 wh 100
4 sh 400
5 wh 200
6 bj 100
7 sh 200
8 bj 300
9 sh 900
需要按 city 分組聚合,然后從每組數據中取出前兩條value最大的記錄。
1、這是實際業務中經常會遇到的 group TopK 問題,下面來看看 pig 如何解決:
|
1
2
3
4
5
|
a =
load
'/data/city.txt'
using PigStorage(
' '
)
as
(id:chararray, city:chararray, value:
int
);
b =
group
a
by
city;
c = foreach b {c1=
order
a
by
value
desc
; c2=limit c1 2; generate
group
,c2.value;};
d = stream c through `sed
's/[(){}]//g'
`;
dump d;
|
結果:
|
1
2
3
|
(bj,600,300)
(sh,900,400)
(wh,500,200)
|
這幾行代碼其實也實現了mysql中的 group_concat 函數的功能:
|
1
2
3
4
5
|
a =
load
'/data/city.txt'
using PigStorage(
' '
)
as
(id:chararray, city:chararray, value:
int
);
b =
group
a
by
city;
c = foreach b {c1=
order
a
by
value
desc
; generate
group
,c1.value;};
d = stream c through `sed
's/[(){}]//g'
`;
dump d;
|
結果:
|
1
2
3
|
(bj,600,300,100)
(sh,900,400,200)
(wh,500,200,100)
|
2、下面我們再來看看hive如何處理group topk的問題:
本質上HSQL和sql有很多相同的地方,但HSQL目前功能還有很多缺失,至少不如原生態的SQL功能強大,
比起PIG也有些差距,如果SQL中這類分組topk的問題如何解決呢?
|
1
2
3
|
select
*
from
city a
where
2>(
select
count
(1)
from
city
where
cname=a.cname
and
value>a.value)
distribute
by
a.cname sort
by
a.cname,a.value
desc
;
|
但是這種寫法在HQL中直接報語法錯誤了,下面我們只能用hive udf的思路來解決了:
排序city和value,然后對city計數,最后where過濾掉city列計數器大於k的行即可。
好了,上代碼:
(1)定義UDF:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.
exec
.UDF;
public
final class Rank extends UDF{
private
int
counter;
private String last_key;
public
int
evaluate(final String
key
){
if ( !
key
.equalsIgnoreCase(this.last_key) ) {
this.counter = 0;
this.last_key =
key
;
}
return
this.counter++;
}
}
|
(2)注冊jar、建表、導數據,查詢:
|
1
2
3
4
5
6
7
8
9
|
add
jar Rank.jar;
create
temporary
function
rank
as
'com.example.hive.udf.Rank'
;
create
table
city(id
int
,cname string,value
int
) row format delimited fields terminated
by
' '
;
LOAD
DATA
LOCAL
INPATH
'city.txt'
OVERWRITE
INTO
TABLE
city;
select
cname, value
from
(
select
cname,rank(cname) csum,value
from
(
select
id, cname, value
from
city distribute
by
cname sort
by
cname,value
desc
)a
)b
where
csum < 2;
|
(3)結果:
|
1
2
3
4
5
6
|
bj 600
bj 300
sh 900
sh 400
wh 500
wh 200
|
REF:hive中分組取前N個值的實現
http://baiyunl.iteye.com/blog/1466343
3、最后我們來看一下原生態的MR:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
import
java.io.IOException;
import
java.util.TreeSet;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
public
class
GroupTopK {
// 這個 MR 將會取得每組年齡中 id 最大的前 3 個
// 測試數據由腳本生成:http://my.oschina.net/leejun2005/blog/76631
public
static
class
GroupTopKMapper
extends
Mapper<LongWritable, Text, IntWritable, LongWritable> {
IntWritable outKey =
new
IntWritable();
LongWritable outValue =
new
LongWritable();
String[] valArr =
null
;
public
void
map(LongWritable key, Text value, Context context)
throws
IOException, InterruptedException {
valArr = value.toString().split(
"\t"
);
outKey.set(Integer.parseInt(valArr[
2
]));
// age int
outValue.set(Long.parseLong(valArr[
0
]));
// id long
context.write(outKey, outValue);
}
}
public
static
class
GroupTopKReducer
extends
Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
LongWritable outValue =
new
LongWritable();
public
void
reduce(IntWritable key, Iterable<LongWritable> values,
Context context)
throws
IOException, InterruptedException {
TreeSet<Long> idTreeSet =
new
TreeSet<Long>();
for
(LongWritable val : values) {
idTreeSet.add(val.get());
if
(idTreeSet.size() >
3
) {
idTreeSet.remove(idTreeSet.first());
}
}
for
(Long id : idTreeSet) {
outValue.set(id);
context.write(key, outValue);
}
}
}
public
static
void
main(String[] args)
throws
Exception {
Configuration conf =
new
Configuration();
String[] otherArgs =
new
GenericOptionsParser(conf, args)
.getRemainingArgs();
System.out.println(otherArgs.length);
System.out.println(otherArgs[
0
]);
System.out.println(otherArgs[
1
]);
if
(otherArgs.length !=
3
) {
System.err.println(
"Usage: GroupTopK <in> <out>"
);
System.exit(
2
);
}
Job job =
new
Job(conf,
"GroupTopK"
);
job.setJarByClass(GroupTopK.
class
);
job.setMapperClass(GroupTopKMapper.
class
);
job.setReducerClass(GroupTopKReducer.
class
);
job.setNumReduceTasks(
1
);
job.setOutputKeyClass(IntWritable.
class
);
job.setOutputValueClass(LongWritable.
class
);
FileInputFormat.addInputPath(job,
new
Path(otherArgs[
1
]));
FileOutputFormat.setOutputPath(job,
new
Path(otherArgs[
2
]));
System.exit(job.waitForCompletion(
true
) ?
0
:
1
);
}
}
|
hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1
結果:
hadoop fs -cat /tmp/1/part-r-00000
0 12869695
0 12869971
0 12869976
1 12869813
1 12869870
1 12869951
......
數據驗證:
awk '$3==0{print $1}' record_new.txt|sort -nr|head -3
12869976
12869971
12869695
可以看到結果沒有問題。
注:測試數據由以下腳本生成:
http://my.oschina.net/leejun2005/blog/76631
PS:
如果說hive類似sql的話,那pig就類似plsql存儲過程了:程序編寫更自由,邏輯能處理的更強大了。
pig中還能直接通過反射調用java的靜態類中的方法,這塊內容請參考之前的相關pig博文。
附幾個HIVE UDAF鏈接,有興趣的同學自己看下:
Hive UDAF和UDTF實現group by后獲取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183
hive中自定義函數(UDAF)實現多行字符串拼接為一行 http://blog.sina.com.cn/s/blog_6ff05a2c0100tjw4.html
編寫Hive UDAF http://www.fuzhijie.me/?p=118
Hive UDAF開發 http://richiehu.blog.51cto.com/2093113/386113

