1.統計需求
1、統計每門課程的參考人數和課程平均分
2、統計每門課程參考學生的平均分,並且按課程存入不同的結果文件,要求一門課程一個結果文件,並且按平均分從高到低排序,分數保留一位小數。
3、求出每門課程參考學生成績最高的學生的信息:課程,姓名和平均分。
數據及字段說明:
computer,huangxiaoming,85,86,41,75,93,42,85 computer,xuzheng,54,52,86,91,42 computer,huangbo,85,42,96,38 english,zhaobenshan,54,52,86,91,42,85,75 english,liuyifei,85,41,75,21,85,96,14 algorithm,liuyifei,75,85,62,48,54,96,15 computer,huangjiaju,85,75,86,85,85 english,liuyifei,76,95,86,74,68,74,48 english,huangdatou,48,58,67,86,15,33,85 algorithm,huanglei,76,95,86,74,68,74,48 algorithm,huangjiaju,85,75,86,85,85,74,86 computer,huangdatou,48,58,67,86,15,33,85 english,zhouqi,85,86,41,75,93,42,85,75,55,47,22 english,huangbo,85,42,96,38,55,47,22 algorithm,liutao,85,75,85,99,66 computer,huangzitao,85,86,41,75,93,42,85 math,wangbaoqiang,85,86,41,75,93,42,85 computer,liujialing,85,41,75,21,85,96,14,74,86 computer,liuyifei,75,85,62,48,54,96,15 computer,liutao,85,75,85,99,66,88,75,91 computer,huanglei,76,95,86,74,68,74,48 english,liujialing,75,85,62,48,54,96,15 math,huanglei,76,95,86,74,68,74,48 math,huangjiaju,85,75,86,85,85,74,86 math,liutao,48,58,67,86,15,33,85 english,huanglei,85,75,85,99,66,88,75,91 math,xuzheng,54,52,86,91,42,85,75 math,huangxiaoming,85,75,85,99,66,88,75,91 math,liujialing,85,86,41,75,93,42,85,75 english,huangxiaoming,85,86,41,75,93,42,85 algorithm,huangdatou,48,58,67,86,15,33,85 algorithm,huangzitao,85,86,41,75,93,42,85,75
數據解釋
數據字段個數不固定:
第一個是課程名稱,總共四個課程,computer,math,english,algorithm,
第二個是學生姓名,后面是每次考試的分數,但是每個學生在某門課程中的考試次數不固定。
2.問題一:統計每門課程的參考人數和課程平均分
根據數據分析,可以根據課程分組求解,這里的分組我們是直接在mapper端使用課程作為輸出的key進行分組的,這樣每門課程所有的記錄會在同一個reduce方法中進行處理,mapper端只要准備好每個學生參考某門課的次數和總成績即可。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 統計每門課程的參考人數和課程平均分 * computer,huangxiaoming,85,86,41,75,93,42,85 */ public class CourseOne { public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //對數據進行解析,分析數據可知第三個字段是學生在某門課程中的考試次數 //根據問題可以分析,統計參考人數,只有使用課程作為key,在reduce階段終極數據條數即可 //對於課程的平均分要統計該門課程所有學生全部的考試次數,以及總分 //在mapper階段,能統計每一個學生在每個課程中的總考試次數和總分 String[] lines = value.toString().split(","); //sum用來統計學生在某門課程中的考試成績 long sum = 0L; //totalTimes用來統計學生在某門課程中的考試次數 //computer,huangxiaoming,85,86,41,75,93,42,85 //首先數據時通過','進行分隔的,所以通過mapper逐行讀取然后根據','進行切分得到一個數組 //然后從第三個元素開始就是某位學生在某門課程中一次考試的成績 //所以使用數組長度減去2就是該學生在該課程中的總考試次數 long totalTimes = lines.length-2; //通過循環遍歷累加該學生在該課程中的考試成績 for (int i = 2; i < lines.length; i++) { sum += Long.parseLong(lines[i]); } //最后的輸出,使用課程名稱作為key 例如:computer //使用拼接字符串的形式創建value,方便reducer階段的處理 //使用totalTimes+"_"+sum 這種拼接方式, //考試次數 + 總成績 context.write(new Text(lines[0]), new Text(totalTimes+"_"+sum)); } } public static class MyReducer extends Reducer<Text,Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { //相同的課程會被分到一個組 //考試人數計數器 int count = 0; //得分累加器 int totalScore = 0; //考試次數計數器 int examTimes = 0; for (Text t : values) { String[] arrs = t.toString().split("_"); count++; totalScore += Integer.parseInt(arrs[1]); examTimes += Integer.parseInt(arrs[0]); } //求平均分 float avg = totalScore*1.0F/examTimes; //輸出結果 context.write(key, new Text(count+"\t"+avg)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(CourseOne.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("G:/files/mr/day2/q3/input")); FileOutputFormat.setOutputPath(job,new Path("G:/files/mr/day2/q3/output1") ); boolean isDone = job.waitForCompletion(true); System.exit(isDone ? 0:1); } }
3.問題二:統計每門課程參考學生的平均分,並且按課程存入不同的結果文件,要求一門課程一個結果文件,並且按平均分從高到低排序,分數保留一位小數。
自定義數據類型:CourseBean
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class CourseBean implements WritableComparable<CourseBean>{ private String course; //課程名 private String name; //學生姓名 private float avg; //平均分 public String getCourse() { return course; } public void setCourse(String course) { this.course = course; } public String getName() { return name; } public void setName(String name) { this.name = name; } public float getAvg() { return avg; } public void setAvg(float avg) { this.avg = avg; } public CourseBean(String course, String name, float avg) { super(); this.course = course; this.name = name; this.avg = avg; } public CourseBean() { } /** * 通過toString方法自定義輸出類型 */ @Override public String toString() { return course + "\t" + name + "\t" + avg; } /** * 序列化 */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(course); out.writeUTF(name); out.writeFloat(avg); } /** * 反序列化 */ @Override public void readFields(DataInput in) throws IOException { course = in.readUTF(); name = in.readUTF(); avg = in.readFloat(); } //比較規則 @Override public int compareTo(CourseBean o) { float flag = o.avg - this.avg; return flag > 0.0f ? 1:-1; }
自定義分區組件:CourseGroupComparator
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class CoursePartitioner extends Partitioner<CourseBean, NullWritable>{ /*algorithm 6 71.12195 computer 10 69.77273 english 9 66.35294 math 7 73.07843*/ @Override public int getPartition(CourseBean key, NullWritable value, int numPartitions) { if("algorithm".equals(key.getCourse())){ return 0; }else if("computer".equals(key.getCourse())){ return 1; }else if("english".equals(key.getCourse())){ return 2; }else{ return 3; } } }
mapreduce程序:
import java.io.IOException; import java.text.DecimalFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 統計每門課程的參考人數和課程平均分 * 考慮到要需求要根據課程進行分組並對平均值進行排序,這里使用自定義bean的形式來進行處理 * 因為要將數據根據課程進行分區並寫入到不容的文件中,所以這里使用自定partitioner組件進行分區 * 要注意的是這時候就要設置reduceTask的個數 * */ public class CourseTwo { static Text text = new Text(); public static class MyMapper extends Mapper<LongWritable, Text, CourseBean, NullWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] lines = value.toString().split(","); long sum = 0L; long totalTimes = lines.length-2; for (int i = 2; i < lines.length; i++) { sum += Long.parseLong(lines[i]); } //格式化平均分使用,保留一位有效小數 DecimalFormat df=new DecimalFormat(".0"); //計算某個學生在某門課程中的平均分 float avg = sum*1.0f/totalTimes; String b = df.format(avg); //構建mapper輸出的key CourseBean cb = new CourseBean(lines[0],lines[1],Float.parseFloat(b)); context.write(cb, NullWritable.get()); } } public static class MyReducer extends Reducer<CourseBean, NullWritable,CourseBean, NullWritable>{ @Override protected void reduce(CourseBean key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException { //因為自定義了分區組件,自定義類型有排序規則,所以這里直接輸出就可以了 for (NullWritable nullWritable : values) { context.write(key, nullWritable); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(CourseTwo.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(CourseBean.class); job.setOutputValueClass(NullWritable.class); //使用自定義的分區組件 job.setPartitionerClass(CoursePartitioner.class); //和自定義分區組件同時使用,根據分區的個數設置reduceTask的個數 job.setNumReduceTasks(4); FileInputFormat.setInputPaths(job, new Path("G:/files/mr/day2/q3/input")); FileOutputFormat.setOutputPath(job,new Path("G:/files/mr/day2/q3/output2") ); boolean isDone = job.waitForCompletion(true); System.exit(isDone ? 0:1); } }
4.問題三:求出每門課程參考學生成績最高的學生的信息:課程,姓名和平均分
這里可以看組是一個分組取Top1的問題,轉換到這個題目,因為每個學生在某門課程中都參考了多次,所以這里在mapper端要先求出每個學生在某門課程的最高分,將最高分及相關信息輸出,在reducer階段求出每門課程的最大值,由於題目要求輸出的是課程,姓名,平均分;那么就要在mapper端將每個學生各科的平均分求出。
通過對於問題的分析,這里采用自定義輸出類型的方式來處理,這里使用bean類型,首先要考慮的是學生在某門課程中的最高分,這里要進行分組求max,默認的使用自定義組件中的compareTo( )方法中的字段進行,這樣多個字段進行分組造成我們在reduce階段取值的時候使用循環的次數增加。所以我們自定義分組組件。使用課程進行分組。
自定義數據類型:CourseBean2
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class CourseBean2 implements WritableComparable<CourseBean2>{ private String course; private String name; private float avg; private long maxScore; public long getMaxScore() { return maxScore; } public void setMaxScore(long maxScore) { this.maxScore = maxScore; } public String getCourse() { return course; } public void setCourse(String course) { this.course = course; } public String getName() { return name; } public void setName(String name) { this.name = name; } public float getAvg() { return avg; } public void setAvg(float avg) { this.avg = avg; } public CourseBean2(String course, String name, float avg, long maxScore) { super(); this.course = course; this.name = name; this.avg = avg; this.maxScore = maxScore; } public CourseBean2() { } @Override public String toString() { return course+"\t"+name + "\t" + avg +"\t"+maxScore; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(course); out.writeUTF(name); out.writeFloat(avg); out.writeLong(maxScore); } @Override public void readFields(DataInput in) throws IOException { course = in.readUTF(); name = in.readUTF(); avg = in.readFloat(); maxScore = in.readLong(); } @Override public int compareTo(CourseBean2 o) { /*首先通過課程進行排序,課程相同的通過成績進行排序 值得一提的是,使用自定義分組組件指定的分組字段,一定要在comparaTo方法中使用字段得而前面 eg: a a b a b c a b c d a b c d e */ int index = o.course.compareTo(this.course); if(index == 0){ long flag = o.maxScore - this.maxScore; return flag > 0L ? 1:-1; }else{ return index > 0L ? 1:-1; } } }
自定義分組組件:CourseGroupComparator
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 自定義分組組件 * 1、如果沒有定義自定義的分組組件,默認的使用comparaTo方法中的字段進行分組排序 * 這里要繼承WritableComparator類,來進行序列化和比較 */ public class CourseGroupComparator extends WritableComparator{ /** * 為了解決下面出現空指針的現象,所以在類中聲明一個構造函數來進行創建 */ public CourseGroupComparator() { super(CourseBean2.class,true); } /** * 如果直接這樣使用會出現一個空指針的錯誤,主要是a,b沒有進行構造,所以是空的; * 創建一個構造方法就可以進行解決 */ @Override public int compare(WritableComparable a, WritableComparable b) { CourseBean2 cb1 = (CourseBean2) a; CourseBean2 cb2 = (CourseBean2) b; //這里是根據課程名稱進行處理的 return cb1.getCourse().compareTo(cb2.getCourse()); } }
mapreduce程序:
package com.jh.hive; import java.io.IOException; import java.text.DecimalFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 統計每門課程的參考人數和課程平均分 * 考慮到要需求要根據課程進行分組並對平均值進行排序,這里使用自定義bean的形式來進行處理 * 因為要將數據根據課程進行分區並寫入到不容的文件中,所以這里使用自定partitioner組件進行分區 * 要注意的是這時候就要設置reduceTask的個數 * */ public class CountApp3 { static class CountMapper extends Mapper<LongWritable, Text, CourseBean2, NullWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CourseBean2, NullWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] worlds = line.split(","); long sum = 0l; long ksCount = worlds.length-2; long maxScore = Long.parseLong(worlds[2]); String course = worlds[0]; String name = worlds[1]; for (int i = 2; i < worlds.length; i++) { sum += Float.parseFloat(worlds[i]); if (Long.parseLong(worlds[i])>maxScore) { maxScore = Long.parseLong(worlds[i]); } } //格式化平均分使用保留一位有效小數 DecimalFormat df = new DecimalFormat(".0"); float avg = sum*1.0f/ksCount; String format = df.format(avg); CourseBean2 courseBean = new CourseBean2(course,name,Float.parseFloat(format),maxScore); context.write(courseBean, NullWritable.get()); } } static class CountReducer extends Reducer<CourseBean2, NullWritable, CourseBean2, NullWritable>{ @Override protected void reduce(CourseBean2 arg0, Iterable<NullWritable> arg1, Reducer<CourseBean2, NullWritable, CourseBean2, NullWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (NullWritable nullWritable : arg1) { count++; if (count==1) { context.write(arg0, nullWritable); } } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(new Path(args[1]))) { fileSystem.delete(new Path(args[1]), true); } Job job = Job.getInstance(conf); job.setJarByClass(CountApp3.class); job.setMapperClass(CountMapper.class); job.setMapOutputKeyClass(CourseBean2.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(CountReducer.class); job.setOutputKeyClass(CourseBean2.class); job.setOutputValueClass(NullWritable.class); job.setGroupingComparatorClass(CourseGroupComparator.class);//添加分組組件 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?1:0); } }
轉載博客:https://blog.csdn.net/zyz_home/article/details/79937228