MapReduce編程初步(WordCount,TopN)


在配置Hadoop集群成功后,利用官方自帶的例子簡單測試了一下MapReduce程序WordCount,這個例子也就相當於編程入門的HelloWorld程序了,結構清晰容易理解,並且方便說明MapReduce的工作過程。這篇隨筆主要想記錄下在Eclipse中編寫簡單的MapReduce程序的上手過程。原創代碼的內容不會很多,更多的是參考和借鑒現有的優秀代碼。

 一、Hello 'MapReduce' World——WordCount程序

1、在Eclipse中建立Java項目WordCount

  

2、導入相關包(可以在Eclispe中為這三個包建立User Library以便使用)

  ①commons-cli-1.2.jar

  ②hadoop-common-2.7.3.jar

  ③hadoop-mapreduce-client-core-2.7.3.jar

 

3、配置好Build Path,確保項目中引入了上述三個包

 

4、新建包名為zmt.test,在其下建立新的Class名為WordCount,並鍵入官方源碼

package zmt.test;


import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
    
    public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable>{
        
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException
        {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()){
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    
    }
    
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
    {
        private IntWritable result = new IntWritable();
        
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
            
            int sum = 0;
            for (IntWritable val : values){
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
    
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        
        if(otherArgs.length < 2){
            
            System.err.println("用法:wordcount <int> [<in>...] <out>");
            System.exit(2);
            
        }
        
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i=0; i<otherArgs.length-1; ++i){
            FileInputFormat.addInputPath(job,  new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
    
}

 

5、項目右鍵導出為jar文件,命名為WC.jar

 

6、將WC.jar復制到虛擬機Master主機中,虛擬機安裝了 VMWare Tools可以直接拖拽進行復制。此處復制到/home/admin/Documents/

 

7、准備好待統計詞頻的文本文件,此處沿用之前搭建Hadoop時的README.txt。

  上傳文件至Hadoop:hdfs dfs -put README.txt /data/input

 

8、執行任務命令

  hadoop jar /home/admin/Documents/WC.jar zmt.test.WordCount /data/input/WC /data/output/WC

  需要關注的是入口類的路徑zmt.test.WordCount,在更復雜的任務開發中需要指明MapReduce程序入口

 

9、查看結果,命令行中會直接給出結果,也可以去/data/output/WC/part-r-00000查看文件內容

 

10、任務跟蹤,查看MapReduce程序運行情況

  http://192.168.222.134:8088/cluster

 

 二、TopN問題——找到前N個數

  TopN問題也是入門的一個很好的例子,可以更好地理解MapReduce程序的工作流程,更重要的是了解程序中哪些是模式,是可以更改的,是可以不這么寫的。

  與WordCount重復的步驟就不再描述,直接給出關鍵代碼和操作。

1、生成隨機數

  

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.util.Random;


public class Num_Generator {
    
    public static void main(String[] args) {
        
        FileOutputStream fos;
        
        OutputStreamWriter osw;
        
        BufferedWriter bw;
        
        Random random = new Random();
        
        String filename = "random_num";
        
        for (int i = 0; i < 10; i++) {
            
            String tmp_name = filename+""+i+".txt";
            
            File file = new File(tmp_name);
            
            try {
                fos = new FileOutputStream(file);
                
                osw = new OutputStreamWriter(fos,"UTF-8");
                
                bw = new BufferedWriter(osw);
                
                for (int j = 0; j < 1000000; j++) {
                    
                    int rm = random.nextInt();
                    
                    bw.write(rm+"");
                    
                    bw.newLine();
                    
                }
                
                bw.flush();
                
                
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (UnsupportedEncodingException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
            System.out.println(i+":Complete.");
            
        }
        
    }
    
}

 

  該程序生成了10個文件,每個文件包括一百萬個Integer范圍的隨機數,生成完成后將其復制並上傳到虛擬機的Hadoop文件系統HDFS中

 

2、TopN程序編寫(該程序是參考另一篇博客的,很慚愧,鏈接忘了(;′⌒`))

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class TopN {
    
    public static class MyMapper extends Mapper<Object, Text, NullWritable, IntWritable>
    {
        
        private TreeMap<Integer, Integer> tree = new TreeMap<Integer, Integer>();
        
//        private final static IntWritable one = new IntWritable(1);
        
//        private Text number = new Text();
        
        
        
        
        
        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
//            super.setup(context);
            System.out.println("Mapper("+context.getConfiguration().getInt("N", 10)+"):in setup...");
        }


        



        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
//            super.cleanup(context);
            System.out.println("Mapper("+context.getConfiguration().getInt("N", 10)+"):in cleanup...");
            for(Integer text : tree.values()){
                
                context.write(NullWritable.get(), new IntWritable(text));
                
            }
        }





        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
            
            String key_num = value.toString();
            
            int num = Integer.parseInt(key_num);
            
            tree.put(num, num);
            
            if(tree.size() > context.getConfiguration().getInt("N", 10))
                tree.remove(tree.firstKey());
            
//            System.out.println("Mapper("+context.getConfiguration().getInt("N", 10)+"):"+key.toString()+"/"+value.toString());
            
//            number.set(key_num);
            
//            context.write(number, one);
            
        }
        
        
    }
    
    public static class MyReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable>
    {
        
//        private IntWritable kk = new IntWritable();
        
        
        private TreeMap<Integer, Integer> tree = new TreeMap<Integer, Integer>();
        
//        private IntWritable result = new IntWritable();
        
        @Override
        public void reduce(NullWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
            
            for (IntWritable value : values){
                
                tree.put(value.get(), value.get());
                
                if(tree.size() > context.getConfiguration().getInt("N", 10))
                {
                    tree.remove(tree.firstKey());
                }
                
            }
            
            
//            System.out.println("Reducer("+context.getConfiguration().getInt("N", 10)+"):"+key.toString()+"/"+result.get());
            
        }

        @Override
        protected void cleanup(
                org.apache.hadoop.mapreduce.Reducer.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
//            super.cleanup(context);
            
            for(Integer val : tree.descendingKeySet()){
                
                context.write(NullWritable.get(), new IntWritable(val));
                
            }
            
        }
        
        
        
    }
    
    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        
        if(otherArgs.length < 3){
            
            System.err.println("heheda");
            
            System.exit(2);
            
        }
        
        conf.setInt("N", new Integer(otherArgs[0]));
        
        System.out.println("N:"+otherArgs[0]);
        
        Job job = Job.getInstance(conf, "TopN");
        job.setJarByClass(TopN.class);
        job.setMapperClass(MyMapper.class);
//        job.setCombinerClass(MyReducer.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(IntWritable.class);
        
        for (int i = 1; i < otherArgs.length-1; i++) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        
    }
    
}

 

 

3、運行測試,需要輸入參數N

  hadoop jar /home/hadoop/hadoop-2.7.3/share/hadoop/mapreduce/TopN.jar TopN 12 /data/input/test1 /data/output/TT

 

4、查看結果

  hdfs dfs -cat /data/output/TT/part-r-00000

  

[root@Master myscript]# hdfs dfs -cat /data/output/TT/part-r-00000
2147483194
2147483070
2147483066
2147482879
2147482835
2147482469
2147482152
2147481212
2147481174
2147480379
2147479927
2147479795

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM