MapReduce排序,从大的范围来说有两种排序,一种是按照key排序,一种是按照value排序。如果按照value排序,只需在map函数中将key和value对调,然后在reduce函数中在对调回去。从小范围来说排序又分成部分排序,全局排序,辅助排序(二次排序)等
全局排序
全局排序就是说在一个MapReduce程序产生的输出文件中,所有的结果都是按照某个策略进行排序的,例如降序还是升序。MapReduce只能保证一个分区内的数据是key有序的,一个分区对应一个reduce,因此只有一个reduce就保证了数据全局有序,但是这样又不能用到Hadoop集群的优势。
事实上仍有一些替代方案,首先,创建一系列排好序的文件;其次,串联这些文件;最后生成一个全局排好序的文件。主要思路是使用一个partitioner来描述输出的全局排序。
对于多个reduce如何保证数据的全局排序呢?通常的做法是按照key值分区,通过MapReduce的默认分区函数HashPartition将不同范围的key发送到不同的reduce处理,例如一个文件中有key值从1到10000的数据,我们使用两个分区,将1到5000的key发送到partition1,然后由reduce1处理,5001到10000的key发动到partition2然后由reduce2处理,reduce1中的key是按照1到5000的升序排序,reduce2中的key是按照5001到10000的升序排序,这样就保证了整个MapReduce程序的全局排序。但是这样做有两个缺点:
1、当数据量大时会出现OOM(内存用完了)。
2、会出现数据倾斜。
Hadoop提供TotalOrderPartitioner类用于实现全局排序的功能,并且解决了OOM和数据倾斜的问题。
TotalOrderPartitioner类提供了数据采样器,对key值进行部分采样,然后按照采样结果寻找key值的最佳分割点,将key值均匀的分配到不同的分区中。
TotalOrderPartitioner 类提供了三个采样器,分别是:
- SplitSampler 分片采样器,从数据分片中采样数据,该采样器不适合已经排好序的数据
- RandomSampler随机采样器,按照设置好的采样率从一个数据集中采样,是一个优秀的通配采样器
- IntervalSampler间隔采样机,以固定的间隔从分片中采样数据,对于已经排好序的数据效果非常好
三个采样器都实现了K[] getSample(InputFormat<K,V> inf, Job job)方法,该方法返回的是K[]数组,数组中存放的是根据采样结果返回的key值,即分隔点,MapRdeuce就是根据K[]数组 的长度N生成N-1个分区partition数量,然后按照分割点的范围将对应的数据发送到对应的分区中。
下面介绍使用TotalOrderPartitioner类实现全局排序的功能。代码如下:
Map类:
public class MaxTempMapper extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable>{ protected void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException { /* String line=value.toString(); String arr[]=line.split(" ");*/ context.write(key,value); } }
Reduce类:
public class MaxTempReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ /** * reduce */ protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int max=Integer.MIN_VALUE; for (IntWritable iw:values) { max=max>iw.get()?max:iw.get(); } context.write(key,new IntWritable(max)); } }
App类:
public class MaxTemp { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); Job job = Job.getInstance(conf); //设置job的各种属性 job.setJobName("MaxTempApp"); //作业名称 job.setJarByClass(MaxTemp.class); //搜索类 job.setInputFormatClass(SequenceFileInputFormat.class); //设置输入格式 //添加输入路径 FileInputFormat.addInputPath(job,new Path("F:\\mr\\seq")); //设置输出路径 FileOutputFormat.setOutputPath(job,new Path("F:\\mr\\seq\\out")); job.setMapperClass(MaxTempMapper.class); //mapper类 job.setReducerClass(MaxTempReducer.class); //reducer类 job.setNumReduceTasks(3); //reduce个数 job.setMapOutputKeyClass(IntWritable.class); // job.setMapOutputValueClass(IntWritable.class); // job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); // //设置全排序分区类S job.setPartitionerClass(TotalOrderPartitioner.class); //创建随机采样器 /** * freq:key被选中的概率 * numSampales 抽取样本的总数 * maxSplitsSampled 最大采样切片数 */ InputSampler.Sampler<IntWritable,IntWritable> sampler= new InputSampler.RandomSampler<IntWritable, IntWritable>(0.1,50,10); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("file:///f:/mr/par.lst")); //将sample数据写入分区文件中 InputSampler.writePartitionFile(job,sampler); job.waitForCompletion(true); } }
会有全局的排序文件输出。