MapReduce 應用實例


Hadoop 版本2.8.0

  • 前期准備工作:

1. 設置用戶環境變量 PATH 和 CLASSPATH

方便執行 Hadoop 命令時不用轉移到對應的目錄下,shell 除了會在當前目錄下還會到 PATH 指定位置尋找可執行文件。

使用 javac 命令編譯 .java 文件時,如果沒有指定 -classpath 選項,會到 CLASSPATH 下尋找程序里 import 的類。使用 echo $PATH 命令可察看對應的環境變量。

vi ~/.bash_profile

# set HADOOP ENVIRONMENT

HADOOP_HOME=~/hadoop-2.8.0

CLASSPATH=$CLASSPATH:$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.8.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.8.0.jar

export PATH=$PATH:$HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

使用 source ~/.bash_profile 使修改生效。有時從系統環境變量中 /etc/profile 去除某一路徑可能導致生效不及時,通過重新登陸一次,可以使其重新加載。上述導入的 CLASSPATH 是 MapReduce 函數常用的三個 jar 包,Hadoop-2.8.0 的資源包都在 hadoop-2.8.0/share/hadoop 路徑下。

  • WordCount

1. 輸入文件准備

  1. 新建輸入文件 file1 和 file2。其中:

    file1 的文件內容是:

    hello world

    file2 的文件內容是:

    hello hadoop

    hello mapreduce

  2. 在 HDFS 文件系統中創建輸入文件夾(hadoop 可執行文件是在 hadoop-2.8.0/bin 目錄下,前面已經將其加入系統路徑中,下面命令在 HDFS 根目錄下創建文件夾 wordcount_input)

    hadoop fs -mkdir /wordcount_input

  3. 上傳本地目錄 ~/files 下的輸入文件 file1 和 file2 文件到集群上的輸入文件夾

    hadoop fs -put ~/files/* /wordcount_input

2. WordCount 代碼

 1 package test;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.IntWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.util.GenericOptionsParser;
16 
17 public class WordCount {
18         public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
19                 private final IntWritable one = new IntWritable(1);
20                 private Text word = new Text();
21 
22                 public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
23                         StringTokenizer itr = new StringTokenizer(value.toString());
24                         while (itr.hasMoreTokens()) {
25                                 word.set(itr.nextToken());
26                                 context.write(word, one);
27                         }
28                 }
29         }
30         public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
31                 private IntWritable result = new IntWritable();
32                 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
33                         int sum = 0;
34                         for (IntWritable val : values) {
35                                 sum += val.get();
36                         }
37                         result.set(sum);
38                         context.write(key, result);
39                 }
40         }
41         public static void main (String[] args) throws Exception {
42                 Configuration conf = new Configuration();
43                 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
44                 if (otherArgs.length != 2) {
45                         System.err.println("Usage: wordcount <in> <out>");
46                         System.exit(2);
47                 }
48                 Job job = new Job(conf, "word count");
49                 job.setJarByClass(WordCount.class);
50                 job.setMapperClass(TokenizerMapper.class);
51                 job.setCombinerClass(IntSumReducer.class);
52                 job.setReducerClass(IntSumReducer.class);
53                 job.setOutputKeyClass(Text.class);
54                 job.setOutputValueClass(IntWritable.class);
55                 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
56                 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
57                 System.exit(job.waitForCompletion(true) ? 0 : 1);
58         }
59 }
WordCount.java

3. 編譯 WordCount.java 程序

  javac -d ~/files ~/files/WordCount.java

  上述命令將 ~/files/WordCount.java 的 java 文件編譯后結果存放在 -d 選項指定的目錄下,java 文件中指定的 package 打包命令會使編譯生成的字節碼 class 文件放置在自動創建的包目錄下,比如在本例程序開頭 package test 命令,會使在 ~/files 目錄下創建 test 子目錄,里面包含編譯生成的文件。

4. 將編譯結果打包成 Jar 包

  jar cvf wordcount.jar ~/files/test

  上述命令將之前生產的 package 下的 class 文件進行打包,並對生成的 jar 包進行命名。

5. 在集群上運行 WordCount 程序,命令行指定參數

  hadoop jar ~/files/wordcount.jar test.WordCount /wordcount_input /wordcount_output

  上述命令需要指定 Jar 包的路徑,同時還需要指定包含 package 路徑的類名。

6. 查看輸出結果

  hadoop fs -cat /wordcount_output/part-r-00000

    [lb@host98 ~/files]$hadoop fs -cat /wordcount_output/part-r-00000

    17/06/28 15:49:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

    hadoop 1

    hello 3

    mapreduce 1

    world 1


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM