019 mapreduce的核心--shuffle理解,以及在shuffle中的優化


關於shuffle的過程圖。

  

 

一:概述shuffle

  Shuffle是mapreduce的核心,鏈接map與reduce的中間過程。

  Mapp負責過濾分發,而reduce則是歸並整理,從mapp輸出到reduce的輸入的這個過程稱為shuffle過程。

 

二:map端的shuffle 

1.map結果的輸出

  map的處理結果首先存放在一個環形的緩沖區。

  這個緩沖區的內存是100M,是map存放結果的地方。如果數據量較大,超過了一定的量(默認80M),將會發生溢寫過程。

  在mapred-site.xml中設置內存的大小

    <property>

      <name>mapreduce.task.io.sort.mb</name>

      <value>100</value>

    </property>

  在mapred-site.xml中設置內存溢寫的閾值  

    <property>

      <name>mapreduce.task.io.sort.spill.percent</name>

      <value>0.8</value>

    </property>

   

2.溢寫過程(這個過程是一個階段,不是一個簡單的寫的過程)  

  溢寫是系統在后台單獨開一個線程去操辦。

  溢寫過程包括:分區partitioner,排序sort,溢寫spill to disk,合並merge。

  

 

3.分區

  分區分的是80%的內存。

  因為reduce可能有不同的任務,所以會對80M的內存進行分區,將map的輸出結果放入的對應的reduce分區中。

  

 

 

4.排序

  默認是按照key排序。

  當分區完成之后,對每一個分區的數據進行排序。

  排序發生在數據到達80M的時候。(2017.12.24,剛剛想了一下,應該是這個時候)

 

5.溢寫

  排序之后,將內存的數據寫入硬盤。留出內存方便map的新的輸出結果。

 

6.合並

  如果是第一次寫入硬盤則不需要考慮合並問題,但是在大數據的情況下,前面已經存在大量的spill文件的時候,這時候需要將它們進行合並。

  將各個分區合並之后,對每一個分區的數據再進行一次排序。(2017.12.24,這個比較重要,注意點是各個分區合並)

   使用歸並的方式進行合並,歸並算法。

  實現comparator比較器,進行比較。

  形成一個文件。

 

三:reduce端的shuffle

1.步驟

  對於reduce端的shuffle,和map端的shuffle步驟相同。但是有一個特別的步驟,分組。

 

2.復制

  當reduce開啟任務后,不斷的在各個節點復制需要的數據。

 

3.合並(內含排序)

  復制數據的時候,把可以存放進內存的就把數據存放在內存中,當達到一定的時候,啟動merge,將數據寫進硬盤。

  如果map數據大於內存需要存放的限制,直接寫入硬盤,當達到一定的數量后將其合並為一個文件。

  這時候,reduce開啟任務需要的數據在內存中和在硬盤中,最終形成一個全局文件。

 

4.分組

  《hadoop,1》

  《hadoop,1》

  《yarn,1》

  《hadoop,1》

  《hdfs,1》

  《yarn,1》

  將相同的key放在一起,使用comparable完成比較。

  結果為:

    《hadoop,list(1,1,1)》

    《yarn,list(1,1)》

    《hdfs,list(1)》

 

四:關於Comparator的理解

  不管是排序還是分組,都需要自定義排序器comparable

  Comparator類繼承WritableComparator

  

  而WritableComparator完成接口RawComparator

   

  在RawComparator中:

   

  

 

五:shuffle處的優化

1.combine的優化

  這是map段的reduce。

  好處就是提前進行一次reduce,注意點是每個map進行一次reduce之后,數據量合並變小。

  問題:是否還需要reduce?

  回答:這個是map段的reduce,正真的reduce是許多map的一個匯總,所以是需要的。(2017.12.24,想法不知道對不對,希望以后進行仔細研究)

  

 

2.下面列舉需要修改的程序

  

  

 

3.輸出結果

  

  

 

4.關於壓縮方面的優化

  這個優化也屬於map段的一個優化部分。

  但是優化的方式是修改配置項。

  

  注意點:

  

  會出現的問題:

  

 

 六:屬於分區的一個思路

  shuffle中程序:

  

  說明:

    這個根據reduce實際需求決定。

    根據測試決定合理的reduce數目。

 

七:shuffle最終總結、

  包括優化部分,可以將shuffle分為五個部分。

  map端:分區

      排序

      合並combine

      壓縮

  reduce端:分組

 

八:完整的程序

  1 package com.senior.bigdata;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.conf.Configured;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.IntWritable;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Mapper.Context;
 14 import org.apache.hadoop.mapreduce.Reducer;
 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.hadoop.util.Tool;
 18 import org.apache.hadoop.util.ToolRunner;
 19 
 20 public class OptimizeOfWordCountMR extends Configured implements Tool{
 21     //Mapper
 22     public static class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
 23         private Text mapoutputkey=new Text();
 24         private static final IntWritable mapoutputvalue=new IntWritable(1);
 25         @Override
 26         protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
 27             String lineValue=value.toString();
 28             String[] strs=lineValue.split(" ");
 29             for(String str:strs){
 30                 mapoutputkey.set(str);
 31                 context.write(mapoutputkey, mapoutputvalue);
 32 //                System.out.println(mapoutputkey+"<---->"+mapoutputvalue);
 33             }
 34         }
 35         
 36     }
 37     
 38     //combiner
 39     public static class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{
 40         private IntWritable outputvalue=new IntWritable();
 41         @Override
 42         protected void reduce(Text text, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
 43             int sum=0;
 44 //            System.out.println("key="+text);
 45             for(IntWritable value:values){
 46                 sum+=value.get();
 47 //                System.out.print(value.get());
 48             }
 49 //            System.out.println();
 50             outputvalue.set(sum);
 51             context.write(text, outputvalue);
 52         }
 53         
 54     }
 55     
 56     //Reducer
 57     public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
 58         private IntWritable outputvalue=new IntWritable();
 59         @Override
 60         protected void reduce(Text text, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
 61             int sum=0;
 62 //            System.out.println("key==="+text);
 63             for(IntWritable value:values){
 64 //                System.out.print(value.get());
 65                 sum+=value.get();
 66             }
 67 //            System.out.println();
 68             outputvalue.set(sum);
 69             context.write(text, outputvalue);
 70         }
 71         
 72     }
 73     
 74     //Driver
 75     public int run(String[] args)throws Exception{
 76         Configuration conf=this.getConf();
 77         Job job=Job.getInstance(conf,this.getClass().getSimpleName());
 78         job.setJarByClass(OptimizeOfWordCountMR.class);
 79         //input
 80         Path inpath=new Path(args[0]);
 81         FileInputFormat.addInputPath(job, inpath);
 82         
 83         //output
 84         Path outpath=new Path(args[1]);
 85         FileOutputFormat.setOutputPath(job, outpath);
 86         
 87         //map
 88         job.setMapperClass(WordCountMapper.class);
 89         job.setMapOutputKeyClass(Text.class);
 90         job.setMapOutputValueClass(IntWritable.class);
 91         
 92         //shuffle
 93         job.setCombinerClass(WordCountCombiner.class);    //combiner
 94         
 95         //reduce
 96         job.setReducerClass(WordCountReducer.class);
 97         job.setOutputKeyClass(Text.class);
 98         job.setOutputValueClass(IntWritable.class);
 99         
100         //submit
101         boolean isSucess=job.waitForCompletion(true);
102         return isSucess?0:1;
103     }
104     
105     //main
106     public static void main(String[] args)throws Exception{
107         Configuration conf=new Configuration();
108         //compress
109         conf.set("mapreduce.map.output.compress", "true");
110         conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
111         args=new String[]{
112                 "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/input",
113                 "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/output5"
114         };
115         int status=ToolRunner.run(new OptimizeOfWordCountMR(), args);
116         System.exit(status);
117     }
118 
119 }

 

      

 

 

 

 

 

  

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM