Hadoop案例(八)輔助排序和二次排序案例(GroupingComparator)


輔助排序和二次排序案例(GroupingComparator)

1.需求

有如下訂單數據

訂單id

商品id

成交金額

0000001

Pdt_01

222.8

0000001

Pdt_05

25.8

0000002

Pdt_03

522.8

0000002

Pdt_04

122.4

0000002

Pdt_05

722.4

0000003

Pdt_01

222.8

0000003

Pdt_02

33.8

現在需要求出每一個訂單中最貴的商品。

2.數據准備

GroupingComparator.txt

   Pdt_01    222.8
   Pdt_05    722.4
   Pdt_05    25.8
   Pdt_01    222.8
   Pdt_01    33.8
   Pdt_03    522.8
   Pdt_04    122.4

輸出數據預期:

3    222.8
part-r-00000.txt
2    722.4
part-r-00001.txt
1    222.8
part-r-00002.txt

3.分析

(1)利用“訂單id和成交金額”作為key,可以將map階段讀取到的所有訂單數據按照id分區,按照金額排序,發送到reduce。

(2)在reduce端利用groupingcomparator將訂單id相同的kv聚合成組,然后取第一個即是最大值。

 

4.實現

定義訂單信息OrderBean

package com.xyg.mapreduce.order;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class OrderBean implements WritableComparable<OrderBean> {

    private int order_id; // 訂單id號
    private double price; // 價格

    public OrderBean() {
        super();
    }

    public OrderBean(int order_id, double price) {
        super();
        this.order_id = order_id;
        this.price = price;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(order_id);
        out.writeDouble(price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        order_id = in.readInt();
        price = in.readDouble();
    }

    @Override
    public String toString() {
        return order_id + "\t" + price;
    }

    public int getOrder_id() {
        return order_id;
    }

    public void setOrder_id(int order_id) {
        this.order_id = order_id;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    // 二次排序
    @Override
    public int compareTo(OrderBean o) {

        int result = order_id > o.getOrder_id() ? 1 : -1;

        if (order_id > o.getOrder_id()) {
            result = 1;
        } else if (order_id < o.getOrder_id()) {
            result = -1;
        } else {
            // 價格倒序排序
            result = price > o.getPrice() ? -1 : 1;
        }

        return result;
    }
}

編寫OrderSortMapper處理流程

package com.xyg.mapreduce.order;
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { OrderBean k = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 獲取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("\t"); // 3 封裝對象 k.setOrder_id(Integer.parseInt(fields[0])); k.setPrice(Double.parseDouble(fields[2])); // 4 寫出 context.write(k, NullWritable.get()); } }

編寫OrderSortReducer處理流程

package com.xyg.mapreduce.order;
import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }

編寫OrderSortDriver處理流程

package com.xyg.mapreduce.order;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderDriver {

    public static void main(String[] args) throws Exception, IOException {

        // 1 獲取配置信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 設置jar包加載路徑
        job.setJarByClass(OrderDriver.class);

        // 3 加載map/reduce類
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);

        // 4 設置map輸出數據key和value類型
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 5 設置最終輸出數據的key和value類型
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 6 設置輸入數據和輸出數據路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 10 設置reduce端的分組
        job.setGroupingComparatorClass(OrderGroupingComparator.class);

        // 7 設置分區
        job.setPartitionerClass(OrderPartitioner.class);

        // 8 設置reduce個數
        job.setNumReduceTasks(3);

        // 9 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

OrderSortDriver

編寫OrderSortPartitioner處理流程

package com.xyg.mapreduce.order;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> { @Override public int getPartition(OrderBean key, NullWritable value, int numReduceTasks) { return (key.getOrder_id() & Integer.MAX_VALUE) % numReduceTasks; } }

編寫OrderSortGroupingComparator處理流程

package com.xyg.mapreduce.order;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderGroupingComparator extends WritableComparator { protected OrderGroupingComparator() { super(OrderBean.class, true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b; int result; if (aBean.getOrder_id() > bBean.getOrder_id()) { result = 1; } else if (aBean.getOrder_id() < bBean.getOrder_id()) { result = -1; } else { result = 0; } return result; } }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM