前言
在Hadoop中,排序是MapReduce的靈魂,MapTask和ReduceTask均會對數據按Key排序,這個操作是MR框架的默認行為,不管你的業務邏輯上是否需要這一操作。
技術點
MapReduce框架中,用到的排序主要有兩種:快速排序和基於堆實現的優先級隊列(PriorityQueue)。
Mapper階段
從map輸出到環形緩沖區的數據會被排序(這是MR框架中改良的快速排序),這個排序涉及partition和key,當緩沖區容量占用80%,會spill數據到磁盤,生成IFile文件,Map結束后,會將IFile文件排序合並成一個大文件(基於堆實現的優先級隊列),以供不同的reduce來拉取相應的數據。
Reducer階段
從Mapper端取回的數據已是部分有序,Reduce Task只需進行一次歸並排序即可保證數據整體有序。為了提高效率,Hadoop將sort階段和reduce階段並行化,在sort階段,Reduce Task為內存和磁盤中的文件建立了小頂堆,保存了指向該小頂堆根節點的迭代器,並不斷的移動迭代器,以將key相同的數據順次交給reduce()函數處理,期間移動迭代器的過程實際上就是不斷調整小頂堆的過程(建堆→取堆頂元素→重新建堆→取堆頂元素...),這樣,sort和reduce可以並行進行。
分組Top N分析
在數據處理中,經常會碰到這樣一個場景,對表數據按照某一字段分組,然后找出各自組內最大的幾條記錄情形。針對這種分組Top N問題,我們利用Hive、MapReduce等多種工具實現一下。
場景模擬
computer,huangxiaoming,85,86,41,75,93,42,85 computer,xuzheng,54,52,86,91,42 computer,huangbo,85,42,96,38 english,zhaobenshan,54,52,86,91,42,85,75 english,liuyifei,85,41,75,21,85,96,14 algorithm,liuyifei,75,85,62,48,54,96,15 computer,huangjiaju,85,75,86,85,85 english,liuyifei,76,95,86,74,68,74,48 english,huangdatou,48,58,67,86,15,33,85 algorithm,huanglei,76,95,86,74,68,74,48 algorithm,huangjiaju,85,75,86,85,85,74,86 computer,huangdatou,48,58,67,86,15,33,85 english,zhouqi,85,86,41,75,93,42,85,75,55,47,22 english,huangbo,85,42,96,38,55,47,22 algorithm,liutao,85,75,85,99,66 computer,huangzitao,85,86,41,75,93,42,85 math,wangbaoqiang,85,86,41,75,93,42,85 computer,liujialing,85,41,75,21,85,96,14,74,86 computer,liuyifei,75,85,62,48,54,96,15 computer,liutao,85,75,85,99,66,88,75,91 computer,huanglei,76,95,86,74,68,74,48 english,liujialing,75,85,62,48,54,96,15 math,huanglei,76,95,86,74,68,74,48 math,huangjiaju,85,75,86,85,85,74,86 math,liutao,48,58,67,86,15,33,85 english,huanglei,85,75,85,99,66,88,75,91 math,xuzheng,54,52,86,91,42,85,75 math,huangxiaoming,85,75,85,99,66,88,75,91 math,liujialing,85,86,41,75,93,42,85,75 english,huangxiaoming,85,86,41,75,93,42,85 algorithm,huangdatou,48,58,67,86,15,33,85 algorithm,huangzitao,85,86,41,75,93,42,85,75
一、數據解釋
數據字段個數不固定:
第一個是課程名稱,總共四個課程,computer,math,english,algorithm,
第二個是學生姓名,后面是每次考試的分數
二、統計需求:
1、統計每門課程的參考人數和課程平均分
2、統計每門課程參考學生的平均分,並且按課程存入不同的結果文件,要求一門課程一個結果文件,並且按平均分從高到低排序,分數保留一位小數
3、求出每門課程參考學生成績最高的學生的信息:課程,姓名和平均分
第一題
CourseScoreMR1.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.DoubleWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 public class CourseScoreMR1 { 16 17 public static void main(String[] args) throws Exception { 18 19 Configuration conf = new Configuration(); 20 FileSystem fs = FileSystem.get(conf); 21 Job job = Job.getInstance(conf); 22 23 24 job.setJarByClass(CourseScoreMR1.class); 25 job.setMapperClass(CourseScoreMR1Mapper.class); 26 job.setReducerClass(CourseScoreMR1Reducer.class); 27 28 job.setMapOutputKeyClass(Text.class); 29 job.setMapOutputValueClass(DoubleWritable.class); 30 job.setOutputKeyClass(Text.class); 31 job.setOutputValueClass(Text.class); 32 33 34 Path inputPath = new Path("E:\\bigdata\\cs\\input"); 35 Path outputPath = new Path("E:\\bigdata\\cs\\output_1"); 36 FileInputFormat.setInputPaths(job, inputPath); 37 if(fs.exists(outputPath)){ 38 fs.delete(outputPath, true); 39 } 40 FileOutputFormat.setOutputPath(job, outputPath); 41 42 43 boolean isDone = job.waitForCompletion(true); 44 System.exit(isDone ? 0 : 1); 45 } 46 47 public static class CourseScoreMR1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{ 48 49 /** 50 * 數據的三個字段: course , name, score 51 * 52 * value == algorithm,huangzitao,85,86,41,75,93,42,85,75 53 * 54 * 輸出的key和value: 55 * 56 * key : course 57 * 58 * value : avgScore 59 * 60 * 格式化數值相關的操作的API : NumberFormat 61 * SimpleDateFormat 62 */ 63 64 Text outKey = new Text(); 65 DoubleWritable outValue = new DoubleWritable(); 66 67 @Override 68 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 69 70 String[] split = value.toString().split(","); 71 72 String course = split[0]; 73 74 int sum = 0; 75 int count = 0; 76 77 for(int i = 2; i<split.length; i++){ 78 int tempScore = Integer.parseInt(split[i]); 79 sum += tempScore; 80 81 count++; 82 } 83 84 double avgScore = 1D * sum / count; 85 86 87 outKey.set(course); 88 outValue.set(avgScore); 89 90 context.write(outKey, outValue); 91 } 92 93 } 94 95 public static class CourseScoreMR1Reducer extends Reducer<Text, DoubleWritable, Text, Text>{ 96 97 98 Text outValue = new Text(); 99 /** 100 * key : course 101 * 102 * values : 98.7 87.6 103 */ 104 @Override 105 protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { 106 107 double sum = 0; 108 int count = 0; 109 110 for(DoubleWritable dw : values){ 111 sum += dw.get(); 112 count ++; 113 } 114 115 double lastAvgScore = sum / count; 116 117 outValue.set(count+"\t" + lastAvgScore); 118 119 context.write(key, outValue); 120 } 121 } 122 }
第二題
CourseScoreMR2.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.NullWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 import com.ghgj.mr.exercise.pojo.CourseScore; 16 import com.ghgj.mr.exercise.ptn.CSPartitioner; 17 18 public class CourseScoreMR2{ 19 20 public static void main(String[] args) throws Exception { 21 22 Configuration conf = new Configuration(); 23 24 FileSystem fs = FileSystem.get(conf); 25 Job job = Job.getInstance(conf); 26 27 28 job.setJarByClass(CourseScoreMR2.class); 29 job.setMapperClass(CourseScoreMR2Mapper.class); 30 // job.setReducerClass(CourseScoreMR2Reducer.class); 31 32 job.setMapOutputKeyClass(CourseScore.class); 33 job.setMapOutputValueClass(NullWritable.class); 34 // job.setOutputKeyClass(CourseScore.class); 35 // job.setOutputValueClass(NullWritable.class); 36 37 38 job.setPartitionerClass(CSPartitioner.class); 39 job.setNumReduceTasks(4); 40 41 42 Path inputPath = new Path("E:\\bigdata\\cs\\input"); 43 Path outputPath = new Path("E:\\bigdata\\cs\\output_2"); 44 FileInputFormat.setInputPaths(job, inputPath); 45 if(fs.exists(outputPath)){ 46 fs.delete(outputPath, true); 47 } 48 FileOutputFormat.setOutputPath(job, outputPath); 49 50 51 boolean isDone = job.waitForCompletion(true); 52 System.exit(isDone ? 0 : 1); 53 } 54 55 public static class CourseScoreMR2Mapper extends Mapper<LongWritable, Text, CourseScore, NullWritable>{ 56 57 CourseScore cs = new CourseScore(); 58 59 /** 60 * value = math,huangxiaoming,85,75,85,99,66,88,75,91 61 */ 62 @Override 63 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 64 65 String[] split = value.toString().split(","); 66 67 String course = split[0]; 68 String name = split[1]; 69 70 int sum = 0; 71 int count = 0; 72 73 for(int i = 2; i<split.length; i++){ 74 int tempScore = Integer.parseInt(split[i]); 75 sum += tempScore; 76 77 count++; 78 } 79 80 double avgScore = 1D * sum / count; 81 82 cs.setCourse(course); 83 cs.setName(name); 84 cs.setScore(avgScore); 85 86 context.write(cs, NullWritable.get()); 87 } 88 89 } 90 91 public static class CourseScoreMR2Reducer extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable>{ 92 93 @Override 94 protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { 95 96 97 } 98 } 99 }
CSPartitioner.java

1 import org.apache.hadoop.io.NullWritable; 2 import org.apache.hadoop.mapreduce.Partitioner; 3 4 import com.ghgj.mr.exercise.pojo.CourseScore; 5 6 public class CSPartitioner extends Partitioner<CourseScore,NullWritable>{ 7 8 /** 9 * 10 */ 11 @Override 12 public int getPartition(CourseScore key, NullWritable value, int numPartitions) { 13 14 String course = key.getCourse(); 15 if(course.equals("math")){ 16 return 0; 17 }else if(course.equals("english")){ 18 return 1; 19 }else if(course.equals("computer")){ 20 return 2; 21 }else{ 22 return 3; 23 } 24 25 26 } 27 28 29 }
第三題
CourseScoreMR3.java

1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.NullWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 import com.ghgj.mr.exercise.gc.CourseScoreGC; 16 import com.ghgj.mr.exercise.pojo.CourseScore; 17 18 public class CourseScoreMR3{ 19 20 private static final int TOPN = 3; 21 22 public static void main(String[] args) throws Exception { 23 24 Configuration conf = new Configuration(); 25 FileSystem fs = FileSystem.get(conf); 26 Job job = Job.getInstance(conf); 27 28 29 job.setJarByClass(CourseScoreMR3.class); 30 job.setMapperClass(CourseScoreMR2Mapper.class); 31 job.setReducerClass(CourseScoreMR2Reducer.class); 32 33 job.setMapOutputKeyClass(CourseScore.class); 34 job.setMapOutputValueClass(NullWritable.class); 35 job.setOutputKeyClass(CourseScore.class); 36 job.setOutputValueClass(NullWritable.class); 37 38 39 // job.setPartitionerClass(CSPartitioner.class); 40 // job.setNumReduceTasks(4); 41 42 43 // 指定分組規則 44 job.setGroupingComparatorClass(CourseScoreGC.class); 45 46 47 Path inputPath = new Path("E:\\bigdata\\cs\\input"); 48 Path outputPath = new Path("E:\\bigdata\\cs\\output_3_last"); 49 FileInputFormat.setInputPaths(job, inputPath); 50 if(fs.exists(outputPath)){ 51 fs.delete(outputPath, true); 52 } 53 FileOutputFormat.setOutputPath(job, outputPath); 54 55 56 boolean isDone = job.waitForCompletion(true); 57 System.exit(isDone ? 0 : 1); 58 } 59 60 public static class CourseScoreMR2Mapper extends Mapper<LongWritable, Text, CourseScore, NullWritable>{ 61 62 CourseScore cs = new CourseScore(); 63 64 /** 65 * value = math,huangxiaoming,85,75,85,99,66,88,75,91 66 */ 67 @Override 68 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 69 70 String[] split = value.toString().split(","); 71 72 String course = split[0]; 73 String name = split[1]; 74 75 int sum = 0; 76 int count = 0; 77 78 for(int i = 2; i<split.length; i++){ 79 int tempScore = Integer.parseInt(split[i]); 80 sum += tempScore; 81 82 count++; 83 } 84 85 double avgScore = 1D * sum / count; 86 87 cs.setCourse(course); 88 cs.setName(name); 89 cs.setScore(avgScore); 90 91 context.write(cs, NullWritable.get()); 92 } 93 94 } 95 96 public static class CourseScoreMR2Reducer extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable>{ 97 98 int count = 0; 99 100 /** 101 * reducer階段的reduce方法的調用參數:key相同的額一組key-vlaue 102 * 103 * redcuer階段,每次遇到一個不同的key的key_value組, 那么reduce方法就會被調用一次。 104 * 105 * 106 * values這個迭代器只能迭代一次。 107 * values迭代器在迭代的過程中迭代出來的value會變,同時,這個value所對應的key也會跟着變 合理 108 * 109 */ 110 @Override 111 protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { 112 113 114 int count = 0; 115 116 for(NullWritable nvl : values){ 117 System.out.println("*********************************** " + (++count) + " " + key.toString()); 118 119 if(count == 3){ 120 return; 121 } 122 } 123 124 125 // 原樣輸出 126 /*for(NullWritable nvl : values){ 127 context.write(key, nvl); 128 }*/ 129 130 131 // 輸出每門課程的最高分數 , 預期結果中,key的顯示都是一樣的 132 // for(NullWritable nvl : values){ 133 // System.out.println(key + " - " nvl); 134 // 135 // valueList.add(nvl); 136 // } 137 138 // List<Value> valueList = null; 139 // 預期結果中,key的顯示都是一樣的 140 /*int count = 0; 141 for(NullWritable nvl : values){ 142 count++; 143 } 144 for(int i = 0; i<count; i++){ 145 valueList.get(i) = value 146 System.out.println(key + " - "+ value); 147 }*/ 148 149 150 // math hello 1 151 // math hi 2 152 } 153 } 154 }
CourseScoreGC.java

1 import org.apache.hadoop.io.WritableComparable; 2 import org.apache.hadoop.io.WritableComparator; 3 4 import com.ghgj.mr.exercise.pojo.CourseScore; 5 6 /** 7 * 分組規則的指定 8 */ 9 public class CourseScoreGC extends WritableComparator{ 10 11 public CourseScoreGC(){ 12 super(CourseScore.class, true); 13 } 14 15 /** 16 * 17 * 方法的定義解釋: 18 * 19 * 方法的意義:一般來說,都可以從方法名找到一些提示 20 * 方法的參數:將來你的MR程序中,要作為key的兩個對象,是否是相同的對象 21 * 方法的返回值: 返回值類型為int 當返回值為0的時候。證明, 兩個參數對象,經過比較之后,是同一個對象 22 * 23 * 在我們的需求中: 分組規則是 Course 24 * 25 */ 26 @Override 27 public int compare(WritableComparable a, WritableComparable b) { 28 29 CourseScore cs1 = (CourseScore)a; 30 CourseScore cs2 = (CourseScore)b; 31 32 int compareTo = cs1.getCourse().compareTo(cs2.getCourse()); 33 34 return compareTo; 35 } 36 }