關於shuffle的過程圖。
一:概述shuffle
Shuffle是mapreduce的核心,鏈接map與reduce的中間過程。
Mapp負責過濾分發,而reduce則是歸並整理,從mapp輸出到reduce的輸入的這個過程稱為shuffle過程。
二:map端的shuffle
1.map結果的輸出
map的處理結果首先存放在一個環形的緩沖區。
這個緩沖區的內存是100M,是map存放結果的地方。如果數據量較大,超過了一定的量(默認80M),將會發生溢寫過程。
在mapred-site.xml中設置內存的大小
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>100</value>
</property>
在mapred-site.xml中設置內存溢寫的閾值
<property>
<name>mapreduce.task.io.sort.spill.percent</name>
<value>0.8</value>
</property>
2.溢寫過程(這個過程是一個階段,不是一個簡單的寫的過程)
溢寫是系統在后台單獨開一個線程去操辦。
溢寫過程包括:分區partitioner,排序sort,溢寫spill to disk,合並merge。
3.分區
分區分的是80%的內存。
因為reduce可能有不同的任務,所以會對80M的內存進行分區,將map的輸出結果放入的對應的reduce分區中。
4.排序
默認是按照key排序。
當分區完成之后,對每一個分區的數據進行排序。
排序發生在數據到達80M的時候。(2017.12.24,剛剛想了一下,應該是這個時候)
5.溢寫
排序之后,將內存的數據寫入硬盤。留出內存方便map的新的輸出結果。
6.合並
如果是第一次寫入硬盤則不需要考慮合並問題,但是在大數據的情況下,前面已經存在大量的spill文件的時候,這時候需要將它們進行合並。
將各個分區合並之后,對每一個分區的數據再進行一次排序。(2017.12.24,這個比較重要,注意點是各個分區合並)
使用歸並的方式進行合並,歸並算法。
實現comparator比較器,進行比較。
形成一個文件。
三:reduce端的shuffle
1.步驟
對於reduce端的shuffle,和map端的shuffle步驟相同。但是有一個特別的步驟,分組。
2.復制
當reduce開啟任務后,不斷的在各個節點復制需要的數據。
3.合並(內含排序)
復制數據的時候,把可以存放進內存的就把數據存放在內存中,當達到一定的時候,啟動merge,將數據寫進硬盤。
如果map數據大於內存需要存放的限制,直接寫入硬盤,當達到一定的數量后將其合並為一個文件。
這時候,reduce開啟任務需要的數據在內存中和在硬盤中,最終形成一個全局文件。
4.分組
《hadoop,1》
《hadoop,1》
《yarn,1》
《hadoop,1》
《hdfs,1》
《yarn,1》
將相同的key放在一起,使用comparable完成比較。
結果為:
《hadoop,list(1,1,1)》
《yarn,list(1,1)》
《hdfs,list(1)》
四:關於Comparator的理解
不管是排序還是分組,都需要自定義排序器comparable
Comparator類繼承WritableComparator
而WritableComparator完成接口RawComparator
在RawComparator中:
五:shuffle處的優化
1.combine的優化
這是map段的reduce。
好處就是提前進行一次reduce,注意點是每個map進行一次reduce之后,數據量合並變小。
問題:是否還需要reduce?
回答:這個是map段的reduce,正真的reduce是許多map的一個匯總,所以是需要的。(2017.12.24,想法不知道對不對,希望以后進行仔細研究)
2.下面列舉需要修改的程序
3.輸出結果
4.關於壓縮方面的優化
這個優化也屬於map段的一個優化部分。
但是優化的方式是修改配置項。
注意點:
會出現的問題:
六:屬於分區的一個思路
shuffle中程序:
說明:
這個根據reduce實際需求決定。
根據測試決定合理的reduce數目。
七:shuffle最終總結、
包括優化部分,可以將shuffle分為五個部分。
map端:分區
排序
合並combine
壓縮
reduce端:分組
八:完整的程序
1 package com.senior.bigdata; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Mapper.Context; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.util.Tool; 18 import org.apache.hadoop.util.ToolRunner; 19 20 public class OptimizeOfWordCountMR extends Configured implements Tool{ 21 //Mapper 22 public static class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ 23 private Text mapoutputkey=new Text(); 24 private static final IntWritable mapoutputvalue=new IntWritable(1); 25 @Override 26 protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { 27 String lineValue=value.toString(); 28 String[] strs=lineValue.split(" "); 29 for(String str:strs){ 30 mapoutputkey.set(str); 31 context.write(mapoutputkey, mapoutputvalue); 32 // System.out.println(mapoutputkey+"<---->"+mapoutputvalue); 33 } 34 } 35 36 } 37 38 //combiner 39 public static class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{ 40 private IntWritable outputvalue=new IntWritable(); 41 @Override 42 protected void reduce(Text text, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException { 43 int sum=0; 44 // System.out.println("key="+text); 45 for(IntWritable value:values){ 46 sum+=value.get(); 47 // System.out.print(value.get()); 48 } 49 // System.out.println(); 50 outputvalue.set(sum); 51 context.write(text, outputvalue); 52 } 53 54 } 55 56 //Reducer 57 public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ 58 private IntWritable outputvalue=new IntWritable(); 59 @Override 60 protected void reduce(Text text, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException { 61 int sum=0; 62 // System.out.println("key==="+text); 63 for(IntWritable value:values){ 64 // System.out.print(value.get()); 65 sum+=value.get(); 66 } 67 // System.out.println(); 68 outputvalue.set(sum); 69 context.write(text, outputvalue); 70 } 71 72 } 73 74 //Driver 75 public int run(String[] args)throws Exception{ 76 Configuration conf=this.getConf(); 77 Job job=Job.getInstance(conf,this.getClass().getSimpleName()); 78 job.setJarByClass(OptimizeOfWordCountMR.class); 79 //input 80 Path inpath=new Path(args[0]); 81 FileInputFormat.addInputPath(job, inpath); 82 83 //output 84 Path outpath=new Path(args[1]); 85 FileOutputFormat.setOutputPath(job, outpath); 86 87 //map 88 job.setMapperClass(WordCountMapper.class); 89 job.setMapOutputKeyClass(Text.class); 90 job.setMapOutputValueClass(IntWritable.class); 91 92 //shuffle 93 job.setCombinerClass(WordCountCombiner.class); //combiner 94 95 //reduce 96 job.setReducerClass(WordCountReducer.class); 97 job.setOutputKeyClass(Text.class); 98 job.setOutputValueClass(IntWritable.class); 99 100 //submit 101 boolean isSucess=job.waitForCompletion(true); 102 return isSucess?0:1; 103 } 104 105 //main 106 public static void main(String[] args)throws Exception{ 107 Configuration conf=new Configuration(); 108 //compress 109 conf.set("mapreduce.map.output.compress", "true"); 110 conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec"); 111 args=new String[]{ 112 "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/input", 113 "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/output5" 114 }; 115 int status=ToolRunner.run(new OptimizeOfWordCountMR(), args); 116 System.exit(status); 117 } 118 119 }