MapReduce中的全局排序


    MapReduce排序,從大的范圍來說有兩種排序,一種是按照key排序,一種是按照value排序。如果按照value排序,只需在map函數中將key和value對調,然后在reduce函數中在對調回去。從小范圍來說排序又分成部分排序全局排序輔助排序(二次排序)

全局排序

    全局排序就是說在一個MapReduce程序產生的輸出文件中,所有的結果都是按照某個策略進行排序的,例如降序還是升序。MapReduce只能保證一個分區內的數據是key有序的,一個分區對應一個reduce,因此只有一個reduce就保證了數據全局有序,但是這樣又不能用到Hadoop集群的優勢。

      事實上仍有一些替代方案,首先,創建一系列排好序的文件;其次,串聯這些文件;最后生成一個全局排好序的文件。主要思路是使用一個partitioner來描述輸出的全局排序。

      對於多個reduce如何保證數據的全局排序呢?通常的做法是按照key值分區,通過MapReduce的默認分區函數HashPartition將不同范圍的key發送到不同的reduce處理,例如一個文件中有key值從1到10000的數據,我們使用兩個分區,將1到5000的key發送到partition1,然后由reduce1處理,5001到10000的key發動到partition2然后由reduce2處理,reduce1中的key是按照1到5000的升序排序,reduce2中的key是按照5001到10000的升序排序,這樣就保證了整個MapReduce程序的全局排序。但是這樣做有兩個缺點:

 

            1、當數據量大時會出現OOM(內存用完了)。

 

      2、會出現數據傾斜。

      Hadoop提供TotalOrderPartitioner類用於實現全局排序的功能,並且解決了OOM和數據傾斜的問題。

      TotalOrderPartitioner類提供了數據采樣器,對key值進行部分采樣,然后按照采樣結果尋找key值的最佳分割點,將key值均勻的分配到不同的分區中。

      TotalOrderPartitioner 類提供了三個采樣器,分別是:

    1. SplitSampler 分片采樣器,從數據分片中采樣數據,該采樣器不適合已經排好序的數據       
    2. RandomSampler隨機采樣器,按照設置好的采樣率從一個數據集中采樣,是一個優秀的通配采樣器
    3. IntervalSampler間隔采樣機,以固定的間隔從分片中采樣數據,對於已經排好序的數據效果非常好

          三個采樣器都實現了K[] getSample(InputFormat<K,V> inf, Job job)方法,該方法返回的是K[]數組,數組中存放的是根據采樣結果返回的key值,即分隔點,MapRdeuce就是根據K[]數組            的長度N生成N-1個分區partition數量,然后按照分割點的范圍將對應的數據發送到對應的分區中。

      下面介紹使用TotalOrderPartitioner類實現全局排序的功能。代碼如下:

      Map類:

public class MaxTempMapper extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable>{

    protected void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException {

     /*   String line=value.toString();
        String arr[]=line.split(" ");*/
        context.write(key,value);
    }

}

 

          Reduce類:

public class MaxTempReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
    /**
     * reduce
     */
    protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int max=Integer.MIN_VALUE;
        for (IntWritable iw:values) {
            max=max>iw.get()?max:iw.get();
        }
        context.write(key,new IntWritable(max));
    }

}

       App類:

public class MaxTemp {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        conf.set("fs.defaultFS", "file:///");

        Job job = Job.getInstance(conf);

        //設置job的各種屬性
        job.setJobName("MaxTempApp");                        //作業名稱
        job.setJarByClass(MaxTemp.class);                 //搜索類
        job.setInputFormatClass(SequenceFileInputFormat.class); //設置輸入格式

        //添加輸入路徑
        FileInputFormat.addInputPath(job,new Path("F:\\mr\\seq"));
        //設置輸出路徑
        FileOutputFormat.setOutputPath(job,new Path("F:\\mr\\seq\\out"));




        job.setMapperClass(MaxTempMapper.class);             //mapper類
        job.setReducerClass(MaxTempReducer.class);           //reducer類

        job.setNumReduceTasks(3);                       //reduce個數


        job.setMapOutputKeyClass(IntWritable.class);           //
        job.setMapOutputValueClass(IntWritable.class);  //

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);     //


        //設置全排序分區類S
        job.setPartitionerClass(TotalOrderPartitioner.class);
        //創建隨機采樣器
        /**
         * freq:key被選中的概率
         * numSampales 抽取樣本的總數
         * maxSplitsSampled 最大采樣切片數
         */
        InputSampler.Sampler<IntWritable,IntWritable> sampler=
                new InputSampler.RandomSampler<IntWritable, IntWritable>(0.1,50,10);

        TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("file:///f:/mr/par.lst"));
        //將sample數據寫入分區文件中
        InputSampler.writePartitionFile(job,sampler);

        job.waitForCompletion(true);
    }
}

              會有全局的排序文件輸出。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM