Hadoop排序,從大的范圍來說有兩種排序,一種是按照key排序,一種是按照value排序。如果按照value排序,只需在map函數中將key和value對調,然后在reduce函數中在對調回去。從小范圍來說排序又分成部分排序,全局排序,輔助排序(二次排序)等。本文介紹如何在Hadoop中實現全局排序。
全局排序,就是說在一個MapReduce程序產生的輸出文件中,所有的結果都是按照某個策略進行排序的,例如降序還是升序。MapReduce只能保證一個分區內的數據是key有序的,一個分區對應一個reduce,因此只有一個reduce就保證了數據全局有序,但是這樣又不能用到Hadoop集群的優勢。
對於多個reduce如何保證數據的全局排序呢?通常的做法是按照key值分區,通過MapReduce的默認分區函數HashPartition將不同范圍的key發送到不同的reduce處理,例如一個文件中有key值從1到10000的數據,我們使用兩個分區,將1到5000的key發送到partition1,然后由reduce1處理,5001到10000的key發動到partition2然后由reduce2處理,reduce1中的key是按照1到5000的升序排序,reduce2中的key是按照5001到10000的升序排序,這樣就保證了整個MapReduce程序的全局排序。但是這樣做有兩個缺點:
1、當數據量大時會出現OOM。
2、會出現數據傾斜。
Hadoop提供TotalOrderPartitioner類用於實現全局排序的功能,並且解決了OOM和數據傾斜的問題。
TotalOrderPartitioner類提供了數據采樣器,對key值進行部分采樣,然后按照采樣結果尋找key值的最佳分割點,將key值均勻的分配到不同的分區中。
TotalOrderPartitioner 類提供了三個采樣器,分別是:
- SplitSampler 分片采樣器,從數據分片中采樣數據,該采樣器不適合已經排好序的數據
- RandomSampler隨機采樣器,按照設置好的采樣率從一個數據集中采樣
- IntervalSampler間隔采樣機,以固定的間隔從分片中采樣數據,對於已經排好序的數據效果非常好。
三個采樣器都實現了K[] getSample(InputFormat<K,V> inf, Job job)方法,該方法返回的是K[]數組,數組中存放的是根據采樣結果返回的key值,即分隔點,MapRdeuce就是根據K[]數組的長度N生成N-1個分區partition數量,然后按照分割點的范圍將對應的數據發送到對應的分區中。
下面介紹使用TotalOrderPartitioner類實現全局排序的功能。代碼如下:
Map類:
1 public class TotalSortMap extends Mapper<Text, Text, Text, IntWritable> { 2 @Override 3 protected void map(Text key, Text value, 4 Context context) throws IOException, InterruptedException { 5 context.write(key, new IntWritable(Integer.parseInt(key.toString()))); 6 } 7 }
Reduce類:
1 public class TotalSortReduce extends Reducer<Text, IntWritable, IntWritable, NullWritable> { 2 @Override 3 protected void reduce(Text key, Iterable<IntWritable> values, 4 Context context) throws IOException, InterruptedException { 5 for (IntWritable value : values) 6 context.write(value, NullWritable.get()); 7 } 8 }
入口類:
1 public class TotalSort extends Configured implements Tool{ 2 3 //實現一個Kye比較器,用於比較兩個key的大小,將key由字符串轉化為Integer,然后進行比較。 4 public static class KeyComparator extends WritableComparator { 5 protected KeyComparator() { 6 super(Text.class, true); 7 } 8 9 @Override 10 public int compare(WritableComparable writableComparable1, WritableComparable writableComparable2) { 11 int num1 = Integer.parseInt(writableComparable1.toString()); 12 int num2 = Integer.parseInt(writableComparable2.toString()); 13 14 return num1 - num2; 15 } 16 } 17 @Override 18 public int run(String[] args) throws Exception { 19 Configuration conf = new Configuration(); 20 conf.set("mapreduce.totalorderpartitioner.naturalorder", "false"); 21 Job job = Job.getInstance(conf, "Total Sort app"); 22 job.setJarByClass(TotalSort.class); 23 24 //設置讀取文件的路徑,都是從HDFS中讀取。讀取文件路徑從腳本文件中傳進來 25 FileInputFormat.addInputPath(job,new Path(args[0])); 26 //設置mapreduce程序的輸出路徑,MapReduce的結果都是輸入到文件中 27 FileOutputFormat.setOutputPath(job,new Path(args[1])); 28 job.setInputFormatClass(KeyValueTextInputFormat.class); 29 //設置比較器,用於比較數據的大小,然后按順序排序,該例子主要用於比較兩個key的大小 30 job.setSortComparatorClass(KeyComparator.class); 31 job.setNumReduceTasks(3);//設置reduce數量 32 33 job.setMapOutputKeyClass(Text.class); 34 job.setMapOutputValueClass(IntWritable.class); 35 job.setOutputKeyClass(IntWritable.class); 36 job.setOutputValueClass(NullWritable.class); 37 38 //設置保存partitions文件的路徑 39 TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2])); 40 //key值采樣,0.01是采樣率, 41 InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000, 100); 42 //將采樣數據寫入到分區文件中 43 InputSampler.writePartitionFile(job, sampler); 44 45 job.setMapperClass(TotalSortMap.class); 46 job.setReducerClass(TotalSortReduce.class); 47 //設置分區類。 48 job.setPartitionerClass(TotalOrderPartitioner.class); 49 return job.waitForCompletion(true) ? 0 : 1; 50 } 51 public static void main(String[] args)throws Exception{ 52 53 int exitCode = ToolRunner.run(new TotalSort(), args); 54 System.exit(exitCode); 55 } 56 }
生成測試數據的代碼如下:
1 #!/bin/bash 2 do 3 for k in $(seq 1 10000) 4 echo $RANDOM; 5 done
將上面代碼保存成create_data.sh,然后執行
sh create_data.sh > test_data.txt
會生成一個test_data.txt的文本文件,文本中的內容是一行一個隨機數字
將test_data.txt上傳到HDFS中:
hadoop fs -put test_data.txt /data/
將上面的實現全局排序的代碼打成一個jar包,然后通過shell文件執行。
執行MapReduce代碼的腳本如下:
1 /usr/local/src/hadoop-2.6.1/bin/hadoop jar TotalSort.jar \ 2 hdfs://hadoop-master:8020/data/test_data1.txt \ 3 hdfs://hadoop-master:8020/total_sort_output \ 4 hdfs://hadoop-master:8020/total_sort_partitions
看下運行結果,我們只需要看part-r-00000的尾10行和part-r-00001的頭10行數據,只要它們收尾相接就證明是全局有序的:
下面有幾個坑要注意,大家不要踩:
- 數據的輸入類型必須使用KeyValueTextInputFormat類而不是TextInputFormat類,因為hadoop采樣器是對key值采樣,而TextInputFormat的key是位置偏移量,value存放的是每行的輸入數據,對該key采樣沒有任何意義。KeyValueTextInputFormat的key存放的是輸入數據,對key采樣才能更好的划分分區。用法:
job.setInputFormatClass(KeyValueTextInputFormat.class);
-
使用代碼conf.set("mapreduce.totalorderpartitioner.naturalorder", "false")設置分區的排序策略,否則是每個分區內有序,而不是全局有序。
-
采樣器只能是Text,Text類型:InputSampler.Sampler<Text, Text>,否則會報 Exception in thread "main" java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable這個錯誤。
-
job.setMapOutputKeyClass(Text.class)和job.setMapOutputValueClass(IntWritable.class)這兩行代碼必須在InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000, 100);這行代碼之前調用,否則會報 Exception in thread "main" java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable錯誤。
-
調用setSortComparatorClass方法設置排序類,對key進行排序。job.setSortComparatorClass(KeyComparator.class);類似例子中的KeyComparator類。否則是按照字典序進行排序。MapReduce默認輸出的key是字符類型時,默認是按照字典序排序。