MapReduce分組


  •  分組:相同key的value進行分組

 例子:如下輸入輸出,右邊的第一列沒有重復值,第二列取得是當第一列相同時第二例取最大值

          

分析:首先確定<k3,v3>,k3的選擇兩種方式,

方法1.前兩列都作為k3

方法2.兩列分別是k3和v3,此種情況的k2和v2分別是那些,第一列為k2,第二列為v2,但是最后如何無法轉化為k3,v3呢,思路是從v2s中取值最大的,此種情況不能取值。

第一部分:方法二達到任務目的

(1)自定義Mapper

 1 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
 2     IntWritable k2= new IntWritable();
 3     IntWritable v2= new IntWritable();
 4     @Override
 5     protected void map(LongWritable key, Text value,
 6             Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
 7             throws IOException, InterruptedException {
 8            String[] splited = value.toString().split("\t");
 9            k2.set(Integer.parseInt(splited[0]));
10            v2.set(Integer.parseInt(splited[1]));
11            context.write(k2, v2);
12     }
13 }

(2)自定義Reduce

//按照k2進行排序,分組(分為3各組,reduce函數被調用3次,分幾組調用幾次)
//分組為3-{3,2,1}, 2-{2,1},1-{1}

 1 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
 2     IntWritable v3 = new IntWritable();
 3     @Override
 4     protected void reduce(IntWritable k2, Iterable<IntWritable> v2s,
 5             Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
 6             throws IOException, InterruptedException {
 7         int max=Integer.MIN_VALUE;
 8         for (IntWritable v2 : v2s) {
 9             if (v2.get()>max) {
10                 max=v2.get();
11             }
12         }
13         //每個組求得一個最大值可得到結果的序列
14         v3.set(max);
15         context.write(k2, v3);
16     }
17 }

(3)組合MapReduce

 1 public static void main(String[] args) throws Exception {
 2     Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName());
 3     job.setJarByClass(GroupTest.class);
 4     //1.自定義輸入路徑
 5     FileInputFormat.setInputPaths(job, new Path(args[0]));
 6     //2.自定義mapper
 7     //job.setInputFormatClass(TextInputFormat.class);
 8     job.setMapperClass(MyMapper.class);
 9     //job.setMapOutputKeyClass(Text.class);
10     //job.setMapOutputValueClass(TrafficWritable.class);
11     
12     //3.自定義reduce
13     job.setReducerClass(MyReducer.class);
14     job.setOutputKeyClass(IntWritable.class);
15     job.setOutputValueClass(IntWritable.class);
16     //4.自定義輸出路徑
17     FileOutputFormat.setOutputPath(job, new Path(args[1]));
18     //job.setOutputFormatClass(TextOutputFormat.class);//對輸出的數據格式化並寫入磁盤
19     
20     job.waitForCompletion(true);
21 }

由此,完整的代碼如下:

 1 package Mapreduce;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.IntWritable;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 
16 public class GroupTest {
17 public static void main(String[] args) throws Exception {
18     Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName());
19     job.setJarByClass(GroupTest.class);
20     //1.自定義輸入路徑
21     FileInputFormat.setInputPaths(job, new Path(args[0]));
22     //2.自定義mapper
23     //job.setInputFormatClass(TextInputFormat.class);
24     job.setMapperClass(MyMapper.class);
25     //job.setMapOutputKeyClass(Text.class);
26     //job.setMapOutputValueClass(TrafficWritable.class);
27     
28     //3.自定義reduce
29     job.setReducerClass(MyReducer.class);
30     job.setOutputKeyClass(IntWritable.class);
31     job.setOutputValueClass(IntWritable.class);
32     //4.自定義輸出路徑
33     FileOutputFormat.setOutputPath(job, new Path(args[1]));
34     //job.setOutputFormatClass(TextOutputFormat.class);//對輸出的數據格式化並寫入磁盤
35     
36     job.waitForCompletion(true);
37 }
38 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
39     IntWritable k2= new IntWritable();
40     IntWritable v2= new IntWritable();
41     @Override
42     protected void map(LongWritable key, Text value,
43             Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
44             throws IOException, InterruptedException {
45            String[] splited = value.toString().split("\t");
46            k2.set(Integer.parseInt(splited[0]));
47            v2.set(Integer.parseInt(splited[1]));
48            context.write(k2, v2);
49     }
50 }
51 //按照k2進行排序,分組(分為3各組,reduce函數被調用3次,分幾組調用幾次)
52 //分組為3-{3,2,1}, 2-{2,1},1-{1}
53 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
54     IntWritable v3 = new IntWritable();
55     @Override
56     protected void reduce(IntWritable k2, Iterable<IntWritable> v2s,
57             Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
58             throws IOException, InterruptedException {
59         int max=Integer.MIN_VALUE;
60         for (IntWritable v2 : v2s) {
61             if (v2.get()>max) {
62                 max=v2.get();
63             }
64         }
65         //每個組求得一個最大值可得到結果的序列
66         v3.set(max);
67         context.write(k2, v3);
68     }
69 }
70 }
最值得MapReduce代碼

(4)測試代碼運行結果

  •   [root@neusoft-master filecontent]# hadoop jar GroupTest.jar /neusoft/twoint  /out9 
  •   [root@neusoft-master filecontent]# hadoop jar -text  /out9/part-r-00000
  •   [root@neusoft-master filecontent]# hadoop dfs -text  /out9/part-r-00000  

       

第二部分:方法一達到任務目的

      前兩列都作為k3,無v3,由此類推,k2也是前兩列

      但是如果采用默認分組的話,上述數據集分為6組,無法達到同樣的數值取得最大值的目的。

      由此,利用Mapreduce的自定義分組規則,使得第一列相同的數值可以在一個組里面,從而正確的分組。

      MapReduce提供了job.setGroupingComparatorClass(cls);其中cls是自定義分組的類

      

      (1) 從源代碼可知,該類需要繼承RawComparator類,自定義分組代碼如下:

 1 //分組比較--自定義分組
 2     private static class MyGroupingComparator implements RawComparator {
 3         public int compare(Object o1, Object o2) {
 4             return 0;//默認的比較方法
 5         }
 6         //byte[] b1 表示第一個參數的輸入字節表示,byte[] b2表示第二個參數的輸入字節表示
 7         //b1 The first byte array. 第一個字節數組,
 8         //b1表示前8個字節,b2表示后8個字節,字節是按次序依次比較的
 9         //s1 The position index in b1. The object under comparison's starting index.第一列開始位置
10         //l1 The length of the object in b1.第一列長度 ,在這里表示長度8
11         //提供的數據集中的k2一共48個字節,k2的每一行的TwoInt類型表示8字節(t1和t2分別為4字節)
12         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
13             //compareBytes是按字節比較的方法,其中k2表示的是兩列,第一列比較,第二例不比較
14             //第一個字節數組的前四個字節和第二個字節數組的前四個字節比較
15             //{3,3},{3,2},{3,1},{2,2},{2,1},{1,1}
16             //比較上述六組的每組的第一個數字,也就是比較twoint中得t1數值
17             //現在就根據t1可分成3個組了{3,(3,2,1)}{2,(2,1)}{1,1}
18             //之后再從v2中取出最大值
19             return WritableComparator.compareBytes(b1, s1, l1-4, b2, s2, l2-4);
20         }
21 
22     }

      (2)主函數中調用      

//當沒有下面的自定義分組的話,會調用k2的compareto方法執行k2的比較,如果自定義了分組類則使用自定義分組類
        job.setGroupingComparatorClass(MyGroupingComparator.class);

     (3)根據比較函數個字段的含義,可以得到v2的類型為intwritable,而不是nullwritable,v2是由第二列的數組成的集合

      Mapper函數如下:

 1 private static class MyMapper extends
 2             Mapper<LongWritable, Text, TwoInt, IntWritable> {
 3         //這里的v2需要改為IntWritable而不是nullwritable
 4         TwoInt K2 = new TwoInt();
 5         IntWritable v2= new IntWritable();
 6         @Override
 7         protected void map(LongWritable key, Text value,
 8                 Mapper<LongWritable, Text, TwoInt, IntWritable>.Context context)
 9                 throws IOException, InterruptedException {
10             String[] splited = value.toString().split("\t");
11             K2.set(Integer.parseInt(splited[0]), Integer.parseInt(splited[1]));
12             v2.set(Integer.parseInt(splited[1])); //要比較第二列,需要將第二列的值賦值為v2
13             context.write(K2, v2);
14         }
15     }

     (4)k3和v3的類型為reduce輸出的類型,均為intwritable類型,但是如何根據得到的v2去統計其中相同key的value中得最大值呢?

 1 private static class MyReducer extends
 2             Reducer<TwoInt, IntWritable, IntWritable, IntWritable> {//k2,v2s,k3,v3
 3         IntWritable k3 = new IntWritable();
 4         IntWritable v3 = new IntWritable();
 5         @Override
 6         protected void reduce(
 7                 TwoInt k2,
 8                 Iterable<IntWritable> v2s,
 9                 Reducer<TwoInt, IntWritable, IntWritable, IntWritable>.Context context)
10                 throws IOException, InterruptedException {
11             int max=Integer.MIN_VALUE;
12             for (IntWritable v2 : v2s) {
13                 if (v2.get()>max) {
14                     max=v2.get();
15                 }
16             }
17             //每個組求得一個最大值可得到結果的序列
18             v3.set(max);
19             k3.set(k2.t1);//k2的第一列作為k3,因為k2為Twoint類型
20             context.write(k3,v3);
21         }
22     }

最終的代碼如下:

  1 package Mapreduce;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.IntWritable;
 10 import org.apache.hadoop.io.LongWritable;
 11 import org.apache.hadoop.io.NullWritable;
 12 import org.apache.hadoop.io.RawComparator;
 13 import org.apache.hadoop.io.Text;
 14 import org.apache.hadoop.io.WritableComparable;
 15 import org.apache.hadoop.io.WritableComparator;
 16 import org.apache.hadoop.mapreduce.Job;
 17 import org.apache.hadoop.mapreduce.Mapper;
 18 import org.apache.hadoop.mapreduce.Reducer;
 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 21 
 22 
 23 public class Group2Test {
 24     public static void main(String[] args) throws Exception {
 25         Job job = Job.getInstance(new Configuration(),
 26                 Group2Test.class.getSimpleName());
 27         job.setJarByClass(Group2Test.class);
 28         // 1.自定義輸入路徑
 29         FileInputFormat.setInputPaths(job, new Path(args[0]));
 30         // 2.自定義mapper
 31         job.setMapperClass(MyMapper.class);
 32         //這里的k2,v2和k3,v3不同,需要顯式定義k2和v2類型
 33         job.setMapOutputKeyClass(TwoInt.class);  
 34         job.setMapOutputValueClass(IntWritable.class);
 35 
 36         //當沒有下面的自定義分組的話,會調用k2的compareto方法執行k2的比較,如果自定義了分組類則使用自定義分組類
 37         job.setGroupingComparatorClass(MyGroupingComparator.class);
 38 
 39         // 3.自定義reduce
 40         job.setReducerClass(MyReducer.class);
 41         job.setOutputKeyClass(IntWritable.class);
 42         job.setOutputValueClass(IntWritable.class);
 43         // 4.自定義輸出路徑
 44         FileOutputFormat.setOutputPath(job, new Path(args[1]));
 45 
 46         job.waitForCompletion(true);
 47     }
 48     //分組比較--自定義分組
 49     private static class MyGroupingComparator implements RawComparator {
 50         public int compare(Object o1, Object o2) {
 51             return 0;//默認的比較方法
 52         }
 53         //byte[] b1 表示第一個參數的輸入字節表示,byte[] b2表示第二個參數的輸入字節表示
 54         //b1 The first byte array. 第一個字節數組,
 55         //b1表示前8個字節,b2表示后8個字節,字節是按次序依次比較的
 56         //s1 The position index in b1. The object under comparison's starting index.第一列開始位置
 57         //l1 The length of the object in b1.第一列長度 ,在這里表示長度8
 58         //提供的數據集中的k2一共48個字節,k2的每一行的TwoInt類型表示8字節(t1和t2分別為4字節)
 59         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
 60             //compareBytes是按字節比較的方法,其中k2表示的是兩列,第一列比較,第二例不比較
 61             //第一個字節數組的前四個字節和第二個字節數組的前四個字節比較
 62             //{3,3},{3,2},{3,1},{2,2},{2,1},{1,1}
 63             //比較上述六組的每組的第一個數字,也就是比較twoint中得t1數值
 64             //現在就根據t1可分成3個組了{3,(3,2,1)}{2,(2,1)}{1,1}
 65             //之后再從v2中取出最大值
 66             return WritableComparator.compareBytes(b1, s1, l1-4, b2, s2, l2-4);
 67         }
 68 
 69     }
 70 
 71     private static class MyMapper extends
 72             Mapper<LongWritable, Text, TwoInt, IntWritable> {
 73         //這里的v2需要改為IntWritable而不是nullwritable
 74         TwoInt K2 = new TwoInt();
 75         IntWritable v2= new IntWritable();
 76         @Override
 77         protected void map(LongWritable key, Text value,
 78                 Mapper<LongWritable, Text, TwoInt, IntWritable>.Context context)
 79                 throws IOException, InterruptedException {
 80             String[] splited = value.toString().split("\t");
 81             K2.set(Integer.parseInt(splited[0]), Integer.parseInt(splited[1]));
 82             v2.set(Integer.parseInt(splited[1]));
 83             context.write(K2, v2);
 84         }
 85     }
 86 
 87     private static class MyReducer extends
 88             Reducer<TwoInt, IntWritable, IntWritable, IntWritable> {//k2,v2s,k3,v3
 89         IntWritable k3 = new IntWritable();
 90         IntWritable v3 = new IntWritable();
 91         @Override
 92         protected void reduce(
 93                 TwoInt k2,
 94                 Iterable<IntWritable> v2s,
 95                 Reducer<TwoInt, IntWritable, IntWritable, IntWritable>.Context context)
 96                 throws IOException, InterruptedException {
 97             int max=Integer.MIN_VALUE;
 98             for (IntWritable v2 : v2s) {
 99                 if (v2.get()>max) {
100                     max=v2.get();
101                 }
102             }
103             //每個組求得一個最大值可得到結果的序列
104             v3.set(max);
105             k3.set(k2.t1);//k2的第一列作為k3,因為k2為Twoint類型
106             context.write(k3,v3);
107         }
108     }
109 
110     private static class TwoInt implements WritableComparable<TwoInt> {
111         public int t1;
112         public int t2;
113 
114         public void write(DataOutput out) throws IOException {
115             out.writeInt(t1);
116             out.writeInt(t2);
117         }
118 
119         public void set(int t1, int t2) {
120             this.t1 = t1;
121             this.t2 = t2;
122         }
123 
124         public void readFields(DataInput in) throws IOException {
125             this.t1 = in.readInt();
126             this.t2 = in.readInt();
127         }
128 
129         public int compareTo(TwoInt o) {
130             if (this.t1 == o.t1) { // 當第一列相等的時候,第二列升序排列
131                 return this.t2 - o.t2;
132             }
133             return this.t1 - o.t1;// 當第一列不相等的時候,按第一列升序排列
134         }
135         @Override
136         public String toString() {
137             return t1 + "\t" + t2;
138         }
139     }
140 }
方法1求最值

測試並運行結果如下:

[root@neusoft-master filecontent]# hadoop dfs -text  /out9/part-r-00000

 [root@neusoft-master filecontent]# hadoop dfs -text  /out10/part-r-00000

 

結果是正確無誤的。

 

 END~

 

       

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM