Hadoop學習之路(二十七)MapReduce的API使用(四)


第一題

下面是三種商品的銷售數據 

要求:根據以上數據,用 MapReduce 統計出如下數據:

1、每種商品的銷售總金額,並降序排序

2、每種商品銷售額最多的三周

 

 

第二題:MapReduce 題

現有如下數據文件需要處理:

格式:CSV

數據樣例:

user_a,location_a,2018-01-01 08:00:00,60

user_a,location_a,2018-01-01 09:00:00,60

user_a,location_b,2018-01-01 10:00:00,60

user_a,location_a,2018-01-01 11:00:00,60

字段:用戶 ID,位置 ID,開始時間,停留時長(分鍾)

數據意義:某個用戶在某個位置從某個時刻開始停留了多長時間

處理邏輯: 對同一個用戶,在同一個位置,連續的多條記錄進行合並

合並原則:開始時間取最早的,停留時長加和

要求:請編寫 MapReduce 程序實現

其他:只有數據樣例,沒有數據。

UserLocationMR.java

  1 /**
  2 測試數據:
  3 user_a    location_a    2018-01-01 08:00:00    60
  4 user_a    location_a    2018-01-01 09:00:00    60
  5 user_a    location_a    2018-01-01 11:00:00    60
  6 user_a    location_a    2018-01-01 12:00:00    60
  7 user_a    location_b    2018-01-01 10:00:00    60
  8 user_a    location_c    2018-01-01 08:00:00    60
  9 user_a    location_c    2018-01-01 09:00:00    60
 10 user_a    location_c    2018-01-01 10:00:00    60
 11 user_b    location_a    2018-01-01 15:00:00    60
 12 user_b    location_a    2018-01-01 16:00:00    60
 13 user_b    location_a    2018-01-01 18:00:00    60
 14 
 15 
 16 結果數據:
 17 user_a    location_a    2018-01-01 08:00:00    120
 18 user_a    location_a    2018-01-01 11:00:00    120
 19 user_a    location_b    2018-01-01 10:00:00    60
 20 user_a    location_c    2018-01-01 08:00:00    180
 21 user_b    location_a    2018-01-01 15:00:00    120
 22 user_b    location_a    2018-01-01 18:00:00    60
 23 
 24 
 25  */
 26 public class UserLocationMR {
 27 
 28     public static void main(String[] args) throws Exception {
 29         // 指定hdfs相關的參數
 30         Configuration conf = new Configuration();
 31         //        conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
 32         //        System.setProperty("HADOOP_USER_NAME", "hadoop");
 33 
 34         Job job = Job.getInstance(conf);
 35         // 設置jar包所在路徑
 36         job.setJarByClass(UserLocationMR.class);
 37 
 38         // 指定mapper類和reducer類
 39         job.setMapperClass(UserLocationMRMapper.class);
 40         job.setReducerClass(UserLocationMRReducer.class);
 41 
 42         // 指定maptask的輸出類型
 43         job.setMapOutputKeyClass(UserLocation.class);
 44         job.setMapOutputValueClass(NullWritable.class);
 45         // 指定reducetask的輸出類型
 46         job.setOutputKeyClass(UserLocation.class);
 47         job.setOutputValueClass(NullWritable.class);
 48 
 49         job.setGroupingComparatorClass(UserLocationGC.class);
 50 
 51         // 指定該mapreduce程序數據的輸入和輸出路徑
 52         Path inputPath = new Path("D:\\武文\\second\\input");
 53         Path outputPath = new Path("D:\\武文\\second\\output2");
 54         FileSystem fs = FileSystem.get(conf);
 55         if (fs.exists(outputPath)) {
 56             fs.delete(outputPath, true);
 57         }
 58         FileInputFormat.setInputPaths(job, inputPath);
 59         FileOutputFormat.setOutputPath(job, outputPath);
 60 
 61         // 最后提交任務
 62         boolean waitForCompletion = job.waitForCompletion(true);
 63         System.exit(waitForCompletion ? 0 : 1);
 64     }
 65 
 66     private static class UserLocationMRMapper extends Mapper<LongWritable, Text, UserLocation, NullWritable> {
 67 
 68         UserLocation outKey = new UserLocation();
 69 
 70         /**
 71          * value = user_a,location_a,2018-01-01 12:00:00,60
 72          */
 73         @Override
 74         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 75 
 76             String[] split = value.toString().split(",");
 77 
 78             outKey.set(split);
 79 
 80             context.write(outKey, NullWritable.get());
 81         }
 82     }
 83 
 84     private static class UserLocationMRReducer extends Reducer<UserLocation, NullWritable, UserLocation, NullWritable> {
 85 
 86         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 87 
 88         UserLocation outKey = new UserLocation();
 89 
 90         /**
 91          * user_a    location_a    2018-01-01 08:00:00    60
 92          * user_a    location_a    2018-01-01 09:00:00    60
 93          * user_a    location_a    2018-01-01 11:00:00    60
 94          * user_a    location_a    2018-01-01 12:00:00    60
 95          */
 96         @Override
 97         protected void reduce(UserLocation key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
 98 
 99             int count = 0;
100             for (NullWritable nvl : values) {
101                 count++;
102                 // 如果是這一組key-value中的第一個元素時,直接賦值給outKey對象。基礎對象
103                 if (count == 1) {
104                     // 復制值
105                     outKey.set(key);
106                 } else {
107 
108                     // 有可能連續,有可能不連續,  連續則繼續變量, 否則輸出
109                     long current_timestamp = 0;
110                     long last_timestamp = 0;
111                     try {
112                         // 這是新遍歷出來的記錄的時間戳
113                         current_timestamp = sdf.parse(key.getTime()).getTime();
114                         // 這是上一條記錄的時間戳 和 停留時間之和
115                         last_timestamp = sdf.parse(outKey.getTime()).getTime() + outKey.getDuration() * 60 * 1000;
116                     } catch (ParseException e) {
117                         e.printStackTrace();
118                     }
119 
120                     // 如果相等,證明是連續記錄,所以合並
121                     if (current_timestamp == last_timestamp) {
122 
123                         outKey.setDuration(outKey.getDuration() + key.getDuration());
124 
125                     } else {
126 
127                         // 先輸出上一條記錄
128                         context.write(outKey, nvl);
129 
130                         // 然后再次記錄當前遍歷到的這一條記錄
131                         outKey.set(key);
132                     }
133                 }
134             }
135             // 最后無論如何,還得輸出最后一次
136             context.write(outKey, NullWritable.get());
137         }
138     }
139 }
View Code

UserLocation.java

  1 public class UserLocation implements WritableComparable<UserLocation> {
  2 
  3     private String userid;
  4     private String locationid;
  5     private String time;
  6     private long duration;
  7 
  8     @Override
  9     public String toString() {
 10         return userid + "\t" + locationid + "\t" + time + "\t" + duration;
 11     }
 12 
 13     public UserLocation() {
 14         super();
 15     }
 16     
 17     public void set(String[] split){
 18         this.setUserid(split[0]);
 19         this.setLocationid(split[1]);
 20         this.setTime(split[2]);
 21         this.setDuration(Long.parseLong(split[3]));
 22     }
 23     
 24     public void set(UserLocation ul){
 25         this.setUserid(ul.getUserid());
 26         this.setLocationid(ul.getLocationid());
 27         this.setTime(ul.getTime());
 28         this.setDuration(ul.getDuration());
 29     }
 30 
 31     public UserLocation(String userid, String locationid, String time, long duration) {
 32         super();
 33         this.userid = userid;
 34         this.locationid = locationid;
 35         this.time = time;
 36         this.duration = duration;
 37     }
 38 
 39     public String getUserid() {
 40         return userid;
 41     }
 42 
 43     public void setUserid(String userid) {
 44         this.userid = userid;
 45     }
 46 
 47     public String getLocationid() {
 48         return locationid;
 49     }
 50 
 51     public void setLocationid(String locationid) {
 52         this.locationid = locationid;
 53     }
 54 
 55     public String getTime() {
 56         return time;
 57     }
 58 
 59     public void setTime(String time) {
 60         this.time = time;
 61     }
 62 
 63     public long getDuration() {
 64         return duration;
 65     }
 66 
 67     public void setDuration(long duration) {
 68         this.duration = duration;
 69     }
 70 
 71     @Override
 72     public void write(DataOutput out) throws IOException {
 73         // TODO Auto-generated method stub
 74         out.writeUTF(userid);
 75         out.writeUTF(locationid);
 76         out.writeUTF(time);
 77         out.writeLong(duration);
 78     }
 79 
 80     @Override
 81     public void readFields(DataInput in) throws IOException {
 82         // TODO Auto-generated method stub
 83         this.userid = in.readUTF();
 84         this.locationid = in.readUTF();
 85         this.time = in.readUTF();
 86         this.duration = in.readLong();
 87     }
 88 
 89     /**
 90      * 排序規則
 91      * 
 92      * 按照 userid  locationid  和  time 排序  都是 升序
 93      */
 94     @Override
 95     public int compareTo(UserLocation o) {
 96 
 97         int diff_userid = o.getUserid().compareTo(this.getUserid());
 98         if(diff_userid == 0){
 99             
100             int diff_location = o.getLocationid().compareTo(this.getLocationid());
101             if(diff_location == 0){
102                 
103                 int diff_time = o.getTime().compareTo(this.getTime());
104                 if(diff_time == 0){
105                     return 0;
106                 }else{
107                     return diff_time > 0 ? -1 : 1;
108                 }
109                 
110             }else{
111                 return diff_location > 0 ? -1 : 1;
112             }
113             
114         }else{
115             return diff_userid > 0 ? -1 : 1;
116         }
117     }
118 }
View Code

UserLocationGC.java

 1 public class UserLocationGC extends WritableComparator{
 2     
 3     public UserLocationGC(){
 4         super(UserLocation.class, true);
 5     }
 6 
 7     @Override
 8     public int compare(WritableComparable a, WritableComparable b) {
 9 
10         UserLocation ul_a = (UserLocation)a;
11         UserLocation ul_b = (UserLocation)b;
12 
13         int diff_userid = ul_a.getUserid().compareTo(ul_b.getUserid());
14         if(diff_userid == 0){
15             
16             int diff_location = ul_a.getLocationid().compareTo(ul_b.getLocationid());
17             if(diff_location == 0){
18                 
19                 return 0;
20                 
21             }else{
22                 return diff_location > 0 ? -1 : 1;
23             }
24             
25         }else{
26             return diff_userid > 0 ? -1 : 1;
27         }
28     }
29 }
View Code

 

 

 

 

第三題:MapReduce 題--倒排索引

概念: 倒排索引(Inverted Index),也常被稱為反向索引、置入檔案或反向檔案,是一種索引方法, 被用來存儲在全文搜索下某個單詞在一個文檔或者一組文檔中的存儲位置的映射。它是文檔 檢索系統中最常用的數據結構。了解詳情可自行百度

有兩份數據:

mapreduce-4-1.txt

huangbo love xuzheng
huangxiaoming love baby huangxiaoming love yangmi
liangchaowei love liujialing
huangxiaoming xuzheng huangbo wangbaoqiang

 

mapreduce-4-2.txt

hello huangbo
hello xuzheng
hello huangxiaoming

題目一:編寫 MapReduce 求出以下格式的結果數據:統計每個關鍵詞在每個文檔中當中的 第幾行出現了多少次 例如,huangxiaoming 關鍵詞的格式:

huangixaoming mapreduce-4-1.txt:2,2; mapreduce-4-1.txt:4,1;mapreduce-4-2.txt:3,1

 

以上答案的意義:

關鍵詞 huangxiaoming 在第一份文檔 mapreduce-4-1.txt 中的第 2 行出現了 2 次
關鍵詞 huangxiaoming 在第一份文檔 mapreduce-4-1.txt 中的第 4 行出現了 1 次
關鍵詞 huangxiaoming 在第二份文檔 mapreduce-4-2.txt 中的第 3 行出現了 1 次

題目二:編寫 MapReduce 程序求出每個關鍵詞在每個文檔出現了多少次,並且按照出現次 數降序排序

例如:

huangixaoming mapreduce-4-1.txt,3;mapreduce-4-2.txt,1

以上答案的含義: 表示關鍵詞 huangxiaoming 在第一份文檔 mapreduce-4-1.txt 中出現了 3 次,在第二份文檔mapreduce-4-2.txt 中出現了 1 次

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM