因為想鞏固下mapreduce,就在網上找了個練習題做。
涉及到的知識點,自定義序列化類,排序輸出,分區輸出,分組。
數據及字段說明:
computer,huangxiaoming,85,86,41,75,93,42,85
computer,xuzheng,54,52,86,91,42
computer,huangbo,85,42,96,38
english,zhaobenshan,54,52,86,91,42,85,75
english,liuyifei,85,41,75,21,85,96,14
algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85
english,zhouqi,85,86,41,75,93,42,85,75,55,47,22
english,huangbo,85,42,96,38,55,47,22
algorithm,liutao,85,75,85,99,66
computer,huangzitao,85,86,41,75,93,42,85
math,wangbaoqiang,85,86,41,75,93,42,85
computer,liujialing,85,41,75,21,85,96,14,74,86
computer,liuyifei,75,85,62,48,54,96,15
computer,liutao,85,75,85,99,66,88,75,91
computer,huanglei,76,95,86,74,68,74,48
english,liujialing,75,85,62,48,54,96,15
math,huanglei,76,95,86,74,68,74,48
math,huangjiaju,85,75,86,85,85,74,86
math,liutao,48,58,67,86,15,33,85
english,huanglei,85,75,85,99,66,88,75,91
math,xuzheng,54,52,86,91,42,85,75
math,huangxiaoming,85,75,85,99,66,88,75,91
math,liujialing,85,86,41,75,93,42,85,75
english,huangxiaoming,85,86,41,75,93,42,85
algorithm,huangdatou,48,58,67,86,15,33,85
algorithm,huangzitao,85,86,41,75,93,42,85,75
數據解釋
數據字段個數不固定:
第一個是課程名稱,總共四個課程,computer,math,english,algorithm,
第二個是學生姓名,后面是每次考試的分數,但是每個學生在某門課程中的考試次數不固定。
題目一:統計每門課程的參考人數和課程平均分
package com.startbigdata.studentPractise;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/*
統計每門課程的參考人數和課程平均分
*/
public class AvgCourse {
static class MapDrv extends Mapper<LongWritable, Text,Text, Text>{
private final static Text outputValue = new Text();
private Text outputKey = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//將每行的數據分割成數組
String[] line = value.toString().split(",");
//先求取每行數據后面分數的和
int sum = 0;
for (int i = 2;i<line.length;i++){
sum+=Integer.parseInt(line[i]);
}
//將結果以(課程,<分數和每個人的考試次數)的形式存儲,
// 不要用(<課程數,分數和>,每個人的考試次數)進行存儲,否則相同的課程不會被分到一組
this.outputKey.set(line[0]);
outputValue.set(sum+"-"+(line.length-2));
context.write(outputKey,outputValue);
}
}
static class ReduceDrv extends Reducer<Text, Text,Text,Text>{
private Text outputValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//用於存儲該課程出現的次數,即相當於求取了學生的人數
int personSum = 0;
//用於存儲分數和
int courseSum=0;
//用於存儲考試的總次數
int testNum = 0;
//求取人數和,分數和
for (Text value:values) {
//將每個value進行分割得到分數和,考試次數和
String[] str = value.toString().split("-");
personSum++;
courseSum+=Integer.parseInt(str[0]);
testNum+=Integer.parseInt(str[1]);
}
//用於存儲平均分
int avg = courseSum / testNum;
this.outputValue.set(personSum+" "+avg);
context.write(key,outputValue);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,AvgCourse.class.getName());
job.setJarByClass(AvgCourse.class);
job.setMapperClass(MapDrv.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(ReduceDrv.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
job.waitForCompletion(true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
執行命令:
hadoop jar /opt/modules/hadoop-2.7.3/jars/hadoopapi.jar com.startbigdata.AvgCourse /tmp1/student.txt /tmp1/out6
題目二:統計每門課程參考學生的平均分,並且按課程存入不同的結果文件,要求一門課程一個結果文件,並且按平均分從高到低排序,分數保留一位小數。
排序快速記憶:當前對象與后一個對象進行比較,如果比較結果為1進行交換,其他不進行交換。
當后一個對象比當前對象大,返回結果值為1時,前后交換,說明是倒序排列。
當后一個對象比當前對象小,返回結果值為1時,前后交換,說明是升序排列。
自定義序列化類:
package com.startbigdata.studentPractise;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 自定義序列化類型
*/
public class StudentBean implements WritableComparable<StudentBean> {//記得是實現WritableComparable接口
private String course;//課程名
private String name;//學生姓名
private Float avgScore;//平均成績
//自定義序列化需要無參的構造函數
public StudentBean() {
}
public StudentBean(String course, String name, Float avgScore) {
this.course = course;
this.name = name;
this.avgScore = avgScore;
}
public String getCourse() {
return course;
}
public void setCourse(String course) {
this.course = course;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Float getAvgScore() {
return avgScore;
}
public void setAvgScore(Float avgScore) {
this.avgScore = avgScore;
}
//序列化
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(course);//注意dataOutput.writeXX類型
dataOutput.writeUTF(name);
dataOutput.writeFloat(avgScore);
}
//反序列化
public void readFields(DataInput dataInput) throws IOException {
//反序列化的順序要和序列化順序相同
course = dataInput.readUTF();
name = dataInput.readUTF();
avgScore=dataInput.readFloat();
}
//設定排序方法是倒敘還是順序
public int compareTo(StudentBean o) {
return o.avgScore > this.avgScore?1:-1;
}
// 通過toString方法自定義輸出類型
@Override
public String toString() {
return course+"\t"+name+"\t"+avgScore;
}
}
自定義分區:
package com.startbigdata.studentPractise;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* Mapreduce中會將map輸出的kv對,按照相同key分組,然后分發給不同的reducetask
*默認的分發規則為:根據key的hashcode%reducetask數來分發
*所以:如果要按照我們自己的需求進行分組,則需要改寫數據分發(分組)組件Partitioner
*自定義一個CouresePartition繼承抽象類:Partitioner
*然后在job對象中,設置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)
*
*/
//自定義分區規則,這里不清楚自定義分區的執行過程
public class CouresePartition extends Partitioner<StudentBean, NullWritable> {
public int getPartition(StudentBean key, NullWritable nullWritable, int i) {
/* algorithm
computer
english
math */
if ("algorithm".equals(key.getCourse())){
return 0;
}
else if ("computer".equals(key.getCourse())){
return 1;
}
else if ("english".equals(key.getCourse())){
return 2;
}
else
return 3;
}
}
mapreduce程序:
package com.startbigdata.studentPractise;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
//統計每門課程參考學生的平均分,並且按課程存入不同的結果文件,
// 要求一門課程一個結果文件,並且按平均分從高到低排序,分數保留一位小數。
public class StudentAvgCourse {
static class MapDrvAvg extends Mapper<LongWritable, Text, StudentBean, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] ling = value.toString().split(",");
//求每個學生的平均分
int sum = 0;//存儲總分數
for (int i = 2;i<ling.length;i++){
sum+=Long.parseLong(ling[i]);
}
StudentBean studentBean = new StudentBean();
studentBean.setName(ling[1]);
studentBean.setCourse(ling[0]);
studentBean.setAvgScore(sum*1.0f/(ling.length-2));
context.write(studentBean,NullWritable.get());
}
}
static class ReduceDrvAvg extends Reducer<StudentBean, NullWritable,StudentBean,NullWritable>{
@Override
protected void reduce(StudentBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//因為自定義了分區組件,自定義類型有排序規則,所以這里直接輸出就可以了
for (NullWritable nullWritable : values) {
context.write(key, nullWritable.get());
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,StudentAvgCourse.class.getName());
job.setJarByClass(StudentAvgCourse.class);
//設置分區類
job.setPartitionerClass(CouresePartition.class);
//設置reduce 任務數
job.setNumReduceTasks(4);
job.setMapperClass(MapDrvAvg.class);
job.setMapOutputKeyClass(StudentBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(ReduceDrvAvg.class);
job.setOutputKeyClass(StudentBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
boolean wait = job.waitForCompletion(true);
System.exit(wait?0:1);
}
}
上面圖片后面算出的數據是錯誤的,因為之前程序有個邏輯錯誤,已經修正了,但結果圖片未保存。不過輸出的格式如上圖。
題目三:求出每門課程參考學生成績最高的學生的信息:課程,姓名和平均分
自定義序列化類:
package com.startbigdata.studentPractise;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class StudentMaxBean implements WritableComparable<StudentMaxBean> {
private String course;
private String name;
private Float avgScore;
private Float maxScore;
public StudentMaxBean() {
}
public StudentMaxBean(String course, String name, Float avgScore, Float maxScore) {
this.course = course;
this.name = name;
this.avgScore = avgScore;
this.maxScore = maxScore;
}
public String getCourse() {
return course;
}
public void setCourse(String course) {
this.course = course;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Float getAvgScore() {
return avgScore;
}
public void setAvgScore(Float avgScore) {
this.avgScore = avgScore;
}
public Float getMaxScore() {
return maxScore;
}
public void setMaxScore(Float maxScore) {
this.maxScore = maxScore;
}
public int compareTo(StudentMaxBean o) {
//首先通過課程進行排序,課程相同的通過成績進行排序
//值得一提的是,使用自定義分組組件指定的分組字段,一定要在comparaTo方法中使用字段得而前面
int index = o.course.compareTo(this.course);
if (index==0){
float flag = o.maxScore-this.maxScore;
return flag>0.0f?1:-1;
}
else return index>0.0f?1:-1;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(course);//注意dataOutput.writeXX類型
dataOutput.writeUTF(name);
dataOutput.writeFloat(avgScore);
dataOutput.writeFloat(maxScore);
}
//反序列化
public void readFields(DataInput dataInput) throws IOException {
//反序列化的順序要和序列化順序相同
course = dataInput.readUTF();
name = dataInput.readUTF();
avgScore=dataInput.readFloat();
maxScore=dataInput.readFloat();
}
@Override
public String toString() {
return course+"\t"+name+"\t"+avgScore+"\t"+maxScore;
}
}
自定義分組類:
package com.startbigdata.studentPractise;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//自定義分組,
//主要就是對於分組進行排序,分組只按照組建鍵中的一個值進行分組
public class CourseGroup extends WritableComparator {
protected CourseGroup() {
super(StudentMaxBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
StudentMaxBean astudentMaxBean = (StudentMaxBean)a;
StudentMaxBean bstudentMaxBean =(StudentMaxBean)b;
//如果是整型直接astudentMaxBean.getCourse()-bstudentMaxBean.getCourse()
return astudentMaxBean.getCourse().compareTo(bstudentMaxBean.getCourse());
}
}
mapreduce程序:
package com.startbigdata.studentPractise;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
//求出每門課程參考學生成績最高的學生的信息:課程,姓名和平均分
public class StudentTop {
static class MapDrv extends Mapper<LongWritable, Text,StudentMaxBean,NullWritable >{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",");
//求每個同學每門課程最高分,默認第一次考試為最高分
Float maxScore = Float.parseFloat(line[2]);
//存儲總分
Float sumScore = 0.0f;
for (int i=2;i<line.length;i++){
//求總分
sumScore+=Float.parseFloat(line[i]);
//如果后次比前次分數高就將maxScore替換,否則不變
if (maxScore<Float.parseFloat(line[i])) maxScore=Float.parseFloat(line[i]);
}
//平均分
Float avgScore = sumScore/(line.length-2)*1.0f;
StudentMaxBean studentMaxBean = new StudentMaxBean(line[0],line[1],avgScore,maxScore);
//輸出<課程名,學生信息>
System.out.println("map------------------------"+studentMaxBean.toString());
context.write(studentMaxBean,NullWritable.get());
}
}
static class ReduceDrv extends Reducer<StudentMaxBean, NullWritable,StudentBean,NullWritable>{
protected void reduce(StudentMaxBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// StudentBean s = new StudentBean(key.getCourse(),key.getName(),key.getAvgScore());不要在這里就賦值,這樣會導致數據的每個值都是第一個進來的值
StudentBean s = new StudentBean();//存放同組內分數最高的同學信息
context.write(s,NullWritable.get());
/*
求topN如下:
int i = 0;
for(NullWritable nullWritable:values){
if (i>=2) break;
s.setCourse(key.getCourse());
s.setName(key.getName());
s.setAvgScore(key.getAvgScore());
context.write(s,nullWritable.get());
i++;
}
*/
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,StudentTop.class.getName());
job.setJarByClass(StudentTop.class);
//設置分組
job.setGroupingComparatorClass(CourseGroup.class);
job.setMapperClass(MapDrv.class);
// job.setMapOutputKeyClass(Text.class);
//job.setMapOutputValueClass(StudentMaxBean.class);
job.setMapOutputKeyClass(StudentMaxBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(ReduceDrv.class);
job.setOutputKeyClass(StudentBean.class);
job.setOutputValueClass(NullWritable.class);
// FileInputFormat.addInputPath(job,new Path(args[0]));
// FileOutputFormat.setOutputPath(job,new Path(args[1]));
FileInputFormat.addInputPath(job,new Path("E:/tmp/student.txt"));
FileOutputFormat.setOutputPath(job,new Path("E:/tmp/out"));
FileSystem fs = FileSystem.get(conf);
//if (fs.exists(new Path(args[1]))){
//fs.delete(new Path(args[1]),true);
// }
boolean wait = job.waitForCompletion(true);
System.exit(wait?0:1);
}
}
上面第三題我是在windows環境下做的,因為程序之前運行的結果不對,我linux沒有編譯器,就在windows環境下編譯的,所以和前兩題有點不太一樣。