在进行流量排序之前,先要明白排序是发生在map阶段,排序之后(排序结束后map阶段才会显示100%完成)才会到reduce阶段(事实上reduce也会排序),.此外排序之前要已经完成了手机流量的统计工作,即把第一次mr的结果作为本次排序的输入.也就是说读取的数据格式为 手机号 上行流量 下行流量 总流量
1,map阶段,读取并封装流量信息,不同的是context.write()时key必须是封装的实体类,而不再是手机号
1 /**
2 * 输入key 行号 3 * 输入value 流量信息 4 * 输出key 封装了流量信息的FlowBean 5 * 输出value 手机号 6 * @author tele 7 * 8 */
9 public class FlowSortMapper extends Mapper<LongWritable,Text,FlowBean,Text>{ 10 FlowBean flow = new FlowBean(); 11 Text v = new Text(); 12 //读取的内容格式 手机号 上行流量 下行流量 总流量
13 @Override 14 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) 15 throws IOException, InterruptedException { 16
17 //1.读取
18 String line = value.toString(); 19
20 //2.切割
21 String[] split = line.split("\t"); 22 String upFlow = split[1]; 23 String downFlow = split[2]; 24 String phoneNum = split[0]; 25
26 //3.封装流量信息
27 flow.set(Long.parseLong(upFlow),Long.parseLong(downFlow)); 28
29 v.set(phoneNum); 30
31 //4.写出
32 context.write(flow,v); 33
34 } 35 }
2.map之后会根据key进行排序,因此如果要实现自定义排序,必须让定义的bean实现WritableComparable接口,并重写其中的compare方法,我们只需要告诉MapReduce根据什么排序,升序还是降序就可以了
具体的排序过程由MapReduce完成
1 public class FlowBean implements WritableComparable<FlowBean>{ 2 private long upFlow; 3 public long getUpFlow() { 4 return upFlow; 5 } 6 public void setUpFlow(long upFlow) { 7 this.upFlow = upFlow; 8 } 9 public long getDownFlow() { 10 return downFlow; 11 } 12 public void setDownFlow(long downFlow) { 13 this.downFlow = downFlow; 14 } 15 public long getSumFlow() { 16 return sumFlow; 17 } 18 public void setSumFlow(long sumFlow) { 19 this.sumFlow = sumFlow; 20 } 21 private long downFlow; 22 private long sumFlow; 23
24 /**
25 * 反序列化时需要通过反射调用空参构造方法.必须有空参构造 26 */
27 public FlowBean() { 28 super(); 29 } 30
31 public FlowBean(long upFlow, long downFlow) { 32 super(); 33 this.upFlow = upFlow; 34 this.downFlow = downFlow; 35 this.sumFlow = upFlow + downFlow; 36 } 37
38 public void set(long upFlow, long downFlow) { 39 this.upFlow = upFlow; 40 this.downFlow = downFlow; 41 this.sumFlow = upFlow + downFlow; 42 } 43
44
45 /**
46 * 序列化与反序列化顺序必须一致 47 */
48
49
50 //序列化
51 @Override 52 public void write(DataOutput output) throws IOException { 53 output.writeLong(upFlow); 54 output.writeLong(downFlow); 55 output.writeLong(sumFlow); 56
57 } 58
59
60 //反序列化
61 @Override 62 public void readFields(DataInput input) throws IOException { 63 upFlow = input.readLong(); 64 downFlow = input.readLong(); 65 sumFlow = input.readLong(); 66 } 67
68 /**
69 * reduce context.write()会调用此方法 70 */
71 @Override 72 public String toString() { 73 return upFlow + "\t" + downFlow + "\t" + sumFlow; 74 } 75
76
77 @Override 78 public int compareTo(FlowBean o) { 79 // -1表示不交换位置,即降序,1表示交换位置,升序
80 return this.sumFlow > o.getSumFlow() ? -1:1; 81 } 82
83 }
3.reduce阶段,map阶段会对输出的value根据key进行分组,具有相同key的value会被划分到一组,这样reduce阶段执行一次reduce()读取一组,由于map阶段输出的key是定义的FlowBean,因此key是唯一的,从而
每组只有一个值,即Iterable<Text> value中只有一个值,也就是只有一个手机号
1 /**
2 * 输出的格式仍然为 手机号 上行流量 下行流量 总流量 3 * @author tele 4 * 5 */
6 public class FlowSortReducer extends Reducer<FlowBean,Text,Text,FlowBean>{ 7 /**
8 * reduce阶段读入的仍然是一组排好序的数据 9 * 前面map阶段输出的结果已根据key(FlowBean)进行分组,但由于此处key的唯一 10 * 所以一组只有一个数据,即 Iterable<Text> value 中只有一个值 11 */
12 @Override 13 protected void reduce(FlowBean key, Iterable<Text> value, Reducer<FlowBean, Text, Text, FlowBean>.Context context) 14 throws IOException, InterruptedException { 15
16 //输出
17 Text phone = value.iterator().next(); 18 context.write(phone,key); 19
20
21 } 22 }
下面进行debug,在map(),reduce()方法的开始与结束均打上断点,在FlowBean的compareTo()中也打上断点
map读取的内容
写出,注意key是FlowBean对象
接下来是排序,可以看到排序时map仍然不是100%,也就是说map阶段进行了排序(reduce阶段也会进行排序)
排序之后进入reduce阶段,reduce时write会调用FlowBean的toString()把结果输出到磁盘上
reduce除了归并排序之外,在执行write时同样会进行一次排序,执行第一组的write,(会调用FlowBean的toString()).但接下来还会去执行compareTo方法,此时在磁盘上生成的是临时目录,并且生成的part000文件是0KB,在执行完第二组的write之后才会真正把第一组数据写出到磁盘上
part000此时有了数据
这样看来我们重写的compareTo方法无论在map阶段还是reduce阶段都被调用了