Hadoop學習筆記(1):WordCount程序的實現與總結


開篇語:

這幾天開始學習Hadoop,花費了整整一天終於把偽分布式給搭好了,激動之情無法言表······

搭好環境之后,按着書本的代碼,實現了這個被譽為Hadoop中的HelloWorld的程序--WordCount,以此開啟學習Hadoop的篇章。

本篇旨在總結WordCount程序的基本結構和工作原理,有關環境的搭建這塊,網上有很多的教程,大家可以自行找谷歌或百度。

 

何為MapReduce:

在開始看WordCount的代碼之前,先簡要了解下什么是MapReduce。HDFS和MapReduce是Hadoop的兩個重要核心,其中MR是Hadoop的分布式計算模型。MapReduce主要分為兩步Map步和Reduce步,引用網上流傳很廣的一個故事來解釋,現在你要統計一個圖書館里面有多少本書,為了完成這個任務,你可以指派小明去統計書架1,指派小紅去統計書架2,這個指派的過程就是Map步,最后,每個人統計完屬於自己負責的書架后,再對每個人的結果進行累加統計,這個過程就是Reduce步。

 

WordCount程序:

程序的功能:假設現在有n個文本,WordCount程序就是利用MR計算模型來統計這n個文本中每個單詞出現的總次數。

 

圖一

現在有兩個文件:

  •   File 0:有兩行,第一行的內容為“Hello World”,第二行的內容為“Hello Hadoop”
  •   File 1:有兩行,第一行的內容為“Bye World”,第二行的內容為“Bye Hadoop”

假設我們現在要統計這兩個文件每種單詞出現的次數,首先我們要對每個文本進行處理,即把其中的句子划分成詞語,按照上面講到的統計圖書的故事,我們會將這兩個文件分派給兩個人,讓這兩個人各自去處理,待這兩個人都處理完成之后,再對結果進行匯總統計,在圖中充當這兩個人角色的就是Map1和Map2,Map步的輸入為<key,value>對,輸出也為<key,value>對,實現Map步的代碼如下:

 

 1 // 繼承Mapper類,Mapper類的四個泛型參數分別為:輸入key類型,輸入value類型,輸出key類型,輸出value類型
 2     public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
 3         
 4         private final static IntWritable one = new IntWritable(1); //輸出的value的類型,可以理解為int
 5         private Text word = new Text(); //輸出的key的類型,可以理解為String
 6 
 7         @Override
 8         public void map(LongWritable key, Text value, Context context) 
 9                 throws IOException, InterruptedException {
10             
11             String line = value.toString();  //每行句子
12             StringTokenizer tokenizer = new StringTokenizer(line);
13             
14             while (tokenizer.hasMoreTokens()) {
15                 word.set(tokenizer.nextToken());
16                 context.write(word, one);  //輸出
17             }
18             
19         }
20     }

 現在來分析和解讀一下代碼中的Map步:

  • 首先,要實現Map步,應該實現一個類,這個類繼承了Mapper類並且重寫其中的map方法。
  • 現在來說下重寫這個map方法有什么意義。繼續拿統計圖書的例子來說,當小明被指派到書架1統計圖書的時候,小明可以偷懶,對於那些他不想統計的書,他可以不統計;小明也可以很盡責,統計的結果達到百分百准確。總而言之,小明只要拿出統計結果給負責匯總的人就可以了,至於他是怎么處理的,負責匯總的人管不着。而重寫這個map方法,就對應於實現這個處理的過程,負責將輸入的<key,value>對進行處理統計,並且輸出<key,value>對給下一步處理。這部分代碼參見圖二中的第一個黃色框(Map步的輸入)和第二個黃色框(Map步的輸出)。

圖二

 

WordCount程序中的Map步的輸出結果為<單詞,1>對,在這里有一個合並處理步驟,將擁有相同key值的鍵值對進行合並,形成一個<key,valuelist>,這個<key,valuelist>的鍵值對集合,作為Reduce步的輸入。現在來看一下實現Reduce步的代碼:

 1 // Reduce類,繼承了Reducer類
 2     public static class Reduce extends
 3             Reducer<Text, IntWritable, Text, IntWritable> {
 4 
 5         @Override
 6                 //在這里,reduce步的輸入相當於<單詞,valuelist>,如<Hello,<1,1>>
 7         public void reduce(Text key, Iterable<IntWritable> values,
 8                 Context context) throws IOException, InterruptedException {
 9             int sum = 0;
10             for (IntWritable val : values) {
11                 sum += val.get();
12             }
13             context.write(key, new IntWritable(sum));
14         }
15     }

 

現在來分析和解讀一下代碼中的Reduce步:

  • 要實現Reduce步,應該實現一個類,這個類繼承了Reducer類並且重寫其中的reduce方法。
  • 這個Reduce步就相當於在統計圖書中那個匯總統計的人,負責對手下的工作結果進行匯總,Reduce步的輸入和輸出同樣為<key,value>。這部分代碼參見圖三中的第一個綠色框(Reduce步的輸入)和第二個綠色框(Reduce步的輸出)。

圖三

最后再來看一下主函數吧,在Hadoop中,每個MapReduce任務被當做一個Job(作業),在執行任務之前,首先要對任務進行一些配置,代碼如下:

 

 1 Job job = new Job(); // 創建一個作業對象
 2 job.setJarByClass(WordCount.class); // 設置運行/處理該作業的類
 3 job.setJobName("WordCount");  
 4 
 5 FileInputFormat.addInputPath(job, new Path(args[0]));//設置這個作業輸入數據的路徑
 6 FileOutputFormat.setOutputPath(job, new Path(args[1]));//設置這個作業輸出結果的路徑
 7 
 8 job.setMapperClass(Map.class);//設置實現了Map步的類
 9 job.setReducerClass(Reduce.class);//設置實現了Reduce步的類
10 
11 job.setOutputKeyClass(Text.class);//設置輸出結果key的類型
12 job.setOutputValueClass(IntWritable.class);//設置輸出結果value的類型
13 
14 System.exit(job.waitForCompletion(true) ? 0 : 1);//執行作業

 

來看一下Job設置了哪些東西:

  • 設置處理該作業的類,setJarByClass()
  • 設置這個作業的名字,setJobName()
  • 設置這個作業輸入數據所在的路徑
  • 設置這個作業輸出結果保存的路徑
  • 設置實現了Map步的類,setMapperClass()
  • 設置實現了Reduce步的類,setReducerClass()
  • 設置輸出結果key的類型,setOutputKeyClass()
  • 設置輸出結果value的類型,setOuputValueClass()
  • 執行作業

倒回看圖一,會發現還有一個如圖四的東西:

圖四

那么圖四中的這個TextInputFormat又是干嗎的呢?

TextInputFormat是Hadoop默認的輸入方法,在TextInputFormat中,每個文件(或其一部分)都會單獨作為Map的輸入,之后,每一行數據都會產生一個<key,value>形式:其中key值是每個數據的記錄在數據分片中的字節偏移量,而value值是每行的內容。所以,圖5中畫紅圈的兩個數據應該是有誤的(在上面只是為了方便表示),正確的值應該是第二行第一個字符的偏移量才對。

圖五

學習資料:

陸嘉恆,《Hadoop實戰》,機械工業出版社。

 

最后附上完整源代碼:

 1 import java.io.IOException;
 2 import java.util.StringTokenizer;
 3 
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
16 import org.apache.hadoop.util.GenericOptionsParser;
17 import org.apache.hadoop.mapreduce.Reducer.Context;
18 
19 public class WordCount {
20 
21     // 繼承Mapper類,Mapper類的四個泛型參數分別為:輸入key類型,輸入value類型,輸出key類型,輸出value類型
22     public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
23         
24         private final static IntWritable one = new IntWritable(1); // output value
25         private Text word = new Text(); // output key
26 
27         @Override
28         public void map(LongWritable key, Text value, Context context) 
29                 throws IOException, InterruptedException {
30             
31             String line = value.toString();
32             StringTokenizer tokenizer = new StringTokenizer(line);
33             
34             while (tokenizer.hasMoreTokens()) {
35                 word.set(tokenizer.nextToken());
36                 context.write(word, one);
37             }
38             
39         }
40     }
41 
42     // Reduce類,繼承了Reducer類
43     public static class Reduce extends
44             Reducer<Text, IntWritable, Text, IntWritable> {
45 
46         @Override
47         public void reduce(Text key, Iterable<IntWritable> values,
48                 Context context) throws IOException, InterruptedException {
49             int sum = 0;
50             for (IntWritable val : values) {
51                 sum += val.get();
52             }
53             context.write(key, new IntWritable(sum));
54         }
55     }
56 
57     public static void main(String[] args) throws Exception {
58 
59         if (args.length != 2) {
60             System.err
61                     .println("Usage: MaxTemperature <input path> <output path>");
62             System.exit(-1);
63         }
64 
65         Job job = new Job(); // 創建一個作業對象
66         job.setJarByClass(WordCount.class); // 設置運行/處理該作業的類
67         job.setJobName("WordCount");  
68 
69         FileInputFormat.addInputPath(job, new Path(args[0]));
70         FileOutputFormat.setOutputPath(job, new Path(args[1]));
71 
72         job.setMapperClass(Map.class);
73         job.setReducerClass(Reduce.class);
74 
75         job.setOutputKeyClass(Text.class);
76         job.setOutputValueClass(IntWritable.class);
77 
78         System.exit(job.waitForCompletion(true) ? 0 : 1);
79 
80     }
81 
82 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM