shuffle英文翻譯:洗牌。
在mapreduce中間階段,作用有緩存,排序和分區。緩存的大小可以更改,在mapreduce-site.xml配置:
<name>io.sort</name><value>1000</value>,單位是M,默認的緩存大小是100M。下面根據shuffle的圖形詳細說一下shuffle的作用。
Map階段將結果輸出到shuffle緩存中,如果緩存不夠,則暫時存於本機的磁盤上(分區和排序存儲),根據數據量的大小可能存在多個文件,當Map完成后,shuffle將這些文件的數據匯總到總的文件中(分區和排序,有幾個分區就有幾個文件,文件帶索引),磁盤文件匯總階段是combiner在管理。
Reduce階段將這些分區和排序好的數據根據reduceTask處理的分區編號拿到集群上的屬於自己應該處理的文件歸並並排序后進行處理。
接下來就是怎么用代碼實現分區和排序?
排序:Map階段處理數據的時候,shuffle默認是根據key的hashcode值來排序,如果我們要自定義的話就需要去實現hadoop的排序接口WritableComparable<FlowSortBean>,會讓你重寫方法:
@Override public int compareTo(FlowSortBean o) { //按照從大到小排序,如果本對象大,則返回-1,本對象小,則返回1. //默認1為升序排列,-1為降。 return this.getSumFlow()>o.getSumFlow()?-1:1; }
注意在map階段輸出的時候,你如果要根據這個類的某個屬性來排序的話,map中的輸出的key必須是此對象,shuffle是根據map輸出的key值排序的。
分區:自定義一個類來集成父類Partitioner,重寫getPartition方法,根據map輸出的key值或者value值return分區號對應不同的reduceTask。
package com.xws.flowCountSortAndPartitionerMapreduce; import java.util.HashMap; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class PhonePartitioner extends Partitioner<FlowSortBean, Text>{ private static HashMap<String,Integer> phoneMap = new HashMap<String,Integer>(); /** * 獲得分類,如果是135開頭的,則為分區1的數據 */ static{ phoneMap.put("135", 0); phoneMap.put("136", 1); phoneMap.put("137", 2); phoneMap.put("138", 3); } @Override public int getPartition(FlowSortBean fsb, Text phone, int arg2) { String key = phone.toString().substring(0, 3); Integer value = phoneMap.get(key); if(value==null){ value=4; } return value; } }
另外,還需要在job調用階段設置自定義的排序類以及分區個數。
//指定自定義的partitioner以及partitioner的個數 job.setPartitionerClass(PhonePartitioner.class); job.setNumReduceTasks(5);
Map和reuduce的代碼如下:在流量統計的基礎上既實現了排序,也實現了分區。
package com.xws.flowCountSortAndPartitionerMapreduce; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //com.xws.flowCountSortAndPartitionerMapreduce.FlowSortPartitionerCount public class FlowSortPartitionerCount { /** * 我們將數據已經匯總之后進行排序 * * @author root * */ public static class FlowCountMapper extends Mapper<LongWritable, Text, FlowSortBean, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowSortBean, Text>.Context context) throws IOException, InterruptedException { // 獲取本行數據 String line = value.toString(); // 將數據拆分 String[] fileds = StringUtils.split(line, "\t"); String phone = fileds[0]; long upFlow = Long.parseLong(fileds[1]); long dFlow = Long.parseLong(fileds[2]); context.write(new FlowSortBean(upFlow, dFlow),new Text(phone)); } } /** * reduce將shuffle階段排序好的數據直接輸出 * @author root * */ public static class FlowCountReducer extends Reducer<FlowSortBean, Text, Text, FlowSortBean> { @Override protected void reduce(FlowSortBean key, Iterable<Text> values, Reducer<FlowSortBean, Text, Text, FlowSortBean>.Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), key); } } public static void main(String[] args) throws Exception { Job job = Job.getInstance(); // 設置工作類 job.setJarByClass(FlowSortPartitionerCount.class); // 設置map和reduce類 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 設置map輸出結果類型 job.setMapOutputKeyClass(FlowSortBean.class); job.setMapOutputValueClass(Text.class); // 設置輸出結果類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowSortBean.class); //指定自定義的partitioner以及partitioner的個數 job.setPartitionerClass(PhonePartitioner.class); job.setNumReduceTasks(5); // 設置輸入輸出文件路徑 FileInputFormat.setInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 打印過程信息 boolean flag = job.waitForCompletion(true); System.exit(flag ? 0 : 1); } }
結果文件如下:
part-r-00000
part-r-00000
part-r-00000
part-r-00000
part-r-00000