一起學Hadoop——TotalOrderPartitioner類實現全局排序


Hadoop排序,從大的范圍來說有兩種排序,一種是按照key排序,一種是按照value排序。如果按照value排序,只需在map函數中將key和value對調,然后在reduce函數中在對調回去。從小范圍來說排序又分成部分排序,全局排序,輔助排序(二次排序)等。本文介紹如何在Hadoop中實現全局排序。
 
全局排序,就是說在一個MapReduce程序產生的輸出文件中,所有的結果都是按照某個策略進行排序的,例如降序還是升序。MapReduce只能保證一個分區內的數據是key有序的,一個分區對應一個reduce,因此只有一個reduce就保證了數據全局有序,但是這樣又不能用到Hadoop集群的優勢。
 
對於多個reduce如何保證數據的全局排序呢?通常的做法是按照key值分區,通過MapReduce的默認分區函數HashPartition將不同范圍的key發送到不同的reduce處理,例如一個文件中有key值從1到10000的數據,我們使用兩個分區,將1到5000的key發送到partition1,然后由reduce1處理,5001到10000的key發動到partition2然后由reduce2處理,reduce1中的key是按照1到5000的升序排序,reduce2中的key是按照5001到10000的升序排序,這樣就保證了整個MapReduce程序的全局排序。但是這樣做有兩個缺點:
1、當數據量大時會出現OOM。
2、會出現數據傾斜。
 
Hadoop提供TotalOrderPartitioner類用於實現全局排序的功能,並且解決了OOM和數據傾斜的問題。
TotalOrderPartitioner類提供了數據采樣器,對key值進行部分采樣,然后按照采樣結果尋找key值的最佳分割點,將key值均勻的分配到不同的分區中。
TotalOrderPartitioner 類提供了三個采樣器,分別是:
  • SplitSampler 分片采樣器,從數據分片中采樣數據,該采樣器不適合已經排好序的數據
  • RandomSampler隨機采樣器,按照設置好的采樣率從一個數據集中采樣
  • IntervalSampler間隔采樣機,以固定的間隔從分片中采樣數據,對於已經排好序的數據效果非常好。
三個采樣器都實現了K[] getSample(InputFormat<K,V> inf, Job job)方法,該方法返回的是K[]數組,數組中存放的是根據采樣結果返回的key值,即分隔點,MapRdeuce就是根據K[]數組的長度N生成N-1個分區partition數量,然后按照分割點的范圍將對應的數據發送到對應的分區中。

下面介紹使用TotalOrderPartitioner類實現全局排序的功能。代碼如下:
 Map類:
1 public class TotalSortMap extends Mapper<Text, Text, Text, IntWritable> {
2     @Override
3     protected void map(Text key, Text value,
4                        Context context) throws IOException, InterruptedException {
5         context.write(key, new IntWritable(Integer.parseInt(key.toString())));
6     }
7 }
Reduce類:
1 public class TotalSortReduce extends Reducer<Text, IntWritable, IntWritable, NullWritable> {
2     @Override
3     protected void reduce(Text key, Iterable<IntWritable> values,
4                           Context context) throws IOException, InterruptedException {
5         for (IntWritable value : values)
6             context.write(value, NullWritable.get());
7     }
8 }

入口類:

 1 public class TotalSort extends Configured implements Tool{
 2 
 3     //實現一個Kye比較器,用於比較兩個key的大小,將key由字符串轉化為Integer,然后進行比較。
 4     public static class KeyComparator extends WritableComparator {
 5         protected KeyComparator() {
 6             super(Text.class, true);
 7         }
 8 
 9         @Override
10         public int compare(WritableComparable writableComparable1, WritableComparable writableComparable2) {
11             int num1 = Integer.parseInt(writableComparable1.toString());
12             int num2 = Integer.parseInt(writableComparable2.toString());
13 
14             return num1 - num2;
15         }
16     }
17     @Override
18     public int run(String[] args) throws Exception {
19         Configuration conf = new Configuration();
20         conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");
21         Job job = Job.getInstance(conf, "Total Sort app");
22         job.setJarByClass(TotalSort.class);
23 
24         //設置讀取文件的路徑,都是從HDFS中讀取。讀取文件路徑從腳本文件中傳進來
25         FileInputFormat.addInputPath(job,new Path(args[0]));
26         //設置mapreduce程序的輸出路徑,MapReduce的結果都是輸入到文件中
27         FileOutputFormat.setOutputPath(job,new Path(args[1]));
28         job.setInputFormatClass(KeyValueTextInputFormat.class);
29         //設置比較器,用於比較數據的大小,然后按順序排序,該例子主要用於比較兩個key的大小
30         job.setSortComparatorClass(KeyComparator.class);
31         job.setNumReduceTasks(3);//設置reduce數量
32 
33         job.setMapOutputKeyClass(Text.class);
34         job.setMapOutputValueClass(IntWritable.class);
35         job.setOutputKeyClass(IntWritable.class);
36         job.setOutputValueClass(NullWritable.class);
37 
38         //設置保存partitions文件的路徑
39         TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));
40         //key值采樣,0.01是采樣率,
41         InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000, 100);
42         //將采樣數據寫入到分區文件中
43         InputSampler.writePartitionFile(job, sampler);
44 
45         job.setMapperClass(TotalSortMap.class);
46         job.setReducerClass(TotalSortReduce.class);
47         //設置分區類。
48         job.setPartitionerClass(TotalOrderPartitioner.class);
49         return job.waitForCompletion(true) ? 0 : 1;
50     }
51     public static void main(String[] args)throws Exception{
52 
53         int exitCode = ToolRunner.run(new TotalSort(), args);
54         System.exit(exitCode);
55     }
56 }
生成測試數據的代碼如下:
1 #!/bin/bash
2 do
3 for k in $(seq 1 10000)
4 echo $RANDOM;
5 done
將上面代碼保存成create_data.sh,然后執行
sh create_data.sh > test_data.txt
會生成一個test_data.txt的文本文件,文本中的內容是一行一個隨機數字
將test_data.txt上傳到HDFS中:
hadoop fs -put test_data.txt /data/
將上面的實現全局排序的代碼打成一個jar包,然后通過shell文件執行。
執行MapReduce代碼的腳本如下:
1 /usr/local/src/hadoop-2.6.1/bin/hadoop jar TotalSort.jar \
2 hdfs://hadoop-master:8020/data/test_data1.txt \
3 hdfs://hadoop-master:8020/total_sort_output \
4 hdfs://hadoop-master:8020/total_sort_partitions
看下運行結果,我們只需要看part-r-00000的尾10行和part-r-00001的頭10行數據,只要它們收尾相接就證明是全局有序的:

下面有幾個坑要注意,大家不要踩:

  1. 數據的輸入類型必須使用KeyValueTextInputFormat類而不是TextInputFormat類,因為hadoop采樣器是對key值采樣,而TextInputFormat的key是位置偏移量,value存放的是每行的輸入數據,對該key采樣沒有任何意義。KeyValueTextInputFormat的key存放的是輸入數據,對key采樣才能更好的划分分區。用法:
    job.setInputFormatClass(KeyValueTextInputFormat.class);
  2. 使用代碼conf.set("mapreduce.totalorderpartitioner.naturalorder", "false")設置分區的排序策略,否則是每個分區內有序,而不是全局有序。
  3. 采樣器只能是Text,Text類型:InputSampler.Sampler<Text, Text>,否則會報 Exception in thread "main" java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable這個錯誤。
  4. job.setMapOutputKeyClass(Text.class)和job.setMapOutputValueClass(IntWritable.class)這兩行代碼必須在InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000, 100);這行代碼之前調用,否則會報 Exception in thread "main" java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable錯誤。
  5. 調用setSortComparatorClass方法設置排序類,對key進行排序。job.setSortComparatorClass(KeyComparator.class);類似例子中的KeyComparator類。否則是按照字典序進行排序。MapReduce默認輸出的key是字符類型時,默認是按照字典序排序。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM