- 分組:相同key的value進行分組
例子:如下輸入輸出,右邊的第一列沒有重復值,第二列取得是當第一列相同時第二例取最大值

分析:首先確定<k3,v3>,k3的選擇兩種方式,
方法1.前兩列都作為k3
方法2.兩列分別是k3和v3,此種情況的k2和v2分別是那些,第一列為k2,第二列為v2,但是最后如何無法轉化為k3,v3呢,思路是從v2s中取值最大的,此種情況不能取值。
第一部分:方法二達到任務目的
(1)自定義Mapper
1 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{ 2 IntWritable k2= new IntWritable(); 3 IntWritable v2= new IntWritable(); 4 @Override 5 protected void map(LongWritable key, Text value, 6 Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context) 7 throws IOException, InterruptedException { 8 String[] splited = value.toString().split("\t"); 9 k2.set(Integer.parseInt(splited[0])); 10 v2.set(Integer.parseInt(splited[1])); 11 context.write(k2, v2); 12 } 13 }
(2)自定義Reduce
//按照k2進行排序,分組(分為3各組,reduce函數被調用3次,分幾組調用幾次)
//分組為3-{3,2,1}, 2-{2,1},1-{1}
1 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ 2 IntWritable v3 = new IntWritable(); 3 @Override 4 protected void reduce(IntWritable k2, Iterable<IntWritable> v2s, 5 Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context) 6 throws IOException, InterruptedException { 7 int max=Integer.MIN_VALUE; 8 for (IntWritable v2 : v2s) { 9 if (v2.get()>max) { 10 max=v2.get(); 11 } 12 } 13 //每個組求得一個最大值可得到結果的序列 14 v3.set(max); 15 context.write(k2, v3); 16 } 17 }
(3)組合MapReduce
1 public static void main(String[] args) throws Exception { 2 Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName()); 3 job.setJarByClass(GroupTest.class); 4 //1.自定義輸入路徑 5 FileInputFormat.setInputPaths(job, new Path(args[0])); 6 //2.自定義mapper 7 //job.setInputFormatClass(TextInputFormat.class); 8 job.setMapperClass(MyMapper.class); 9 //job.setMapOutputKeyClass(Text.class); 10 //job.setMapOutputValueClass(TrafficWritable.class); 11 12 //3.自定義reduce 13 job.setReducerClass(MyReducer.class); 14 job.setOutputKeyClass(IntWritable.class); 15 job.setOutputValueClass(IntWritable.class); 16 //4.自定義輸出路徑 17 FileOutputFormat.setOutputPath(job, new Path(args[1])); 18 //job.setOutputFormatClass(TextOutputFormat.class);//對輸出的數據格式化並寫入磁盤 19 20 job.waitForCompletion(true); 21 }
由此,完整的代碼如下:
1 package Mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 16 public class GroupTest { 17 public static void main(String[] args) throws Exception { 18 Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName()); 19 job.setJarByClass(GroupTest.class); 20 //1.自定義輸入路徑 21 FileInputFormat.setInputPaths(job, new Path(args[0])); 22 //2.自定義mapper 23 //job.setInputFormatClass(TextInputFormat.class); 24 job.setMapperClass(MyMapper.class); 25 //job.setMapOutputKeyClass(Text.class); 26 //job.setMapOutputValueClass(TrafficWritable.class); 27 28 //3.自定義reduce 29 job.setReducerClass(MyReducer.class); 30 job.setOutputKeyClass(IntWritable.class); 31 job.setOutputValueClass(IntWritable.class); 32 //4.自定義輸出路徑 33 FileOutputFormat.setOutputPath(job, new Path(args[1])); 34 //job.setOutputFormatClass(TextOutputFormat.class);//對輸出的數據格式化並寫入磁盤 35 36 job.waitForCompletion(true); 37 } 38 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{ 39 IntWritable k2= new IntWritable(); 40 IntWritable v2= new IntWritable(); 41 @Override 42 protected void map(LongWritable key, Text value, 43 Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context) 44 throws IOException, InterruptedException { 45 String[] splited = value.toString().split("\t"); 46 k2.set(Integer.parseInt(splited[0])); 47 v2.set(Integer.parseInt(splited[1])); 48 context.write(k2, v2); 49 } 50 } 51 //按照k2進行排序,分組(分為3各組,reduce函數被調用3次,分幾組調用幾次) 52 //分組為3-{3,2,1}, 2-{2,1},1-{1} 53 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ 54 IntWritable v3 = new IntWritable(); 55 @Override 56 protected void reduce(IntWritable k2, Iterable<IntWritable> v2s, 57 Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context) 58 throws IOException, InterruptedException { 59 int max=Integer.MIN_VALUE; 60 for (IntWritable v2 : v2s) { 61 if (v2.get()>max) { 62 max=v2.get(); 63 } 64 } 65 //每個組求得一個最大值可得到結果的序列 66 v3.set(max); 67 context.write(k2, v3); 68 } 69 } 70 }
(4)測試代碼運行結果
- [root@neusoft-master filecontent]# hadoop jar GroupTest.jar /neusoft/twoint /out9
- [root@neusoft-master filecontent]# hadoop jar -text /out9/part-r-00000
- [root@neusoft-master filecontent]# hadoop dfs -text /out9/part-r-00000

第二部分:方法一達到任務目的
前兩列都作為k3,無v3,由此類推,k2也是前兩列
但是如果采用默認分組的話,上述數據集分為6組,無法達到同樣的數值取得最大值的目的。
由此,利用Mapreduce的自定義分組規則,使得第一列相同的數值可以在一個組里面,從而正確的分組。
MapReduce提供了job.setGroupingComparatorClass(cls);其中cls是自定義分組的類

(1) 從源代碼可知,該類需要繼承RawComparator類,自定義分組代碼如下:
1 //分組比較--自定義分組 2 private static class MyGroupingComparator implements RawComparator { 3 public int compare(Object o1, Object o2) { 4 return 0;//默認的比較方法 5 } 6 //byte[] b1 表示第一個參數的輸入字節表示,byte[] b2表示第二個參數的輸入字節表示 7 //b1 The first byte array. 第一個字節數組, 8 //b1表示前8個字節,b2表示后8個字節,字節是按次序依次比較的 9 //s1 The position index in b1. The object under comparison's starting index.第一列開始位置 10 //l1 The length of the object in b1.第一列長度 ,在這里表示長度8 11 //提供的數據集中的k2一共48個字節,k2的每一行的TwoInt類型表示8字節(t1和t2分別為4字節) 12 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 13 //compareBytes是按字節比較的方法,其中k2表示的是兩列,第一列比較,第二例不比較 14 //第一個字節數組的前四個字節和第二個字節數組的前四個字節比較 15 //{3,3},{3,2},{3,1},{2,2},{2,1},{1,1} 16 //比較上述六組的每組的第一個數字,也就是比較twoint中得t1數值 17 //現在就根據t1可分成3個組了{3,(3,2,1)}{2,(2,1)}{1,1} 18 //之后再從v2中取出最大值 19 return WritableComparator.compareBytes(b1, s1, l1-4, b2, s2, l2-4); 20 } 21 22 }
(2)主函數中調用
//當沒有下面的自定義分組的話,會調用k2的compareto方法執行k2的比較,如果自定義了分組類則使用自定義分組類 job.setGroupingComparatorClass(MyGroupingComparator.class);
(3)根據比較函數個字段的含義,可以得到v2的類型為intwritable,而不是nullwritable,v2是由第二列的數組成的集合
Mapper函數如下:
1 private static class MyMapper extends 2 Mapper<LongWritable, Text, TwoInt, IntWritable> { 3 //這里的v2需要改為IntWritable而不是nullwritable 4 TwoInt K2 = new TwoInt(); 5 IntWritable v2= new IntWritable(); 6 @Override 7 protected void map(LongWritable key, Text value, 8 Mapper<LongWritable, Text, TwoInt, IntWritable>.Context context) 9 throws IOException, InterruptedException { 10 String[] splited = value.toString().split("\t"); 11 K2.set(Integer.parseInt(splited[0]), Integer.parseInt(splited[1])); 12 v2.set(Integer.parseInt(splited[1])); //要比較第二列,需要將第二列的值賦值為v2 13 context.write(K2, v2); 14 } 15 }
(4)k3和v3的類型為reduce輸出的類型,均為intwritable類型,但是如何根據得到的v2去統計其中相同key的value中得最大值呢?
1 private static class MyReducer extends 2 Reducer<TwoInt, IntWritable, IntWritable, IntWritable> {//k2,v2s,k3,v3 3 IntWritable k3 = new IntWritable(); 4 IntWritable v3 = new IntWritable(); 5 @Override 6 protected void reduce( 7 TwoInt k2, 8 Iterable<IntWritable> v2s, 9 Reducer<TwoInt, IntWritable, IntWritable, IntWritable>.Context context) 10 throws IOException, InterruptedException { 11 int max=Integer.MIN_VALUE; 12 for (IntWritable v2 : v2s) { 13 if (v2.get()>max) { 14 max=v2.get(); 15 } 16 } 17 //每個組求得一個最大值可得到結果的序列 18 v3.set(max); 19 k3.set(k2.t1);//k2的第一列作為k3,因為k2為Twoint類型 20 context.write(k3,v3); 21 } 22 }
最終的代碼如下:
1 package Mapreduce; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.IntWritable; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.NullWritable; 12 import org.apache.hadoop.io.RawComparator; 13 import org.apache.hadoop.io.Text; 14 import org.apache.hadoop.io.WritableComparable; 15 import org.apache.hadoop.io.WritableComparator; 16 import org.apache.hadoop.mapreduce.Job; 17 import org.apache.hadoop.mapreduce.Mapper; 18 import org.apache.hadoop.mapreduce.Reducer; 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 21 22 23 public class Group2Test { 24 public static void main(String[] args) throws Exception { 25 Job job = Job.getInstance(new Configuration(), 26 Group2Test.class.getSimpleName()); 27 job.setJarByClass(Group2Test.class); 28 // 1.自定義輸入路徑 29 FileInputFormat.setInputPaths(job, new Path(args[0])); 30 // 2.自定義mapper 31 job.setMapperClass(MyMapper.class); 32 //這里的k2,v2和k3,v3不同,需要顯式定義k2和v2類型 33 job.setMapOutputKeyClass(TwoInt.class); 34 job.setMapOutputValueClass(IntWritable.class); 35 36 //當沒有下面的自定義分組的話,會調用k2的compareto方法執行k2的比較,如果自定義了分組類則使用自定義分組類 37 job.setGroupingComparatorClass(MyGroupingComparator.class); 38 39 // 3.自定義reduce 40 job.setReducerClass(MyReducer.class); 41 job.setOutputKeyClass(IntWritable.class); 42 job.setOutputValueClass(IntWritable.class); 43 // 4.自定義輸出路徑 44 FileOutputFormat.setOutputPath(job, new Path(args[1])); 45 46 job.waitForCompletion(true); 47 } 48 //分組比較--自定義分組 49 private static class MyGroupingComparator implements RawComparator { 50 public int compare(Object o1, Object o2) { 51 return 0;//默認的比較方法 52 } 53 //byte[] b1 表示第一個參數的輸入字節表示,byte[] b2表示第二個參數的輸入字節表示 54 //b1 The first byte array. 第一個字節數組, 55 //b1表示前8個字節,b2表示后8個字節,字節是按次序依次比較的 56 //s1 The position index in b1. The object under comparison's starting index.第一列開始位置 57 //l1 The length of the object in b1.第一列長度 ,在這里表示長度8 58 //提供的數據集中的k2一共48個字節,k2的每一行的TwoInt類型表示8字節(t1和t2分別為4字節) 59 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 60 //compareBytes是按字節比較的方法,其中k2表示的是兩列,第一列比較,第二例不比較 61 //第一個字節數組的前四個字節和第二個字節數組的前四個字節比較 62 //{3,3},{3,2},{3,1},{2,2},{2,1},{1,1} 63 //比較上述六組的每組的第一個數字,也就是比較twoint中得t1數值 64 //現在就根據t1可分成3個組了{3,(3,2,1)}{2,(2,1)}{1,1} 65 //之后再從v2中取出最大值 66 return WritableComparator.compareBytes(b1, s1, l1-4, b2, s2, l2-4); 67 } 68 69 } 70 71 private static class MyMapper extends 72 Mapper<LongWritable, Text, TwoInt, IntWritable> { 73 //這里的v2需要改為IntWritable而不是nullwritable 74 TwoInt K2 = new TwoInt(); 75 IntWritable v2= new IntWritable(); 76 @Override 77 protected void map(LongWritable key, Text value, 78 Mapper<LongWritable, Text, TwoInt, IntWritable>.Context context) 79 throws IOException, InterruptedException { 80 String[] splited = value.toString().split("\t"); 81 K2.set(Integer.parseInt(splited[0]), Integer.parseInt(splited[1])); 82 v2.set(Integer.parseInt(splited[1])); 83 context.write(K2, v2); 84 } 85 } 86 87 private static class MyReducer extends 88 Reducer<TwoInt, IntWritable, IntWritable, IntWritable> {//k2,v2s,k3,v3 89 IntWritable k3 = new IntWritable(); 90 IntWritable v3 = new IntWritable(); 91 @Override 92 protected void reduce( 93 TwoInt k2, 94 Iterable<IntWritable> v2s, 95 Reducer<TwoInt, IntWritable, IntWritable, IntWritable>.Context context) 96 throws IOException, InterruptedException { 97 int max=Integer.MIN_VALUE; 98 for (IntWritable v2 : v2s) { 99 if (v2.get()>max) { 100 max=v2.get(); 101 } 102 } 103 //每個組求得一個最大值可得到結果的序列 104 v3.set(max); 105 k3.set(k2.t1);//k2的第一列作為k3,因為k2為Twoint類型 106 context.write(k3,v3); 107 } 108 } 109 110 private static class TwoInt implements WritableComparable<TwoInt> { 111 public int t1; 112 public int t2; 113 114 public void write(DataOutput out) throws IOException { 115 out.writeInt(t1); 116 out.writeInt(t2); 117 } 118 119 public void set(int t1, int t2) { 120 this.t1 = t1; 121 this.t2 = t2; 122 } 123 124 public void readFields(DataInput in) throws IOException { 125 this.t1 = in.readInt(); 126 this.t2 = in.readInt(); 127 } 128 129 public int compareTo(TwoInt o) { 130 if (this.t1 == o.t1) { // 當第一列相等的時候,第二列升序排列 131 return this.t2 - o.t2; 132 } 133 return this.t1 - o.t1;// 當第一列不相等的時候,按第一列升序排列 134 } 135 @Override 136 public String toString() { 137 return t1 + "\t" + t2; 138 } 139 } 140 }
測試並運行結果如下:
[root@neusoft-master filecontent]# hadoop dfs -text /out9/part-r-00000

[root@neusoft-master filecontent]# hadoop dfs -text /out10/part-r-00000

結果是正確無誤的。
END~
