Hadoop世界中的HelloWorld之WordCount具體分析


MapReduce 應用舉例:單詞計數 

WorldCount可以說是MapReduce中的helloworld了,下面來看看hadoop中的例子worldcount對其進行的處理過程,也能對mapreduce的執行過程有一個清晰的認識,特別是對於每一個階段的函數執行所產生的鍵值對

單詞 計數主要完成的功能是:統計一系列文本文件中每個單詞出現的次數,如下圖所示。下面將 通過分析源代碼幫助讀者摸清 MapReduce 程序的基本結構。  

 

圖 3-1 單詞計數 

WordCount 詳細的執行步驟如下:

(1) 將文件拆分成 splits,由於測試用的文件較小,所以每個文件為一個 split,並將文件按行分割形成<key, value>對,如圖 3-2 所示。這一步由 MapReduce 框架自動完成,其中偏 移量(即 key 值)包括了回車所占的字符數(Windows 和 Linux 環境下會不同)。  

圖 3-2 分割過程

(2) 將分割好的<key, value>對交給用戶定義的 map 方法進行處理,生成新的<key, value> 對,如圖 3-3 所示。  

圖 3-3 執行 map

(3) 得到 map 方法輸出的<key, value>對后,Mapper 會將它們按照 key 值進行排序,並 執行 Combine 過程,將 key 值相同的 value 值累加,得到 Mapper 的最終輸出結果。  


圖 3-4 map 端排序以及 combine 過程

(4) Reducer 先對從 Mapper 接收的數據進行排序,再交由用戶自定義的 reduce 方法進行 處理,得到新的<key, value>對,並作為 WordCount 的輸出結果,如圖 3-5 所示。  

 圖 3-5 reduce 端排序以及輸出結果 

以上就是wordcount在mapreduce中執行的具體細節,這里面對於中間的鍵值對產生描述的很詳細,這是理解mapreduce很好的資料;

下面來看看hadoop源碼中提供的這一源代碼:這份代碼我的注釋很詳細,但是運行時需要導入很多包,還要給Eclipse配置hadoop的環境,這里主要是分析worldcount的源碼;

 19 import java.io.IOException;
 20 import java.util.StringTokenizer;
 21 
 22 import org.apache.hadoop.conf.Configuration;
 23 
 24 import org.apache.hadoop.fs.Path;
 25 import org.apache.hadoop.io.IntWritable;
 26 import org.apache.hadoop.io.Text;
 27 import org.apache.hadoop.mapreduce.Job;
 28 import org.apache.hadoop.mapreduce.Mapper;
 29 import org.apache.hadoop.mapreduce.Reducer;
 30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 32 import org.apache.hadoop.util.GenericOptionsParser;
 33 
 34 public class WordCount {
 35 
 36     public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
 37 
 38         private final static IntWritable one = new IntWritable(1);// 初始的單詞都是1次,即使重復
 39         private Text word = new Text();// word表示單詞
 40         /*
 41          * 重寫map方法,讀取初試划分的每一個鍵值對,即行偏移量和一行字符串,key為偏移量,value為該行字符串
 42          */
 43 
 44         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 45             /*
 46              * 因為每一行就是一個spilt,並會為之生成一個mapper,所以我們的參數,key就是偏移量,value就是一行字符串
 47              */
 48             StringTokenizer itr = new StringTokenizer(value.toString());// value是一行的字符串,這里將其切割成多個單詞
 49             while (itr.hasMoreTokens()) {// 多個單詞
 50                 word.set(itr.nextToken());// 每個word
 51                 context.write(word, one);// one代表1,最開始每個單詞都是1次,context直接將<word,1>寫到本地磁盤上
 52                 // write函數直接將兩個參數封裝成<key,value>
 53             }
 54         }
 55     }
 56 
 57     public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 58         private IntWritable result = new IntWritable();
 59 
 60         /*
 61          * 重寫reduce函數,key為單詞,values是reducer從多個mapper中得到數據后進行排序並將相同key組
 62          * 合成<key.list<V>>中的list<V>,也就是說明排序這些工作都是mapper和reducer自己去做的,
 63          * 我們只需要專注與在map和reduce函數中處理排序處理后的結果
 64          */
 65         public void reduce(Text key, Iterable<IntWritable> values, Context context)
 66                 throws IOException, InterruptedException {
 67             /*
 68              * 因為在同一個spilt對應的mapper中,會將其進行combine,使得其中單詞(key)不重復,然后將這些鍵值對按照
 69              * hash函數分配給對應的reducer,reducer進行排序,和組合成list,然后再調用的用戶自定義的這個函數,
 70              * 所以有values
 71              * 這一Iterable對象,說明,這個reducer排序后有多少個鍵值對,就會有多少次調用這個算法,每一次都會進行寫,
 72              * 並且key在整個 並行的多個節點中是唯一的
 73              * 
 74              */
 75             int sum = 0;
 76             for (IntWritable val : values) {
 77                 sum += val.get();
 78             }
 79             result.set(sum);
 80             context.write(key, result);
 81         }
 82     }
 83 
 84     public static void main(String[] args) throws Exception {
 85         Configuration conf = new Configuration();
 86         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 87         if (otherArgs.length < 2) {
 88             System.err.println("Usage: wordcount <in> [<in>...] <out>");
 89             System.exit(2);
 90         }
 91         @SuppressWarnings("deprecation")
 92         Job job = new Job(conf, "word count");
 93         job.setJarByClass(WordCount.class);// 本次作業的job
 94         job.setMapperClass(TokenizerMapper.class);// map函數
 95         job.setCombinerClass(IntSumReducer.class);// combine的實現個reduce函數一樣,都是將相同的單詞組合成一個鍵值對
 96         job.setReducerClass(IntSumReducer.class);// reduce函數
 97         job.setOutputKeyClass(Text.class);// 鍵key的類型,
 98         job.setOutputValueClass(IntWritable.class);// value的類型
 99         for (int i = 0; i < otherArgs.length - 1; ++i) {
100             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//輸入輸出參數的獲取,說明可以是多個輸入文件
101         }
102         FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//參數的最后一個是輸出文件
103         System.exit(job.waitForCompletion(true) ? 0 : 1);
104     }
105 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM