MapReduce分區和排序


一、排序

排序:

需求:根據用戶每月使用的流量按照使用的流量多少排序

接口-->WritableCompareable

    排序操作在hadoop中屬於默認的行為。默認按照字典殊勛排序。
    
排序的分類:

    1)部分排序
    
    2)全排序
    
    3)輔助排序
    
    4)二次排序
    
Combiner 合並

    父類Reducer
    局部匯總 ,減少網絡傳輸量 ,進而優化程序。
    
    注意:求平均值?
    
    3  5  7  2  6
    
    mapper: (3 + 5 + 7)/3 = 5
            (2 + 6)/2 = 4
            
    reducer:(5+4)/2
    
    只能應用在不影響最終業務邏輯的情況下

二、分區和排序實例

1.Mapper類

package com.css.flowsort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 1.獲取一行數據
        String line = value.toString();
        // 2.切割
        String[] fields = line.split("\t");
        // 3.取出關鍵字段
        long upFlow = Long.parseLong(fields[1]);
        long dfFlow = Long.parseLong(fields[2]);
        // 4.寫出到reducer階段
        context.write(new FlowBean(upFlow, dfFlow), new Text(fields[0]));
    }
}

2.Reducer類

package com.css.flowsort;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{

    @Override
    protected void reduce(FlowBean key, Iterable<Text> value, Context context)
            throws IOException, InterruptedException {     
        context.write(value.iterator().next(), key);
    }
}

3.封裝類

package com.css.flowsort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

// 封裝類 直接完成排序
public class FlowBean implements WritableComparable<FlowBean> {

    // 定義屬性
    private long upFlow;
    private long dfFlow;
    private long flowSum;
    
    // 無參構造
    public FlowBean() {        
    }
    
    // 有參構造
    public FlowBean(long upFlow,long dfFlow){
        this.upFlow = upFlow;
        this.dfFlow = dfFlow;
        this.flowSum = upFlow + dfFlow;
    }
    
    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDfFlow() {
        return dfFlow;
    }

    public void setDfFlow(long dfFlow) {
        this.dfFlow = dfFlow;
    }

    public long getFlowSum() {
        return flowSum;
    }

    public void setFlowSum(long flowSum) {
        this.flowSum = flowSum;
    }
    
    // 反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        dfFlow = in.readLong();
        flowSum = in.readLong();
    }

    // 序列化
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dfFlow);
        out.writeLong(flowSum);
    }
    
    @Override
    public String toString() {
        return upFlow + "\t" + dfFlow + "\t" + flowSum;
    }

    // 排序
    @Override
    public int compareTo(FlowBean o) {
        // 倒序
        return this.flowSum > o.getFlowSum() ? -1 : 1;
    }
}

4.自定義分區類

package com.css.flowsort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FlowSortPartitioner extends Partitioner<FlowBean, Text>{

    // 根據手機號前三位進行分區
    @Override
    public int getPartition(FlowBean key, Text value, int numPartitions) {
        // 獲取手機號前三位
        String phoneNum = value.toString().substring(0, 3);
        // 分區
        int partitioner = 4;
        if ("135".equals(phoneNum)) {
            return 0;
        }else if ("137".equals(phoneNum)) {
            return 1;
        }else if ("138".equals(phoneNum)) {
            return 2;
        }else if ("139".equals(phoneNum)) {
            return 3;
        }
        return partitioner;
    }
}

5.Driver類

package com.css.flowsort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowSortDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.獲取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        // 2.獲取jar包
        job.setJarByClass(FlowSortDriver.class);
        
        // 3.獲取自定義的mapper與reducer類
        job.setMapperClass(FlowSortMapper.class);
        job.setReducerClass(FlowSortReducer.class);
        
        // 4.設置map輸出的數據類型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        
        // 5.設置reduce輸出的數據類型(最終的數據類型)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        //添加自定義分區
        job.setPartitionerClass(FlowSortPartitioner.class);
        job.setNumReduceTasks(5);
        
        // 6.設置輸入存在的路徑與處理后的結果路徑
        FileInputFormat.setInputPaths(job, new Path("c:/flow1024/in"));
        FileOutputFormat.setOutputPath(job, new Path("c:/flow1024/out1"));
        
        // 7.提交任務
        boolean rs = job.waitForCompletion(true);
        System.out.println(rs ? 0 : 1);
    }
}

6.輸入的文件part-r-00000

13480253104    120    1320    1440
13502468823    735    11349    12084
13510439658    1116    954    2070
13560436326    1136    94    1230
13560436666    1136    94    1230
13560439658    918    4938    5856
13602846565    198    910    1108
13660577991    660    690    1350
13719199419    240    0    240
13726130503    299    681    980
13726238888    2481    24681    27162
13760778710    120    120    240
13822544101    264    0    264
13884138413    4116    1432    5548
13922314466    3008    3720    6728
13925057413    11058    4243    15301
13926251106    240    0    240
13926435656    132    1512    1644
15013685858    369    338    707
15889002119    938    380    1318
15920133257    316    296    612
18212575961    1527    2106    3633
18320173382    9531    212    9743

7.如果第5步Driver類中的紅色部分去掉,則輸出全局排序后的文件part-r-00000

13726238888    2481    24681    27162
13925057413    11058    4243    15301
13502468823    735    11349    12084
18320173382    9531    212    9743
13922314466    3008    3720    6728
13560439658    918    4938    5856
13884138413    4116    1432    5548
18212575961    1527    2106    3633
13510439658    1116    954    2070
13926435656    132    1512    1644
13480253104    120    1320    1440
13660577991    660    690    1350
15889002119    938    380    1318
13560436326    1136    94    1230
13560436666    1136    94    1230
13602846565    198    910    1108
13726130503    299    681    980
15013685858    369    338    707
15920133257    316    296    612
13822544101    264    0    264
13760778710    120    120    240
13719199419    240    0    240
13926251106    240    0    240

8.如果第5步Driver類中的紅色部分不去掉,則輸出分區加排序后的文件

(1)part-r-00000
13502468823    735    11349    12084
13560439658    918    4938    5856
13510439658    1116    954    2070
13560436666    1136    94    1230
13560436326    1136    94    12302)part-r-00001
13726238888    2481    24681    27162
13726130503    299    681    980
13760778710    120    120    240
13719199419    240    0    2403)part-r-00002
13884138413    4116    1432    5548
13822544101    264    0    2644)part-r-00003
13925057413    11058    4243    15301
13922314466    3008    3720    6728
13926435656    132    1512    1644
13926251106    240    0    2405)part-r-00004
18320173382    9531    212    9743
18212575961    1527    2106    3633
13480253104    120    1320    1440
13660577991    660    690    1350
15889002119    938    380    1318
13602846565    198    910    1108
15013685858    369    338    707
15920133257    316    296    612

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM