Scala實現Mapreduce程序1-----求平均數


輸入:

語文成績:

a  89

b  88

c  90

d  77

數學成績:

a  80

b  90

c  98

d  98

輸出:

a  84.5

b  89

c  94

d  87.5

Scala程序實現:按照名字分組,然后計算總成績以及課程個數

object AverageScore {
def main(args: Array[String]): Unit = {
val sc=new SparkContext(new SparkConf().setMaster("local"))
val one=sc.textFile("/spark/test/",2)
val line=one.filter(_.trim.length>0).map(text=>{
(text.split("\t")(0).trim,text.split("\t")(1).trim.toInt)
}).groupByKey().map(text=>{
var sum=0;
var num=0.0;
for(x<-text._2){
sum=sum+text._1.toInt;
num+=1
}
val avg=sum/num
val format=f"$avg%1.2f".toDouble
(text._1,format)
}).collect().foreach(x=>println("name"+x._1+"Average score"+x._2))
}

}

用Mapreduce實現:
package HadoopvsSpark;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import scala.tools.cmd.gen.AnyVals;

import java.io.IOException;

/**
* Created by Administrator on 2017/5/25.
*/
public class AverageScore {
public static class AverageMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
public void map(Text key,IntWritable value,Context context) throws IOException, InterruptedException {
String line=key.toString();
if(line.trim().length()>0){
String[] arr=line.split( "\t" );
context.write( new Text( arr[0] ),new IntWritable( Integer.valueOf( arr[1]) ) );
}
}
}

public static class AverageReducer extends Reducer<Text,IntWritable,Text,DoubleWritable>{
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum=0;
for(IntWritable val:values){
sum+=val.get();
}
context.write( key,new DoubleWritable( sum/3.0 ) );
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job=new Job();
job.setJarByClass( AverageScore.class );

job.setMapperClass( AverageMapper.class );
job.setMapOutputKeyClass( Text.class );
job.setMapOutputValueClass( IntWritable.class );

job.setReducerClass( AverageReducer.class );
job.setOutputKeyClass( Text.class );
job.setOutputValueClass( DoubleWritable.class );

FileInputFormat.addInputPath( job,new Path(args[0]) );
Path outputdir=new Path( args[1] );
FileSystem fs=FileSystem.get( conf );
if(fs.exists( outputdir )){
fs.delete( outputdir,true );
}
FileOutputFormat.setOutputPath(job,outputdir );
System.out.println(job.waitForCompletion( true )?0:1);

}
}

參考博客:
http://blog.csdn.net/kwu_ganymede/article/details/50482948


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM