和遠哥一起了解Hadoop的MapReduce是如何運行的


Hadoop越來越火,而Hadoop里面有個核心的玩意,那就是MapReduce,它在Hadoop的並行計算中承擔很重要的作用,也是在Hadoop下做程序開發時,必須要了解的,下面我們就MapRecude的一個簡單例子WordCount來做一下深入的了解和分析。

 

先跟遠哥一起先了解一下什么是MapReduce吧。

首先MapReduce它是兩個英文單詞組成的,Map表示映射,Reduce表示化簡,它是一種編程模型,用於大規模數據集(大於1TB)的並行運算,主要思想來自函數式編程。

在Hadoop中,MapReduce過程分三個步驟:Map(主要是分解並行的任務)、Combine(主要是為了提高Reduce的效率)和Reduce(把處理后的結果再匯總起來) 。

 

關於如何搭建Hadoop運行環境,可以閱讀我的另外一篇博文:http://www.cnblogs.com/taven/archive/2012/08/12/2634145.html
 

好了,我們先看一下運行一個Hadoop作業的啟動代碼:

 

        Job job =  new Job(conf, "word count");
        job.setJarByClass(WordCount. class);
        job.setMapperClass( TokenizerMapper. class);
        job.setCombinerClass( IntSumReducer. class);
        job.setReducerClass( IntSumReducer. class);
        job.setOutputKeyClass(Text. class);
        job.setOutputValueClass(IntWritable. class);
        FileInputFormat.addInputPath(job,  new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job,  new Path(otherArgs[1]));
        
        System.exit(job.waitForCompletion( true) ? 0 : 1);

 

看到沒,在運行一個Hadoop作業之前,先得指定 MapperClass 、CombinerClass 、ReducerClass .

 

假設我們交給Hadoop去分析的一個文本內容為:

lixy csy lixy zmde nitamade hehe


realy amoeba woyou weibo hehe

  

好了,提供的內容很簡單,就是3行文本,第1行文本包含n個單詞,第2行是空的,第3行也包含n個單詞,單詞與單詞之間用空格隔開,下面我們來看看MapperClass 是如何實現的,又是如何運行的呢?看看 TokenizerMapper 的代碼: 

 

public  class TokenizerMapper  extends Mapper<Object, Text, Text, IntWritable> {

     private  final  static IntWritable one =  new IntWritable(1);
     private Text word =  new Text();

     public  void map(Object key, Text value, Context context)     throws IOException, InterruptedException {
        
        System.out.println( "TokenizerMapper.map...");
        System.out.println( "Map key:"+key.toString()+ " Map value:"+value.toString());
        
        StringTokenizer itr =  new StringTokenizer(value.toString());
        
         while (itr.hasMoreTokens()) {
            String tmp = itr.nextToken();
            word.set(tmp);
            
            context.write(word, one);
            System.out.println( "tmp:"+tmp+ " one:"+one);
            
        }
        
        System.out.println( "context:"+context.toString());

    }
}

 注:這里遠哥要說一下“IntWritable one = new IntWritable(1);”的用意,因為我們不管一個單詞會出現幾次,只要出現,我們就計算1次,所以“context.write(word, one)”這行代碼將一個單詞寫入的時候,值永遠是1;

 

在運行的時候,根據你文件中內容的情況,上面的 map(Object key, Text value, Context context) 方法可能會被調用多次,將本例子提供的文件內容執行后,控制台輸出內容如下(為了方便閱讀,我添加了一些換行):

TokenizerMapper.map...
Map key:0 Map value: lixy csy lixy zmde nitamade hehe
tmp:lixy one:1
tmp:csy one:1
tmp:lixy one:1
tmp:zmde one:1
tmp:nitamade one:1
tmp:hehe one:1
context:org.apache.hadoop.mapreduce.Mapper$Context@1af0b4a3

TokenizerMapper.map...
Map key:34 Map value:
context:org.apache.hadoop.mapreduce.Mapper$Context@1af0b4a3

TokenizerMapper.map...
Map key:36 Map value: realy amoeba woyou weibo hehe
tmp:realy one:1
tmp:amoeba one:1
tmp:woyou one:1
tmp:weibo one:1
tmp:hehe one:1
context:org.apache.hadoop.mapreduce.Mapper$Context@1af0b4a3

IntSumReducer.reduce...
val.get():1
Reduce key: amoeba Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1

IntSumReducer.reduce...
val.get():1
Reduce key: csy Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1

IntSumReducer.reduce...
val.get():1
val.get():1
Reduce key: hehe Reduce result:2
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:2

IntSumReducer.reduce...
val.get():1
val.get():1
Reduce key: lixy Reduce result:2
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:2

IntSumReducer.reduce...
val.get():1
Reduce key: nitamade Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1

IntSumReducer.reduce...
val.get():1
Reduce key: realy Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1

IntSumReducer.reduce...
val.get():1
Reduce key: weibo Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1

IntSumReducer.reduce...
val.get():1
Reduce key: woyou Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1

IntSumReducer.reduce...
val.get():1
Reduce key: zmde Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@d5d4de6 Result:1

 

 從TokenizerMapper的 map(Object key, Text value, Context context) 調用的信息輸出情況可以分析出,文件內容中有兩行,所以該方法一共調用了2次(因為TextInputFormat類型的,都是按行處理)。

每一行的內容會在value參數中傳進來,也就是說每一行的內容都對應了一個key,這個key為此行的開頭位置在本文件中的所在位置(所以第1行的key是0,第2行的key是34,第3行的key是36),一般為數字的。

在這個map方法中,我們可以加入一些自己的處理邏輯,比如根據空格來取得每個單詞,然后我們需要將處理后的結果,寫入到 context 參數中,便於hadoop處理完后續的處理邏輯。(這里我們需要注意的是“IntWritable one”變量都是數值1)

 

上面看了map的過程,接下來我們再看reduce的過程,先看看 IntSumReducer 的代碼:

 

public  class IntSumReducer  extends    Reducer<Text, IntWritable, Text, IntWritable> {
    
     private IntWritable result =  new IntWritable();

     public  void reduce(Text key, Iterable<IntWritable> values, Context context)  throws IOException, InterruptedException {

        System.out.println( "IntSumReducer.reduce...");

        
         int sum = 0;
         for (IntWritable val : values) {
            sum += val.get();
            System.out.println( "val.get():" + val.get());
        }

        result.set(sum);

        context.write(key, result);

        System.out.println( "Reduce key:" + key.toString() +  " Reduce result:"    + result.get());

        System.out.println( "Reduce Context:" + context +  " Result:" + result);

    }
}

 

  執行調用后,控制台輸出內容如下:

 

IntSumReducer.reduce...
val.get():1
Reduce key: amoeba Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1

IntSumReducer.reduce...
val.get():1
Reduce key: csy Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1

IntSumReducer.reduce...
val.get():2
Reduce key: hehe Reduce result:2
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:2

IntSumReducer.reduce...
val.get():2
Reduce key: lixy Reduce result:2
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:2

IntSumReducer.reduce...
val.get():1
Reduce key: nitamade Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1

IntSumReducer.reduce...
val.get():1
Reduce key: realy Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1

IntSumReducer.reduce...
val.get():1
Reduce key: weibo Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1

IntSumReducer.reduce...
val.get():1
Reduce key: woyou Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1

IntSumReducer.reduce...
val.get():1
Reduce key: zmde Reduce result:1
Reduce Context:org.apache.hadoop.mapreduce.Reducer$Context@6c04ab2f Result:1 

 

通過執行 reduce(Text key, Iterable<IntWritable> values, Context context) 方法,奇跡發生了,hadoop傳到這里的參數,已經去重了。什么意思呢?就是說,參數key里面是單詞名稱,如果一個單詞出現2次,那么參數values里面就會2個值,但是key只有1次。像“lixy”這個單詞在第一行出現了2次,那么這里的key只出現1次,但是后面的values會有2個IntWritable,並且值都是1,這個為1的值其實就是你在map的時候,自己定的。

 

這個例子只是簡單的說明了MapReduce的一個簡單使用,實際上他的功能遠不只這些,還可以實現一些對數據庫中數據的排序、統計等等,特別是對一些非常非常龐大的數據表。當你有一個1T的文本內容,需要統計里面每個單詞分別出現多少次的時候,一台計算機去計算,會需要很長時間的,有可能光加載就要很長時間,但是如果你交給hadoop,並且配了幾台機器一起跑的話,hadoop能把這1T的文本內容分成很多個小段,分發到不同的物理機器上,並行執行你的MapReduce邏輯,然后將幾台物理機器上處理完成的內容匯總后給你,整個過程是分片、並行處理完成的,效率大大提高。

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM