Hadoop 6、第一個mapreduce程序 WordCount


1、程序代碼

Map:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        String[] words = StringUtils.split(value.toString(), ' ');
        for(String word : words){
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

Reduce:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class wordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    protected void reduce(Text arg0, Iterable<IntWritable> arg1,Context arg2)
            throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable i : arg1){
            sum += i.get();
        }
        arg2.write(arg0, new IntWritable(sum));
    }
    
}

Main:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {

    public static void main(String[] args) {
        Configuration config = new Configuration();
        try {
            FileSystem fs = FileSystem.get(config);
            Job job = Job.getInstance(config);
            job.setJobName("wordCount");
            job.setJarByClass(RunJob.class);
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(wordCountReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            FileInputFormat.addInputPath(job, new Path("/usr/input/"));
            Path outPath = new Path("/usr/output/wc/");
            if(fs.exists(outPath)){
                fs.delete(outPath, true);
            }
            FileOutputFormat.setOutputPath(job, outPath);
            Boolean result = job.waitForCompletion(true);
            if(result){
                System.out.println("Job is complete!");
            }else{
                System.out.println("Job is fail!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

2、打包程序

將Java程序打成Jar包,並上傳到Hadoop服務器上(任何一台在啟動的NameNode節點即可)

 

3、數據源

數據源是如下:

hadoop java text hdfs
tom jack java text
job hadoop abc lusi
hdfs tom text

將該內容放到txt文件中,並放到HDFS的/usr/input(是HDFS下不是Linux下),可以使用Eclipse插件上傳:

 

4、執行Jar包

# hadoop jar jar路徑  類的全限定名(Hadoop需要配置環境變量)
$ hadoop jar wc.jar com.raphael.wc.RunJob

執行完成以后會在HDFS的/usr下新創建一個output目錄:

查看執行結果:

abc	1
hadoop	2
hdfs	2
jack	1
java	2
job	1
lusi	1
text	3
tom	2

完成了單詞個數的統計。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM