前言
前面一篇博文寫的是Combiner優化MapReduce執行,也就是使用Combiner在map端執行減少reduce端的計算量。
一、作業的默認配置
MapReduce程序的默認配置
1)概述
在我們的MapReduce程序中有一些默認的配置。所以說當我們程序如果要使用這些默認配置時,可以不用寫。
我們的一個MapReduce程序一定會有Mapper和Reducer,但是我們程序中不寫的話,它也有默認的Mapper和Reducer。
當我們使用默認的Mapper和Reducer的時候,map和reducer的輸入和輸出都是偏移量和數據文件的一行數據,所以就是相當於原樣輸出!
2)默認的MapReduce程序
/** * 沒有指定Mapper和Reducer的最小作業配置 */ public class MinimalMapReduce { public static void main(String[] args) throws Exception{ // 構建新的作業 Configuration conf=new Configuration(); Job job = Job.getInstance(conf, "MinimalMapReduce"); job.setJarByClass(MinimalMapReduce.class); // 設置輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // ᨀ交作業運行 System.exit(job.waitForCompletion(true)?0:1); } }
輸入是:
輸出是:
二、作業的配置方式
MapReduce的類型配置
1)用於配置類型的屬性
在命令行中,怎么去配置呢?
比如說mapreduce.job.inputformat.class。首先我們要繼承Configured實現Tool工具才能這樣去指定:
-Dmapreduce.job.inputformat.class = 某一個類的類全名(一定要記得加報名)
這是Map端的輸出類型控制
這是整個MapReduce程序輸出類型控制,其實就是reduce的類型格式控制
2)No Reducer的MapReduce程序--Mapper
第一步:寫一個TokenCounterMapper繼承Mapper

/** * 將輸入的文本內容拆分為word,做一個簡單輸出的Mapper */ public class TokenCounterMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Text word=new Text(); private static final IntWritable one=new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub StringTokenizer itr=new StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word, one); } } }
第二步:寫一個NoReducerMRDriver完成作業配置

/** *沒有設置Reducer的MR程序 */ public class NoReducerMRDriver { public static void main(String[] args) throws Exception { // 構建新的作業 Configuration conf=new Configuration(); Job job = Job.getInstance(conf, "NoReducer"); job.setJarByClass(NoReducerMRDriver.class); // 設置Mapper job.setMapperClass(TokenCounterMapper.class); // 設置reducer的數量為0 job.setNumReduceTasks(0); // 設置輸出格式 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 設置輸入輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // ᨀ交運行作業 System.exit(job.waitForCompletion(true)?0:1); } }
輸入:
結果:
注意:如果作業擁有0個Reducer,則Mapper結果直接寫入OutputFormat而不經key值排序。
3)No Mapper的MapReduce程序--Reducer
第一步:寫一個TokenCounterReducer繼承Reducer

/** * 將reduce輸入的values內容拆分為word,做一個簡單輸出的Reducer */ public class TokenCounterReducer extends Reducer<LongWritable, Text, Text, IntWritable>{ private Text word=new Text(); private static final IntWritable one=new IntWritable(1); @Override protected void reduce(LongWritable key, Iterable<Text> values,Reducer<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub for(Text value:values){ StringTokenizer itr=new StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word, one); } } } }
第二步:寫一個NoMapperMRDrive完成作業配置

/** *沒有設置Mapper的MR程序 */ public class NoMapperMRDriver { public static void main(String[] args) throws Exception { // 構建新的作業 Configuration conf=new Configuration(); Job job = Job.getInstance(conf, "NoMapper"); job.setJarByClass(NoMapperMRDriver.class); // 設置Reducer job.setReducerClass(TokenCounterReducer.class); // 設置輸出格式 job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設置輸入輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // ᨀ交運行作業 System.exit(job.waitForCompletion(true)?0:1); } }
輸入:
輸出:
三、Mapper類和Reducer類以及它們的子類(實現類)
3.1、Mapper概述
Mapper:封裝了應用程序Mapper階段的數據處理邏輯
1)ChainMapper
方便用戶編寫鏈式Map任務, 即Map階段包含多個Mapper,即可以別寫多個自定義map去參與運算。
2)InverseMapper
一個能交換key和value的Mapper
3)RegexMapper
檢查輸入是否匹配某正則表達式, 輸出匹配字符串和計數器(用的很少)
4)TockenCounterMapper
將輸入分解為獨立的單詞, 輸出個單詞和計數器(以空格分割單詞,value值為1)
3.2、Reducer概述
Mapper:封裝了應用程序Mapper階段的數據處理邏輯
1)ChainMapper:
方便用戶編寫鏈式Map任務, 即Map階段只能有一個Reducer,后面還可以用ChainMapper去多加Mapper。
2)IntSumReducer/LongSumReducer
對各key的所有整型值求和
3.2、寫一個實例去使用
注意:這里用到了一個輸入格式為KeyValueTextInputFormat,我們查看源碼注釋:
我們需要用mapreduce.input.keyvaluelinerecordreader.key.value.separator去指定key和value的分隔符是什么,它的默認分隔符是"\t"也就是tab鍵。
這個需要在配置文件中去指定,但是我們知道在配置文件中能設置的在程序中也是可以設置的。
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
代碼實現:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.map.InverseMapper; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class PatentReference_0010 extends Configured implements Tool{ static class PatentReferenceMapper extends Mapper<Text,Text,Text,IntWritable>{ private IntWritable one=new IntWritable(1); @Override protected void map(Text key,Text value,Context context) throws IOException, InterruptedException{ context.write(key,one); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",","); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); ChainMapper.addMapper(job,InverseMapper.class, // 輸入的鍵值類型由InputFormat決定 Text.class,Text.class, // 輸出的鍵值類型與輸入的鍵值類型相反 Text.class,Text.class,conf); ChainMapper.addMapper(job,PatentReferenceMapper.class, // 輸入的鍵值類型由前一個Mapper輸出的鍵值類型決定 Text.class,Text.class, Text.class,IntWritable.class,conf); ChainReducer.setReducer(job,IntSumReducer.class, Text.class,IntWritable.class, Text.class,IntWritable.class,conf); ChainReducer.addMapper(job,InverseMapper.class, Text.class,IntWritable.class, IntWritable.class,Text.class,conf); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); KeyValueTextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00010_PatentReference_0010(),args)); } }
在Job job=Job.getInstance(conf,this.getClass().getSimpleName());設置中,job把conf也就是配置文件做了一個拷貝,因為hadoop要重復利用一個對象,如果是引用的話,發現值得改變就都改變了。
喜歡就點個“推薦”哦!