MapReduce按照兩個字段對數據進行排序


按照k2排序,要求k2必須是可以比較的,即必須實現WritableComparable接口。

但是如果還想讓別的字段(比如v2中的一些字段)參與排序怎么辦?

需要重新定義k2....把需要參與排序的字段都放到k2中.

這塊用代碼實現:

假如數據現在的結構是

3       3

3       2

3       1

2       2

2       1

1       1

看代碼:

 1 import java.io.DataInput;
 2 import java.io.DataOutput;
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.NullWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.io.WritableComparable;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 
17 public class TwoIntSortApp {
18     
19     public static void main(String[] args) throws Exception {
20         Job job = Job.getInstance(new Configuration(), TwoIntSortApp.class.getSimpleName());
21         job.setJarByClass(TwoIntSortApp.class);
22         FileInputFormat.setInputPaths(job, args[0]);
23         
24         job.setMapperClass(TwoIntSortMapper.class);
25         job.setMapOutputKeyClass(TwoInt.class);
26         job.setMapOutputValueClass(NullWritable.class);
27         
28         job.setReducerClass(TwoIntSortReducer.class);
29         job.setOutputKeyClass(TwoInt.class);
30         job.setOutputValueClass(NullWritable.class);
31 
32         FileOutputFormat.setOutputPath(job, new Path(args[1]));
33         job.waitForCompletion(true);        
34     }
35     
36     public static class TwoIntSortMapper extends Mapper<LongWritable, Text, TwoInt, NullWritable>{
37         TwoInt k2 = new TwoInt();
38         @Override
39         protected void map(LongWritable key, Text value,
40                 Mapper<LongWritable, Text, TwoInt, NullWritable>.Context context)
41                 throws IOException, InterruptedException {
42             String[] splited = value.toString().split("\t");
43             k2.set(splited[0],splited[1]);
44             context.write(k2, NullWritable.get());
45             System.out.println("Mapper-----第一個數:"+k2.first+" 第二個數:"+k2.second);
46         }
47     }
48     
49     public static class TwoIntSortReducer extends Reducer<TwoInt, NullWritable, TwoInt, NullWritable>{
50         int i=1;
51         @Override
52         protected void reduce(TwoInt k2, Iterable<NullWritable> arg1,
53                 Reducer<TwoInt, NullWritable, TwoInt, NullWritable>.Context context)
54                 throws IOException, InterruptedException {
55             context.write(k2,NullWritable.get());
56             System.out.println("調用次數"+(i++));
57             System.out.println("Reducer-----第一個數:"+k2.first+" 第二個數:"+k2.second);
58         }
59     }
60     
61     public static class TwoInt implements WritableComparable<TwoInt>{
62         int first;
63         int second;
64         public void write(DataOutput out) throws IOException {
65             out.writeInt(first);
66             out.writeInt(second);
67         }
68         
69         public void set(String s1,String s2){
70             this.first = Integer.parseInt(s1);
71             this.second = Integer.parseInt(s2);
72         }
73 
74         public void readFields(DataInput in) throws IOException {
75             this.first = in.readInt();
76             this.second = in.readInt();
77             
78         }
79 
80         public int compareTo(TwoInt o) {
81             int r1 = this.first - o.first;
82             if(r1 < 0){
83                 return -1;
84             }else if(r1 > 0){
85                 return 1;
86             }
87             int r2 = this.second - o.second;
88             return  (r2 < 0 ? -1 : (r2 > 0 ? 1 : 0)); 
89         }
90         
91         @Override
92         public String toString() {
93             return this.first+"\t"+this.second;
94         }
95     }
96 }

 //==============================================================

在job上設置Combiner類...

        
        job.setCombinerClass(TwoIntSortReducer.class);//設置Combiner類
        job.setGroupingComparatorClass(MyGroupingCompartor.class);//設置自定義的分組類

  

 1     public static class MyGroupingCompartor extends WritableComparator{
 2         @Override
 3         public int compare(WritableComparable a, WritableComparable b) {
 4             TwoInt aa = (TwoInt)a; 
 5             TwoInt bb = (TwoInt)b; 
 6             return aa.first-bb.first<0?-1:(aa.first-bb.first>0?1:0);//只要是第一列相同的就認為是一個分組.
 7             /*
 8              * 1    1
 9              * 2    1
10              * 2    2
11              * 3    1
12              * 3    2
13              * 3    3
14              * 這樣就分成了三組
15              */
16         }
17     }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM