MapReduce練習-----學生成績相關題目


1.統計需求

1、統計每門課程的參考人數和課程平均分
2、統計每門課程參考學生的平均分,並且按課程存入不同的結果文件,要求一門課程一個結果文件,並且按平均分從高到低排序,分數保留一位小數。
3、求出每門課程參考學生成績最高的學生的信息:課程,姓名和平均分。

數據及字段說明:

computer,huangxiaoming,85,86,41,75,93,42,85
computer,xuzheng,54,52,86,91,42
computer,huangbo,85,42,96,38
english,zhaobenshan,54,52,86,91,42,85,75
english,liuyifei,85,41,75,21,85,96,14
algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85
english,zhouqi,85,86,41,75,93,42,85,75,55,47,22
english,huangbo,85,42,96,38,55,47,22
algorithm,liutao,85,75,85,99,66
computer,huangzitao,85,86,41,75,93,42,85
math,wangbaoqiang,85,86,41,75,93,42,85
computer,liujialing,85,41,75,21,85,96,14,74,86
computer,liuyifei,75,85,62,48,54,96,15
computer,liutao,85,75,85,99,66,88,75,91
computer,huanglei,76,95,86,74,68,74,48
english,liujialing,75,85,62,48,54,96,15
math,huanglei,76,95,86,74,68,74,48
math,huangjiaju,85,75,86,85,85,74,86
math,liutao,48,58,67,86,15,33,85
english,huanglei,85,75,85,99,66,88,75,91
math,xuzheng,54,52,86,91,42,85,75
math,huangxiaoming,85,75,85,99,66,88,75,91
math,liujialing,85,86,41,75,93,42,85,75
english,huangxiaoming,85,86,41,75,93,42,85
algorithm,huangdatou,48,58,67,86,15,33,85
algorithm,huangzitao,85,86,41,75,93,42,85,75

數據解釋
數據字段個數不固定:
第一個是課程名稱,總共四個課程,computer,math,english,algorithm,
第二個是學生姓名,后面是每次考試的分數,但是每個學生在某門課程中的考試次數不固定。

2.問題一:統計每門課程的參考人數和課程平均分

根據數據分析,可以根據課程分組求解,這里的分組我們是直接在mapper端使用課程作為輸出的key進行分組的,這樣每門課程所有的記錄會在同一個reduce方法中進行處理,mapper端只要准備好每個學生參考某門課的次數和總成績即可。

import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
/**
 * 統計每門課程的參考人數和課程平均分
 * computer,huangxiaoming,85,86,41,75,93,42,85
 */
public class CourseOne {
 
    public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            //對數據進行解析,分析數據可知第三個字段是學生在某門課程中的考試次數
            //根據問題可以分析,統計參考人數,只有使用課程作為key,在reduce階段終極數據條數即可
            //對於課程的平均分要統計該門課程所有學生全部的考試次數,以及總分
            //在mapper階段,能統計每一個學生在每個課程中的總考試次數和總分
            String[] lines = value.toString().split(",");
            //sum用來統計學生在某門課程中的考試成績
            long sum = 0L;
            //totalTimes用來統計學生在某門課程中的考試次數
            //computer,huangxiaoming,85,86,41,75,93,42,85
            //首先數據時通過','進行分隔的,所以通過mapper逐行讀取然后根據','進行切分得到一個數組
            //然后從第三個元素開始就是某位學生在某門課程中一次考試的成績
            //所以使用數組長度減去2就是該學生在該課程中的總考試次數
            long totalTimes = lines.length-2;
            //通過循環遍歷累加該學生在該課程中的考試成績
            for (int i = 2; i < lines.length; i++) {
                sum += Long.parseLong(lines[i]);
            }
            //最后的輸出,使用課程名稱作為key 例如:computer
            //使用拼接字符串的形式創建value,方便reducer階段的處理
            //使用totalTimes+"_"+sum 這種拼接方式,
            //考試次數  + 總成績
            context.write(new Text(lines[0]), new Text(totalTimes+"_"+sum));
        }
    }
    
    public static class MyReducer extends Reducer<Text,Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values,Context context)
                throws IOException, InterruptedException {
            //相同的課程會被分到一個組
            //考試人數計數器
            int count = 0;
            //得分累加器
            int totalScore = 0;
            //考試次數計數器
            int examTimes = 0;
            for (Text t : values) {
                String[] arrs = t.toString().split("_");
                count++;
                totalScore += Integer.parseInt(arrs[1]);
                examTimes += Integer.parseInt(arrs[0]);
            }
            
            //求平均分
            float avg = totalScore*1.0F/examTimes;
            //輸出結果
            context.write(key, new Text(count+"\t"+avg));
        
        }
    }
    
    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(CourseOne.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        FileInputFormat.setInputPaths(job, new Path("G:/files/mr/day2/q3/input"));
        FileOutputFormat.setOutputPath(job,new Path("G:/files/mr/day2/q3/output1") );
        
        boolean isDone = job.waitForCompletion(true);
        System.exit(isDone ? 0:1);
    }
}

3.問題二:統計每門課程參考學生的平均分,並且按課程存入不同的結果文件,要求一門課程一個結果文件,並且按平均分從高到低排序,分數保留一位小數。

自定義數據類型:CourseBean

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
 
public class CourseBean implements WritableComparable<CourseBean>{
    private String course; //課程名
    private String name; //學生姓名
    private float avg; //平均分
    
    public String getCourse() {
        return course;
    }
    public void setCourse(String course) {
        this.course = course;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public float getAvg() {
        return avg;
    }
    public void setAvg(float avg) {
        this.avg = avg;
    }
    
    public CourseBean(String course, String name, float avg) {
        super();
        this.course = course;
        this.name = name;
        this.avg = avg;
    }
    
    public CourseBean() {
 
    }
    
    
    /**
     * 通過toString方法自定義輸出類型
     */
    @Override
    public String toString() {
        return course + "\t" + name + "\t" + avg;
    }
    
    /**
     * 序列化
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(course);
        out.writeUTF(name);
        out.writeFloat(avg);
    }
    
    /**
     * 反序列化
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        course = in.readUTF();
        name = in.readUTF();
        avg = in.readFloat();
    }
    
    //比較規則
    @Override
    public int compareTo(CourseBean o) {
        float flag = o.avg - this.avg;
        return flag > 0.0f ? 1:-1;
    }

自定義分區組件:CourseGroupComparator

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
 
public class CoursePartitioner extends Partitioner<CourseBean, NullWritable>{
 
    /*algorithm    6    71.12195
    computer    10    69.77273
    english    9    66.35294
    math    7    73.07843*/
    @Override
    public int getPartition(CourseBean key, NullWritable value, int numPartitions) {
        if("algorithm".equals(key.getCourse())){
            return 0;
        }else if("computer".equals(key.getCourse())){
            return 1;
        }else if("english".equals(key.getCourse())){
            return 2;
        }else{
            return 3;
        }
    }
 
}

mapreduce程序:

import java.io.IOException;
import java.text.DecimalFormat;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
/**
 * 統計每門課程的參考人數和課程平均分
 * 考慮到要需求要根據課程進行分組並對平均值進行排序,這里使用自定義bean的形式來進行處理
 * 因為要將數據根據課程進行分區並寫入到不容的文件中,所以這里使用自定partitioner組件進行分區
 * 要注意的是這時候就要設置reduceTask的個數
 * 
 */
public class CourseTwo {
    static Text text = new Text();
 
    public static class MyMapper extends Mapper<LongWritable, Text, CourseBean, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            String[] lines = value.toString().split(",");
            long sum = 0L;
            long totalTimes = lines.length-2;
            for (int i = 2; i < lines.length; i++) {
                sum += Long.parseLong(lines[i]);
            }
            //格式化平均分使用,保留一位有效小數
            DecimalFormat df=new DecimalFormat(".0");
            //計算某個學生在某門課程中的平均分
            float avg = sum*1.0f/totalTimes;
            String b = df.format(avg);
            //構建mapper輸出的key
            CourseBean cb = new CourseBean(lines[0],lines[1],Float.parseFloat(b));
            
            context.write(cb, NullWritable.get());
        }
    }
    
    public static class MyReducer extends Reducer<CourseBean, NullWritable,CourseBean, NullWritable>{
        @Override
        protected void reduce(CourseBean key, Iterable<NullWritable> values,Context context)
                throws IOException, InterruptedException {
            //因為自定義了分區組件,自定義類型有排序規則,所以這里直接輸出就可以了
            for (NullWritable nullWritable : values) {
                context.write(key, nullWritable);
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(CourseTwo.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        
        job.setOutputKeyClass(CourseBean.class);
        job.setOutputValueClass(NullWritable.class);
        
        //使用自定義的分區組件
        job.setPartitionerClass(CoursePartitioner.class);
        //和自定義分區組件同時使用,根據分區的個數設置reduceTask的個數
        job.setNumReduceTasks(4);
        
        FileInputFormat.setInputPaths(job, new Path("G:/files/mr/day2/q3/input"));
        FileOutputFormat.setOutputPath(job,new Path("G:/files/mr/day2/q3/output2") );
        
        boolean isDone = job.waitForCompletion(true);
        System.exit(isDone ? 0:1);
    }
}

4.問題三:求出每門課程參考學生成績最高的學生的信息:課程,姓名和平均分

這里可以看組是一個分組取Top1的問題,轉換到這個題目,因為每個學生在某門課程中都參考了多次,所以這里在mapper端要先求出每個學生在某門課程的最高分,將最高分及相關信息輸出,在reducer階段求出每門課程的最大值,由於題目要求輸出的是課程,姓名,平均分;那么就要在mapper端將每個學生各科的平均分求出。

通過對於問題的分析,這里采用自定義輸出類型的方式來處理,這里使用bean類型,首先要考慮的是學生在某門課程中的最高分,這里要進行分組求max,默認的使用自定義組件中的compareTo( )方法中的字段進行,這樣多個字段進行分組造成我們在reduce階段取值的時候使用循環的次數增加。所以我們自定義分組組件。使用課程進行分組。

自定義數據類型:CourseBean2

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
 
import org.apache.hadoop.io.WritableComparable;
 
public class CourseBean2 implements WritableComparable<CourseBean2>{
    private String course;
    private String name;
    private float avg;
    private long maxScore;
    
    
    public long getMaxScore() {
        return maxScore;
    }
    public void setMaxScore(long maxScore) {
        this.maxScore = maxScore;
    }
    public String getCourse() {
        return course;
    }
    public void setCourse(String course) {
        this.course = course;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public float getAvg() {
        return avg;
    }
    public void setAvg(float avg) {
        this.avg = avg;
    }
    
    public CourseBean2(String course, String name, float avg, long maxScore) {
        super();
        this.course = course;
        this.name = name;
        this.avg = avg;
        this.maxScore = maxScore;
    }
    
    public CourseBean2() {
 
    }
    
    
    
    
    @Override
    public String toString() {
        return course+"\t"+name + "\t" + avg +"\t"+maxScore;
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(course);
        out.writeUTF(name);
        out.writeFloat(avg);
        out.writeLong(maxScore);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        course = in.readUTF();
        name = in.readUTF();
        avg = in.readFloat();
        maxScore = in.readLong();
    }
    @Override
    public int compareTo(CourseBean2 o) {
      /*首先通過課程進行排序,課程相同的通過成績進行排序
        值得一提的是,使用自定義分組組件指定的分組字段,一定要在comparaTo方法中使用字段得而前面  
        eg: a  
            a b  
            a b c  
            a b c d  
            a b c d e  */
        int index = o.course.compareTo(this.course);
        if(index == 0){
            long flag = o.maxScore - this.maxScore;
            return flag > 0L ? 1:-1;
            
        }else{
            return index > 0L ? 1:-1;
        }
    }
    
}

自定義分組組件:CourseGroupComparator

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
 
/**
 * 自定義分組組件
 * 1、如果沒有定義自定義的分組組件,默認的使用comparaTo方法中的字段進行分組排序
 * 這里要繼承WritableComparator類,來進行序列化和比較
 */
public class CourseGroupComparator  extends WritableComparator{
    
    /**
     * 為了解決下面出現空指針的現象,所以在類中聲明一個構造函數來進行創建
     */
    public CourseGroupComparator() {
        super(CourseBean2.class,true);
    }
    
    
    /**
     * 如果直接這樣使用會出現一個空指針的錯誤,主要是a,b沒有進行構造,所以是空的;
     * 創建一個構造方法就可以進行解決
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        CourseBean2 cb1 = (CourseBean2) a;
        CourseBean2 cb2 = (CourseBean2) b;
        //這里是根據課程名稱進行處理的
        return cb1.getCourse().compareTo(cb2.getCourse());
    }
}

mapreduce程序:

package com.jh.hive;

import java.io.IOException;
import java.text.DecimalFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 統計每門課程的參考人數和課程平均分
 * 考慮到要需求要根據課程進行分組並對平均值進行排序,這里使用自定義bean的形式來進行處理
 * 因為要將數據根據課程進行分區並寫入到不容的文件中,所以這里使用自定partitioner組件進行分區
 * 要注意的是這時候就要設置reduceTask的個數
 * 
 */
public class CountApp3 {
    static class CountMapper extends Mapper<LongWritable, Text, CourseBean2, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, CourseBean2, NullWritable>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] worlds = line.split(",");
            long sum = 0l;
            long ksCount = worlds.length-2;
            long maxScore = Long.parseLong(worlds[2]);
            String course = worlds[0];
            String name = worlds[1];
            for (int i = 2; i < worlds.length; i++) {
                sum += Float.parseFloat(worlds[i]);
                if (Long.parseLong(worlds[i])>maxScore) {
                    maxScore = Long.parseLong(worlds[i]);
                }
            }
            //格式化平均分使用保留一位有效小數
            DecimalFormat df = new DecimalFormat(".0");
            float avg = sum*1.0f/ksCount;
            String format = df.format(avg);
            CourseBean2 courseBean = new CourseBean2(course,name,Float.parseFloat(format),maxScore);
            context.write(courseBean, NullWritable.get());
        }
    }
    static class CountReducer extends Reducer<CourseBean2, NullWritable, CourseBean2, NullWritable>{
        @Override
        protected void reduce(CourseBean2 arg0, Iterable<NullWritable> arg1,
                Reducer<CourseBean2, NullWritable, CourseBean2, NullWritable>.Context context)
                throws IOException, InterruptedException {
            int count = 0;
            for (NullWritable nullWritable : arg1) {
                count++;
                if (count==1) {
                    context.write(arg0, nullWritable);
                }
            }
        }
            
    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(conf);
        if (fileSystem.exists(new Path(args[1]))) {
            fileSystem.delete(new Path(args[1]), true);
        }
        Job job = Job.getInstance(conf);
        job.setJarByClass(CountApp3.class);
        job.setMapperClass(CountMapper.class);
        job.setMapOutputKeyClass(CourseBean2.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setReducerClass(CountReducer.class);
        job.setOutputKeyClass(CourseBean2.class);
        job.setOutputValueClass(NullWritable.class);

        job.setGroupingComparatorClass(CourseGroupComparator.class);//添加分組組件

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?1:0);
    }
}    

轉載博客:https://blog.csdn.net/zyz_home/article/details/79937228


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM