1、程序代碼
Map:
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] words = StringUtils.split(value.toString(), ' '); for(String word : words){ context.write(new Text(word), new IntWritable(1)); } } }
Reduce:
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class wordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { protected void reduce(Text arg0, Iterable<IntWritable> arg1,Context arg2) throws IOException, InterruptedException { int sum = 0; for(IntWritable i : arg1){ sum += i.get(); } arg2.write(arg0, new IntWritable(sum)); } }
Main:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RunJob { public static void main(String[] args) { Configuration config = new Configuration(); try { FileSystem fs = FileSystem.get(config); Job job = Job.getInstance(config); job.setJobName("wordCount"); job.setJarByClass(RunJob.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(wordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/usr/input/")); Path outPath = new Path("/usr/output/wc/"); if(fs.exists(outPath)){ fs.delete(outPath, true); } FileOutputFormat.setOutputPath(job, outPath); Boolean result = job.waitForCompletion(true); if(result){ System.out.println("Job is complete!"); }else{ System.out.println("Job is fail!"); } } catch (Exception e) { e.printStackTrace(); } } }
2、打包程序
將Java程序打成Jar包,並上傳到Hadoop服務器上(任何一台在啟動的NameNode節點即可)

3、數據源
數據源是如下:
hadoop java text hdfs tom jack java text job hadoop abc lusi hdfs tom text
將該內容放到txt文件中,並放到HDFS的/usr/input(是HDFS下不是Linux下),可以使用Eclipse插件上傳:

4、執行Jar包
# hadoop jar jar路徑 類的全限定名(Hadoop需要配置環境變量) $ hadoop jar wc.jar com.raphael.wc.RunJob
執行完成以后會在HDFS的/usr下新創建一個output目錄:

查看執行結果:
abc 1 hadoop 2 hdfs 2 jack 1 java 2 job 1 lusi 1 text 3 tom 2
完成了單詞個數的統計。
