MapReduce排序,從大的范圍來說有兩種排序,一種是按照key排序,一種是按照value排序。如果按照value排序,只需在map函數中將key和value對調,然后在reduce函數中在對調回去。從小范圍來說排序又分成部分排序,全局排序,輔助排序(二次排序)等
全局排序
全局排序就是說在一個MapReduce程序產生的輸出文件中,所有的結果都是按照某個策略進行排序的,例如降序還是升序。MapReduce只能保證一個分區內的數據是key有序的,一個分區對應一個reduce,因此只有一個reduce就保證了數據全局有序,但是這樣又不能用到Hadoop集群的優勢。
事實上仍有一些替代方案,首先,創建一系列排好序的文件;其次,串聯這些文件;最后生成一個全局排好序的文件。主要思路是使用一個partitioner來描述輸出的全局排序。
對於多個reduce如何保證數據的全局排序呢?通常的做法是按照key值分區,通過MapReduce的默認分區函數HashPartition將不同范圍的key發送到不同的reduce處理,例如一個文件中有key值從1到10000的數據,我們使用兩個分區,將1到5000的key發送到partition1,然后由reduce1處理,5001到10000的key發動到partition2然后由reduce2處理,reduce1中的key是按照1到5000的升序排序,reduce2中的key是按照5001到10000的升序排序,這樣就保證了整個MapReduce程序的全局排序。但是這樣做有兩個缺點:
1、當數據量大時會出現OOM(內存用完了)。
2、會出現數據傾斜。
Hadoop提供TotalOrderPartitioner類用於實現全局排序的功能,並且解決了OOM和數據傾斜的問題。
TotalOrderPartitioner類提供了數據采樣器,對key值進行部分采樣,然后按照采樣結果尋找key值的最佳分割點,將key值均勻的分配到不同的分區中。
TotalOrderPartitioner 類提供了三個采樣器,分別是:
- SplitSampler 分片采樣器,從數據分片中采樣數據,該采樣器不適合已經排好序的數據
- RandomSampler隨機采樣器,按照設置好的采樣率從一個數據集中采樣,是一個優秀的通配采樣器
- IntervalSampler間隔采樣機,以固定的間隔從分片中采樣數據,對於已經排好序的數據效果非常好
三個采樣器都實現了K[] getSample(InputFormat<K,V> inf, Job job)方法,該方法返回的是K[]數組,數組中存放的是根據采樣結果返回的key值,即分隔點,MapRdeuce就是根據K[]數組 的長度N生成N-1個分區partition數量,然后按照分割點的范圍將對應的數據發送到對應的分區中。
下面介紹使用TotalOrderPartitioner類實現全局排序的功能。代碼如下:
Map類:
public class MaxTempMapper extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable>{ protected void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException { /* String line=value.toString(); String arr[]=line.split(" ");*/ context.write(key,value); } }
Reduce類:
public class MaxTempReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ /** * reduce */ protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int max=Integer.MIN_VALUE; for (IntWritable iw:values) { max=max>iw.get()?max:iw.get(); } context.write(key,new IntWritable(max)); } }
App類:
public class MaxTemp { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); Job job = Job.getInstance(conf); //設置job的各種屬性 job.setJobName("MaxTempApp"); //作業名稱 job.setJarByClass(MaxTemp.class); //搜索類 job.setInputFormatClass(SequenceFileInputFormat.class); //設置輸入格式 //添加輸入路徑 FileInputFormat.addInputPath(job,new Path("F:\\mr\\seq")); //設置輸出路徑 FileOutputFormat.setOutputPath(job,new Path("F:\\mr\\seq\\out")); job.setMapperClass(MaxTempMapper.class); //mapper類 job.setReducerClass(MaxTempReducer.class); //reducer類 job.setNumReduceTasks(3); //reduce個數 job.setMapOutputKeyClass(IntWritable.class); // job.setMapOutputValueClass(IntWritable.class); // job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); // //設置全排序分區類S job.setPartitionerClass(TotalOrderPartitioner.class); //創建隨機采樣器 /** * freq:key被選中的概率 * numSampales 抽取樣本的總數 * maxSplitsSampled 最大采樣切片數 */ InputSampler.Sampler<IntWritable,IntWritable> sampler= new InputSampler.RandomSampler<IntWritable, IntWritable>(0.1,50,10); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("file:///f:/mr/par.lst")); //將sample數據寫入分區文件中 InputSampler.writePartitionFile(job,sampler); job.waitForCompletion(true); } }
會有全局的排序文件輸出。