MapReduce全排序的方法1:
每個map任務對自己的輸入數據進行排序,但是無法做到全局排序,需要將數據傳遞到reduce,然后通過reduce進行一次總的排序,但是這樣做的要求是只能有一個reduce任務來完成。
並行程度不高,無法發揮分布式計算的特點。
MapReduce全排序的方法2:
針對方法1的問題,現在介紹方法2來進行改進;
使用多個partition對map的結果進行分區,且分區后的結果是有區間的,將多個分區結果拼接起來,就是一個連續的全局排序文件。
Hadoop自帶的Partitioner的實現有兩種,一種為HashPartitioner, 默認的分區方式,計算公式 hash(key)%reducernum,另一種為TotalOrderPartitioner, 為排序作業創建分區,分區中數據的范圍需要通過分區文件來指定。
分區文件可以人為創建,如采用等距區間,如果數據分布不均勻導致作業完成時間受限於個別reduce任務完成時間的影響。
也可以通過抽樣器,先對數據進行抽樣,根據數據分布生成分區文件,避免數據傾斜。
這里實現一個通過隨機抽樣來生成分區文件,然后對數據進行全排序,根據分區文件的范圍分配到不同的reducer中。
示例代碼:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.InputSampler; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import java.io.IOException; /** * Created by Edward on 2016/10/4. */ public class TotalSort { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //access hdfs's user System.setProperty("HADOOP_USER_NAME","root"); Configuration conf = new Configuration(); conf.set("mapred.jar", "D:\\MyDemo\\MapReduce\\Sort\\out\\artifacts\\TotalSort\\TotalSort.jar"); FileSystem fs = FileSystem.get(conf); /*RandomSampler 參數說明 * @param freq Probability with which a key will be chosen. * @param numSamples Total number of samples to obtain from all selected splits. * @param maxSplitsSampled The maximum number of splits to examine. */ InputSampler.RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.1, 10, 10); //設置分區文件, TotalOrderPartitioner必須指定分區文件 Path partitionFile = new Path( "_partitions"); TotalOrderPartitioner.setPartitionFile(conf, partitionFile); Job job = Job.getInstance(conf); job.setJarByClass(TotalSort.class); job.setInputFormatClass(KeyValueTextInputFormat.class); //數據文件默認以\t分割 job.setMapperClass(Mapper.class); job.setReducerClass(Reducer.class); job.setNumReduceTasks(4); //設置reduce任務個數,分區文件以reduce個數為基准,拆分成n段 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setPartitionerClass(TotalOrderPartitioner.class); FileInputFormat.addInputPath(job, new Path("/test/sort")); Path path = new Path("/test/wc/output"); if(fs.exists(path))//如果目錄存在,則刪除目錄 { fs.delete(path,true); } FileOutputFormat.setOutputPath(job, path); //將隨機抽樣數據寫入分區文件 InputSampler.writePartitionFile(job, sampler); boolean b = job.waitForCompletion(true); if(b) { System.out.println("OK"); } } }
測試數據:
1 1
2 1
3 1
4 1
5 1
6 1
7 1
8 1
9 1
10 2
11 2
12 2
13 2
14 2
15 2
16 2
17 2
18 2
19 2
20 2
...
5999 4
6000 4
6001 4
6002 4
6003 4
6004 4
6005 4
6006 4
6007 4
6008 4
6009 4
6010 4
抽樣生成的分區文件為:
# hadoop fs -text /user/root/_partitions
2673 (null)
4441 (null)
5546 (null)
生成的抽樣文件為sequence file通過 -text打開查看
生成的排序結果文件:
文件內容:
hadoop fs -cat /test/wc/output/part-r-00000
... 2668 4 2669 4 267 3 2670 4 2671 4 2672 4
hadoop fs -cat /test/wc/output/part-r-00001
... 4431 4 4432 4 4433 4 4434 4 4435 4 4436 4 4437 4 4438 4 4439 4 444 3 4440 4
hadoop fs -cat /test/wc/output/part-r-00002
...
554 3 5540 4 5541 4 5542 4 5543 4 5544 4 5545 4
hadoop fs -cat /test/wc/output/part-r-00003
...
99 2 990 3 991 3 992 3 993 3 994 3 995 3 996 3 997 3 998 3 999 3