如何快速地編寫和運行一個屬於自己的 MapReduce 例子程序


大數據的時代, 到處張嘴閉嘴都是Hadoop, MapReduce, 不跟上時代怎么行? 可是對一個hadoop的新手, 寫一個屬於自己的MapReduce程序還是小有點難度的, 需要建立一個maven項目, 還要搞清楚各種庫的依賴, 再加上編譯運行, 基本上頭大兩圈了吧。 這也使得很多只是想簡單了解一下MapReduce的人望而卻步。

本文會教你如何用最快最簡單的方法編寫和運行一個屬於自己的MapReduce程序, let's go!

首先有兩個前提:

1. 有一個已經可以運行的hadoop 集群(也可以是偽分布系統), 上面的hdfs和mapreduce工作正常 (這個真的是最基本的了, 不再累述, 不會的請參考 http://hadoop.apache.org/docs/current/)

2. 集群上安裝了JDK (編譯運行時會用到)

 

正式開始

1. 首先登入hadoop 集群里面的一個節點, 創建一個java源文件, 偷懶起見, 基本盜用官方的word count (因為本文的目的是教會你如何快編寫和運行一個MapReduce程序, 而不是如何寫好一個功能齊全的MapReduce程序)

內容如下:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class myword {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(myword.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
                                                             

與官方版本相比, 主要做了兩處修改

1) 為了簡單起見,去掉了開頭的 package org.apache.hadoop.examples; 

2) 將類名從 WordCount 改為 myword, 以體現是我們自己的工作成果 :)

 

2.  拿到hadoop 運行的class path, 主要為編譯所用

運行命令 

hadoop classpath

保存打出的結果,本文用的hadoop 版本是Pivotal 公司的Pivotal hadoop, 例子:

/etc/gphd/hadoop/conf:/usr/lib/gphd/hadoop/lib/*:/usr/lib/gphd/hadoop/.//*:/usr/lib/gphd/hadoop-hdfs/./:/usr/lib/gphd/hadoop-hdfs/lib/*:/usr/lib/gphd/hadoop-hdfs/.//*:/usr/lib/gphd/hadoop-yarn/lib/*:/usr/lib/gphd/hadoop-yarn/.//*:/usr/lib/gphd/hadoop-mapreduce/lib/*:/usr/lib/gphd/hadoop-mapreduce/.//*::/etc/gphd/pxf/conf::/usr/lib/gphd/pxf/pxf-core.jar:/usr/lib/gphd/pxf/pxf-api.jar:/usr/lib/gphd/publicstage:/usr/lib/gphd/gfxd/lib/gemfirexd.jar::/usr/lib/gphd/zookeeper/zookeeper.jar:/usr/lib/gphd/hbase/lib/hbase-common.jar:/usr/lib/gphd/hbase/lib/hbase-protocol.jar:/usr/lib/gphd/hbase/lib/hbase-client.jar:/usr/lib/gphd/hbase/lib/hbase-thrift.jar:/usr/lib/gphd/hbase/lib/htrace-core-2.01.jar:/etc/gphd/hbase/conf::/usr/lib/gphd/hive/lib/hive-service.jar:/usr/lib/gphd/hive/lib/libthrift-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-metastore.jar:/usr/lib/gphd/hive/lib/libfb303-0.9.0.jar:/usr/lib/gphd/hive/lib/hive-common.jar:/usr/lib/gphd/hive/lib/hive-exec.jar:/usr/lib/gphd/hive/lib/postgresql-jdbc.jar:/etc/gphd/hive/conf::/usr/lib/gphd/sm-plugins/*:

 

3. 編譯

運行命令

javac -classpath xxx ./myword.java 

xxx部分就是上一步里面取到的class path

運行完此命令后, 當前目錄下會生成一些.class 文件, 例如:

myword.class  myword$IntSumReducer.class  myword$TokenizerMapper.class

 

4. 將class文件打包成.jar文件

運行命令

jar -cvf myword.jar ./*.class

至此, 目標jar 文件成功生成

 

5. 准備一些文本文件, 上傳到hdfs, 以做word count的input

例子:

隨意創建一些文本文件, 保存到mapred_test 文件夾

運行命令

hadoop fs -put ./mapred_test/

確保此文件夾成功上傳到hdfs 當前用戶根目錄下

 

6. 運行我們的程序

運行命令

hadoop jar ./myword.jar myword mapred_test output

 

順利的話, 此命令會正常進行, 一個MapReduce job 會開始工作, 輸出的結果會保存在 hdfs 當前用戶根目錄下的output 文件夾里面。

 

至此大功告成!

如果還需要更多的功能, 我們可以修改前面的源文件以達到一個真正有用的MapReduce job。

但是原理大同小異, 練手的話, 基本夠了。

 

一個拋磚引玉的簡單例子, 歡迎板磚。

 

版權聲明:

本文由 雷子-曉飛爸 所有,發布於http://www.cnblogs.com/npumenglei/ 如果轉載,請注明出處,在未經作者同意下將本文用於商業用途,將追究其法律責任。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM