Hive中分組取前N個值


分享兩篇文章,結合看更清楚一點。

背景

假設有一個學生各門課的成績的表單,應用hive取出每科成績前100名的學生成績。

這個就是典型在分組取Top N的需求。

 

解決思路

對於取出每科成績前100名的學生成績,針對學生成績表,根據學科,成績做order by排序,然后對排序后的成績,執行自定義函數row_number(),必須帶一個或者多個列參數,如ROW_NUMBER(col1, ....),它的作用是按指定的列進行分組生成行序列。在ROW_NUMBER(a,b) 時,若兩條記錄的a,b列相同,則行序列+1,否則重新計數。

只要返回row_number()返回值小於100的的成績記錄,就可以返回每個單科成績前一百的學生。

 

解決過程

成績表結構

create table score_table (
  subject        string,
  student       string,
  score           int
)
partitioned by (date string)

 

如果要查詢2012年每科成績前100的學生成績,sql如下

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
select subject,score,student from
    (select subject,score,student from score where dt='2012'  order by subject,socre desc) order_score
where row_number(subject) <= 100;

com.blue.hive.udf.RowNumber是自定義函數,函數的作用是按指定的列進行分組生成行序列。這里根據每個科目的所有成績,生成序列,序列值從1開始自增。

假設成績表的記錄如下:

復制代碼
物理  80 張三
數學  100 李一
物理  90  張二
數學  90  李二
物理  100 張一
數學  80  李三
.....
復制代碼

經過order by全局排序后,記錄如下

復制代碼
物理  100 張一
物理  90  張二
物理  80 張三
..... 數學 100 李一 數學 90 李二 數學 80 李三
....
復制代碼

接着執行row_number函數,返回值如下

復制代碼
科目  成績 學生   row_number
物理  100 張一      1
物理  90  張二      2
物理  80  張三      3
.....
數學  100 李一      1
數學  90  李二      2
數學  80  李三      3
....
復制代碼

因為hive是基於MAPREADUCE的,必須保證row_number執行是在reducer中執行。上述的語句保證了成績表的記錄,按照科目和成績做了全局排序,然后在reducer端執行row_number函數,如果在map端執行了row_number,那么結果將是錯誤的。

要查看row_number函數在map端還是reducer端執行,可以查看hive的執行計划:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select subject,score,student from
    (select subject,score,student from score where dt='2012'  order by subject,socre desc) order_score
where row_number(subject) <= 100;

explain不會執行mapreduce計算,只會顯示執行計划。

 

只要row_number函數在reducer端執行,除了使用order by全局排序配合,也可以使用distribute by + sort by。distribute by可以讓相同科目的成績記錄發送到同一個reducer,而sort by可以在reducer端對記錄做排序。

而使用order by全局排序,只有一個reducer,未能充分利用資源,相比之下,distribute by + sort by在這里更有性能優勢,可以在多個reducer做排序,再做row_number的計算。

sql如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
select subject,score,student from
    (select subject,score,student from score where dt='2012'  distribute by subject sort by subject asc, socre desc) order_score
where row_number(subject) <= 100;

 

如果成績有學院字段college,要找出學院里,單科成績前一百的學生,解決方法如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select college,subject,score,student from
    (select college,subject,score,student from score where dt='2012'  order by college asc,subject asc,socre desc) order_score
where row_number(college,subject) <= 100;

 

如果成績有學院字段college,要找出學院里,總成績前一百的學生,解決方法如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select college,totalscore,student from
    (select college,student,sum(score) as totalscore from score where dt='2012'  group by college,student  order by college asc,totalscore desc) order_score
where row_number(college) <= 100;

 

row_number的源碼

函數row_number(),必須帶一個或者多個列參數,如ROW_NUMBER(col1, ....),它的作用是按指定的列進行分組生成行序列。在ROW_NUMBER(a,b) 時,若兩條記錄的a,b列相同,則行序列+1,否則重新計數。

復制代碼
package com.blue.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;

public class RowNumber extends UDF {

    private static int MAX_VALUE = 50;
    private static String comparedColumn[] = new String[MAX_VALUE];
    private static int rowNum = 1;

    public int evaluate(Object... args) {
        String columnValue[] = new String[args.length];
        for (int i = 0; i < args.length; i++) 『
            columnValue[i] = args[i].toString();
        }
        if (rowNum == 1) {
            for (int i = 0; i < columnValue.length; i++)
                comparedColumn[i] = columnValue[i];
        }

        for (int i = 0; i < columnValue.length; i++) {
            if (!comparedColumn[i].equals(columnValue[i])) {
                for (int j = 0; j < columnValue.length; j++) {
                    comparedColumn[j] = columnValue[j];
                }
                rowNum = 1;
                return rowNum++;
            }
        }
        return rowNum++;
    }
}
復制代碼

編譯后,打包成一個jar包,如/usr/local/hive/udf/blueudf.jar

然后在hive shell下使用,如下:

add jar /usr/local/hive/udf/blueudf.jar;
create temporary function row_number as 'com.blue.hive.udf.RowNumber';
select subject,score,student from
    (select subject,score,student from score where dt='2012'  order by subject,socre desc) order_score
where row_number(subject) <= 100;

 

hive 0.12之前可用,0.12之后不可用,只能用窗口函數替代。

參考 http://chiyx.iteye.com/blog/1559460 

-----------------------------------------分割線-----------------------------------------------------

問題:

有如下數據文件 city.txt (id, city, value)

cat city.txt 
1 wh 500
2 bj 600
3 wh 100
4 sh 400
5 wh 200
6 bj 100
7 sh 200
8 bj 300
9 sh 900
需要按 city 分組聚合,然后從每組數據中取出前兩條value最大的記錄。

1、這是實際業務中經常會遇到的 group TopK 問題,下面來看看 pig 如何解決:

?
1
2
3
4
5
a = load '/data/city.txt'  using PigStorage( ' ' ) as (id:chararray, city:chararray, value: int );
b = group a by city;
c = foreach b {c1= order a by value desc ; c2=limit c1 2; generate group ,c2.value;};
d = stream c through `sed 's/[(){}]//g' `;
dump d;

結果:

?
1
2
3
(bj,600,300)
(sh,900,400)
(wh,500,200)

這幾行代碼其實也實現了mysql中的 group_concat 函數的功能:

?
1
2
3
4
5
a = load '/data/city.txt'  using PigStorage( ' ' ) as (id:chararray, city:chararray, value: int );
b = group a by city;
c = foreach b {c1= order a by value desc ;  generate group ,c1.value;};
d = stream c through `sed 's/[(){}]//g' `;
dump d;

結果:

?
1
2
3
(bj,600,300,100)
(sh,900,400,200)
(wh,500,200,100)

2、下面我們再來看看hive如何處理group topk的問題:

本質上HSQL和sql有很多相同的地方,但HSQL目前功能還有很多缺失,至少不如原生態的SQL功能強大,

比起PIG也有些差距,如果SQL中這類分組topk的問題如何解決呢?

?
1
2
3
select * from city a where
2>( select count (1) from city where cname=a.cname and value>a.value)
distribute by a.cname sort by a.cname,a.value desc ;
http://my.oschina.net/leejun2005/blog/78904

但是這種寫法在HQL中直接報語法錯誤了,下面我們只能用hive udf的思路來解決了:

排序city和value,然后對city計數,最后where過濾掉city列計數器大於k的行即可。

好了,上代碼:

(1)定義UDF:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.example.hive.udf;
import org.apache.hadoop.hive.ql. exec .UDF;
      
public final class Rank extends UDF{
     private int  counter;
     private String last_key;
     public int evaluate(final String key ){
       if ( ! key .equalsIgnoreCase(this.last_key) ) {
          this.counter = 0;
          this.last_key = key ;
       }
       return this.counter++;
     }
}

(2)注冊jar、建表、導數據,查詢:

?
1
2
3
4
5
6
7
8
9
add jar Rank.jar;
create temporary function rank as 'com.example.hive.udf.Rank' ;
create table city(id int ,cname string,value int ) row format delimited fields terminated by ' ' ;
LOAD DATA LOCAL INPATH 'city.txt' OVERWRITE INTO TABLE city;
select cname, value from (
     select cname,rank(cname) csum,value from (
         select id, cname, value from city distribute by cname sort by cname,value desc
     )a
)b where csum < 2;

(3)結果:

 

?
1
2
3
4
5
6
bj  600
bj  300
sh  900
sh  400
wh  500
wh  200
可以看到,hive相比pig來說,處理起來稍微復雜了點,但隨着hive的日漸完善,以后比pig更簡潔也說不定。

REF:hive中分組取前N個值的實現

http://baiyunl.iteye.com/blog/1466343

 

3、最后我們來看一下原生態的MR:

 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import java.io.IOException;
import java.util.TreeSet;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class GroupTopK {
     // 這個 MR 將會取得每組年齡中 id 最大的前 3 個
     // 測試數據由腳本生成:http://my.oschina.net/leejun2005/blog/76631
     public static class GroupTopKMapper extends
             Mapper<LongWritable, Text, IntWritable, LongWritable> {
         IntWritable outKey = new IntWritable();
         LongWritable outValue = new LongWritable();
         String[] valArr = null ;
 
         public void map(LongWritable key, Text value, Context context)
                 throws IOException, InterruptedException {
             valArr = value.toString().split( "\t" );
             outKey.set(Integer.parseInt(valArr[ 2 ])); // age int
             outValue.set(Long.parseLong(valArr[ 0 ])); // id long
             context.write(outKey, outValue);
         }
     }
 
     public static class GroupTopKReducer extends
             Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
 
         LongWritable outValue = new LongWritable();
 
         public void reduce(IntWritable key, Iterable<LongWritable> values,
                 Context context) throws IOException, InterruptedException {
             TreeSet<Long> idTreeSet = new TreeSet<Long>();
             for (LongWritable val : values) {
                 idTreeSet.add(val.get());
                 if (idTreeSet.size() > 3 ) {
                     idTreeSet.remove(idTreeSet.first());
                 }
             }
             for (Long id : idTreeSet) {
                 outValue.set(id);
                 context.write(key, outValue);
             }
         }
     }
 
     public static void main(String[] args) throws Exception {
         Configuration conf = new Configuration();
         String[] otherArgs = new GenericOptionsParser(conf, args)
                 .getRemainingArgs();
 
         System.out.println(otherArgs.length);
         System.out.println(otherArgs[ 0 ]);
         System.out.println(otherArgs[ 1 ]);
 
         if (otherArgs.length != 3 ) {
             System.err.println( "Usage: GroupTopK <in> <out>" );
             System.exit( 2 );
         }
         Job job = new Job(conf, "GroupTopK" );
         job.setJarByClass(GroupTopK. class );
         job.setMapperClass(GroupTopKMapper. class );
         job.setReducerClass(GroupTopKReducer. class );
         job.setNumReduceTasks( 1 );
         job.setOutputKeyClass(IntWritable. class );
         job.setOutputValueClass(LongWritable. class );
         FileInputFormat.addInputPath(job, new Path(otherArgs[ 1 ]));
         FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 2 ]));
         System.exit(job.waitForCompletion( true ) ? 0 : 1 );
     }
}

hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1

結果:

 

hadoop fs -cat /tmp/1/part-r-00000
0       12869695
0       12869971
0       12869976
1       12869813
1       12869870
1       12869951

......

數據驗證:

awk '$3==0{print $1}' record_new.txt|sort -nr|head -3
12869976
12869971
12869695

可以看到結果沒有問題。 

注:測試數據由以下腳本生成:

http://my.oschina.net/leejun2005/blog/76631

 

PS:

如果說hive類似sql的話,那pig就類似plsql存儲過程了:程序編寫更自由,邏輯能處理的更強大了。

pig中還能直接通過反射調用java的靜態類中的方法,這塊內容請參考之前的相關pig博文。

附幾個HIVE UDAF鏈接,有興趣的同學自己看下:

Hive UDAF和UDTF實現group by后獲取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183
hive中自定義函數(UDAF)實現多行字符串拼接為一行 http://blog.sina.com.cn/s/blog_6ff05a2c0100tjw4.html
編寫Hive UDAF http://www.fuzhijie.me/?p=118
Hive UDAF開發 http://richiehu.blog.51cto.com/2093113/386113


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM