MapReduce實例(數據去重)


數據去重:

      原理(理解):Mapreduce程序首先應該確認<k3,v3>,根據<k3,v3>確定<k2,v2>,原始數據中出現次數超過一次的數據在輸出文件中只出現一次。Reduce的輸出是不重復的數據,也就是每一行數據作為key,即k3。而v3為空或不需要設值。根據<k3,v3>得到k2為每一行的數據,v2為空。根據MapReduce框架設值可知,k1為每行的起始位置,v1為每行的內容。因此,v1需要賦值給k2,使得原來的v1作為新的k2,從而兩個或更多文件通過在Reduce端聚合,得到去重后的數據。

 數據:

file1.txt

2016-6-1 b
2016-6-2 a
2016-6-3 b
2016-6-4 d
2016-6-5 a
2016-6-6 c
2016-6-7 d
2016-6-3 c

file2.txt

2016-6-1 a
2016-6-2 b
2016-6-3 c
2016-6-4 d
2016-6-5 a
2016-6-6 b
2016-6-7 c
2016-6-3 c

*創建文件夾dedup_in並創建上述兩文件,將該文件夾上傳到HDFS中

[root@neusoft-master filecontent]# hadoop dfs -put dedup_in/ /neusoft/

[root@neusoft-master filecontent]# hadoop dfs -ls /neusoft

 

 

(1)自定義Mapper任務   

 1     private static class MyMapper extends Mapper<Object, Text, Text, Text>{
 2         private static Text line=new Text();
 3         @Override
 4         protected void map(Object k1, Text v1,
 5                 Mapper<Object, Text, Text, Text>.Context context)
 6                 throws IOException, InterruptedException {
 7             line=v1;//v1為每行數據,賦值給line
 8             context.write(line, new Text(""));
 9         }
10     }

(2)自定義Reduce任務

1 private static class MyReducer extends Reducer<Text, Text, Text, Text>
2     {
3         @Override
4         protected void reduce(Text k2, Iterable<Text> v2s,
5                 Reducer<Text, Text, Text, Text>.Context context)
6                 throws IOException, InterruptedException {
7             context.write(k2, new Text(""));
8         }
9     }

(3)主函數(組織map和reduce)

 1 public static void main(String[] args) throws Exception {
 2         //必須要傳遞的是自定的mapper和reducer的類,輸入輸出的路徑必須指定,輸出的類型<k3,v3>必須指定
 3                 //2將自定義的MyMapper和MyReducer組裝在一起
 4                 Configuration conf=new Configuration();
 5                 String jobName=DataDeduplication.class.getSimpleName();
 6                 //1首先寫job,知道需要conf和jobname在去創建即可
 7                 Job job = Job.getInstance(conf, jobName);
 8                 
 9                 //*13最后,如果要打包運行改程序,則需要調用如下行
10                 job.setJarByClass(DataDeduplication.class);
11                 
12                 //3讀取HDFS內容:FileInputFormat在mapreduce.lib包下
13                 FileInputFormat.setInputPaths(job, new Path(args[0]));
14                 //4指定解析<k1,v1>的類(誰來解析鍵值對)
15                 //*指定解析的類可以省略不寫,因為設置解析類默認的就是TextInputFormat.class
16                 job.setInputFormatClass(TextInputFormat.class);
17                 //5指定自定義mapper類
18                 job.setMapperClass(MyMapper.class);
19                 //6指定map輸出的key2的類型和value2的類型  <k2,v2>
20                 //*下面兩步可以省略,當<k3,v3>和<k2,v2>類型一致的時候,<k2,v2>類型可以不指定
21                 job.setMapOutputKeyClass(Text.class);
22                 job.setMapOutputValueClass(Text.class);
23                 //7分區(默認1個),排序,分組,規約 采用 默認
24                 job.setCombinerClass(MyReducer.class);
25                 //接下來采用reduce步驟
26                 //8指定自定義的reduce類
27                 job.setReducerClass(MyReducer.class);
28                 //9指定輸出的<k3,v3>類型
29                 job.setOutputKeyClass(Text.class);
30                 job.setOutputValueClass(Text.class);
31                 //10指定輸出<K3,V3>的類
32                 //*下面這一步可以省
33                 job.setOutputFormatClass(TextOutputFormat.class);
34                 //11指定輸出路徑
35                 FileOutputFormat.setOutputPath(job, new Path(args[1]));
36                 
37                 //12寫的mapreduce程序要交給resource manager運行
38                 job.waitForCompletion(true);
39     }

數據去重源代碼:

 1 package Mapreduce;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
15 
16 public class DataDeduplication {
17     public static void main(String[] args) throws Exception {
18         //必須要傳遞的是自定的mapper和reducer的類,輸入輸出的路徑必須指定,輸出的類型<k3,v3>必須指定
19                 //2將自定義的MyMapper和MyReducer組裝在一起
20                 Configuration conf=new Configuration();
21                 String jobName=DataDeduplication.class.getSimpleName();
22                 //1首先寫job,知道需要conf和jobname在去創建即可
23                 Job job = Job.getInstance(conf, jobName);
24                 
25                 //*13最后,如果要打包運行改程序,則需要調用如下行
26                 job.setJarByClass(DataDeduplication.class);
27                 
28                 //3讀取HDFS內容:FileInputFormat在mapreduce.lib包下
29                 FileInputFormat.setInputPaths(job, new Path(args[0]));
30                 //4指定解析<k1,v1>的類(誰來解析鍵值對)
31                 //*指定解析的類可以省略不寫,因為設置解析類默認的就是TextInputFormat.class
32                 job.setInputFormatClass(TextInputFormat.class);
33                 //5指定自定義mapper類
34                 job.setMapperClass(MyMapper.class);
35                 //6指定map輸出的key2的類型和value2的類型  <k2,v2>
36                 //*下面兩步可以省略,當<k3,v3>和<k2,v2>類型一致的時候,<k2,v2>類型可以不指定
37                 job.setMapOutputKeyClass(Text.class);
38                 job.setMapOutputValueClass(Text.class);
39                 //7分區(默認1個),排序,分組,規約 采用 默認
40                 job.setCombinerClass(MyReducer.class);
41                 //接下來采用reduce步驟
42                 //8指定自定義的reduce類
43                 job.setReducerClass(MyReducer.class);
44                 //9指定輸出的<k3,v3>類型
45                 job.setOutputKeyClass(Text.class);
46                 job.setOutputValueClass(Text.class);
47                 //10指定輸出<K3,V3>的類
48                 //*下面這一步可以省
49                 job.setOutputFormatClass(TextOutputFormat.class);
50                 //11指定輸出路徑
51                 FileOutputFormat.setOutputPath(job, new Path(args[1]));
52                 
53                 //12寫的mapreduce程序要交給resource manager運行
54                 job.waitForCompletion(true);
55     }
56     private static class MyMapper extends Mapper<Object, Text, Text, Text>{
57         private static Text line=new Text();
58         @Override
59         protected void map(Object k1, Text v1,
60                 Mapper<Object, Text, Text, Text>.Context context)
61                 throws IOException, InterruptedException {
62             line=v1;//v1為每行數據,賦值給line
63             context.write(line, new Text(""));
64         }
65     }
66     private static class MyReducer extends Reducer<Text, Text, Text, Text>
67     {
68         @Override
69         protected void reduce(Text k2, Iterable<Text> v2s,
70                 Reducer<Text, Text, Text, Text>.Context context)
71                 throws IOException, InterruptedException {
72             context.write(k2, new Text(""));
73         }
74     }
75 }
數據去重

 運行結果:

[root@neusoft-master filecontent]# hadoop jar DataDeduplication.jar /neusoft/dedup_in /out12

[root@neusoft-master filecontent]# hadoop dfs -text /out12/part-r-00000

結果驗證正確~

 

注意:HDFS的顯示形式

[root@neusoft-master filecontent]# hadoop dfs -ls hdfs://neusoft-master:9000/out12

[root@neusoft-master filecontent]# hadoop dfs -ls  /out12 

等價表示形式

       /out12的完整表達形式hdfs://neusoft-master:9000/out12

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM