數據去重:
原理(理解):Mapreduce程序首先應該確認<k3,v3>,根據<k3,v3>確定<k2,v2>,原始數據中出現次數超過一次的數據在輸出文件中只出現一次。Reduce的輸出是不重復的數據,也就是每一行數據作為key,即k3。而v3為空或不需要設值。根據<k3,v3>得到k2為每一行的數據,v2為空。根據MapReduce框架設值可知,k1為每行的起始位置,v1為每行的內容。因此,v1需要賦值給k2,使得原來的v1作為新的k2,從而兩個或更多文件通過在Reduce端聚合,得到去重后的數據。
數據:
file1.txt
2016-6-1 b
2016-6-2 a
2016-6-3 b
2016-6-4 d
2016-6-5 a
2016-6-6 c
2016-6-7 d
2016-6-3 c
file2.txt
2016-6-1 a
2016-6-2 b
2016-6-3 c
2016-6-4 d
2016-6-5 a
2016-6-6 b
2016-6-7 c
2016-6-3 c
*創建文件夾dedup_in並創建上述兩文件,將該文件夾上傳到HDFS中
[root@neusoft-master filecontent]# hadoop dfs -put dedup_in/ /neusoft/
[root@neusoft-master filecontent]# hadoop dfs -ls /neusoft
(1)自定義Mapper任務
1 private static class MyMapper extends Mapper<Object, Text, Text, Text>{ 2 private static Text line=new Text(); 3 @Override 4 protected void map(Object k1, Text v1, 5 Mapper<Object, Text, Text, Text>.Context context) 6 throws IOException, InterruptedException { 7 line=v1;//v1為每行數據,賦值給line 8 context.write(line, new Text("")); 9 } 10 }
(2)自定義Reduce任務
1 private static class MyReducer extends Reducer<Text, Text, Text, Text> 2 { 3 @Override 4 protected void reduce(Text k2, Iterable<Text> v2s, 5 Reducer<Text, Text, Text, Text>.Context context) 6 throws IOException, InterruptedException { 7 context.write(k2, new Text("")); 8 } 9 }
(3)主函數(組織map和reduce)
1 public static void main(String[] args) throws Exception { 2 //必須要傳遞的是自定的mapper和reducer的類,輸入輸出的路徑必須指定,輸出的類型<k3,v3>必須指定 3 //2將自定義的MyMapper和MyReducer組裝在一起 4 Configuration conf=new Configuration(); 5 String jobName=DataDeduplication.class.getSimpleName(); 6 //1首先寫job,知道需要conf和jobname在去創建即可 7 Job job = Job.getInstance(conf, jobName); 8 9 //*13最后,如果要打包運行改程序,則需要調用如下行 10 job.setJarByClass(DataDeduplication.class); 11 12 //3讀取HDFS內容:FileInputFormat在mapreduce.lib包下 13 FileInputFormat.setInputPaths(job, new Path(args[0])); 14 //4指定解析<k1,v1>的類(誰來解析鍵值對) 15 //*指定解析的類可以省略不寫,因為設置解析類默認的就是TextInputFormat.class 16 job.setInputFormatClass(TextInputFormat.class); 17 //5指定自定義mapper類 18 job.setMapperClass(MyMapper.class); 19 //6指定map輸出的key2的類型和value2的類型 <k2,v2> 20 //*下面兩步可以省略,當<k3,v3>和<k2,v2>類型一致的時候,<k2,v2>類型可以不指定 21 job.setMapOutputKeyClass(Text.class); 22 job.setMapOutputValueClass(Text.class); 23 //7分區(默認1個),排序,分組,規約 采用 默認 24 job.setCombinerClass(MyReducer.class); 25 //接下來采用reduce步驟 26 //8指定自定義的reduce類 27 job.setReducerClass(MyReducer.class); 28 //9指定輸出的<k3,v3>類型 29 job.setOutputKeyClass(Text.class); 30 job.setOutputValueClass(Text.class); 31 //10指定輸出<K3,V3>的類 32 //*下面這一步可以省 33 job.setOutputFormatClass(TextOutputFormat.class); 34 //11指定輸出路徑 35 FileOutputFormat.setOutputPath(job, new Path(args[1])); 36 37 //12寫的mapreduce程序要交給resource manager運行 38 job.waitForCompletion(true); 39 }
數據去重源代碼:

1 package Mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 15 16 public class DataDeduplication { 17 public static void main(String[] args) throws Exception { 18 //必須要傳遞的是自定的mapper和reducer的類,輸入輸出的路徑必須指定,輸出的類型<k3,v3>必須指定 19 //2將自定義的MyMapper和MyReducer組裝在一起 20 Configuration conf=new Configuration(); 21 String jobName=DataDeduplication.class.getSimpleName(); 22 //1首先寫job,知道需要conf和jobname在去創建即可 23 Job job = Job.getInstance(conf, jobName); 24 25 //*13最后,如果要打包運行改程序,則需要調用如下行 26 job.setJarByClass(DataDeduplication.class); 27 28 //3讀取HDFS內容:FileInputFormat在mapreduce.lib包下 29 FileInputFormat.setInputPaths(job, new Path(args[0])); 30 //4指定解析<k1,v1>的類(誰來解析鍵值對) 31 //*指定解析的類可以省略不寫,因為設置解析類默認的就是TextInputFormat.class 32 job.setInputFormatClass(TextInputFormat.class); 33 //5指定自定義mapper類 34 job.setMapperClass(MyMapper.class); 35 //6指定map輸出的key2的類型和value2的類型 <k2,v2> 36 //*下面兩步可以省略,當<k3,v3>和<k2,v2>類型一致的時候,<k2,v2>類型可以不指定 37 job.setMapOutputKeyClass(Text.class); 38 job.setMapOutputValueClass(Text.class); 39 //7分區(默認1個),排序,分組,規約 采用 默認 40 job.setCombinerClass(MyReducer.class); 41 //接下來采用reduce步驟 42 //8指定自定義的reduce類 43 job.setReducerClass(MyReducer.class); 44 //9指定輸出的<k3,v3>類型 45 job.setOutputKeyClass(Text.class); 46 job.setOutputValueClass(Text.class); 47 //10指定輸出<K3,V3>的類 48 //*下面這一步可以省 49 job.setOutputFormatClass(TextOutputFormat.class); 50 //11指定輸出路徑 51 FileOutputFormat.setOutputPath(job, new Path(args[1])); 52 53 //12寫的mapreduce程序要交給resource manager運行 54 job.waitForCompletion(true); 55 } 56 private static class MyMapper extends Mapper<Object, Text, Text, Text>{ 57 private static Text line=new Text(); 58 @Override 59 protected void map(Object k1, Text v1, 60 Mapper<Object, Text, Text, Text>.Context context) 61 throws IOException, InterruptedException { 62 line=v1;//v1為每行數據,賦值給line 63 context.write(line, new Text("")); 64 } 65 } 66 private static class MyReducer extends Reducer<Text, Text, Text, Text> 67 { 68 @Override 69 protected void reduce(Text k2, Iterable<Text> v2s, 70 Reducer<Text, Text, Text, Text>.Context context) 71 throws IOException, InterruptedException { 72 context.write(k2, new Text("")); 73 } 74 } 75 }
運行結果:
[root@neusoft-master filecontent]# hadoop jar DataDeduplication.jar /neusoft/dedup_in /out12
[root@neusoft-master filecontent]# hadoop dfs -text /out12/part-r-00000
結果驗證正確~
注意:HDFS的顯示形式
[root@neusoft-master filecontent]# hadoop dfs -ls hdfs://neusoft-master:9000/out12
[root@neusoft-master filecontent]# hadoop dfs -ls /out12
等價表示形式
/out12的完整表達形式hdfs://neusoft-master:9000/out12