1、MapReduce整體流程
- 並行讀取文本中的內容,然后進行MapReduce操作。
- Map過程:並行讀取文本,對讀取的單詞進行map操作,每個詞都以<key,value>形式生成。
我的理解:
一個有三行文本的文件進行MapReduce操作。
讀取第一行Hello World Bye World ,分割單詞形成Map。
<Hello,1> <World,1> <Bye,1> <World,1>
讀取第二行Hello Hadoop Bye Hadoop ,分割單詞形成Map。
<Hello,1> <Hadoop,1> <Bye,1> <Hadoop,1>
讀取第三行Bye Hadoop Hello Hadoop,分割單詞形成Map。
<Bye,1> <Hadoop,1> <Hello,1> <Hadoop,1>
- Reduce操作是對map的結果進行排序,合並,最后得出詞頻。
我的理解:
經過進一步處理(combiner),將形成的Map根據相同的key組合成value數組。
<Bye,1,1,1> <Hadoop,1,1,1,1> <Hello,1,1,1> <World,1,1>
循環執行Reduce(K,V[]),分別統計每個單詞出現的次數。
<Bye,3> <Hadoop,4> <Hello,3> <World,2>
2、WordCount源碼
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** * * 描述:WordCount explains by York * @author Hadoop Dev Group */ publicclass WordCount { /** * 建立Mapper類TokenizerMapper繼承自泛型類Mapper * Mapper類:實現了Map功能基類 * Mapper接口: * WritableComparable接口:實現WritableComparable的類可以相互比較。所有被用作key的類應該實現此接口。 * Reporter 則可用於報告整個應用的運行進度,本例中未使用。 * */ publicstaticclass TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ /** * IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 數據類型的類,這些類實現了WritableComparable接口, * 都能夠被串行化從而便於在分布式環境中進行數據交換,你可以將它們分別視為int,String 的替代品。 * 聲明one常量和word用於存放單詞的變量 */ privatefinalstatic IntWritable one =new IntWritable(1); private Text word =new Text(); /** * Mapper中的map方法: * void map(K1 key, V1 value, Context context) * 映射一個單個的輸入k/v對到一個中間的k/v對 * 輸出對不需要和輸入對是相同的類型,輸入對可以映射到0個或多個輸出對。 * Context:收集Mapper輸出的<k,v>對。 * Context的write(k, v)方法:增加一個(k,v)對到context * 程序員主要編寫Map和Reduce函數.這個Map函數使用StringTokenizer函數對字符串進行分隔,通過write方法把單詞存入word中 * write方法存入(單詞,1)這樣的二元組到context中 */ publicvoid map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr =new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } publicstaticclass IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result =new IntWritable(); /** * Reducer類中的reduce方法: * void reduce(Text key, Iterable<IntWritable> values, Context context) * 中k/v來自於map函數中的context,可能經過了進一步處理(combiner),同樣通過context輸出 */ publicvoid reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum =0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } publicstaticvoid main(String[] args) throws Exception { /** * Configuration:map/reduce的j配置類,向hadoop框架描述map-reduce執行的工作 */ Configuration conf =new Configuration(); String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length !=2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job =new Job(conf, "word count"); //設置一個用戶定義的job名稱 job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); //為job設置Mapper類 job.setCombinerClass(IntSumReducer.class); //為job設置Combiner類 job.setReducerClass(IntSumReducer.class); //為job設置Reducer類 job.setOutputKeyClass(Text.class); //為job的輸出數據設置Key類 job.setOutputValueClass(IntWritable.class); //為job輸出設置value類 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //為job設置輸入路徑 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為job設置輸出路徑 System.exit(job.waitForCompletion(true) ?0 : 1); //運行job } }
3、WordCount逐行解析
- 對於map函數的方法。
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}
這里有三個參數,前面兩個Object key, Text value就是輸入的key和value,第三個參數Context context這是可以記錄輸入的key和value,例如:context.write(word, one);此外context還會記錄map運算的狀態。
- 對於reduce函數的方法。
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}
reduce函數的輸入也是一個key/value的形式,不過它的value是一個迭代器的形式Iterable<IntWritable> values,也就是說reduce的輸入是一個key對應一組的值的value,reduce也有context和map的context作用一致。
至於計算的邏輯則需要程序員編碼實現。
- 對於main函數的調用。
首先是:
Configuration conf = new Configuration();
運行MapReduce程序前都要初始化Configuration,該類主要是讀取MapReduce系統配置信息,這些信息包括hdfs還有MapReduce,也就是安裝hadoop時候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解為啥要這么做,這個是沒有深入思考MapReduce計算框架造成,我們程序員開發MapReduce時候只是在填空,在map函數和reduce函數里編寫實際進行的業務邏輯,其它的工作都是交給MapReduce框架自己操作的,但是至少我們要告訴它怎么操作啊,比如hdfs在哪里,MapReduce的jobstracker在哪里,而這些信息就在conf包下的配置文件里。
接下來的代碼是:
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
If的語句好理解,就是運行WordCount程序時候一定是兩個參數,如果不是就會報錯退出。至於第一句里的GenericOptionsParser類,它是用來解釋常用hadoop命令,並根據需要為Configuration對象設置相應的值,其實平時開發里我們不太常用它,而是讓類實現Tool接口,然后再main函數里使用ToolRunner運行程序,而ToolRunner內部會調用GenericOptionsParser。
接下來的代碼是:
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
第一行就是在構建一個job,在mapreduce框架里一個mapreduce任務也叫mapreduce作業也叫做一個mapreduce的job,而具體的map和reduce運算就是task了,這里我們構建一個job,構建時候有兩個參數,一個是conf這個就不累述了,一個是這個job的名稱。
第二行就是裝載程序員編寫好的計算程序,例如我們的程序類名就是WordCount了。這里我要做下糾正,雖然我們編寫mapreduce程序只需要實現map函數和reduce函數,但是實際開發我們要實現三個類,第三個類是為了配置mapreduce如何運行map和reduce函數,准確的說就是構建一個mapreduce能執行的job了,例如WordCount類。
第三行和第五行就是裝載map函數和reduce函數實現類了,這里多了個第四行,這個是裝載Combiner類,這個類和mapreduce運行機制有關,其實本例去掉第四行也沒有關系,但是使用了第四行理論上運行效率會更好。
接下來的代碼:
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
這個是定義輸出的key/value的類型,也就是最終存儲在hdfs上結果文件的key/value的類型。
最后的代碼是:
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
第一行就是構建輸入的數據文件,第二行是構建輸出的數據文件,最后一行如果job運行成功了,我們的程序就會正常退出。
