Mapreduce實例——去重


 

"數據去重"主要是為了掌握和利用並行化思想來對數據進行有意義的篩選。統計大數據集上的數據種類個數、從網站日志中計算訪問地等這些看似龐雜的任務都會涉及數據去重。

MaprReduce去重流程如下圖所示:

數據去重的最終目標是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。在MapReduce流程中,map的輸出<key,value>經過shuffle過程聚集成<key,value-list>后交給reduce。我們自然而然會想到將同一個數據的所有記錄都交給一台reduce機器,無論這個數據出現多少次,只要在最終結果中輸出一次就可以了。具體就是reduce的輸入應該以數據作為key,而對value-list則沒有要求(可以設置為空)。當reduce接收到一個<key,value-list>時就直接將輸入的key復制到輸出的key中,並將value設置成空值,然后輸出<key,value>。

實驗環境

Linux Ubuntu 14.04

jdk-7u75-linux-x64

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

實驗內容

現有一個某電商網站的數據文件,名為buyer_favorite1,記錄了用戶收藏的商品以及收藏的日期,文件buyer_favorite1中包含(用戶id,商品id,收藏日期)三個字段,數據內容以"\t"分割,由於數據很大,所以為了方便統計我們只截取它的一部分數據,內容如下:

  1. 用戶id   商品id    收藏日期  
  2. 10181   1000481   2010-04-04 16:54:31  
  3. 20001   1001597   2010-04-07 15:07:52  
  4. 20001   1001560   2010-04-07 15:08:27  
  5. 20042   1001368   2010-04-08 08:20:30  
  6. 20067   1002061   2010-04-08 16:45:33  
  7. 20056   1003289   2010-04-12 10:50:55  
  8. 20056   1003290   2010-04-12 11:57:35  
  9. 20056   1003292   2010-04-12 12:05:29  
  10. 20054   1002420   2010-04-14 15:24:12  
  11. 20055   1001679   2010-04-14 19:46:04  
  12. 20054   1010675   2010-04-14 15:23:53  
  13. 20054   1002429   2010-04-14 17:52:45  
  14. 20076   1002427   2010-04-14 19:35:39  
  15. 20054   1003326   2010-04-20 12:54:44  
  16. 20056   1002420   2010-04-15 11:24:49  
  17. 20064   1002422   2010-04-15 11:35:54  
  18. 20056   1003066   2010-04-15 11:43:01  
  19. 20056   1003055   2010-04-15 11:43:06  
  20. 20056   1010183   2010-04-15 11:45:24  
  21. 20056   1002422   2010-04-15 11:45:49  
  22. 20056   1003100   2010-04-15 11:45:54  
  23. 20056   1003094   2010-04-15 11:45:57  
  24. 20056   1003064   2010-04-15 11:46:04  
  25. 20056   1010178   2010-04-15 16:15:20  
  26. 20076   1003101   2010-04-15 16:37:27  
  27. 20076   1003103   2010-04-15 16:37:05  
  28. 20076   1003100   2010-04-15 16:37:18  
  29. 20076   1003066   2010-04-15 16:37:31  
  30. 20054   1003103   2010-04-15 16:40:14  
  31. 20054   1003100   2010-04-15 16:40:16  

要求用Java編寫MapReduce程序,根據商品id進行去重,統計用戶收藏商品中都有哪些商品被收藏。結果數據如下:

  1. 商品id  
  2. 1000481  
  3. 1001368  
  4. 1001560  
  5. 1001597  
  6. 1001679  
  7. 1002061  
  8. 1002420  
  9. 1002422  
  10. 1002427  
  11. 1002429  
  12. 1003055  
  13. 1003064  
  14. 1003066  
  15. 1003094  
  16. 1003100  
  17. 1003101  
  18. 1003103  
  19. 1003289  
  20. 1003290  
  21. 1003292  
  22. 1003326  
  23. 1010178  
  24. 1010183  
  25. 1010675  

實驗步驟

1.切換到/apps/hadoop/sbin目錄下,開啟Hadoop。

  1. cd /apps/hadoop/sbin  
  2. ./start-all.sh  

2.在Linux本地新建/data/mapreduce2目錄。

  1. mkdir -p /data/mapreduce2  

3.切換到/data/mapreduce1目錄下,自行建立文本文件buyer_favorite1。

依然在/data/mapreduce1目錄下,使用wget命令,從

網絡下載hadoop2lib.tar.gz,下載項目用到的依賴包。

將hadoop2lib.tar.gz解壓到當前目錄下。

  1. tar -xzvf hadoop2lib.tar.gz  

4.首先在HDFS上新建/mymapreduce2/in目錄,然后將Linux本地/data/mapreduce2目錄下的buyer_favorite1文件導入到HDFS的/mymapreduce2/in目錄中。

view plain copy

  1. hadoop fs -mkdir -p /mymapreduce2/in  
  2. hadoop fs -put /data/mapreduce2/buyer_favorite1 /mymapreduce2/in  

5.新建Java Project項目,項目名為mapreduce2。

在mapreduce2項目下新建包,包名為mapreduce。

在mapreduce包下新建類,類名為Filter。

6.添加項目所需依賴的jar包

右鍵項目,新建一個文件夾,命名為:hadoop2lib,用於存放項目所需的jar包。

將/data/mapreduce2目錄下,hadoop2lib目錄中的jar包,拷貝到eclipse中mapreduce2項目的hadoop2lib目錄下。

選中所有項目hadoop2lib目錄下所有jar包,並添加到Build Path中。

7.編寫程序代碼,並描述其思路

數據去重的目的是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。我們自然想到將相同key值的所有value記錄交到一台reduce機器,讓其無論這個數據出現多少次,最終結果只輸出一次。具體就是reduce的輸出應該以數據作為key,而對value-list沒有要求,當reduce接收到一個時,就直接將key復制到輸出的key中,將value設置為空。

Map代碼

  1. public static class Map extends Mapper<Object , Text , Text , NullWritable>  
  2.     //map將輸入中的value復制到輸出數據的key上,並直接輸出  
  3.     {  
  4.     private static Text newKey=new Text();      //從輸入中得到的每行的數據的類型  
  5.     public void map(Object key,Text value,Context context) throws IOException, InterruptedException  
  6.     //實現map函數  
  7.     {             //獲取並輸出每一次的處理過程  
  8.     String line=value.toString();  
  9.     System.out.println(line);  
  10.     String arr[]=line.split("\t");  
  11.     newKey.set(arr[1]);  
  12.     context.write(newKey, NullWritable.get());  
  13.     System.out.println(newKey);  
  14.     }  
  15.     }  

map階段采用Hadoop的默認的作業輸入方式,把輸入的value用split()方法截取,截取出的商品id字段設置為key,設置value為空,然后直接輸出<key,value>。

reduce端代碼

  1. public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{  
  2.         public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException  
  3.     //實現reduce函數  
  4.     {  
  5.     context.write(key,NullWritable.get());   //獲取並輸出每一次的處理過程  
  6.     }  
  7.     }  

map輸出的<key,value>鍵值對經過shuffle過程,聚成<key,value-list>后,會交給reduce函數。reduce函數,不管每個key 有多少個value,它直接將輸入的賦值給輸出的key,將輸出的value設置為空,然后輸出<key,value>就可以了。

完整代碼

  1. package mapreduce;  
  2. import java.io.IOException;  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.NullWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Job;  
  9. import org.apache.hadoop.mapreduce.Mapper;  
  10. import org.apache.hadoop.mapreduce.Reducer;  
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  14. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  15. public class Filter{  
  16.     public static class Map extends Mapper<Object , Text , Text , NullWritable>{  
  17.     private static Text newKey=new Text();  
  18.     public void map(Object key,Text value,Context context) throws IOException, InterruptedException{  
  19.     String line=value.toString();  
  20.     System.out.println(line);  
  21.     String arr[]=line.split("\t");  
  22.     newKey.set(arr[1]);  
  23.     context.write(newKey, NullWritable.get());  
  24.     System.out.println(newKey);  
  25.     }  
  26.     }  
  27.     public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{  
  28.     public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException{  
  29.         context.write(key,NullWritable.get());  
  30.         }  
  31.         }  
  32.         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{  
  33.         Configuration conf=new Configuration();  
  34.         System.out.println("start");  
  35.         Job job =new Job(conf,"filter");  
  36.         job.setJarByClass(Filter.class);  
  37.         job.setMapperClass(Map.class);  
  38.         job.setReducerClass(Reduce.class);  
  39.         job.setOutputKeyClass(Text.class);  
  40.         job.setOutputValueClass(NullWritable.class);  
  41.         job.setInputFormatClass(TextInputFormat.class);  
  42.         job.setOutputFormatClass(TextOutputFormat.class);  
  43.         Path in=new Path("hdfs://localhost:9000/mymapreduce2/in/buyer_favorite1");  
  44.         Path out=new Path("hdfs://localhost:9000/mymapreduce2/out");  
  45.         FileInputFormat.addInputPath(job,in);  
  46.         FileOutputFormat.setOutputPath(job,out);  
  47.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  48.         }  
  49.         }  

8.在Filter類文件中,右鍵並點擊=>Run As=>Run on Hadoop選項,將MapReduce任務提交到Hadoop中。

9.待執行完畢后,進入命令模式下,在HDFS中/mymapreduce2/out查看實驗結果。

  1. hadoop fs -ls /mymapreduce2/out  
  2. hadoop fs -cat /mymapreduce2/out/part-r-00000  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM