Hadoop案例(九)流量匯總案例


流量匯總程序案例

1.自定義輸出

統計手機號耗費的總上行流量、下行流量、總流量(序列化)

1)需求: 統計每一個手機號耗費的總上行流量、下行流量、總流量

2)數據准備 phone_date.txt

    13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
    13826544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4            4    0    264    0    200
    13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200
    13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200
    18211575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    視頻網站    15    12    1527    2106    200
    84138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200
    13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
    15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    3156    2936    200
    13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200
    13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站點統計    24    9    6960    690    200
    15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    3659    3538    200
    15989002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站點統計    3    3    1938    180    200
    13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
    13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
    13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    綜合門戶    15    12    1938    2910    200
    13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200
    13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    綜合門戶    57    102    7335    110349    200
    18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    2412    200
    13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    48243    200
    13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200
    13560436666    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
    13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200

輸入數據格式:      

1363157993055     13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99        18    15    1116        954        200
                   手機號碼                                                                   上行流量    下行流量

 輸出數據格式

13560436666     1116              954         2070
手機號碼        上行流量        下行流量        總流量

3)分析

基本思路:

Map階段:

(1)讀取一行數據,切分字段

(2)抽取手機號、上行流量、下行流量

(3)以手機號為key,bean對象為value輸出,即context.write(手機號,bean);

Reduce階段:

(1)累加上行流量和下行流量得到總流量。

(2)實現自定義的bean來封裝流量信息,並將bean作為map輸出的key來傳輸

(3)MR程序在處理數據的過程中會對數據排序(map輸出的kv對傳輸到reduce之前,會排序),排序的依據是map輸出的key

所以,我們如果要實現自己需要的排序規則,則可以考慮將排序因素放到key中,讓key實現接口:WritableComparable。

然后重寫key的compareTo方法。

4)編寫mapreduce程序

(1)編寫流量統計的bean對象

package com.xyg.mr.flowsum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

// bean對象要實例化
public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 反序列化時,需要反射調用空參構造函數,所以必須有
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    /**
     * 序列化方法
     * 
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 反序列化方法 
注意反序列化的順序和序列化的順序完全一致
     * 
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}

(2)編寫mapreduce主程序

package com.xyg.mr.flowsum;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCount {

    static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 1 將一行內容轉成string
            String ling = value.toString();

            // 2 切分字段
            String[] fields = ling.split("\t");

            // 3 取出手機號碼
            String phoneNum = fields[1];

            // 4 取出上行流量和下行流量
            long upFlow = Long.parseLong(fields[fields.length - 3]);
            long downFlow = Long.parseLong(fields[fields.length - 2]);

            // 5 寫出數據
            context.write(new Text(phoneNum), new FlowBean(upFlow, downFlow));
        }
    }

    static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context)
                throws IOException, InterruptedException {
            long sum_upFlow = 0;
            long sum_downFlow = 0;

            // 1 遍歷所用bean,將其中的上行流量,下行流量分別累加
            for (FlowBean bean : values) {
                sum_upFlow += bean.getUpFlow();
                sum_downFlow += bean.getDownFlow();
            }

            // 2 封裝對象
            FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
            context.write(key, resultBean);
        }
    }

    public static void main(String[] args) throws Exception {
        // 1 獲取配置信息,或者job對象實例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 6 指定本程序的jar包所在的本地路徑
        job.setJarByClass(FlowCount.class);

        // 2 指定本業務job要使用的mapper/Reducer業務類
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 3 指定mapper輸出數據的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 4 指定最終輸出的數據的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 5 指定job的輸入原始文件所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

(3)將程序打成jar包,然后拷貝到hadoop集群中。

(4)啟動hadoop集群(3)將程序打成jar包,然后拷貝到hadoop集群中。

(5)執行flowcount程序

[root@node21 ~]$ hadoop jar flowcount.jar com.xyg.mr.flowsum.FlowCount /user/root/flowcount/input/ /user/root/flowcount/output

(6)查看結果

[root@node21 ~]$ hadoop fs -cat /user/root/flowcount/output/part-r-00000

13480253104 FlowBean [upFlow=180, downFlow=180, sumFlow=360]

13502468823 FlowBean [upFlow=7335, downFlow=110349, sumFlow=117684]

13560436666 FlowBean [upFlow=1116, downFlow=954, sumFlow=2070]

13560439658 FlowBean [upFlow=2034, downFlow=5892, sumFlow=7926]

13602846565 FlowBean [upFlow=1938, downFlow=2910, sumFlow=4848]

。。。

2.自定義分區

將統計結果按照手機歸屬地不同省份輸出到不同文件中(Partitioner)

0)需求:將統計結果按照手機歸屬地不同省份輸出到不同文件中(分區)

1)數據准備  phone_date.txt

2)分析

(1)Mapreduce中會將map輸出的kv對,按照相同key分組,然后分發給不同的reducetask。默認的分發規則為:根據key的hashcode%reducetask數來分發

(2)如果要按照我們自己的需求進行分組,則需要改寫數據分發(分組)組件Partitioner

自定義一個CustomPartitioner繼承抽象類:Partitioner

(3)在job驅動中,設置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)

3)在需求1的基礎上,增加一個分區類

package com.xyg.mr.partitioner;
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * K2 V2 對應的是map輸出kv類型
* @author Administrator
*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 獲取電話號碼的前三位
        String preNum = key.toString().substring(0, 3);
        
        int partition = 4;
        
        // 2 判斷是哪個省
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else if ("139".equals(preNum)) {
            partition = 3;
        }
        return partition;
    }
}

2)在驅動函數中增加自定義數據分區設置和reduce task設置

public static void main(String[] args) throws Exception {
        // 1 獲取配置信息,或者job對象實例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 6 指定本程序的jar包所在的本地路徑
        job.setJarByClass(FlowCount.class);

        // 8 指定自定義數據分區
        job.setPartitionerClass(ProvincePartitioner.class);
        
        // 9 同時指定相應數量的reduce task
        job.setNumReduceTasks(5); 
        
        // 2 指定本業務job要使用的mapper/Reducer業務類
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 3 指定mapper輸出數據的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 4 指定最終輸出的數據的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 5 指定job的輸入原始文件所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

3)將程序打成jar包,然后拷貝到hadoop集群中。

4)啟動hadoop集群

5)執行flowcountPartitionser程序

[root@node21 ~]$ hadoop jar flowcountPartitionser.jar com.xyg.mr.partitioner.FlowCount /user/root/flowcount/input /user/root/flowcount/output

6)查看結果

[root@node21 ~]]$ hadoop fs -lsr /

/user/root/flowcount/output/part-r-00000

/user/root/flowcount/output/part-r-00001

/user/root/flowcount/output/part-r-00002

/user/root/flowcount/output/part-r-00003

/user/root/flowcount/output/part-r-00004

3.自定義全排序

將統計結果按照總流量倒序排序(全排序)

0)需求  根據需求1產生的結果再次對總流量進行排序。

1)數據准備      phone_date.txt

2)分析

(1)把程序分兩步走,第一步正常統計總流量,第二步再把結果進行排序

(2)context.write(總流量,手機號)

(3)FlowBean實現WritableComparable接口重寫compareTo方法

@Override
public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
package com.xyg.mr.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 反序列化時,需要反射調用空參構造函數,所以必須有
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    /**
     * 序列化方法
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 反序列化方法 注意反序列化的順序和序列化的順序完全一致
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    @Override
    public int compareTo(FlowBean o) {
        // 倒序排列,從大到小
        return this.sumFlow > o.getSumFlow() ? -1 : 1;
    }
}

4)Map方法優化為一個對象,reduce方法則直接輸出結果即可,驅動函數根據輸入輸出重寫配置即可。3)FlowBean對象在在需求1基礎上增加了比較功能

package com.xyg.mr.sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountSort {
    static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
        FlowBean bean = new FlowBean();
        Text v = new Text();
        
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            
            // 1 拿到的是上一個統計程序輸出的結果,已經是各手機號的總流量信息
            String line = value.toString();
            
            // 2 截取字符串並獲取電話號、上行流量、下行流量
            String[] fields = line.split("\t");
            String phoneNbr = fields[0];
            
            long upFlow = Long.parseLong(fields[1]);
            long downFlow = Long.parseLong(fields[2]);
            
            // 3 封裝對象
            bean.set(upFlow, downFlow);
            v.set(phoneNbr);
            
            // 4 輸出
            context.write(bean, v);
        }
    }
    
    static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
        
        @Override
        protected void reduce(FlowBean bean, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            context.write(values.iterator().next(), bean);
        }
    }
    
    public static void main(String[] args) throws Exception {
        // 1 獲取配置信息,或者job對象實例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 6 指定本程序的jar包所在的本地路徑
        job.setJarByClass(FlowCountSort.class);

        // 2 指定本業務job要使用的mapper/Reducer業務類
        job.setMapperClass(FlowCountSortMapper.class);
        job.setReducerClass(FlowCountSortReducer.class);

        // 3 指定mapper輸出數據的kv類型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        // 4 指定最終輸出的數據的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 5 指定job的輸入原始文件所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        
        Path outPath = new Path(args[1]);
//        FileSystem fs = FileSystem.get(configuration);
//        if (fs.exists(outPath)) {
//            fs.delete(outPath, true);
//        }
        FileOutputFormat.setOutputPath(job, outPath);

        // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

5)將程序打成jar包,然后拷貝到hadoop集群中。

6)啟動hadoop集群5)將程序打成jar包,然后拷貝到hadoop集群中。

7)執行flowcountsort程序

[root@node21 module]$ hadoop jar flowcountsort.jar com.xyg.mr.sort.FlowCountSort /user/root/flowcount/output /user/root/flowcount/output_sort

8)查看結果

[root@node21 module]$ hadoop fs -cat /user/flowcount/output_sort/part-r-00000

13502468823 7335 110349 117684

13925057413 11058 48243 59301

13726238888 2481 24681 27162

13726230503 2481 24681 27162

18320173382 9531 2412 11943

4.自定義局部排序

不同省份輸出文件內部排序(部分排序)

1)需求   要求每個省份手機號輸出的文件中按照總流量內部排序。

2)分析   基於需求3,增加自定義分區類即可。

3)案例實操

(1)增加自定義分區類

package com.xyg.reduce.flowsort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FlowSortPartitioner extends Partitioner<FlowBean, Text> {

@Override
public int getPartition(FlowBean key, Text value, int numPartitions) {

int partition = 0;
String preNum = value.toString().substring(0, 3);

if (" ".equals(preNum)) {
partition = 5;
} else {
if ("136".equals(preNum)) {
partition = 1;
} else if ("137".equals(preNum)) {
partition = 2;
} else if ("138".equals(preNum)) {
partition = 3;
} else if ("139".equals(preNum)) {
partition = 4;
}
}
return partition;
 }
}

(2)在驅動類中添加分區類

job.setPartitionerClass(FlowSortPartitioner.class);
job.setNumReduceTasks(5);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM