1、hadoop源碼下載
下載地址:https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/
2、我們看一下hadoop源碼中提供的一個程序WordCount
1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.examples; 19 20 import java.io.IOException; 21 import java.util.StringTokenizer; 22 23 import org.apache.hadoop.conf.Configuration; 24 import org.apache.hadoop.fs.Path; 25 import org.apache.hadoop.io.IntWritable; 26 import org.apache.hadoop.io.Text; 27 import org.apache.hadoop.mapreduce.Job; 28 import org.apache.hadoop.mapreduce.Mapper; 29 import org.apache.hadoop.mapreduce.Reducer; 30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 32 import org.apache.hadoop.util.GenericOptionsParser; 33 34 public class WordCount { 35 36 // Mapper<Object,Text,Text,IntWritable> 37 // Object是輸入的key的類型 38 // Text是輸入的value的類型 39 // Text是輸出的key的類型 40 // IntWritable是輸出的value的類型 41 public static class TokenizerMapper 42 extends Mapper<Object, Text, Text, IntWritable>{ 43 44 private final static IntWritable one = new IntWritable(1); 45 private Text word = new Text(); 46 47 public void map(Object key, Text value, Context context 48 ) throws IOException, InterruptedException { 49 StringTokenizer itr = new StringTokenizer(value.toString()); 50 while (itr.hasMoreTokens()) { 51 word.set(itr.nextToken()); 52 context.write(word, one); 53 } 54 } 55 } 56 57 public static class IntSumReducer 58 extends Reducer<Text,IntWritable,Text,IntWritable> { 59 private IntWritable result = new IntWritable(); 60 61 public void reduce(Text key, Iterable<IntWritable> values, 62 Context context 63 ) throws IOException, InterruptedException { 64 int sum = 0; 65 for (IntWritable val : values) { 66 sum += val.get(); 67 } 68 result.set(sum); 69 context.write(key, result); 70 } 71 } 72 73 public static void main(String[] args) throws Exception { 74 Configuration conf = new Configuration(); 75 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 76 if (otherArgs.length < 2) { 77 System.err.println("Usage: wordcount <in> [<in>...] <out>"); 78 System.exit(2); 79 } 80 Job job = Job.getInstance(conf, "word count"); 81 job.setJarByClass(WordCount.class); 82 job.setMapperClass(TokenizerMapper.class); 83 job.setCombinerClass(IntSumReducer.class); 84 job.setReducerClass(IntSumReducer.class); 85 job.setOutputKeyClass(Text.class); 86 job.setOutputValueClass(IntWritable.class); 87 for (int i = 0; i < otherArgs.length - 1; ++i) { 88 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 89 } 90 FileOutputFormat.setOutputPath(job, 91 new Path(otherArgs[otherArgs.length - 1])); 92 System.exit(job.waitForCompletion(true) ? 0 : 1); 93 } 94 }
Map()階段
其中的42-55行,MapReduce程序需要繼承org.apache.hadoop.mapreduce.Mapper 這個類,並在這個類中的繼承類中自定義實現Map()方法
其中 org.apache.hadoop.mapreduce.Mapper 要求的參數有四個(keyIn、valueIn、keyOut、valueOut),即Map()任務的輸入和輸出都是< key,value >對的形式
源代碼此處各個參數的意義是:
- Object:輸入< key, value >對的 key 值,此處為文本數據的起始位置的偏移量。在大部分程序下這個參數可以直接使用 Long 類型,源碼此處使用Object做了泛化。
- Text:輸入< key, value >對的 value 值,此處為一段具體的文本數據。
- Text:輸出< key, value >對的 key 值,此處為一個單詞。
- IntWritable:輸出< key, value >對的 value 值,此處固定為 1 。IntWritable 是 Hadoop 對 Integer 的進一步封裝,使其可以進行序列。
1 private final static IntWritable one = new IntWritable(1); 2 private Text word = new Text();
此處定義了兩個變量:
one:類型為Hadoop定義的 IntWritable 類型,其本質就是序列化的 Integer ,one 變量的值恆為 1 。
word:因為在WordCount程序中,Map 端的任務是對輸入數據按照單詞進行切分,每個單詞為 Text 類型。
1 public void map(Object key, Text value, Context context 2 ) throws IOException, InterruptedException { 3 StringTokenizer itr = new StringTokenizer(value.toString()); 4 while (itr.hasMoreTokens()) { 5 word.set(itr.nextToken()); 6 context.write(word, one); 7 } 8 }
這段代碼為Map端的核心,定義了Map Task 所需要執行的任務的具體邏輯實現。
map() 方法的參數為 Object key, Text value, Context context,其中:
- key: 輸入數據在原數據中的偏移量。
- value:具體的數據數據,此處為一段字符串。
- context:用於暫時存儲 map() 處理后的結果。
方法內部首先把輸入值轉化為字符串類型,並且對Hadoop自帶的分詞器 StringTokenizer 進行實例化用於存儲輸入數據。之后對輸入數據從頭開始進行切分,把字符串中的每個單詞切分成< key, value >對的形式,如:< hello , 1>、< world, 1> …
Reduce()階段
1 public static class IntSumReducer 2 extends Reducer<Text,IntWritable,Text,IntWritable> {}
import org.apache.hadoop.mapreduce.Reducer 類的參數也是四個(keyIn、valueIn、keyOut、valueOut),即Reduce()任務的輸入和輸出都是< key,value >對的形式。
源代碼中此處的各個參數的含義:
- Text:輸入< key, value >對的key值,此處為一個單詞
- IntWritable:輸入< key, value >對的value值。
- Text:輸出< key, value >對的key值,此處為一個單詞
- IntWritable:輸出< key, value >對,此處為相同單詞詞頻累加之后的值。實際上就是一個數字。
1 public void reduce(Text key, Iterable<IntWritable> values, 2 Context context 3 ) throws IOException, InterruptedException { 4 int sum = 0; 5 for (IntWritable val : values) { 6 sum += val.get(); 7 } 8 result.set(sum); 9 context.write(key, result); 10 }
Reduce()函數的三個參數:
- Text:輸入< key, value >對的key值,也就是一個單詞
- value:這個地方值得注意,在前面說到了,在MapReduce任務中,除了我們自定義的map()和reduce()之外,在從map 刀reduce 的過程中,系統會自動進行combine、shuffle、sort等過程對map task的輸出進行處理,因此reduce端的輸入數據已經不僅僅是簡單的< key, value >對的形式,而是一個一系列key值相同的序列化結構,如:< hello,1,1,2,2,3…>。因此,此處value的值就是單詞后面出現的序列化的結構:(1,1,1,2,2,3…….)
- context:臨時存儲reduce端產生的結果
因此在reduce端的代碼中,對value中的值進行累加,所得到的結果就是對應key值的單詞在文本中所出現的詞頻。
main()函數
1 public static void main(String[] args) throws Exception { 2 Configuration conf = new Configuration(); 3 // 獲取我們在執行這個任務時傳入的參數,如輸入數據所在路徑、輸出文件的路徑的等 4 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 5 //因為此任務正常運行至少要給出輸入和輸出文件的路徑,因此如果傳入的參數少於兩個,程序肯定無法運行。 6 if (otherArgs.length < 2) { 7 System.err.println("Usage: wordcount <in> [<in>...] <out>"); 8 System.exit(2); 9 } 10 Job job = Job.getInstance(conf, "word count"); // 實例化job,傳入參數,job的名字叫 word count 11 job.setJarByClass(WordCount.class); //使用反射機制,加載程序 12 job.setMapperClass(TokenizerMapper.class); //設置job的map階段的執行類 13 job.setCombinerClass(IntSumReducer.class); //設置job的combine階段的執行類 14 job.setReducerClass(IntSumReducer.class); //設置job的reduce階段的執行類 15 job.setOutputKeyClass(Text.class); //設置程序的輸出的key值的類型 16 job.setOutputValueClass(IntWritable.class); //設置程序的輸出的value值的類型 17 for (int i = 0; i < otherArgs.length - 1; ++i) { 18 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 19 } //獲取我們給定的參數中,輸入文件所在路徑 20 FileOutputFormat.setOutputPath(job, 21 new Path(otherArgs[otherArgs.length - 1])); //獲取我們給定的參數中,輸出文件所在路徑 22 System.exit(job.waitForCompletion(true) ? 0 : 1); //等待任務完成,任務完成之后退出程序 23 } 24 }