1.wordcount的代碼如下
public class WordCount { public static class TokenizerMapper extends Mapper < Object, Text, Text, IntWritable > { private final static IntWritable one = new IntWritable( 1 ); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer < Text, IntWritable, Text, IntWritable > { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable < IntWritable > values,Context context) throws IOException, InterruptedException { int sum = 0 ; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2 ) { System.err.println( " Usage: wordcount <in> <out> " ); System.exit( 2 ); } Job job = new Job(conf, " word count " ); job.setJarByClass(WordCount. class ); job.setMapperClass(TokenizerMapper. class ); job.setCombinerClass(IntSumReducer. class ); job.setReducerClass(IntSumReducer. class ); job.setOutputKeyClass(Text. class ); job.setOutputValueClass(IntWritable. class ); FileInputFormat.addInputPath(job, new Path(otherArgs[ 0 ])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 1 ])); System.exit(job.waitForCompletion( true ) ? 0 : 1 ); } }
2.一個可以運行的mapreduce程序可以包含哪些元素呢?
JobConf 常用可定制參數
參數
|
作用
|
缺省值
|
其它實現
|
InputFormat
|
將輸入的數據集切割成小數據集 InputSplits, 每一個 InputSplit 將由一個 Mapper 負責處理。此外 InputFormat 中還提供一個 RecordReader 的實現, 將一個 InputSplit 解析成 <key,value> 對提供給 map 函數。
|
TextInputFormat
|
SequenceFileInputFormat
|
OutputFormat
|
提供一個 RecordWriter 的實現,負責輸出最終結果
|
TextOutputFormat
|
SequenceFileOutputFormat
|
OutputKeyClass
|
輸出的最終結果中 key 的類型
|
LongWritable
|
|
OutputValueClass
|
輸出的最終結果中 value 的類型
|
Text
|
|
MapperClass
|
Mapper 類,實現 map 函數,完成輸入的 <key,value> 到中間結果的映射
|
IdentityMapper
|
LongSumReducer,
|
CombinerClass
|
實現 combine 函數,將中間結果中的重復 key 做合並
|
null
|
|
ReducerClass
|
Reducer 類,實現 reduce 函數,對中間結果做合並,形成最終結果
|
IdentityReducer
|
AccumulatingReducer, LongSumReducer
|
InputPath
|
設定 job 的輸入目錄, job 運行時會處理輸入目錄下的所有文件
|
null
|
|
OutputPath
|
設定 job 的輸出目錄,job 的最終結果會寫入輸出目錄下
|
null
|
|
MapOutputKeyClass
|
設定 map 函數輸出的中間結果中 key 的類型
|
如果用戶沒有設定的話,使用 OutputKeyClass
|
|
MapOutputValueClass
|
設定 map 函數輸出的中間結果中 value 的類型
|
如果用戶沒有設定的話,使用 OutputValuesClass
|
|
OutputKeyComparator
|
對結果中的 key 進行排序時的使用的比較器
|
WritableComparable
|
|
PartitionerClass
|
對中間結果的 key 排序后,用此 Partition 函數將其划分為R份,每份由一個 Reducer 負責處理。
|
HashPartitioner
|
KeyFieldBasedPartitioner PipesPartitioner
|
比較容易疑惑的是:
InputFormat:讀取輸入文件,以自定義map的輸入數據格式,傳給map.(如下紅色字體)
public static class TokenizerMapper extends Mapper<
MapOutputKeyClass,MapOutputValueClass:定義了map的輸出數據的格式,reduce的輸入數據格式
public static class TokenizerMapper extends Mapper<
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
OutputKeyClass,OutputValueClass:定義了reduce的輸出數據格式,OutputFormat的輸入格式
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
OutputFormat:將mapreduce的結果數據寫入到文件中去
OutputKeyComparator/OutputValueGroupingComparator:二次排序用的
參考文獻:
1.http://hadoop.apache.org/docs/r0.19.1/cn/mapred_tutorial.html
2.http://caibinbupt.iteye.com/blog/338785
3.http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop2/index.html
4.http://www.riccomini.name/Topics/DistributedComputing/Hadoop/SortByValue/