1.wordcount的代碼如下
public
class
WordCount {
public
static
class
TokenizerMapper
extends
Mapper
<
Object, Text, Text, IntWritable
>
{
private
final
static
IntWritable one
=
new
IntWritable(
1
);
private
Text word
=
new
Text();
public
void
map(Object key, Text value, Context context )
throws
IOException, InterruptedException { StringTokenizer itr
=
new
StringTokenizer(value.toString());
while
(itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
public
static
class
IntSumReducer
extends
Reducer
<
Text, IntWritable, Text, IntWritable
>
{
private
IntWritable result
=
new
IntWritable();
public
void
reduce(Text key, Iterable
<
IntWritable
>
values,Context context)
throws
IOException, InterruptedException {
int
sum
=
0
;
for
(IntWritable val : values) { sum
+=
val.get(); } result.set(sum); context.write(key, result); } }
public
static
void
main(String[] args)
throws
Exception { Configuration conf
=
new
Configuration(); String[] otherArgs
=
new
GenericOptionsParser(conf, args).getRemainingArgs();
if
(otherArgs.length
!=
2
) { System.err.println(
"
Usage: wordcount <in> <out>
"
); System.exit(
2
); } Job job
=
new
Job(conf,
"
word count
"
); job.setJarByClass(WordCount.
class
); job.setMapperClass(TokenizerMapper.
class
); job.setCombinerClass(IntSumReducer.
class
); job.setReducerClass(IntSumReducer.
class
); job.setOutputKeyClass(Text.
class
); job.setOutputValueClass(IntWritable.
class
); FileInputFormat.addInputPath(job,
new
Path(otherArgs[
0
])); FileOutputFormat.setOutputPath(job,
new
Path(otherArgs[
1
])); System.exit(job.waitForCompletion(
true
)
?
0
:
1
); } }
2.一個可以運行的mapreduce程序可以包含哪些元素呢?
JobConf 常用可定制參數
| 參數
|
作用
|
缺省值
|
其它實現
|
| InputFormat
|
將輸入的數據集切割成小數據集 InputSplits, 每一個 InputSplit 將由一個 Mapper 負責處理。此外 InputFormat 中還提供一個 RecordReader 的實現, 將一個 InputSplit 解析成 <key,value> 對提供給 map 函數。
|
TextInputFormat
|
SequenceFileInputFormat
|
| OutputFormat
|
提供一個 RecordWriter 的實現,負責輸出最終結果
|
TextOutputFormat
|
SequenceFileOutputFormat
|
| OutputKeyClass
|
輸出的最終結果中 key 的類型
|
LongWritable
|
|
| OutputValueClass
|
輸出的最終結果中 value 的類型
|
Text
|
|
| MapperClass
|
Mapper 類,實現 map 函數,完成輸入的 <key,value> 到中間結果的映射
|
IdentityMapper
|
LongSumReducer,
|
| CombinerClass
|
實現 combine 函數,將中間結果中的重復 key 做合並
|
null
|
|
| ReducerClass
|
Reducer 類,實現 reduce 函數,對中間結果做合並,形成最終結果
|
IdentityReducer
|
AccumulatingReducer, LongSumReducer
|
| InputPath
|
設定 job 的輸入目錄, job 運行時會處理輸入目錄下的所有文件
|
null
|
|
| OutputPath
|
設定 job 的輸出目錄,job 的最終結果會寫入輸出目錄下
|
null
|
|
| MapOutputKeyClass
|
設定 map 函數輸出的中間結果中 key 的類型
|
如果用戶沒有設定的話,使用 OutputKeyClass
|
|
| MapOutputValueClass
|
設定 map 函數輸出的中間結果中 value 的類型
|
如果用戶沒有設定的話,使用 OutputValuesClass
|
|
| OutputKeyComparator
|
對結果中的 key 進行排序時的使用的比較器
|
WritableComparable
|
|
| PartitionerClass
|
對中間結果的 key 排序后,用此 Partition 函數將其划分為R份,每份由一個 Reducer 負責處理。
|
HashPartitioner
|
KeyFieldBasedPartitioner PipesPartitioner
|
比較容易疑惑的是:
InputFormat:讀取輸入文件,以自定義map的輸入數據格式,傳給map.(如下紅色字體)
public static class TokenizerMapper extends Mapper<
MapOutputKeyClass,MapOutputValueClass:定義了map的輸出數據的格式,reduce的輸入數據格式
public static class TokenizerMapper extends Mapper<
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
OutputKeyClass,OutputValueClass:定義了reduce的輸出數據格式,OutputFormat的輸入格式
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
OutputFormat:將mapreduce的結果數據寫入到文件中去
OutputKeyComparator/OutputValueGroupingComparator:二次排序用的
參考文獻:
1.http://hadoop.apache.org/docs/r0.19.1/cn/mapred_tutorial.html
2.http://caibinbupt.iteye.com/blog/338785
3.http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop2/index.html
4.http://www.riccomini.name/Topics/DistributedComputing/Hadoop/SortByValue/
