在進行流量排序之前,先要明白排序是發生在map階段,排序之后(排序結束后map階段才會顯示100%完成)才會到reduce階段(事實上reduce也會排序),.此外排序之前要已經完成了手機流量的統計工作,即把第一次mr的結果作為本次排序的輸入.也就是說讀取的數據格式為 手機號 上行流量 下行流量 總流量
1,map階段,讀取並封裝流量信息,不同的是context.write()時key必須是封裝的實體類,而不再是手機號
1 /**
2 * 輸入key 行號 3 * 輸入value 流量信息 4 * 輸出key 封裝了流量信息的FlowBean 5 * 輸出value 手機號 6 * @author tele 7 * 8 */
9 public class FlowSortMapper extends Mapper<LongWritable,Text,FlowBean,Text>{ 10 FlowBean flow = new FlowBean(); 11 Text v = new Text(); 12 //讀取的內容格式 手機號 上行流量 下行流量 總流量
13 @Override 14 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) 15 throws IOException, InterruptedException { 16
17 //1.讀取
18 String line = value.toString(); 19
20 //2.切割
21 String[] split = line.split("\t"); 22 String upFlow = split[1]; 23 String downFlow = split[2]; 24 String phoneNum = split[0]; 25
26 //3.封裝流量信息
27 flow.set(Long.parseLong(upFlow),Long.parseLong(downFlow)); 28
29 v.set(phoneNum); 30
31 //4.寫出
32 context.write(flow,v); 33
34 } 35 }
2.map之后會根據key進行排序,因此如果要實現自定義排序,必須讓定義的bean實現WritableComparable接口,並重寫其中的compare方法,我們只需要告訴MapReduce根據什么排序,升序還是降序就可以了
具體的排序過程由MapReduce完成
1 public class FlowBean implements WritableComparable<FlowBean>{ 2 private long upFlow; 3 public long getUpFlow() { 4 return upFlow; 5 } 6 public void setUpFlow(long upFlow) { 7 this.upFlow = upFlow; 8 } 9 public long getDownFlow() { 10 return downFlow; 11 } 12 public void setDownFlow(long downFlow) { 13 this.downFlow = downFlow; 14 } 15 public long getSumFlow() { 16 return sumFlow; 17 } 18 public void setSumFlow(long sumFlow) { 19 this.sumFlow = sumFlow; 20 } 21 private long downFlow; 22 private long sumFlow; 23
24 /**
25 * 反序列化時需要通過反射調用空參構造方法.必須有空參構造 26 */
27 public FlowBean() { 28 super(); 29 } 30
31 public FlowBean(long upFlow, long downFlow) { 32 super(); 33 this.upFlow = upFlow; 34 this.downFlow = downFlow; 35 this.sumFlow = upFlow + downFlow; 36 } 37
38 public void set(long upFlow, long downFlow) { 39 this.upFlow = upFlow; 40 this.downFlow = downFlow; 41 this.sumFlow = upFlow + downFlow; 42 } 43
44
45 /**
46 * 序列化與反序列化順序必須一致 47 */
48
49
50 //序列化
51 @Override 52 public void write(DataOutput output) throws IOException { 53 output.writeLong(upFlow); 54 output.writeLong(downFlow); 55 output.writeLong(sumFlow); 56
57 } 58
59
60 //反序列化
61 @Override 62 public void readFields(DataInput input) throws IOException { 63 upFlow = input.readLong(); 64 downFlow = input.readLong(); 65 sumFlow = input.readLong(); 66 } 67
68 /**
69 * reduce context.write()會調用此方法 70 */
71 @Override 72 public String toString() { 73 return upFlow + "\t" + downFlow + "\t" + sumFlow; 74 } 75
76
77 @Override 78 public int compareTo(FlowBean o) { 79 // -1表示不交換位置,即降序,1表示交換位置,升序
80 return this.sumFlow > o.getSumFlow() ? -1:1; 81 } 82
83 }
3.reduce階段,map階段會對輸出的value根據key進行分組,具有相同key的value會被划分到一組,這樣reduce階段執行一次reduce()讀取一組,由於map階段輸出的key是定義的FlowBean,因此key是唯一的,從而
每組只有一個值,即Iterable<Text> value中只有一個值,也就是只有一個手機號
1 /**
2 * 輸出的格式仍然為 手機號 上行流量 下行流量 總流量 3 * @author tele 4 * 5 */
6 public class FlowSortReducer extends Reducer<FlowBean,Text,Text,FlowBean>{ 7 /**
8 * reduce階段讀入的仍然是一組排好序的數據 9 * 前面map階段輸出的結果已根據key(FlowBean)進行分組,但由於此處key的唯一 10 * 所以一組只有一個數據,即 Iterable<Text> value 中只有一個值 11 */
12 @Override 13 protected void reduce(FlowBean key, Iterable<Text> value, Reducer<FlowBean, Text, Text, FlowBean>.Context context) 14 throws IOException, InterruptedException { 15
16 //輸出
17 Text phone = value.iterator().next(); 18 context.write(phone,key); 19
20
21 } 22 }
下面進行debug,在map(),reduce()方法的開始與結束均打上斷點,在FlowBean的compareTo()中也打上斷點
map讀取的內容
寫出,注意key是FlowBean對象
接下來是排序,可以看到排序時map仍然不是100%,也就是說map階段進行了排序(reduce階段也會進行排序)
排序之后進入reduce階段,reduce時write會調用FlowBean的toString()把結果輸出到磁盤上
reduce除了歸並排序之外,在執行write時同樣會進行一次排序,執行第一組的write,(會調用FlowBean的toString()).但接下來還會去執行compareTo方法,此時在磁盤上生成的是臨時目錄,並且生成的part000文件是0KB,在執行完第二組的write之后才會真正把第一組數據寫出到磁盤上
part000此時有了數據
這樣看來我們重寫的compareTo方法無論在map階段還是reduce階段都被調用了