先跟遠哥一起先了解一下什么是MapReduce吧。
在Hadoop中,MapReduce過程分三個步驟:Map(主要是分解並行的任務)、Combine(主要是為了提高Reduce的效率)和Reduce(把處理后的結果再匯總起來) 。
關於如何搭建Hadoop運行環境,可以閱讀我的另外一篇博文:http://www.cnblogs.com/taven/archive/2012/08/12/2634145.html
好了,我們先看一下運行一個Hadoop作業的啟動代碼:
job.setJarByClass(WordCount. class);
job.setMapperClass( TokenizerMapper. class);
job.setCombinerClass( IntSumReducer. class);
job.setReducerClass( IntSumReducer. class);
job.setOutputKeyClass(Text. class);
job.setOutputValueClass(IntWritable. class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion( true) ? 0 : 1);
看到沒,在運行一個Hadoop作業之前,先得指定 MapperClass 、CombinerClass 、ReducerClass .
假設我們交給Hadoop去分析的一個文本內容為:
lixy csy lixy zmde nitamade hehe
realy amoeba woyou weibo hehe
好了,提供的內容很簡單,就是3行文本,第1行文本包含n個單詞,第2行是空的,第3行也包含n個單詞,單詞與單詞之間用空格隔開,下面我們來看看MapperClass 是如何實現的,又是如何運行的呢?看看 TokenizerMapper 的代碼:
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println( "TokenizerMapper.map...");
System.out.println( "Map key:"+key.toString()+ " Map value:"+value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String tmp = itr.nextToken();
word.set(tmp);
context.write(word, one);
System.out.println( "tmp:"+tmp+ " one:"+one);
}
System.out.println( "context:"+context.toString());
}
}
注:這里遠哥要說一下“IntWritable one = new IntWritable(1);”的用意,因為我們不管一個單詞會出現幾次,只要出現,我們就計算1次,所以“context.write(word, one)”這行代碼將一個單詞寫入的時候,值永遠是1;
在運行的時候,根據你文件中內容的情況,上面的 map(Object key, Text value, Context context) 方法可能會被調用多次,將本例子提供的文件內容執行后,控制台輸出內容如下(為了方便閱讀,我添加了一些換行):
Map key:0 Map value: lixy csy lixy zmde nitamade hehe
tmp:lixy one:1
tmp:csy one:1
tmp:lixy one:1
tmp:zmde one:1
tmp:nitamade one:1
tmp:hehe one:1
context:org.apache.hadoop.mapreduce.Mapper$Context@1af0b4a3
TokenizerMapper.map...
Map key:34 Map value:
context:org.apache.hadoop.mapreduce.Mapper$Context@1af0b4a3
TokenizerMapper.map...
Map key:36 Map value: realy amoeba woyou weibo hehe
tmp:realy one:1
tmp:amoeba one:1
tmp:woyou one:1
tmp:weibo one:1
tmp:hehe one:1
context:org.apache.hadoop.mapreduce.Mapper$Context@1af0b4a3
IntSumReducer.reduce...
val.get():1
Reduce key: amoeba Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
IntSumReducer.reduce...
val.get():1
Reduce key: csy Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
IntSumReducer.reduce...
val.get():1
val.get():1
Reduce key: hehe Reduce result:2
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:2
IntSumReducer.reduce...
val.get():1
val.get():1
Reduce key: lixy Reduce result:2
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:2
IntSumReducer.reduce...
val.get():1
Reduce key: nitamade Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
IntSumReducer.reduce...
val.get():1
Reduce key: realy Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
IntSumReducer.reduce...
val.get():1
Reduce key: weibo Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
IntSumReducer.reduce...
val.get():1
Reduce key: woyou Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
IntSumReducer.reduce...
val.get():1
Reduce key: zmde Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1
從TokenizerMapper的 map(Object key, Text value, Context context) 調用的信息輸出情況可以分析出,文件內容中有兩行,所以該方法一共調用了2次(因為TextInputFormat類型的,都是按行處理)。
每一行的內容會在value參數中傳進來,也就是說每一行的內容都對應了一個key,這個key為此行的開頭位置在本文件中的所在位置(所以第1行的key是0,第2行的key是34,第3行的key是36),一般為數字的。
在這個map方法中,我們可以加入一些自己的處理邏輯,比如根據空格來取得每個單詞,然后我們需要將處理后的結果,寫入到 context 參數中,便於hadoop處理完后續的處理邏輯。(這里我們需要注意的是“IntWritable one”變量都是數值1)
上面看了map的過程,接下來我們再看reduce的過程,先看看 IntSumReducer 的代碼:
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
System.out.println( "IntSumReducer.reduce...");
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
System.out.println( "val.get():" + val.get());
}
result.set(sum);
context.write(key, result);
System.out.println( "Reduce key:" + key.toString() + " Reduce result:" + result.get());
System.out.println( "Reduce Context:" + context + " Result:" + result);
}
}
執行調用后,控制台輸出內容如下:
val.get():1
Reduce key: amoeba Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
IntSumReducer.reduce...
val.get():1
Reduce key: csy Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
IntSumReducer.reduce...
val.get():2
Reduce key: hehe Reduce result:2
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:2
IntSumReducer.reduce...
val.get():2
Reduce key: lixy Reduce result:2
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:2
IntSumReducer.reduce...
val.get():1
Reduce key: nitamade Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
IntSumReducer.reduce...
val.get():1
Reduce key: realy Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
IntSumReducer.reduce...
val.get():1
Reduce key: weibo Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
IntSumReducer.reduce...
val.get():1
Reduce key: woyou Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
IntSumReducer.reduce...
val.get():1
Reduce key: zmde Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1
通過執行 reduce(Text key, Iterable<IntWritable> values, Context context) 方法,奇跡發生了,hadoop傳到這里的參數,已經去重了。什么意思呢?就是說,參數key里面是單詞名稱,如果一個單詞出現2次,那么參數values里面就會2個值,但是key只有1次。像“lixy”這個單詞在第一行出現了2次,那么這里的key只出現1次,但是后面的values會有2個IntWritable,並且值都是1,這個為1的值其實就是你在map的時候,自己定的。
這個例子只是簡單的說明了MapReduce的一個簡單使用,實際上他的功能遠不只這些,還可以實現一些對數據庫中數據的排序、統計等等,特別是對一些非常非常龐大的數據表。當你有一個1T的文本內容,需要統計里面每個單詞分別出現多少次的時候,一台計算機去計算,會需要很長時間的,有可能光加載就要很長時間,但是如果你交給hadoop,並且配了幾台機器一起跑的話,hadoop能把這1T的文本內容分成很多個小段,分發到不同的物理機器上,並行執行你的MapReduce邏輯,然后將幾台物理機器上處理完成的內容匯總后給你,整個過程是分片、並行處理完成的,效率大大提高。