按照k2排序,要求k2必須是可以比較的,即必須實現WritableComparable接口。
但是如果還想讓別的字段(比如v2中的一些字段)參與排序怎么辦?
需要重新定義k2....把需要參與排序的字段都放到k2中.
這塊用代碼實現:
假如數據現在的結構是
3 3
3 2
3 1
2 2
2 1
1 1
看代碼:
1 import java.io.DataInput; 2 import java.io.DataOutput; 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.NullWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.io.WritableComparable; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 17 public class TwoIntSortApp { 18 19 public static void main(String[] args) throws Exception { 20 Job job = Job.getInstance(new Configuration(), TwoIntSortApp.class.getSimpleName()); 21 job.setJarByClass(TwoIntSortApp.class); 22 FileInputFormat.setInputPaths(job, args[0]); 23 24 job.setMapperClass(TwoIntSortMapper.class); 25 job.setMapOutputKeyClass(TwoInt.class); 26 job.setMapOutputValueClass(NullWritable.class); 27 28 job.setReducerClass(TwoIntSortReducer.class); 29 job.setOutputKeyClass(TwoInt.class); 30 job.setOutputValueClass(NullWritable.class); 31 32 FileOutputFormat.setOutputPath(job, new Path(args[1])); 33 job.waitForCompletion(true); 34 } 35 36 public static class TwoIntSortMapper extends Mapper<LongWritable, Text, TwoInt, NullWritable>{ 37 TwoInt k2 = new TwoInt(); 38 @Override 39 protected void map(LongWritable key, Text value, 40 Mapper<LongWritable, Text, TwoInt, NullWritable>.Context context) 41 throws IOException, InterruptedException { 42 String[] splited = value.toString().split("\t"); 43 k2.set(splited[0],splited[1]); 44 context.write(k2, NullWritable.get()); 45 System.out.println("Mapper-----第一個數:"+k2.first+" 第二個數:"+k2.second); 46 } 47 } 48 49 public static class TwoIntSortReducer extends Reducer<TwoInt, NullWritable, TwoInt, NullWritable>{ 50 int i=1; 51 @Override 52 protected void reduce(TwoInt k2, Iterable<NullWritable> arg1, 53 Reducer<TwoInt, NullWritable, TwoInt, NullWritable>.Context context) 54 throws IOException, InterruptedException { 55 context.write(k2,NullWritable.get()); 56 System.out.println("調用次數"+(i++)); 57 System.out.println("Reducer-----第一個數:"+k2.first+" 第二個數:"+k2.second); 58 } 59 } 60 61 public static class TwoInt implements WritableComparable<TwoInt>{ 62 int first; 63 int second; 64 public void write(DataOutput out) throws IOException { 65 out.writeInt(first); 66 out.writeInt(second); 67 } 68 69 public void set(String s1,String s2){ 70 this.first = Integer.parseInt(s1); 71 this.second = Integer.parseInt(s2); 72 } 73 74 public void readFields(DataInput in) throws IOException { 75 this.first = in.readInt(); 76 this.second = in.readInt(); 77 78 } 79 80 public int compareTo(TwoInt o) { 81 int r1 = this.first - o.first; 82 if(r1 < 0){ 83 return -1; 84 }else if(r1 > 0){ 85 return 1; 86 } 87 int r2 = this.second - o.second; 88 return (r2 < 0 ? -1 : (r2 > 0 ? 1 : 0)); 89 } 90 91 @Override 92 public String toString() { 93 return this.first+"\t"+this.second; 94 } 95 } 96 }
//==============================================================
在job上設置Combiner類...
job.setCombinerClass(TwoIntSortReducer.class);//設置Combiner類 job.setGroupingComparatorClass(MyGroupingCompartor.class);//設置自定義的分組類
1 public static class MyGroupingCompartor extends WritableComparator{ 2 @Override 3 public int compare(WritableComparable a, WritableComparable b) { 4 TwoInt aa = (TwoInt)a; 5 TwoInt bb = (TwoInt)b; 6 return aa.first-bb.first<0?-1:(aa.first-bb.first>0?1:0);//只要是第一列相同的就認為是一個分組. 7 /* 8 * 1 1 9 * 2 1 10 * 2 2 11 * 3 1 12 * 3 2 13 * 3 3 14 * 這樣就分成了三組 15 */ 16 } 17 }
