第六篇:Eclipse上運行第一個Hadoop實例 - WordCount(單詞統計程序)


需求

       計算出文件中每個單詞的頻數。要求輸出結果按照單詞的字母順序進行排序。每個單詞和其頻數占一行,單詞和頻數之間有間隔。

       比如,輸入兩個文件,其一內容如下:

       hello world

       hello hadoop

       hello mapreduce

       另一內容如下:

       bye world

       bye hadoop

       bye mapreduce

       對應上面給出的輸入樣例,其輸出樣例為:

       bye        3

       hadoop    2

       hello      3

       mapreduce   2

       world     2

方案制定

       對該案例,可設計出如下的MapReduce方案:

1. Map階段各節點完成由輸入數據到單詞切分再到單詞搜集的工作

2. shuffle階段完成相同單詞的聚集再到分發到各個Reduce節點的工作 (shuffle階段是MapReduce的默認過程)

3. Reduce階段負責接收所有單詞並計算各自頻數

代碼示例

  1 /**
  2  *  Licensed under the Apache License, Version 2.0 (the "License");
  3  *  you may not use this file except in compliance with the License.
  4  *  You may obtain a copy of the License at
  5  *
  6  *      http://www.apache.org/licenses/LICENSE-2.0
  7  *
  8  *  Unless required by applicable law or agreed to in writing, software
  9  *  distributed under the License is distributed on an "AS IS" BASIS,
 10  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 11  *  See the License for the specific language governing permissions and
 12  *  limitations under the License.
 13  */
 14 
 15 
 16 package org.apache.hadoop.examples;
 17 
 18 import java.io.IOException;
 19 import java.util.StringTokenizer;
 20 
 21 //導入各種Hadoop包
 22 import org.apache.hadoop.conf.Configuration;
 23 import org.apache.hadoop.fs.Path;
 24 import org.apache.hadoop.io.IntWritable;
 25 import org.apache.hadoop.io.Text;
 26 import org.apache.hadoop.mapreduce.Job;
 27 import org.apache.hadoop.mapreduce.Mapper;
 28 import org.apache.hadoop.mapreduce.Reducer;
 29 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 30 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 31 import org.apache.hadoop.util.GenericOptionsParser;
 32 
 33 // 主類
 34 public class WordCount {
 35         
 36     // Mapper類
 37     public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
 38         
 39         // new一個值為1的整數對象 
 40         private final static IntWritable one = new IntWritable(1);
 41         // new一個空的Text對象
 42         private Text word = new Text();
 43       
 44         // 實現map函數
 45         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 46             
 47             // 創建value的字符串迭代器
 48             StringTokenizer itr = new StringTokenizer(value.toString());
 49         
 50             // 對數據進行再次分割並輸出map結果。初始格式為<字節偏移量,單詞> 目標格式為<單詞,頻率>
 51             while (itr.hasMoreTokens()) {
 52                     word.set(itr.nextToken());
 53                     context.write(word, one);
 54             }
 55         }
 56     }
 57         
 58     // Reducer類
 59     public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
 60     
 61         // new一個值為空的整數對象
 62         private IntWritable result = new IntWritable();
 63 
 64         // 實現reduce函數
 65         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
 66                 
 67             int sum = 0;
 68             for (IntWritable val : values) {
 69                 sum += val.get();
 70             }
 71                 
 72             // 得到本次計算的單詞的頻數
 73             result.set(sum);
 74                         
 75             // 輸出reduce結果
 76             context.write(key, result);
 77         }
 78     }
 79 
 80     // 主函數
 81     public static void main(String[] args) throws Exception {
 82     
 83         // 獲取配置參數
 84         Configuration conf = new Configuration();
 85         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 86                 
 87         // 檢查命令語法
 88         if (otherArgs.length != 2) {
 89                 System.err.println("Usage: wordcount <in> <out>");
 90                 System.exit(2);
 91         }
 92                 
 93         // 定義作業對象
 94         Job job = new Job(conf, "word count");
 95         // 注冊分布式類
 96         job.setJarByClass(WordCount.class);
 97         // 注冊Mapper類
 98         job.setMapperClass(TokenizerMapper.class);
 99         // 注冊合並類
100         job.setCombinerClass(IntSumReducer.class);
101         // 注冊Reducer類
102         job.setReducerClass(IntSumReducer.class);
103         // 注冊輸出格式類
104         job.setOutputKeyClass(Text.class);
105         job.setOutputValueClass(IntWritable.class);
106         // 設置輸入輸出路徑
107         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
108         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
109                 
110         // 運行程序
111         System.exit(job.waitForCompletion(true) ? 0 : 1);
112     }
113 }

運行方法

1. 打開Eclipse並啟動Hdfs(方法請參考前文)

2. 新建一個MapReduce工程:”file" -> "new" -> "project",然后選擇 "Map/Reduce Project"

  

3. 設置輸入目錄及文件

在項目工程包里面新建一個名為input的目錄,里面存放需要處理的輸入文件。這里選用2個文件名分別為file01和file02的文件進行測試。文件內容同需求示例。

       

4. 將輸入文件傳輸入Hdfs

在終端輸入以下命令即可將整個目錄傳輸進Hdfs(input目錄下的所有文件將會被送進Hdfs下名為input01的目錄里),請根據MapReduce工程包實際路徑對如下命令略作修改即可:

1 ./bin/hadoop fs -put ../workspace/Hadoop_t1/input/ input01

5. 在工程包中新建一個WordCount類並將上面的源代碼拷貝進去。

6. 調整項目運行參數:右鍵項目 -> “Run As" -> ”Run Configurations"

       

需要添加的就是"Program arguments"下的那些代碼。它們其實是作為命令行參數傳遞進程序的,第一段是輸入文件路徑;第二段是輸出文件路徑。

路徑的格式為 "[主機IP地址:hdfs端口] + [輸入/輸出目錄在hdfs中的路徑]"。

可以輸入以下命令查看輸入目錄路徑:

1 ./bin/hadoop fs -ls

       

7. 點擊"Run"運行程序。

8. 執行以下命令查看結果:

1 ./bin/hadoop fs -cat output01/*

       

       這些主機和Hdfs的文件傳遞,顯示也可以使用Eclipse,更方便容易。在此就不提了。

  

小結

1. 多多熟練Hadoop平台下MapReduce項目基本創建流程。

2. WordCount是一個很經典的Hadoop示例,它雖然簡單,但具有很大的代表性。

3. 從某個程度上來說也反映了其設計的初衷,對日志文件的分析。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM