測試hadoop版本號:2.4
Map端聚合的應用場景:當我們僅僅關心全部數據中的部分數據時,而且數據能夠放入內存中。
使用的優點:能夠大大減小網絡數據的傳輸量,提高效率;
一般編程思路:在Mapper的map函數中讀入全部數據,然后加入到一個List(隊列)中。然后在cleanup函數中對list進行處理。輸出我們關系的少量數據。
實例:
在map函數中使用空格分隔每行數據。然后把每一個單詞加入到一個堆棧中,在cleanup函數中輸出堆棧中單詞次數比較多的單詞以及次數。
package fz.inmap.aggregation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.PriorityQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class InMapArrgegationDriver extends Configured implements Tool{
public static Logger log = LoggerFactory.getLogger(InMapArrgegationDriver.class);
/**
* @throws Exception
*
*/
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new InMapArrgegationDriver(),args);
}
@Override
public int run(String[] arg0) throws Exception {
if(arg0.length!=3){
System.err.println("Usage:\nfz.inmap.aggregation.InMapArrgegationDriver <in> <out> <maxNum>");
return -1;
}
Configuration conf = getConf();
// System.out.println(conf.get("fs.defaultFS"));
Path in = new Path(arg0[0]);
Path out= new Path(arg0[1]);
out.getFileSystem(conf).delete(out, true);
conf.set("maxResult", arg0[2]);
Job job = Job.getInstance(conf,"in map arrgegation job");
job.setJarByClass(getClass());
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(InMapMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// job.setOutputKeyClass(LongWritable.class);
// job.setOutputValueClass(VectorWritable.class);
job.setNumReduceTasks(0);
// System.out.println(job.getConfiguration().get("mapreduce.job.reduces"));
// System.out.println(conf.get("mapreduce.job.reduces"));
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
return job.waitForCompletion(true)?0:-1;
}
protected static class InMapMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
private ArrayList<Word> words = new ArrayList<Word>();
private PriorityQueue<Word> queue;
private int maxResult;
protected void setup(Context cxt){
maxResult = cxt.getConfiguration().getInt("maxResult", 10);
}
protected void map(LongWritable key, Text value,Context cxt){
String [] line = value.toString().split(" "); // use blank to split
for(String word:line){
Word curr = new Word(word,1);
if(words.contains(curr)){
// increase the exists word's frequency
for(Word w:words){
if(w.equals(curr)){
w.frequency++;
break;
}
}
}else{
words.add(curr);
}
}
}
protected void cleanup(Context cxt) throws InterruptedException,IOException{
Text outputKey = new Text();
IntWritable outputValue = new IntWritable();
queue = new PriorityQueue<Word>(words.size());
queue.addAll(words);
for(int i=0;i< maxResult;i++){
Word tail = queue.poll();
if(tail!=null){
outputKey.set(tail.value);
outputValue.set(tail.frequency);
log.info("key is {},value is {}", outputKey,outputValue);
cxt.write(outputKey, outputValue);
}
}
}
}
}
使用到的Word類
package fz.inmap.aggregation;
public class Word implements Comparable<Word>{
public String value;
public int frequency;
public Word(String value,int frequency){
this.value=value;
this.frequency=frequency;
}
@Override
public int compareTo(Word o) {
return o.frequency-this.frequency;
}
@Override
public boolean equals(Object obj){
if(obj instanceof Word){
return value.equalsIgnoreCase(((Word)obj).value);
}else{
return false;
}
}
}
查看輸出結果,能夠看日志(因為在程序中輸出了日志,所以在日志中也能夠查看到);
或者查看輸出結果:
總結:使用map端聚合,盡管能夠大大減小網絡傳輸數據量。提高效率,可是我們在應用的時候還是須要考慮實際的應用環境。比方。假設使用上面的算法來計算最大單詞頻率的前10個,然后還是使用上面的代碼。就會有問題。
每一個mapper會處理並輸出自己的單詞詞頻最大的10個單詞,並沒有考慮到全部數據。這樣在reducer端整合的時候就會可能會忽略部分數據,造成終於結果的錯誤。
分享,成長,快樂
轉載請注明blog地址:http://blog.csdn.net/fansy1990