文章來源:http://www.itnose.net/detail/6197823.html
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; /** * * 描述:WordCount explains by Felix * @author Hadoop Dev Group */ public class WordCount { /** * MapReduceBase類:實現了Mapper和Reducer接口的基類(其中的方法只是實現接口,而未作任何事情) * Mapper接口: * WritableComparable接口:實現WritableComparable的類可以相互比較。所有被用作key的類應該實現此接口。 * Reporter 則可用於報告整個應用的運行進度,本例中未使用。 * */ public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { /** * LongWritable, IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 數據類型的類,這些類實現了WritableComparable接口, * 都能夠被串行化從而便於在分布式環境中進行數據交換,你可以將它們分別視為long,int,String 的替代品。 */ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /** * Mapper接口中的map方法: * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter) * 映射一個單個的輸入k/v對到一個中間的k/v對 * 輸出對不需要和輸入對是相同的類型,輸入對可以映射到0個或多個輸出對。 * OutputCollector接口:收集Mapper和Reducer輸出的<k,v>對。 * OutputCollector接口的collect(k, v)方法:增加一個(k,v)對到output */ public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { /** * JobConf:map/reduce的job配置類,向hadoop框架描述map-reduce執行的工作 * 構造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等 */ JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); //設置一個用戶定義的job名稱 conf.setOutputKeyClass(Text.class); //為job的輸出數據設置Key類 conf.setOutputValueClass(IntWritable.class); //為job輸出設置value類 conf.setMapperClass(Map.class); //為job設置Mapper類 conf.setCombinerClass(Reduce.class); //為job設置Combiner類 conf.setReducerClass(Reduce.class); //為job設置Reduce類 conf.setInputFormat(TextInputFormat.class); //為map-reduce任務設置InputFormat實現類 conf.setOutputFormat(TextOutputFormat.class); //為map-reduce任務設置OutputFormat實現類 /** * InputFormat描述map-reduce中對job的輸入定義 * setInputPaths():為map-reduce job設置路徑數組作為輸入列表 * setInputPath():為map-reduce job設置路徑數組作為輸出列表 */ FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); //運行一個job } }