hive中分組取前N個值的實現


背景

假設有一個學生各門課的成績的表單,應用hive取出每科成績前100名的學生成績。

這個就是典型在分組取Top N的需求。

 

解決思路

對於取出每科成績前100名的學生成績,針對學生成績表,根據學科,成績做order by排序,然后對排序后的成績,執行自定義函數row_number(),必須帶一個或者多個列參數,如ROW_NUMBER(col1, ....),它的作用是按指定的列進行分組生成行序列。在ROW_NUMBER(a,b) 時,若兩條記錄的a,b列相同,則行序列+1,否則重新計數。

只要返回row_number()返回值小於100的的成績記錄,就可以返回每個單科成績前一百的學生。

 

解決過程

成績表結構

create table score_table (
  subject        string,
  student       string,
  score           int
)
partitioned by (date string)

 

如果要查詢2012年每科成績前100的學生成績,sql如下

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
select subject,score,student from
    (select subject,score,student from score where dt='2012'  order by subject,socre desc) order_score
where row_number(subject) <= 100;

com.blue.hive.udf.RowNumber是自定義函數,函數的作用是按指定的列進行分組生成行序列。這里根據每個科目的所有成績,生成序列,序列值從1開始自增。

假設成績表的記錄如下:

物理  80 張三
數學  100 李一
物理  90  張二
數學  90  李二
物理  100 張一
數學  80  李三
.....

經過order by全局排序后,記錄如下

物理  100 張一
物理  90  張二
物理  80 張三
..... 數學
100 李一 數學 90 李二 數學 80 李三
....

接着執行row_number函數,返回值如下

科目  成績 學生   row_number
物理  100 張一      1
物理  90  張二      2
物理  80  張三      3
.....
數學  100 李一      1
數學  90  李二      2
數學  80  李三      3
....

因為hive是基於MAPREADUCE的,必須保證row_number執行是在reducer中執行。上述的語句保證了成績表的記錄,按照科目和成績做了全局排序,然后在reducer端執行row_number函數,如果在map端執行了row_number,那么結果將是錯誤的。

要查看row_number函數在map端還是reducer端執行,可以查看hive的執行計划:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select subject,score,student from
    (select subject,score,student from score where dt='2012'  order by subject,socre desc) order_score
where row_number(subject) <= 100;

explain不會執行mapreduce計算,只會顯示執行計划。

 

只要row_number函數在reducer端執行,除了使用order by全局排序配合,也可以使用distribute by + sort by。distribute by可以讓相同科目的成績記錄發送到同一個reducer,而sort by可以在reducer端對記錄做排序。

而使用order by全局排序,只有一個reducer,未能充分利用資源,相比之下,distribute by + sort by在這里更有性能優勢,可以在多個reducer做排序,再做row_number的計算。

sql如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
select subject,score,student from
    (select subject,score,student from score where dt='2012'  distribute by subject sort by subject asc, socre desc) order_score
where row_number(subject) <= 100;

 

如果成績有學院字段college,要找出學院里,單科成績前一百的學生,解決方法如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select college,subject,score,student from
    (select college,subject,score,student from score where dt='2012'  order by college asc,subject asc,socre desc) order_score
where row_number(college,subject) <= 100;

 

如果成績有學院字段college,要找出學院里,總成績前一百的學生,解決方法如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select college,totalscore,student from
    (select college,student,sum(score) as totalscore from score where dt='2012'  group by college,student  order by college asc,totalscore desc) order_score
where row_number(college) <= 100;

 

row_number的源碼

函數row_number(),必須帶一個或者多個列參數,如ROW_NUMBER(col1, ....),它的作用是按指定的列進行分組生成行序列。在ROW_NUMBER(a,b) 時,若兩條記錄的a,b列相同,則行序列+1,否則重新計數。

package com.blue.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;

public class RowNumber extends UDF {

    private static int MAX_VALUE = 50;
    private static String comparedColumn[] = new String[MAX_VALUE];
    private static int rowNum = 1;

    public int evaluate(Object... args) {
        String columnValue[] = new String[args.length];
        for (int i = 0; i < args.length; i++) 『
            columnValue[i] = args[i].toString();
        }
        if (rowNum == 1) {
            for (int i = 0; i < columnValue.length; i++)
                comparedColumn[i] = columnValue[i];
        }

        for (int i = 0; i < columnValue.length; i++) {
            if (!comparedColumn[i].equals(columnValue[i])) {
                for (int j = 0; j < columnValue.length; j++) {
                    comparedColumn[j] = columnValue[j];
                }
                rowNum = 1;
                return rowNum++;
            }
        }
        return rowNum++;
    }
}

編譯后,打包成一個jar包,如/usr/local/hive/udf/blueudf.jar

然后在hive shell下使用,如下:

add jar /usr/local/hive/udf/blueudf.jar;
create temporary function row_number as 'com.blue.hive.udf.RowNumber';
select subject,score,student from
    (select subject,score,student from score where dt='2012'  order by subject,socre desc) order_score
where row_number(subject) <= 100;

 

hive 0.12之前可用,0.12之后不可用,只能用窗口函數替代。

參考 http://chiyx.iteye.com/blog/1559460 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM