mapreduce練習題1


因為想鞏固下mapreduce,就在網上找了個練習題做。
涉及到的知識點,自定義序列化類,排序輸出,分區輸出,分組。

數據及字段說明:
computer,huangxiaoming,85,86,41,75,93,42,85
computer,xuzheng,54,52,86,91,42
computer,huangbo,85,42,96,38
english,zhaobenshan,54,52,86,91,42,85,75
english,liuyifei,85,41,75,21,85,96,14
algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85
english,zhouqi,85,86,41,75,93,42,85,75,55,47,22
english,huangbo,85,42,96,38,55,47,22
algorithm,liutao,85,75,85,99,66
computer,huangzitao,85,86,41,75,93,42,85
math,wangbaoqiang,85,86,41,75,93,42,85
computer,liujialing,85,41,75,21,85,96,14,74,86
computer,liuyifei,75,85,62,48,54,96,15
computer,liutao,85,75,85,99,66,88,75,91
computer,huanglei,76,95,86,74,68,74,48
english,liujialing,75,85,62,48,54,96,15
math,huanglei,76,95,86,74,68,74,48
math,huangjiaju,85,75,86,85,85,74,86
math,liutao,48,58,67,86,15,33,85
english,huanglei,85,75,85,99,66,88,75,91
math,xuzheng,54,52,86,91,42,85,75
math,huangxiaoming,85,75,85,99,66,88,75,91
math,liujialing,85,86,41,75,93,42,85,75
english,huangxiaoming,85,86,41,75,93,42,85
algorithm,huangdatou,48,58,67,86,15,33,85
algorithm,huangzitao,85,86,41,75,93,42,85,75
數據解釋
數據字段個數不固定:
第一個是課程名稱,總共四個課程,computer,math,english,algorithm,
第二個是學生姓名,后面是每次考試的分數,但是每個學生在某門課程中的考試次數不固定。
題目一:統計每門課程的參考人數和課程平均分

package com.startbigdata.studentPractise;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/*
統計每門課程的參考人數和課程平均分
 */
public class AvgCourse {
    static class MapDrv extends Mapper<LongWritable, Text,Text, Text>{
        private final static Text outputValue = new Text();
        private Text outputKey = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //將每行的數據分割成數組
            String[] line = value.toString().split(",");
            //先求取每行數據后面分數的和
            int sum = 0;
            for (int i = 2;i<line.length;i++){
                sum+=Integer.parseInt(line[i]);
            }

           //將結果以(課程,<分數和每個人的考試次數)的形式存儲,
            // 不要用(<課程數,分數和>,每個人的考試次數)進行存儲,否則相同的課程不會被分到一組
            this.outputKey.set(line[0]);
            outputValue.set(sum+"-"+(line.length-2));
            context.write(outputKey,outputValue);
        }
    }

    static class ReduceDrv extends Reducer<Text, Text,Text,Text>{
        private Text outputValue = new Text();


        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            //用於存儲該課程出現的次數,即相當於求取了學生的人數
            int personSum = 0;
            //用於存儲分數和
            int courseSum=0;

            //用於存儲考試的總次數
            int testNum = 0;

            //求取人數和,分數和
            for (Text value:values) {
                //將每個value進行分割得到分數和,考試次數和
              String[] str = value.toString().split("-");
              personSum++;
              courseSum+=Integer.parseInt(str[0]);
              testNum+=Integer.parseInt(str[1]);

            }

            //用於存儲平均分
            int avg = courseSum / testNum;
            this.outputValue.set(personSum+" "+avg);
            context.write(key,outputValue);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,AvgCourse.class.getName());

        job.setJarByClass(AvgCourse.class);

        job.setMapperClass(MapDrv.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(ReduceDrv.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(new Path(args[1]))){
            fs.delete(new Path(args[1]),true);
        }
        job.waitForCompletion(true);
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}

執行命令:
hadoop jar /opt/modules/hadoop-2.7.3/jars/hadoopapi.jar com.startbigdata.AvgCourse /tmp1/student.txt /tmp1/out6

題目二:統計每門課程參考學生的平均分,並且按課程存入不同的結果文件,要求一門課程一個結果文件,並且按平均分從高到低排序,分數保留一位小數。
排序快速記憶:當前對象與后一個對象進行比較,如果比較結果為1進行交換,其他不進行交換。
當后一個對象比當前對象大,返回結果值為1時,前后交換,說明是倒序排列。
當后一個對象比當前對象小,返回結果值為1時,前后交換,說明是升序排列。

自定義序列化類:
package com.startbigdata.studentPractise;


import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 自定義序列化類型
 */
public class StudentBean implements WritableComparable<StudentBean> {//記得是實現WritableComparable接口
    private String course;//課程名
    private String name;//學生姓名
    private Float avgScore;//平均成績

    //自定義序列化需要無參的構造函數
    public StudentBean() {
    }

    public StudentBean(String course, String name, Float avgScore) {
        this.course = course;
        this.name = name;
        this.avgScore = avgScore;
    }

    public String getCourse() {
        return course;
    }

    public void setCourse(String course) {
        this.course = course;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Float getAvgScore() {
        return avgScore;
    }

    public void setAvgScore(Float avgScore) {
        this.avgScore = avgScore;
    }

    //序列化
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(course);//注意dataOutput.writeXX類型
        dataOutput.writeUTF(name);
        dataOutput.writeFloat(avgScore);
    }

    //反序列化
    public void readFields(DataInput dataInput) throws IOException {
        //反序列化的順序要和序列化順序相同
           course = dataInput.readUTF();
           name = dataInput.readUTF();
           avgScore=dataInput.readFloat();
    }
    //設定排序方法是倒敘還是順序
    public int compareTo(StudentBean o) {

        return o.avgScore > this.avgScore?1:-1;
    }

    // 通過toString方法自定義輸出類型
    @Override
    public String toString() {
        return course+"\t"+name+"\t"+avgScore;
    }
}
自定義分區:
package com.startbigdata.studentPractise;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * Mapreduce中會將map輸出的kv對,按照相同key分組,然后分發給不同的reducetask
 *默認的分發規則為:根據key的hashcode%reducetask數來分發
 *所以:如果要按照我們自己的需求進行分組,則需要改寫數據分發(分組)組件Partitioner
 *自定義一個CouresePartition繼承抽象類:Partitioner
 *然后在job對象中,設置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)
 *
 */

//自定義分區規則,這里不清楚自定義分區的執行過程
public class CouresePartition extends Partitioner<StudentBean, NullWritable> {

    public int getPartition(StudentBean key, NullWritable nullWritable, int i) {
        	/*   algorithm
	            computer
	            english
	            math	*/

        if ("algorithm".equals(key.getCourse())){
            return 0;
        }
        else if ("computer".equals(key.getCourse())){
            return 1;

        }
        else if ("english".equals(key.getCourse())){
            return 2;
        }

        else
            return 3;
    }
}
mapreduce程序:
package com.startbigdata.studentPractise;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

 //統計每門課程參考學生的平均分,並且按課程存入不同的結果文件,
 // 要求一門課程一個結果文件,並且按平均分從高到低排序,分數保留一位小數。

public class StudentAvgCourse {

    static class MapDrvAvg extends Mapper<LongWritable, Text, StudentBean, NullWritable>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] ling = value.toString().split(",");
            //求每個學生的平均分
            int sum = 0;//存儲總分數
            for (int i = 2;i<ling.length;i++){
                sum+=Long.parseLong(ling[i]);
            }
            StudentBean studentBean = new StudentBean();
            studentBean.setName(ling[1]);
            studentBean.setCourse(ling[0]);
            studentBean.setAvgScore(sum*1.0f/(ling.length-2));

            context.write(studentBean,NullWritable.get());
        }
    }
    static class ReduceDrvAvg extends Reducer<StudentBean, NullWritable,StudentBean,NullWritable>{

        @Override
        protected void reduce(StudentBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            //因為自定義了分區組件,自定義類型有排序規則,所以這里直接輸出就可以了
            for (NullWritable nullWritable : values) {
                context.write(key, nullWritable.get());
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,StudentAvgCourse.class.getName());
        job.setJarByClass(StudentAvgCourse.class);
         //設置分區類
        job.setPartitionerClass(CouresePartition.class);
        //設置reduce 任務數
        job.setNumReduceTasks(4);

        job.setMapperClass(MapDrvAvg.class);
        job.setMapOutputKeyClass(StudentBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setReducerClass(ReduceDrvAvg.class);
        job.setOutputKeyClass(StudentBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(new Path(args[1]))){
            fs.delete(new Path(args[1]),true);
        }

        boolean wait = job.waitForCompletion(true);
        System.exit(wait?0:1);

    }
}


上面圖片后面算出的數據是錯誤的,因為之前程序有個邏輯錯誤,已經修正了,但結果圖片未保存。不過輸出的格式如上圖。

題目三:求出每門課程參考學生成績最高的學生的信息:課程,姓名和平均分

自定義序列化類:
package com.startbigdata.studentPractise;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class StudentMaxBean implements WritableComparable<StudentMaxBean> {
    private String course;
    private String name;
    private Float avgScore;
    private Float maxScore;

    public StudentMaxBean() {
    }

    public StudentMaxBean(String course, String name, Float avgScore, Float maxScore) {
        this.course = course;
        this.name = name;
        this.avgScore = avgScore;
        this.maxScore = maxScore;
    }

    public String getCourse() {
        return course;
    }

    public void setCourse(String course) {
        this.course = course;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Float getAvgScore() {
        return avgScore;
    }

    public void setAvgScore(Float avgScore) {
        this.avgScore = avgScore;
    }

    public Float getMaxScore() {
        return maxScore;
    }

    public void setMaxScore(Float maxScore) {
        this.maxScore = maxScore;
    }

    public int compareTo(StudentMaxBean o) {
     //首先通過課程進行排序,課程相同的通過成績進行排序
     //值得一提的是,使用自定義分組組件指定的分組字段,一定要在comparaTo方法中使用字段得而前面

        int index = o.course.compareTo(this.course);
        if (index==0){
            float flag = o.maxScore-this.maxScore;
            return flag>0.0f?1:-1;
        }
        else return index>0.0f?1:-1;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(course);//注意dataOutput.writeXX類型
        dataOutput.writeUTF(name);
        dataOutput.writeFloat(avgScore);
        dataOutput.writeFloat(maxScore);
    }

    //反序列化
    public void readFields(DataInput dataInput) throws IOException {
        //反序列化的順序要和序列化順序相同
        course = dataInput.readUTF();
        name = dataInput.readUTF();
        avgScore=dataInput.readFloat();
        maxScore=dataInput.readFloat();
    }

    @Override
    public String toString() {
        return course+"\t"+name+"\t"+avgScore+"\t"+maxScore;
    }
}
自定義分組類:
package com.startbigdata.studentPractise;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

   //自定義分組,
   //主要就是對於分組進行排序,分組只按照組建鍵中的一個值進行分組
public class CourseGroup extends WritableComparator  {

    protected CourseGroup() {

        super(StudentMaxBean.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        StudentMaxBean astudentMaxBean = (StudentMaxBean)a;
        StudentMaxBean bstudentMaxBean =(StudentMaxBean)b;
          //如果是整型直接astudentMaxBean.getCourse()-bstudentMaxBean.getCourse()
   return astudentMaxBean.getCourse().compareTo(bstudentMaxBean.getCourse());
    }
}
mapreduce程序:
package com.startbigdata.studentPractise;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


//求出每門課程參考學生成績最高的學生的信息:課程,姓名和平均分

public class StudentTop {
    static class MapDrv extends Mapper<LongWritable, Text,StudentMaxBean,NullWritable >{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split(",");
            //求每個同學每門課程最高分,默認第一次考試為最高分
            Float maxScore = Float.parseFloat(line[2]);
            //存儲總分
            Float sumScore = 0.0f;
            for (int i=2;i<line.length;i++){
                //求總分
                sumScore+=Float.parseFloat(line[i]);
                //如果后次比前次分數高就將maxScore替換,否則不變
                if (maxScore<Float.parseFloat(line[i])) maxScore=Float.parseFloat(line[i]);
            }
            //平均分
            Float avgScore = sumScore/(line.length-2)*1.0f;
            StudentMaxBean studentMaxBean = new StudentMaxBean(line[0],line[1],avgScore,maxScore);
            //輸出<課程名,學生信息>
            System.out.println("map------------------------"+studentMaxBean.toString());
            context.write(studentMaxBean,NullWritable.get());
        }

       }

    static class ReduceDrv extends Reducer<StudentMaxBean, NullWritable,StudentBean,NullWritable>{

        protected void reduce(StudentMaxBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
           // StudentBean s = new StudentBean(key.getCourse(),key.getName(),key.getAvgScore());不要在這里就賦值,這樣會導致數據的每個值都是第一個進來的值
            StudentBean s = new StudentBean();//存放同組內分數最高的同學信息
             context.write(s,NullWritable.get());

      /*
      求topN如下:
            int i = 0;
                    for(NullWritable nullWritable:values){
                        if (i>=2) break;
                    s.setCourse(key.getCourse());
                    s.setName(key.getName());
                    s.setAvgScore(key.getAvgScore());
                    context.write(s,nullWritable.get());
                    i++;
            }
*/


        }


    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,StudentTop.class.getName());

        job.setJarByClass(StudentTop.class);
        //設置分組
        job.setGroupingComparatorClass(CourseGroup.class);

        job.setMapperClass(MapDrv.class);
       // job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(StudentMaxBean.class);
         job.setMapOutputKeyClass(StudentMaxBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setReducerClass(ReduceDrv.class);
        job.setOutputKeyClass(StudentBean.class);
        job.setOutputValueClass(NullWritable.class);

       // FileInputFormat.addInputPath(job,new Path(args[0]));
       // FileOutputFormat.setOutputPath(job,new Path(args[1]));
        FileInputFormat.addInputPath(job,new Path("E:/tmp/student.txt"));
        FileOutputFormat.setOutputPath(job,new Path("E:/tmp/out"));

        FileSystem fs = FileSystem.get(conf);
        //if (fs.exists(new Path(args[1]))){
        //fs.delete(new Path(args[1]),true);
      // }

        boolean wait = job.waitForCompletion(true);
        System.exit(wait?0:1);
    }
}

上面第三題我是在windows環境下做的,因為程序之前運行的結果不對,我linux沒有編譯器,就在windows環境下編譯的,所以和前兩題有點不太一樣。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM