Hadoop學習之路(二十)MapReduce求TopN


前言

在Hadoop中,排序是MapReduce的靈魂,MapTask和ReduceTask均會對數據按Key排序,這個操作是MR框架的默認行為,不管你的業務邏輯上是否需要這一操作。

技術點

MapReduce框架中,用到的排序主要有兩種:快速排序和基於堆實現的優先級隊列(PriorityQueue)。

Mapper階段

從map輸出到環形緩沖區的數據會被排序(這是MR框架中改良的快速排序),這個排序涉及partition和key,當緩沖區容量占用80%,會spill數據到磁盤,生成IFile文件,Map結束后,會將IFile文件排序合並成一個大文件(基於堆實現的優先級隊列),以供不同的reduce來拉取相應的數據。

Reducer階段 

從Mapper端取回的數據已是部分有序,Reduce Task只需進行一次歸並排序即可保證數據整體有序。為了提高效率,Hadoop將sort階段和reduce階段並行化,在sort階段,Reduce Task為內存和磁盤中的文件建立了小頂堆,保存了指向該小頂堆根節點的迭代器,並不斷的移動迭代器,以將key相同的數據順次交給reduce()函數處理,期間移動迭代器的過程實際上就是不斷調整小頂堆的過程(建堆→取堆頂元素→重新建堆→取堆頂元素...),這樣,sort和reduce可以並行進行。

分組Top N分析

在數據處理中,經常會碰到這樣一個場景,對表數據按照某一字段分組,然后找出各自組內最大的幾條記錄情形。針對這種分組Top N問題,我們利用Hive、MapReduce等多種工具實現一下。

場景模擬

computer,huangxiaoming,85,86,41,75,93,42,85
computer,xuzheng,54,52,86,91,42
computer,huangbo,85,42,96,38
english,zhaobenshan,54,52,86,91,42,85,75
english,liuyifei,85,41,75,21,85,96,14
algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85
english,zhouqi,85,86,41,75,93,42,85,75,55,47,22
english,huangbo,85,42,96,38,55,47,22
algorithm,liutao,85,75,85,99,66
computer,huangzitao,85,86,41,75,93,42,85
math,wangbaoqiang,85,86,41,75,93,42,85
computer,liujialing,85,41,75,21,85,96,14,74,86
computer,liuyifei,75,85,62,48,54,96,15
computer,liutao,85,75,85,99,66,88,75,91
computer,huanglei,76,95,86,74,68,74,48
english,liujialing,75,85,62,48,54,96,15
math,huanglei,76,95,86,74,68,74,48
math,huangjiaju,85,75,86,85,85,74,86
math,liutao,48,58,67,86,15,33,85
english,huanglei,85,75,85,99,66,88,75,91
math,xuzheng,54,52,86,91,42,85,75
math,huangxiaoming,85,75,85,99,66,88,75,91
math,liujialing,85,86,41,75,93,42,85,75
english,huangxiaoming,85,86,41,75,93,42,85
algorithm,huangdatou,48,58,67,86,15,33,85
algorithm,huangzitao,85,86,41,75,93,42,85,75

 一、數據解釋

數據字段個數不固定:
第一個是課程名稱,總共四個課程,computer,math,english,algorithm,
第二個是學生姓名,后面是每次考試的分數

二、統計需求:
1、統計每門課程的參考人數和課程平均分

2、統計每門課程參考學生的平均分,並且按課程存入不同的結果文件,要求一門課程一個結果文件,並且按平均分從高到低排序,分數保留一位小數

3、求出每門課程參考學生成績最高的學生的信息:課程,姓名和平均分

第一題

CourseScoreMR1.java

  1 import java.io.IOException;
  2 
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.fs.FileSystem;
  5 import org.apache.hadoop.fs.Path;
  6 import org.apache.hadoop.io.DoubleWritable;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.Mapper;
 11 import org.apache.hadoop.mapreduce.Reducer;
 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 14 
 15 public class CourseScoreMR1 {
 16 
 17     public static void main(String[] args) throws Exception {
 18         
 19         Configuration conf = new Configuration();
 20         FileSystem fs = FileSystem.get(conf);
 21         Job job = Job.getInstance(conf);
 22         
 23         
 24         job.setJarByClass(CourseScoreMR1.class);
 25         job.setMapperClass(CourseScoreMR1Mapper.class);
 26         job.setReducerClass(CourseScoreMR1Reducer.class);
 27         
 28         job.setMapOutputKeyClass(Text.class);
 29         job.setMapOutputValueClass(DoubleWritable.class);
 30         job.setOutputKeyClass(Text.class);
 31         job.setOutputValueClass(Text.class);
 32         
 33         
 34         Path inputPath = new Path("E:\\bigdata\\cs\\input");
 35         Path outputPath = new Path("E:\\bigdata\\cs\\output_1");
 36         FileInputFormat.setInputPaths(job, inputPath);
 37         if(fs.exists(outputPath)){
 38             fs.delete(outputPath, true);
 39         }
 40         FileOutputFormat.setOutputPath(job, outputPath);
 41         
 42         
 43         boolean isDone = job.waitForCompletion(true);
 44         System.exit(isDone ? 0 : 1);
 45     }
 46     
 47     public static class CourseScoreMR1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{
 48         
 49         /**
 50          *  數據的三個字段: course , name, score
 51          * 
 52          *  value == algorithm,huangzitao,85,86,41,75,93,42,85,75
 53          *  
 54          *  輸出的key和value:
 55          *  
 56          *  key : course
 57          *  
 58          *  value : avgScore
 59          *  
 60          *  格式化數值相關的操作的API : NumberFormat
 61          *                      SimpleDateFormat
 62          */
 63         
 64         Text outKey = new Text();
 65         DoubleWritable outValue = new DoubleWritable();
 66         
 67         @Override
 68         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 69             
 70             String[] split = value.toString().split(",");
 71             
 72             String course = split[0];
 73             
 74             int sum = 0;
 75             int count = 0;
 76             
 77             for(int i = 2; i<split.length; i++){
 78                 int tempScore = Integer.parseInt(split[i]);
 79                 sum += tempScore;
 80                 
 81                 count++;
 82             }
 83             
 84             double avgScore = 1D * sum / count;
 85             
 86             
 87             outKey.set(course);
 88             outValue.set(avgScore);
 89             
 90             context.write(outKey, outValue);
 91         }
 92         
 93     }
 94     
 95     public static class CourseScoreMR1Reducer extends Reducer<Text, DoubleWritable, Text, Text>{
 96         
 97         
 98         Text outValue = new Text();
 99         /**
100          * key :  course
101          * 
102          * values : 98.7   87.6
103          */
104         @Override
105         protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
106             
107             double sum = 0;
108             int count = 0;
109             
110             for(DoubleWritable dw : values){
111                 sum += dw.get();
112                 count ++;
113             }
114             
115             double lastAvgScore = sum / count;
116             
117             outValue.set(count+"\t" + lastAvgScore);
118             
119             context.write(key, outValue);
120         }
121     }
122 }
View Code

第二題

CourseScoreMR2.java

 1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.FileSystem;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.io.LongWritable;
 7 import org.apache.hadoop.io.NullWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 
15 import com.ghgj.mr.exercise.pojo.CourseScore;
16 import com.ghgj.mr.exercise.ptn.CSPartitioner;
17 
18 public class CourseScoreMR2{
19 
20     public static void main(String[] args) throws Exception {
21         
22         Configuration conf = new Configuration();
23 
24         FileSystem fs = FileSystem.get(conf);
25         Job job = Job.getInstance(conf);
26         
27         
28         job.setJarByClass(CourseScoreMR2.class);
29         job.setMapperClass(CourseScoreMR2Mapper.class);
30 //        job.setReducerClass(CourseScoreMR2Reducer.class);
31         
32         job.setMapOutputKeyClass(CourseScore.class);
33         job.setMapOutputValueClass(NullWritable.class);
34 //        job.setOutputKeyClass(CourseScore.class);
35 //        job.setOutputValueClass(NullWritable.class);
36         
37         
38         job.setPartitionerClass(CSPartitioner.class);
39         job.setNumReduceTasks(4);
40         
41         
42         Path inputPath = new Path("E:\\bigdata\\cs\\input");
43         Path outputPath = new Path("E:\\bigdata\\cs\\output_2");
44         FileInputFormat.setInputPaths(job, inputPath);
45         if(fs.exists(outputPath)){
46             fs.delete(outputPath, true);
47         }
48         FileOutputFormat.setOutputPath(job, outputPath);
49         
50         
51         boolean isDone = job.waitForCompletion(true);
52         System.exit(isDone ? 0 : 1);
53     }
54     
55     public static class CourseScoreMR2Mapper extends Mapper<LongWritable, Text, CourseScore, NullWritable>{
56         
57         CourseScore cs = new CourseScore();
58         
59         /**
60          * value =  math,huangxiaoming,85,75,85,99,66,88,75,91
61          */
62         @Override
63         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
64             
65             String[] split = value.toString().split(",");
66             
67             String course = split[0];
68             String name = split[1];
69             
70             int sum = 0;
71             int count = 0;
72             
73             for(int i = 2; i<split.length; i++){
74                 int tempScore = Integer.parseInt(split[i]);
75                 sum += tempScore;
76                 
77                 count++;
78             }
79             
80             double avgScore = 1D * sum / count;
81             
82             cs.setCourse(course);
83             cs.setName(name);
84             cs.setScore(avgScore);
85             
86             context.write(cs, NullWritable.get());
87         }
88         
89     }
90     
91     public static class CourseScoreMR2Reducer extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable>{
92         
93         @Override
94         protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
95             
96             
97         }
98     }
99 }
View Code

CSPartitioner.java

 1 import org.apache.hadoop.io.NullWritable;
 2 import org.apache.hadoop.mapreduce.Partitioner;
 3 
 4 import com.ghgj.mr.exercise.pojo.CourseScore;
 5 
 6 public class CSPartitioner extends Partitioner<CourseScore,NullWritable>{
 7 
 8     /**
 9      * 
10      */
11     @Override
12     public int getPartition(CourseScore key, NullWritable value, int numPartitions) {
13 
14         String course = key.getCourse();
15         if(course.equals("math")){
16             return 0;
17         }else if(course.equals("english")){
18             return 1;
19         }else if(course.equals("computer")){
20             return 2;
21         }else{
22             return 3;
23         }
24         
25         
26     }
27 
28     
29 }
View Code

 第三題

CourseScoreMR3.java

  1 import java.io.IOException;
  2 
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.fs.FileSystem;
  5 import org.apache.hadoop.fs.Path;
  6 import org.apache.hadoop.io.LongWritable;
  7 import org.apache.hadoop.io.NullWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.Mapper;
 11 import org.apache.hadoop.mapreduce.Reducer;
 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 14 
 15 import com.ghgj.mr.exercise.gc.CourseScoreGC;
 16 import com.ghgj.mr.exercise.pojo.CourseScore;
 17 
 18 public class CourseScoreMR3{ 
 19     
 20     private static final int TOPN = 3;
 21 
 22     public static void main(String[] args) throws Exception {
 23         
 24         Configuration conf = new Configuration();
 25         FileSystem fs = FileSystem.get(conf);
 26         Job job = Job.getInstance(conf);
 27         
 28         
 29         job.setJarByClass(CourseScoreMR3.class);
 30         job.setMapperClass(CourseScoreMR2Mapper.class);
 31         job.setReducerClass(CourseScoreMR2Reducer.class);
 32         
 33         job.setMapOutputKeyClass(CourseScore.class);
 34         job.setMapOutputValueClass(NullWritable.class);
 35         job.setOutputKeyClass(CourseScore.class);
 36         job.setOutputValueClass(NullWritable.class);
 37         
 38         
 39 //        job.setPartitionerClass(CSPartitioner.class);
 40 //        job.setNumReduceTasks(4);
 41         
 42         
 43         // 指定分組規則
 44         job.setGroupingComparatorClass(CourseScoreGC.class);
 45         
 46         
 47         Path inputPath = new Path("E:\\bigdata\\cs\\input");
 48         Path outputPath = new Path("E:\\bigdata\\cs\\output_3_last");
 49         FileInputFormat.setInputPaths(job, inputPath);
 50         if(fs.exists(outputPath)){
 51             fs.delete(outputPath, true);
 52         }
 53         FileOutputFormat.setOutputPath(job, outputPath);
 54         
 55         
 56         boolean isDone = job.waitForCompletion(true);
 57         System.exit(isDone ? 0 : 1);
 58     }
 59     
 60     public static class CourseScoreMR2Mapper extends Mapper<LongWritable, Text, CourseScore, NullWritable>{
 61         
 62         CourseScore cs = new CourseScore();
 63         
 64         /**
 65          * value =  math,huangxiaoming,85,75,85,99,66,88,75,91
 66          */
 67         @Override
 68         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 69             
 70             String[] split = value.toString().split(",");
 71             
 72             String course = split[0];
 73             String name = split[1];
 74             
 75             int sum = 0;
 76             int count = 0;
 77             
 78             for(int i = 2; i<split.length; i++){
 79                 int tempScore = Integer.parseInt(split[i]);
 80                 sum += tempScore;
 81                 
 82                 count++;
 83             }
 84             
 85             double avgScore = 1D * sum / count;
 86             
 87             cs.setCourse(course);
 88             cs.setName(name);
 89             cs.setScore(avgScore);
 90             
 91             context.write(cs, NullWritable.get());
 92         }
 93         
 94     }
 95     
 96     public static class CourseScoreMR2Reducer extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable>{
 97         
 98         int count = 0;
 99         
100         /**
101          * reducer階段的reduce方法的調用參數:key相同的額一組key-vlaue
102          * 
103          * redcuer階段,每次遇到一個不同的key的key_value組, 那么reduce方法就會被調用一次。
104          * 
105          * 
106          * values這個迭代器只能迭代一次。
107          * values迭代器在迭代的過程中迭代出來的value會變,同時,這個value所對應的key也會跟着變         合理
108          * 
109          */
110         @Override
111         protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
112             
113             
114             int count = 0;
115             
116             for(NullWritable nvl : values){
117                 System.out.println("*********************************** " + (++count) + "      " + key.toString());
118                 
119                 if(count == 3){
120                     return;
121                 }
122             }
123             
124             
125             // 原樣輸出
126             /*for(NullWritable nvl :  values){
127                 context.write(key, nvl);
128             }*/
129             
130             
131             // 輸出每門課程的最高分數   ,  預期結果中,key的顯示都是一樣的
132 //            for(NullWritable nvl : values){
133 //                System.out.println(key + " - " nvl);
134 //                
135 //                valueList.add(nvl);
136 //            }
137             
138 //            List<Value> valueList = null;
139             // 預期結果中,key的顯示都是一樣的
140             /*int count = 0;
141             for(NullWritable nvl : values){
142                 count++;
143             }
144             for(int i = 0; i<count; i++){
145             valueList.get(i) = value
146                 System.out.println(key + " - "+ value);
147             }*/
148             
149             
150 //            math hello 1
151 //            math hi  2
152         }
153     }
154 }
View Code

CourseScoreGC.java

 1 import org.apache.hadoop.io.WritableComparable;
 2 import org.apache.hadoop.io.WritableComparator;
 3 
 4 import com.ghgj.mr.exercise.pojo.CourseScore;
 5 
 6 /**
 7  *  分組規則的指定
 8  */
 9 public class CourseScoreGC extends WritableComparator{
10     
11     public CourseScoreGC(){
12         super(CourseScore.class, true);
13     }
14 
15     /**
16      *   
17      *   方法的定義解釋:
18      *   
19      *   方法的意義:一般來說,都可以從方法名找到一些提示
20      *   方法的參數:將來你的MR程序中,要作為key的兩個對象,是否是相同的對象
21      *   方法的返回值: 返回值類型為int  當返回值為0的時候。證明, 兩個參數對象,經過比較之后,是同一個對象
22      *   
23      *   在我們的需求中: 分組規則是  Course
24      * 
25      */
26     @Override
27     public int compare(WritableComparable a, WritableComparable b) {
28 
29         CourseScore cs1 = (CourseScore)a;
30         CourseScore cs2 = (CourseScore)b;
31         
32         int compareTo = cs1.getCourse().compareTo(cs2.getCourse());
33         
34         return compareTo;
35     }
36 }
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM