MapReduce Java API實例-統計平均成績


場景

MapReduce Java API實例-統計單詞出現頻率:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119410169

在上面對單個txt文件進行統計的基礎上,Mapreduce也是支持文件夾下多個文件處理的。

統計學生各科平均成績,每科成績為一個文件。

在Map階段和上面統計單次頻率差不多,然后在Reduce階段求出總和后,除以科目數,

並將輸出value的數據類型設置為FloatWritable即可。

新建三個數據集,chinese.txt、math.txt、english.txt

分別代表三科成績,每科成績的格式如下

 

 

每科成績左邊是姓名,右邊是成績,並且姓名和成績之間是用空格分開。

注意這里是一個空格,因為下面處理的規則就是按照中間一個空格來處理的。

這點要尤為注意,並且如果這個文件是在Windows上新建並添加的空格,一定要注意排查上傳到Centos以及HDFS集群中是否格式變化。

注:

博客:
https://blog.csdn.net/badao_liumang_qizhi
關注公眾號
霸道的程序猿
獲取編程相關電子書、教程推送與免費下載。

實現

1、Map實現代碼

package com.badao.averagegrade;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class AverageGradeMapper extends Mapper<Object,Text,Text,IntWritable> {
    //1、編寫map函數,通過繼承Mapper類實現里面的map函數
    //   Mapper類當中的第一個函數是Object,也可以寫成Long
    //   第一個參數對應的值是行偏移量

    //2、第二個參數類型通常是Text類型,Text是Hadoop實現的String 類型的可寫類型
    //   第二個參數對應的值是每行字符串

    //3、第三個參數表示的是輸出key的數據類型

    //4、第四個參數表示的是輸出value的數據類型,IntWriable 是Hadoop實現的int類型的可寫數據類型

    public final static IntWritable one = new IntWritable(1);
    public Text word = new Text();

    //key 是行偏移量
    //value是每行字符串
    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String[] str = value.toString().split(" ");
        context.write(new Text(str[0]),new IntWritable(Integer.parseInt(str[1])));
    }
}

 

2、Reduce代碼

package com.badao.averagegrade;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


//第一個參數類型是輸入值key的數據類型,map中間輸出key的數據類型
//第二個參數類型是輸入值value的數據類型,map中間輸出value的數據類型
//第三個參數類型是輸出值key的數據類型,他的數據類型要跟job.setOutputKeyClass(Text.class) 保持一致
//第四個參數類型是輸出值value的數據類型,它的數據類型要跟job.setOutputValueClass(IntWriable.class) 保持一致

public class AverageGradeReducer extends Reducer<Text, IntWritable,Text, FloatWritable> {

    public FloatWritable result = new FloatWritable();


    //key就是單詞  values是單詞出現頻率列表
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable val:values)
        {
            //get就是取出IntWriable的值
            sum += val.get();
        }
        //3表示科目數
        result.set((float)sum/3);
        context.write(key,result);
    }
}

 

3、Job代碼

package com.badao.averagegrade;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import java.io.IOException;

public class AverageGradeJob {
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        wordCountLocal();
    }

    public static void wordCountLocal()throws IOException, ClassNotFoundException, InterruptedException
    {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://192.168.148.128:9000");
        //conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        System.setProperty("HADOOP_USER_NAME","root");
        //實例化一個作業,word count是作業的名字
        Job job = Job.getInstance(conf, "averagegrade");
        //指定通過哪個類找到對應的jar包
        job.setJarByClass(AverageGradeJob.class);
        //為job設置Mapper類
        job.setMapperClass(AverageGradeMapper.class);
        //為job設置reduce類
        job.setReducerClass(AverageGradeReducer.class);
        job.setMapOutputValueClass(IntWritable.class);
        //為job的輸出數據設置key類
        job.setOutputKeyClass(Text.class);
        //為job輸出設置value類
        job.setOutputValueClass(FloatWritable.class);
        //為job設置輸入路徑,輸入路徑是存在的文件夾/文件
        FileInputFormat.addInputPath(job,new Path("/grade"));
        //為job設置輸出路徑
        FileOutputFormat.setOutputPath(job,new Path("/averageGrade3"));
        job.waitForCompletion(true);
    }

}

 

然后將上面的三個成績的txt上傳到集群HDFS中,運行job

 

 

可以在集群HDFS中看到生成統計好的文件,查看paat-r-00000的內容

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM