前言
前面的一篇給大家寫了一些MapReduce的一些程序,像去重、詞頻統計、統計分數、共現次數等。這一篇給大家介紹的是關於Combiner優化操作。
一、Combiner概述
1.1、為什么需要Combiner
我們map任務處理的結果是存放在運行map任務的節點上。
map處理的數據的結果在進入reduce的時候,reduce會通過遠程的方式去獲取數據。
在map處理完數據之后,數據量特別大的話。reduce再去處理數據它就要通過網絡去獲取很多的數據。
這樣會導致一個問題是:大量的數據會對網絡帶寬造成一定的影響。
有沒有一種方式能夠類似reduce一樣,在map端處理完數據之后,然后在reduce端進行一次簡單的數據處理?
MapReudce正常處理是:
map處理完,中間結果存放在map節點上。reduce處理的數據通過網絡形式拿到reduce所在的節點上。
如果我們能夠在map端進行一次類似於reduce的操作,這樣會使進入reduce的數據就會少很多。
我們把在map端所執行的類似於reduce的操作成為Combiner。
1.2、Combiner介紹
1) 前提
每一個map都可能會產生大量的本地輸出
2)Combiner功能
對map端的輸出先做一次合並
3)目的
減少在map和reduce節點之間的數據傳輸量, 以提高網絡IO性能。
二、使用Combiner優化Mapduce執行
2.1、使用前提
不能對最原始的map的數據流向reduce造成影響。也就是說map端進入reduce的數據不收Combiner的影響。
數據輸入的鍵值類型和數據輸出的鍵值類型一樣的reduce我們可以把它當做Combiner來使用
舉例:
我們前面一篇博客中有一個處理的是求用戶的好友列表的數據。
我們之后進入map端的數據類型為LongWritable,Text,而map端輸出的數據類型為Text,Text(用戶,好友),進入reduce之后reduce的輸入類型為Text,Text,
最后reduce的輸出也是Text,Text(用戶,好友列表)。
這樣總結:
reduce的輸入類型等於reduce輸出的數據類型,這樣符合Combiner的情況。(這樣我們就不需要去自定義數據類型了)
2.2、怎么使用
其實Combiner的本質就是一個reducer,那我們要寫Combiner我們就要繼承reducer。
下面寫一個例子,首先你需要了解我前面寫的一個專利引用的例子,才能了解專利文件數據格式。
需求:求這個專利以及這個專利它引用了哪些專利。

import com.briup.bd1702.hadoop.mapred.utils.PatentRecordParser; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class PatentReferenceWithCombiner_0010 extends Configured implements Tool{ public static class PatentReferenceMapper extends Mapper<LongWritable,Text,Text,Text>{ private PatentRecordParser parser=new PatentRecordParser(); private Text key=new Text(); private Text value=new Text(); @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ parser.parse(value); if(parser.isValid()){ this.key.set(parser.getPatentId()); this.value.set(parser.getRefPatentId()); context.write(this.key,this.value); } } } public static class PatentReferenceReducer extends Reducer<Text,Text,Text,Text>{ private Text value=new Text(); @Override protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ StringBuffer refPatentIds=null; for(Text value:values){ refPatentIds.append(value.toString()+","); } this.value.set(refPatentIds.toString()); context.write(key,value); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); // 構建Job對象,並設置驅動類名和Job名,用於提交作業 Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); // 給Job設置Mapper類以及map方法輸出的鍵值類型 job.setMapperClass(PatentReferenceMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 給Job設置Reducer類及reduce方法輸出的鍵值類型 job.setReducerClass(PatentReferenceReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設置文件的讀取方式,文本文件;輸出方式,文本文件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // 給Job是定輸入文件的路徑和輸出結果的路徑 TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); // 設置Combiner job.setCombinerClass(PatentReferenceReducer.class); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00010_PatentReferenceWithCombiner_0010(),args)); } }
注意:
1)我們在作業配置中設置Mapper和Reducer,所以我們的Combiner也需要設置的:
2)在這個例子中的Reducer直接就可以當作Combiner去使用。(Combiner的本質就是一個reducer)
2.3、利用Combiner計算每一年的平均氣溫
1)分析
如果我們不用Cobiner的時候,map輸出是(年份,溫度),進入reduce中的集合就是這一年中所有的溫度值。我們在設置一個變量來疊加一下我們有多少個這樣的溫度。
然后把所有的溫度加起來除以遍歷的個數。這是正常情況下!
如果我們利用Combiner計算每一年的平均氣溫的時候,我們在map端先算一次平均溫度,然后到reduce計算一個總的平均氣溫。
從上圖來說,我們看出來雖然滿足數據輸入的鍵值類型和數據輸出的鍵值類型一樣的reduce,但是這是不符合我們的數學邏輯。
分析上圖:我們不可能那把每個平均值拿出來除以個數吧,這樣做是錯誤的。
2)解決
上圖分析:我們可以把溫度和個數組合起來,自定義一個數據類型(AV)。
注意:我們Combiner和Reduce的數據輸入和輸出不一樣,所以程序中的Reduce就不能作為Reduce了,
我們需要單獨去編寫一個Combiner,但是我們注意到Combiner和Reduce的實現(算法或應用程序內容)是一模一樣的。
3)代碼實現Combiner計算每一年的平均氣溫
第一:寫一個AverageValue類

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.io.Writable; class AverageValue implements Writable{ private VIntWritable num; private DoubleWritable avgValue; AverageValue(){ num=new VIntWritable(); avgValue=new DoubleWritable(); } AverageValue(AverageValue av){ num=new VIntWritable(av.num.get()); avgValue=new DoubleWritable(av.avgValue.get()); } @Override public void write(DataOutput out) throws IOException{ num.write(out); avgValue.write(out); } @Override public void readFields(DataInput in) throws IOException{ num.readFields(in); avgValue.readFields(in); } public void set(int num,double avgValue){ this.num.set(num); this.avgValue.set(avgValue); } public void set(VIntWritable num,DoubleWritable avgValue){ set(num.get(),avgValue.get()); } public VIntWritable getNum(){ return num; } public void setNum(VIntWritable num){ this.num=num; } public DoubleWritable getAvgValue(){ return avgValue; } public void setAvgValue(DoubleWritable avgValue){ this.avgValue=avgValue; } @Override public String toString(){ return "AverageValue{"+"num="+num+", avgValue="+avgValue+'}'; }
第二:實現

import com.briup.bd1702.hadoop.mapred.utils.WeatherRecordParser; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class AverageTemperatureWithCombiner_0010 extends Configured implements Tool{ static class AverageTemperatureWithCombinerMapper extends Mapper<LongWritable,Text, IntWritable,AverageValue>{ private WeatherRecordParser parser=new WeatherRecordParser(); private IntWritable year=new IntWritable(); private AverageValue value=new AverageValue(); @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ parser.parse(value); if(parser.isValid()){ this.year.set(parser.getYear()); this.value.set(1, parser.getTemperature()); context.write(this.year,this.value); } } } static class AverageTemperatureWithCombinerCombiner extends Reducer<IntWritable,AverageValue, IntWritable,AverageValue>{ private AverageValue value=new AverageValue(); @Override protected void reduce(IntWritable key, Iterable<AverageValue> values,Context context) throws IOException, InterruptedException{ int sum=0; double value=0; for(AverageValue av:values){ sum+=av.getNum().get(); value+=av.getAvgValue().get() *av.getNum().get(); } this.value.set(sum,value/sum); context.write(key,this.value); } } static class AverageTemperatureWithCombinerReducer extends Reducer<IntWritable,AverageValue,IntWritable,DoubleWritable>{ private DoubleWritable value=new DoubleWritable(); @Override protected void reduce(IntWritable key,Iterable<AverageValue> values,Context context) throws IOException, InterruptedException{ int sum=0; double value=0; for(AverageValue av:values){ sum+=av.getNum().get(); value+=av.getAvgValue().get()* av.getNum().get(); } this.value.set(value/sum); context.write(key,this.value); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(AverageTemperatureWithCombinerMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(AverageValue.class); job.setReducerClass(AverageTemperatureWithCombinerReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(DoubleWritable.class); job.setCombinerClass(AverageTemperatureWithCombinerCombiner.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00030_AverageTemperatureWithCombiner_0010(),args)); } }
2.4、計算每一年每個氣象站的平均溫度
1)分析
我們的key可以有兩種方式:
使用一個Text和氣象站拼接起來作為key,來計算平均溫度。
我們可以創建一個數據類型,使用年份和氣象站形成一個聯合的key(聯合腱),我們就寫一個YeayStation,對於YearStation既要序列化又要可比較大小要實現WritableComparable<T>。
Hadoop的hash值用來干什么的?
我們需要使用hash值是因為在數據分區的時候,也就是確定哪個數據進入哪個reduce的時候。需要通過hashCode和reduce個數取余的結果確定進入哪個reduce。(IntWritable的默認hash值是它代表int類型數字的本身)
所以說數據分區主要是用的HashCode(key的值得hashCode)。
需要比較大小是因為進入同一個reduce的多組數據誰先進入,要比較它key值得大小。誰小誰先進入。
那我們這個復合鍵需不要重寫hashCode和equals方法?
如果我們不去重寫的話,我們使用的是Object的hashCode()方法。當我們一個YearStation對象重復去使用的時候,所有的hashCode都一樣。
所以我們還是盡可能的去重寫hashCode和equals方法。我們需要year和stationId同時參與分區,那我們重寫的hashcode同時和這兩個參數有關系。
2)代碼實現
第一:編寫YearStation類(聯合腱)

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class YearStation implements WritableComparable<YearStation>{ private IntWritable year; private Text stationId; public YearStation(){ year=new IntWritable(); stationId=new Text(); } /* 復制構造器 */ public YearStation(YearStation ys){ year=new IntWritable(ys.year.get()); stationId=new Text(ys.stationId.toString()); } public void set(YearStation ys){ year=new IntWritable(ys.year.get()); stationId=new Text(ys.stationId.toString()); } public void set(IntWritable year,Text stationId){ this.year=new IntWritable(year.get()); this.stationId=new Text(stationId.toString()); } public void set(int year,String stationId){ this.year=new IntWritable(year); this.stationId=new Text(stationId); } @Override public int compareTo(YearStation o){ int yearComp=year.compareTo(o.year); int stationIdComp=stationId.compareTo(o.stationId); return yearComp!=0?yearComp:stationIdComp; } @Override public void write(DataOutput out) throws IOException{ year.write(out); stationId.write(out); } @Override public void readFields(DataInput in) throws IOException{ year.readFields(in); stationId.readFields(in); } @Override public boolean equals(Object o){ if(this==o) return true; if(!(o instanceof YearStation)) return false; YearStation that=(YearStation)o; if(!year.equals(that.year)) return false; return stationId.equals(that.stationId); } @Override public int hashCode(){ int result=year.hashCode(); result=127*result+stationId.hashCode(); return Math.abs(result); } @Override public String toString(){ return year+"\t"+stationId; } }
注意:在這個需求中,我們需要重寫toString()方法,因為我們這個鍵最后要輸出到HDFS中的結果文件中去的。如果不重寫可能是一個YearStation的地址。
我們知道reduce做輸出最后產生的就是結果文件,那么reduce輸出的key和value以什么分割的?其實就是制表符("\t")。所以toString()方法中我們也用這個
第二:實現計算每一年每個氣象站的平均溫度

import com.briup.bd1702.hadoop.mapred.utils.WeatherRecordParser; import com.briup.bd1702.hadoop.mapred.utils.YearStation; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class AverageTemperatureByYearStationWithCombiner_0010 extends Configured implements Tool{ static class AvgTempByYSWithCombMapper extends Mapper<LongWritable,Text,YearStation,AverageValue>{ private YearStation ys=new YearStation(); private AverageValue av=new AverageValue(); private WeatherRecordParser parser=new WeatherRecordParser(); @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ parser.parse(value); if(parser.isValid()){ ys.set(parser.getYear(),parser.getStationId()); av.set(1,parser.getTemperature()); context.write(ys,av); } } } static class AvgTempByYSWithCombCombiner extends Reducer<YearStation,AverageValue,YearStation,AverageValue>{ private AverageValue av=new AverageValue(); @Override protected void reduce(YearStation key,Iterable<AverageValue> values,Context context) throws IOException, InterruptedException{ int sum=0; double count=0.0; for(AverageValue av:values){ sum+=av.getNum().get(); count+=av.getAvgValue().get()*av.getNum().get(); } av.set(sum,count/sum); context.write(key,av); } } static class AvgTempByYSWithCombReducer extends Reducer<YearStation,AverageValue,YearStation,DoubleWritable>{ private DoubleWritable result=new DoubleWritable(); @Override protected void reduce(YearStation key,Iterable<AverageValue> values,Context context) throws IOException, InterruptedException{ int sum=0; double count=0; for(AverageValue av:values){ sum+=av.getNum().get(); count+=av.getAvgValue().get()*av.getNum().get(); } result.set(count/sum); context.write(key,result); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(AvgTempByYSWithCombMapper.class); job.setMapOutputKeyClass(YearStation.class); job.setMapOutputValueClass(AverageValue.class); job.setCombinerClass(AvgTempByYSWithCombCombiner.class); job.setReducerClass(AvgTempByYSWithCombReducer.class); job.setOutputKeyClass(YearStation.class); job.setOutputValueClass(DoubleWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00050_AverageTemperatureByYearStationWithCombiner_0010(),args)); } }
喜歡就推薦哦!