MapReduce 二次排序


默認情況下,Map 輸出的結果會對 Key 進行默認的排序,但是有時候需要對 Key 排序的同時再對 Value 進行排序,這時候就要用到二次排序了。下面讓我們來介紹一下什么是二次排序。

二次排序原理

        我們把二次排序主要分為以下幾個階段。

        Map 起始階段

        在Map階段,使用 job.setInputFormatClass() 定義的 InputFormat ,將輸入的數據集分割成小數據塊 split,同時 InputFormat 提供一個 RecordReader的實現。本課程中使用的是 TextInputFormat,它提供的 RecordReader 會將文本的行號作為 Key,這一行的文本作為 Value。這就是自定義 Mapper 的輸入是 < LongWritable,Text> 的原因。然后調用自定義 Mapper 的map方法,將一個個< LongWritable,Text>鍵值對輸入給 Mapper 的 map方法。

        Map 最后階段

        在 Map 階段的最后,會先調用 job.setPartitionerClass() 對這個 Mapper 的輸出結果進行分區,每個分區映射到一個Reducer。每個分區內又調用 job.setSortComparatorClass() 設置的 Key 比較函數類排序。可以看到,這本身就是一個二次排序。如果沒有通過 job.setSortComparatorClass() 設置 Key 比較函數類,則使用 Key 實現的 compareTo() 方法。我們既可以使用 IntPair 實現的 compareTo() 方法,也可以專門定義 Key 比較函數類。

        Reduce 階段

        在 Reduce 階段,reduce() 方法接受所有映射到這個 Reduce 的 map 輸出后,也是會調用 job.setSortComparatorClass()方法設置的 Key 比較函數類,對所有數據進行排序。然后開始構造一個 Key 對應的 Value 迭代器。 這時就要用到分組,使用 job.setGroupingComparatorClass()方法設置分組函數類。只要這個比較器比較的兩個 Key 相同,它們就屬於同一組,它們的 Value 放在一個 Value 迭代器,而這個迭代器的 Key 使用屬於同一個組的所有Key的第一個Key。最后就是進入 Reducer 的 reduce() 方法,reduce() 方法的輸入是所有的 Key 和它的 Value 迭代器,同樣注意輸入與輸出的類型必須與自定義的 Reducer 中聲明的一致。

        接下來我們通過數據示例,可以很直觀的了解二次排序的原理。

        輸入文件sort.txt(下載)內容為:

40  20 
40  10
40  30
40  5
30  30
30  20
30  10
30  40
50  20 
50  50
50  10
50  60

        輸出文件的內容(從小到大排序)如下:

30  10
30  20
30  30
30  40
==============================
40  5
40  10
40  20
40  30
==============================  
50  10
50  20
50  50
50  60 

 

        從輸出的結果可以看出Key實現了從小到大的排序,同時相同Key的Value也實現了從小到大的排序,這就是二次排序的結果。

二次排序的具體流程

        在 MapReduce 中,所有的 Key 是需要被比較和排序的,而且是二次,先根據 Partitioner,再根據大小。而本例中也是要比較兩次。先按照第一字段排序,然后再對第一字段相同的按照第二字段排序。根據這一點,我們可以構造一個復合類 IntPair ,它有兩個字段,先利用分區對第一字段排序,再利用分區內的比較對第二字段排序。二次排序的流程分為以下幾步。

        1、自定義 key

        所有自定義的 key 應該實現接口 WritableComparable,因為它是可序列化的並且可比較的。WritableComparable 的內部方法如下所示。

 1 //反序列化,從流中的二進制轉換成IntPair
 2 public void readFields(DataInput in) throws IOException
 3        
 4 //序列化,將IntPair轉化成使用流傳送的二進制
 5 public void write(DataOutput out)
 6 
 7 //key的比較
 8 public int compareTo(IntPair o)
 9        
10 //默認的分區類 HashPartitioner,使用此方法
11 public int hashCode()
12 
13 //默認實現
14 public boolean equals(Object right)

 

        2、自定義分區

        自定義分區函數類 FirstPartitioner,是 key 的第一次比較,完成對所有 key 的排序。

public static class FirstPartitioner extends Partitioner< IntPair,IntWritable>

 

        在 job 中使用 setPartitionerClasss()方法設置 Partitioner。

job.setPartitionerClasss(FirstPartitioner.Class);

 

        3、Key 的比較類

        這是 Key 的第二次比較,對所有的 Key 進行排序,即同時完成IntPair中的first和second排序。該類是一個比較器,可以通過兩種方式實現。

        1) 繼承 WritableComparator。

public static class KeyComparator extends WritableComparator

 

        必須有一個構造函數,並且重載 以下方法。

public int compare(WritableComparable w1, WritableComparable w2)

 

        2) 實現接口 RawComparator。

        上面兩種實現方式,在 Job 中,可以通過setSortComparatorClass()方法來設置Key的比較類。

job.setSortComparatorClass(KeyComparator.Class);

 

        注意:如果沒有使用自定義的 SortComparator 類,則默認使用 Key 中compareTo()方法對 Key 排序分組。

        4、定義分組類函數

        在 Reduce 階段,構造一個與 Key 相對應的 Value 迭代器的時候,只要 first 相同就屬於同一個組,放在一個 Value 迭代器。定義這個比較器,可以有兩種方式。

        1) 繼承 WritableComparator。

public static class GroupingComparator extends WritableComparator

 

        必須有一個構造函數,並且重載以下方法。

public int compare(WritableComparable w1, WritableComparable w2)

 

        2) 實現接口 RawComparator。

        上面兩種實現方式,在 Job 中,可以通過 setGroupingComparatorClass()方法來設置分組類。

job.setGroupingComparatorClass(GroupingComparator.Class);

 

        另外注意的是,如果reduce的輸入與輸出不是同一種類型,則 Combiner和Reducer 不能共用 Reducer 類,因為 Combiner 的輸出是 reduce 的輸入。除非重新定義一個Combiner。

代碼實現

        Hadoop 的 example 包中自帶了一個 MapReduce 的二次排序算法,下面這個示例對 example 包中的二次排序源碼的改進。我們按照以下幾步完成二次排序:

        第一步:自定義IntPair類,將示例數據中的key/value封裝成一個整體作為Key,同時實現 WritableComparable 接口並重寫其方法。

 1 /**
 2 * 自己定義的key類應該實現WritableComparable接口
 3 */
 4 public  class IntPair implements WritableComparable<IntPair>{
 5     int first;//第一個成員變量
 6     int second;//第二個成員變量
 7     public void set(int left, int right){
 8         first = left;
 9         second = right;
10     }
11     public int getFirst(){
12         return first;
13     }
14     public int getSecond(){
15         return second;
16     }
17     @Override
18     //反序列化,從流中的二進制轉換成IntPair
19     public void readFields(DataInput in) throws IOException{
20         first = in.readInt();
21         second = in.readInt();
22     }
23     @Override
24     //序列化,將IntPair轉化成使用流傳送的二進制
25     public void write(DataOutput out) throws IOException{
26         out.writeInt(first);
27         out.writeInt(second);
28     }
29     @Override
30     //key的比較
31     public int compareTo(IntPair o)
32     {
33         // TODO Auto-generated method stub
34         if (first != o.first){
35             return first < o.first ? -1 : 1;
36         }else if (second != o.second){
37             return second < o.second ? -1 : 1;
38         }else{
39             return 0;
40         }
41     }
42     
43     @Override
44     public int hashCode(){
45         return first * 157 + second;
46     }
47     @Override
48     public boolean equals(Object right){
49         if (right == null)
50             return false;
51         if (this == right)
52             return true;
53         if (right instanceof IntPair){
54             IntPair r = (IntPair) right;
55             return r.first == first && r.second == second;
56         }else{
57             return false;
58         }
59     }
60 }

 

        第二步:自定義分區函數類FirstPartitioner,根據 IntPair 中的first實現分區。

1 /**
2 * 分區函數類。根據first確定Partition。
3 */
4 public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>{
5         @Override
6         public int getPartition(IntPair key, IntWritable value,int numPartitions){
7             return Math.abs(key.getFirst() * 127) % numPartitions;
8         }
9 }

 

 

        第三步:自定義 SortComparator 實現 IntPair 類中的first和second排序。本課程中沒有使用這種方法,而是使用 IntPair 中的compareTo()方法實現的。

        第四步:自定義 GroupingComparator 類,實現分區內的數據分組。

  1 /**
  2 *繼承WritableComparator
  3 */
  4 public static class GroupingComparator extends WritableComparator{
  5         protected GroupingComparator(){
  6             super(IntPair.class, true);
  7         }
  8         @Override
  9         //Compare two WritableComparables.
 10         public int compare(WritableComparable w1, WritableComparable w2){
 11             IntPair ip1 = (IntPair) w1;
 12             IntPair ip2 = (IntPair) w2;
 13             int l = ip1.getFirst();
 14             int r = ip2.getFirst();
 15             return l == r ? 0 : (l < r ? -1 : 1);
 16         }
 17 }
 18 
 19         第五步:編寫 MapReduce 主程序實現二次排序。
 20 
 21 import java.io.DataInput;
 22 import java.io.DataOutput;
 23 import java.io.IOException;
 24 import java.util.StringTokenizer;
 25 import org.apache.hadoop.conf.Configuration;
 26 import org.apache.hadoop.fs.Path;
 27 import org.apache.hadoop.io.IntWritable;
 28 import org.apache.hadoop.io.LongWritable;
 29 import org.apache.hadoop.io.Text;
 30 import org.apache.hadoop.io.WritableComparable;
 31 import org.apache.hadoop.io.WritableComparator;
 32 import org.apache.hadoop.mapreduce.Job;
 33 import org.apache.hadoop.mapreduce.Mapper;
 34 import org.apache.hadoop.mapreduce.Partitioner;
 35 import org.apache.hadoop.mapreduce.Reducer;
 36 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 37 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 38 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 39 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 40 public class SecondarySort{
 41     // 自定義map
 42     public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>{
 43         private final IntPair intkey = new IntPair();
 44         private final IntWritable intvalue = new IntWritable();
 45         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
 46             String line = value.toString();
 47             StringTokenizer tokenizer = new StringTokenizer(line);
 48             int left = 0;
 49             int right = 0;
 50             if (tokenizer.hasMoreTokens()){
 51                 left = Integer.parseInt(tokenizer.nextToken());
 52                 if (tokenizer.hasMoreTokens())
 53                     right = Integer.parseInt(tokenizer.nextToken());
 54                 intkey.set(left, right);
 55                 intvalue.set(right);
 56                 context.write(intkey, intvalue);
 57             }
 58         }
 59     }
 60     // 自定義reduce
 61     public static class Reduce extends Reducer< IntPair, IntWritable, Text, IntWritable>{
 62         private final Text left = new Text();      
 63         public void reduce(IntPair key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException{
 64             left.set(Integer.toString(key.getFirst()));
 65             for (IntWritable val : values){
 66                 context.write(left, val);
 67             }
 68         }
 69     }
 70     /**
 71      * @param args
 72      */
 73     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
 74         // TODO Auto-generated method stub
 75         Configuration conf = new Configuration();
 76 
 77         Job job = new Job(conf, "secondarysort");
 78         job.setJarByClass(SecondarySort.class);
 79         
 80         FileInputFormat.setInputPaths(job, new Path(args[0]));//輸入路徑
 81         FileOutputFormat.setOutputPath(job, new Path(args[1]));//輸出路徑
 82 
 83         job.setMapperClass(Map.class);// Mapper
 84         job.setReducerClass(Reduce.class);// Reducer
 85         
 86         job.setPartitionerClass(FirstPartitioner.class);// 分區函數
 87         //job.setSortComparatorClass(KeyComparator.Class);//本課程並沒有自定義SortComparator,而是使用IntPair自帶的排序
 88         job.setGroupingComparatorClass(GroupingComparator.class);// 分組函數
 89 
 90 
 91         job.setMapOutputKeyClass(IntPair.class);
 92         job.setMapOutputValueClass(IntWritable.class);
 93         
 94         job.setOutputKeyClass(Text.class);
 95         job.setOutputValueClass(IntWritable.class);
 96 
 97         job.setInputFormatClass(TextInputFormat.class);
 98         job.setOutputFormatClass(TextOutputFormat.class);
 99        
100         System.exit(job.waitForCompletion(true) ? 0 : 1);
101     }
102 }

        至此,MapReduce 的二次排序的原理和實現已經學習完畢,理解起來可能有點難度,希望大家先看代碼,把代碼跑起來以后再去看前面的原理,這樣理解起來就非常容易。

以上就是博主為大家介紹的這一板塊的主要內容,這都是博主自己的學習過程,希望能給大家帶來一定的指導作用,有用的還望大家點個支持,如果對你沒用也望包涵,有錯誤煩請指出。如有期待可關注博主以第一時間獲取更新哦,謝謝! 

 版權聲明:本文為博主原創文章,未經博主允許不得轉載。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM