hadoop源碼閱讀


1、hadoop源碼下載

下載地址:https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/  

 

2、我們看一下hadoop源碼中提供的一個程序WordCount

 1 /**
 2  * Licensed to the Apache Software Foundation (ASF) under one
 3  * or more contributor license agreements.  See the NOTICE file
 4  * distributed with this work for additional information
 5  * regarding copyright ownership.  The ASF licenses this file
 6  * to you under the Apache License, Version 2.0 (the
 7  * "License"); you may not use this file except in compliance
 8  * with the License.  You may obtain a copy of the License at
 9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.examples;
19 
20 import java.io.IOException;
21 import java.util.StringTokenizer;
22 
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.io.IntWritable;
26 import org.apache.hadoop.io.Text;
27 import org.apache.hadoop.mapreduce.Job;
28 import org.apache.hadoop.mapreduce.Mapper;
29 import org.apache.hadoop.mapreduce.Reducer;
30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
32 import org.apache.hadoop.util.GenericOptionsParser;
33 
34 public class WordCount {
35 
36     // Mapper<Object,Text,Text,IntWritable>
37     // Object是輸入的key的類型
38     // Text是輸入的value的類型
39     // Text是輸出的key的類型
40     // IntWritable是輸出的value的類型
41   public static class TokenizerMapper 
42        extends Mapper<Object, Text, Text, IntWritable>{
43     
44     private final static IntWritable one = new IntWritable(1);
45     private Text word = new Text();
46       
47     public void map(Object key, Text value, Context context
48                     ) throws IOException, InterruptedException {
49       StringTokenizer itr = new StringTokenizer(value.toString());
50       while (itr.hasMoreTokens()) {
51         word.set(itr.nextToken());
52         context.write(word, one);
53       }
54     }
55   }
56   
57   public static class IntSumReducer 
58        extends Reducer<Text,IntWritable,Text,IntWritable> {
59     private IntWritable result = new IntWritable();
60 
61     public void reduce(Text key, Iterable<IntWritable> values, 
62                        Context context
63                        ) throws IOException, InterruptedException {
64       int sum = 0;
65       for (IntWritable val : values) {
66         sum += val.get();
67       }
68       result.set(sum);
69       context.write(key, result);
70     }
71   }
72 
73   public static void main(String[] args) throws Exception {
74     Configuration conf = new Configuration();
75     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
76     if (otherArgs.length < 2) {
77       System.err.println("Usage: wordcount <in> [<in>...] <out>");
78       System.exit(2);
79     }
80     Job job = Job.getInstance(conf, "word count");
81     job.setJarByClass(WordCount.class);
82     job.setMapperClass(TokenizerMapper.class);
83     job.setCombinerClass(IntSumReducer.class);
84     job.setReducerClass(IntSumReducer.class);
85     job.setOutputKeyClass(Text.class);
86     job.setOutputValueClass(IntWritable.class);
87     for (int i = 0; i < otherArgs.length - 1; ++i) {
88       FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
89     }
90     FileOutputFormat.setOutputPath(job,
91       new Path(otherArgs[otherArgs.length - 1]));
92     System.exit(job.waitForCompletion(true) ? 0 : 1);
93   }
94 }

 

Map()階段

 其中的42-55行,MapReduce程序需要繼承org.apache.hadoop.mapreduce.Mapper 這個類,並在這個類中的繼承類中自定義實現Map()方法

其中 org.apache.hadoop.mapreduce.Mapper 要求的參數有四個(keyIn、valueIn、keyOut、valueOut),即Map()任務的輸入和輸出都是< key,value >對的形式

源代碼此處各個參數的意義是:

  1. Object:輸入< key, value >對的 key 值,此處為文本數據的起始位置的偏移量。在大部分程序下這個參數可以直接使用 Long 類型,源碼此處使用Object做了泛化。
  2. Text:輸入< key, value >對的 value 值,此處為一段具體的文本數據。
  3. Text:輸出< key, value >對的 key 值,此處為一個單詞。
  4. IntWritable:輸出< key, value >對的 value 值,此處固定為 1 。IntWritable 是 Hadoop 對 Integer 的進一步封裝,使其可以進行序列。
1     private final static IntWritable one = new IntWritable(1);
2     private Text word = new Text();

此處定義了兩個變量:

one:類型為Hadoop定義的 IntWritable 類型,其本質就是序列化的 Integer ,one 變量的值恆為 1 。 
word:因為在WordCount程序中,Map 端的任務是對輸入數據按照單詞進行切分,每個單詞為 Text 類型。

1    public void map(Object key, Text value, Context context
2                     ) throws IOException, InterruptedException {
3       StringTokenizer itr = new StringTokenizer(value.toString());
4       while (itr.hasMoreTokens()) {
5         word.set(itr.nextToken());
6         context.write(word, one);
7       }
8     }

這段代碼為Map端的核心,定義了Map Task 所需要執行的任務的具體邏輯實現。 
map() 方法的參數為 Object key, Text value, Context context,其中:

  1. key: 輸入數據在原數據中的偏移量。 
  2. value:具體的數據數據,此處為一段字符串。 
  3. context:用於暫時存儲 map() 處理后的結果。

方法內部首先把輸入值轉化為字符串類型,並且對Hadoop自帶的分詞器 StringTokenizer 進行實例化用於存儲輸入數據。之后對輸入數據從頭開始進行切分,把字符串中的每個單詞切分成< key, value >對的形式,如:< hello , 1>、< world, 1> …

Reduce()階段

1 public static class IntSumReducer 
2        extends Reducer<Text,IntWritable,Text,IntWritable> {}

import org.apache.hadoop.mapreduce.Reducer 類的參數也是四個(keyIn、valueIn、keyOut、valueOut),即Reduce()任務的輸入和輸出都是< key,value >對的形式。

源代碼中此處的各個參數的含義:

  1. Text:輸入< key, value >對的key值,此處為一個單詞
  2. IntWritable:輸入< key, value >對的value值。
  3. Text:輸出< key, value >對的key值,此處為一個單詞
  4. IntWritable:輸出< key, value >對,此處為相同單詞詞頻累加之后的值。實際上就是一個數字。
 1   public void reduce(Text key, Iterable<IntWritable> values, 
 2                        Context context
 3                        ) throws IOException, InterruptedException {
 4       int sum = 0;
 5       for (IntWritable val : values) {
 6         sum += val.get();
 7       }
 8       result.set(sum);
 9       context.write(key, result);
10     }

Reduce()函數的三個參數:

  1. Text:輸入< key, value >對的key值,也就是一個單詞
  2. value:這個地方值得注意,在前面說到了,在MapReduce任務中,除了我們自定義的map()和reduce()之外,在從map 刀reduce 的過程中,系統會自動進行combine、shuffle、sort等過程對map task的輸出進行處理,因此reduce端的輸入數據已經不僅僅是簡單的< key, value >對的形式,而是一個一系列key值相同的序列化結構,如:< hello,1,1,2,2,3…>。因此,此處value的值就是單詞后面出現的序列化的結構:(1,1,1,2,2,3…….)
  3. context:臨時存儲reduce端產生的結果

因此在reduce端的代碼中,對value中的值進行累加,所得到的結果就是對應key值的單詞在文本中所出現的詞頻。

 

main()函數

 1 public static void main(String[] args) throws Exception {
 2     Configuration conf = new Configuration();    
 3         // 獲取我們在執行這個任務時傳入的參數,如輸入數據所在路徑、輸出文件的路徑的等
 4     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 5         //因為此任務正常運行至少要給出輸入和輸出文件的路徑,因此如果傳入的參數少於兩個,程序肯定無法運行。
 6     if (otherArgs.length < 2) {
 7       System.err.println("Usage: wordcount <in> [<in>...] <out>");
 8       System.exit(2);
 9     }
10     Job job = Job.getInstance(conf, "word count");  // 實例化job,傳入參數,job的名字叫 word count
11     job.setJarByClass(WordCount.class);  //使用反射機制,加載程序
12     job.setMapperClass(TokenizerMapper.class);  //設置job的map階段的執行類
13     job.setCombinerClass(IntSumReducer.class);  //設置job的combine階段的執行類
14     job.setReducerClass(IntSumReducer.class);  //設置job的reduce階段的執行類
15     job.setOutputKeyClass(Text.class);  //設置程序的輸出的key值的類型
16     job.setOutputValueClass(IntWritable.class);  //設置程序的輸出的value值的類型
17     for (int i = 0; i < otherArgs.length - 1; ++i) {
18       FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
19     }  //獲取我們給定的參數中,輸入文件所在路徑
20     FileOutputFormat.setOutputPath(job,
21       new Path(otherArgs[otherArgs.length - 1]));  //獲取我們給定的參數中,輸出文件所在路徑
22     System.exit(job.waitForCompletion(true) ? 0 : 1);  //等待任務完成,任務完成之后退出程序
23   }
24 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM