MapReduce --全排序


MapReduce全排序的方法1:

  每個map任務對自己的輸入數據進行排序,但是無法做到全局排序,需要將數據傳遞到reduce,然后通過reduce進行一次總的排序,但是這樣做的要求是只能有一個reduce任務來完成。

  並行程度不高,無法發揮分布式計算的特點。

 

MapReduce全排序的方法2:

  針對方法1的問題,現在介紹方法2來進行改進;

  使用多個partition對map的結果進行分區,且分區后的結果是有區間的,將多個分區結果拼接起來,就是一個連續的全局排序文件。

    

 

  Hadoop自帶的Partitioner的實現有兩種,一種為HashPartitioner, 默認的分區方式,計算公式 hash(key)%reducernum,另一種為TotalOrderPartitioner, 為排序作業創建分區,分區中數據的范圍需要通過分區文件來指定。

  分區文件可以人為創建,如采用等距區間,如果數據分布不均勻導致作業完成時間受限於個別reduce任務完成時間的影響。

  也可以通過抽樣器,先對數據進行抽樣,根據數據分布生成分區文件,避免數據傾斜。

  

這里實現一個通過隨機抽樣來生成分區文件,然后對數據進行全排序,根據分區文件的范圍分配到不同的reducer中。

示例代碼:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

import java.io.IOException;

/**
 * Created by Edward on 2016/10/4.
 */
public class TotalSort {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //access hdfs's user
        System.setProperty("HADOOP_USER_NAME","root");

        Configuration conf = new Configuration();
        conf.set("mapred.jar", "D:\\MyDemo\\MapReduce\\Sort\\out\\artifacts\\TotalSort\\TotalSort.jar");

        FileSystem fs = FileSystem.get(conf);

        /*RandomSampler 參數說明
        * @param freq Probability with which a key will be chosen.
        * @param numSamples Total number of samples to obtain from all selected splits.
        * @param maxSplitsSampled The maximum number of splits to examine.
        */
        InputSampler.RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.1, 10, 10);

        //設置分區文件, TotalOrderPartitioner必須指定分區文件
        Path partitionFile = new Path( "_partitions");
        TotalOrderPartitioner.setPartitionFile(conf, partitionFile);

        Job job = Job.getInstance(conf);
        job.setJarByClass(TotalSort.class);
        job.setInputFormatClass(KeyValueTextInputFormat.class); //數據文件默認以\t分割
        job.setMapperClass(Mapper.class);
        job.setReducerClass(Reducer.class);
        job.setNumReduceTasks(4);  //設置reduce任務個數,分區文件以reduce個數為基准,拆分成n段

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setPartitionerClass(TotalOrderPartitioner.class);

        FileInputFormat.addInputPath(job, new Path("/test/sort"));

        Path path = new Path("/test/wc/output");

        if(fs.exists(path))//如果目錄存在,則刪除目錄
        {
            fs.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job, path);

        //將隨機抽樣數據寫入分區文件
        InputSampler.writePartitionFile(job, sampler);

        boolean b = job.waitForCompletion(true);
        if(b)
        {
            System.out.println("OK");
        }

    }
}

 

測試數據:

1    1
2    1
3    1
4    1
5    1
6    1
7    1
8    1
9    1
10    2
11    2
12    2
13    2
14    2
15    2
16    2
17    2
18    2
19    2
20    2
...
5999    4
6000    4
6001    4
6002    4
6003    4
6004    4
6005    4
6006    4
6007    4
6008    4
6009    4
6010    4

抽樣生成的分區文件為:

# hadoop fs -text  /user/root/_partitions

 2673 (null)
 4441 (null)
 5546 (null)

生成的抽樣文件為sequence file通過 -text打開查看

生成的排序結果文件:

文件內容:

 hadoop fs -cat /test/wc/output/part-r-00000

...
2668    4
2669    4
267     3
2670    4
2671    4
2672    4

hadoop fs -cat /test/wc/output/part-r-00001

...
4431    4
4432    4
4433    4
4434    4
4435    4
4436    4
4437    4
4438    4
4439    4
444     3
4440    4

hadoop fs -cat /test/wc/output/part-r-00002

...
554
3 5540 4 5541 4 5542 4 5543 4 5544 4 5545 4

hadoop fs -cat /test/wc/output/part-r-00003

...
99
2 990 3 991 3 992 3 993 3 994 3 995 3 996 3 997 3 998 3 999 3

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM