MapReduce 應用舉例:單詞計數
WorldCount可以說是MapReduce中的helloworld了,下面來看看hadoop中的例子worldcount對其進行的處理過程,也能對mapreduce的執行過程有一個清晰的認識,特別是對於每一個階段的函數執行所產生的鍵值對
單詞 計數主要完成的功能是:統計一系列文本文件中每個單詞出現的次數,如下圖所示。下面將 通過分析源代碼幫助讀者摸清 MapReduce 程序的基本結構。
圖 3-1 單詞計數
WordCount 詳細的執行步驟如下:
(1) 將文件拆分成 splits,由於測試用的文件較小,所以每個文件為一個 split,並將文件按行分割形成<key, value>對,如圖 3-2 所示。這一步由 MapReduce 框架自動完成,其中偏 移量(即 key 值)包括了回車所占的字符數(Windows 和 Linux 環境下會不同)。
圖 3-2 分割過程
(2) 將分割好的<key, value>對交給用戶定義的 map 方法進行處理,生成新的<key, value> 對,如圖 3-3 所示。
圖 3-3 執行 map
(3) 得到 map 方法輸出的<key, value>對后,Mapper 會將它們按照 key 值進行排序,並 執行 Combine 過程,將 key 值相同的 value 值累加,得到 Mapper 的最終輸出結果。
圖 3-4 map 端排序以及 combine 過程
(4) Reducer 先對從 Mapper 接收的數據進行排序,再交由用戶自定義的 reduce 方法進行 處理,得到新的<key, value>對,並作為 WordCount 的輸出結果,如圖 3-5 所示。
圖 3-5 reduce 端排序以及輸出結果
以上就是wordcount在mapreduce中執行的具體細節,這里面對於中間的鍵值對產生描述的很詳細,這是理解mapreduce很好的資料;
下面來看看hadoop源碼中提供的這一源代碼:這份代碼我的注釋很詳細,但是運行時需要導入很多包,還要給Eclipse配置hadoop的環境,這里主要是分析worldcount的源碼;
19 import java.io.IOException; 20 import java.util.StringTokenizer; 21 22 import org.apache.hadoop.conf.Configuration; 23 24 import org.apache.hadoop.fs.Path; 25 import org.apache.hadoop.io.IntWritable; 26 import org.apache.hadoop.io.Text; 27 import org.apache.hadoop.mapreduce.Job; 28 import org.apache.hadoop.mapreduce.Mapper; 29 import org.apache.hadoop.mapreduce.Reducer; 30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 32 import org.apache.hadoop.util.GenericOptionsParser; 33 34 public class WordCount { 35 36 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { 37 38 private final static IntWritable one = new IntWritable(1);// 初始的單詞都是1次,即使重復 39 private Text word = new Text();// word表示單詞 40 /* 41 * 重寫map方法,讀取初試划分的每一個鍵值對,即行偏移量和一行字符串,key為偏移量,value為該行字符串 42 */ 43 44 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 45 /* 46 * 因為每一行就是一個spilt,並會為之生成一個mapper,所以我們的參數,key就是偏移量,value就是一行字符串 47 */ 48 StringTokenizer itr = new StringTokenizer(value.toString());// value是一行的字符串,這里將其切割成多個單詞 49 while (itr.hasMoreTokens()) {// 多個單詞 50 word.set(itr.nextToken());// 每個word 51 context.write(word, one);// one代表1,最開始每個單詞都是1次,context直接將<word,1>寫到本地磁盤上 52 // write函數直接將兩個參數封裝成<key,value> 53 } 54 } 55 } 56 57 public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 58 private IntWritable result = new IntWritable(); 59 60 /* 61 * 重寫reduce函數,key為單詞,values是reducer從多個mapper中得到數據后進行排序並將相同key組 62 * 合成<key.list<V>>中的list<V>,也就是說明排序這些工作都是mapper和reducer自己去做的, 63 * 我們只需要專注與在map和reduce函數中處理排序處理后的結果 64 */ 65 public void reduce(Text key, Iterable<IntWritable> values, Context context) 66 throws IOException, InterruptedException { 67 /* 68 * 因為在同一個spilt對應的mapper中,會將其進行combine,使得其中單詞(key)不重復,然后將這些鍵值對按照 69 * hash函數分配給對應的reducer,reducer進行排序,和組合成list,然后再調用的用戶自定義的這個函數, 70 * 所以有values 71 * 這一Iterable對象,說明,這個reducer排序后有多少個鍵值對,就會有多少次調用這個算法,每一次都會進行寫, 72 * 並且key在整個 並行的多個節點中是唯一的 73 * 74 */ 75 int sum = 0; 76 for (IntWritable val : values) { 77 sum += val.get(); 78 } 79 result.set(sum); 80 context.write(key, result); 81 } 82 } 83 84 public static void main(String[] args) throws Exception { 85 Configuration conf = new Configuration(); 86 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 87 if (otherArgs.length < 2) { 88 System.err.println("Usage: wordcount <in> [<in>...] <out>"); 89 System.exit(2); 90 } 91 @SuppressWarnings("deprecation") 92 Job job = new Job(conf, "word count"); 93 job.setJarByClass(WordCount.class);// 本次作業的job 94 job.setMapperClass(TokenizerMapper.class);// map函數 95 job.setCombinerClass(IntSumReducer.class);// combine的實現個reduce函數一樣,都是將相同的單詞組合成一個鍵值對 96 job.setReducerClass(IntSumReducer.class);// reduce函數 97 job.setOutputKeyClass(Text.class);// 鍵key的類型, 98 job.setOutputValueClass(IntWritable.class);// value的類型 99 for (int i = 0; i < otherArgs.length - 1; ++i) { 100 FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//輸入輸出參數的獲取,說明可以是多個輸入文件 101 } 102 FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//參數的最后一個是輸出文件 103 System.exit(job.waitForCompletion(true) ? 0 : 1); 104 } 105 }