MapReduce-【流量匯總案例】(一)統計手機號耗費的總上行流量、下行流量、總流量(序列化)


統計手機號耗費的總上行流量、下行流量、總流量(序列化)

統計總上行流量、總下行流量。

數據准備

輸入數據格式:

數據格式:時間戳、電話號碼、基站的物理地址、訪問網址的ip、網站域名、數據包、接包數、上行/傳流量、下行/載流量、響應碼

輸出數據格式:

1356·0436666 1116       954 2070

手機號碼 上行流量        下行流量 總流量

分析-基本思路:

Map階段:

(1)讀取一行數據,切分字段

(2)抽取手機號、上行流量、下行流量

(3)以手機號為key,bean對象為value輸出,即context.write(手機號,bean);

Reduce階段:

(1)累加上行流量和下行流量得到總流量。

(2)實現自定義的bean來封裝流量信息,並將bean作為map輸出的key來傳輸

(3)MR程序在處理數據的過程中會對數據排序(map輸出的kv對傳輸到reduce之前,會排序),排序的依據是map輸出的key

所以,我們如果要實現自己需要的排序規則,則可以考慮將排序因素放到key中,讓key實現接口:Writable

一、封裝類LiuLiangBean-實現Writable接口-序列化

package liu.liang;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@Getter
@Setter
@NoArgsConstructor

/**
 * 封裝類LiuLiangBean-實現Writable接口-序列化,自定義的數據類型想要在Hadoop集群中傳遞,需要實現Hadoop的序列化框架
 */
public class LiuLiangBean implements Writable {
    //上行流量
    private long upflow;
    //下行流量
    private long downflow;
    //總流量
    private long sumflow;

    public LiuLiangBean(long upflow, long downflow) {
        this.upflow = upflow;
        this.downflow = downflow;
        this.sumflow = upflow+downflow;
    }

    /**  序列化-將我們要傳輸的數據序列化成字節流
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upflow);
        dataOutput.writeLong(downflow);
        dataOutput.writeLong(sumflow);
    }

    /**反序列化-從數據字節流中逐個恢復出各個字段 ,因為反射機制的需要,需要定義一個無參構造函數
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upflow = dataInput.readLong();
        this.downflow = dataInput.readLong();
        this.sumflow = dataInput.readLong();
    }

    @Override
    public String toString() {
        return this.upflow + "\t" + this.downflow + "\t" + sumflow;
    }
}

二、分隔類-繼承Mapper類

package liu.liang;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 分隔類-繼承Mapper類
 */
public class LiuLiangMapper extends Mapper<LongWritable, Text,Text,LiuLiangBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\t");
        //轉換類型:String--》Long
        long upflow = Long.parseLong(fields[fields.length - 3]);
        long downflow = Long.parseLong(fields[fields.length - 2]);
        //電話號碼作Key,上行流量和下行流量作Value
        context.write(new Text(fields[1]),new LiuLiangBean(upflow,downflow));
    }
}

三、統計總上行流量、總下行流量類

package liu.liang;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 統計總上行流量、總下行流量類
 */
public class LiuLiangReducer extends Reducer<Text,LiuLiangBean,Text,LiuLiangBean> {
    @Override
    protected void reduce(Text key, Iterable<LiuLiangBean> values, Context context) throws IOException, InterruptedException {
        long sumUpFlow = 0;
        long sumDownFlow = 0;
        for(LiuLiangBean value:values){
            sumUpFlow += value.getUpflow();
            sumDownFlow += value.getUpflow();
        }
        context.write(key,new LiuLiangBean(sumUpFlow,sumDownFlow));
    }
}

四、執行類

package liu.liang;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 執行類
 */
public class LiuLiangDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        long startTime = System.currentTimeMillis();

        args = new String[]{"D:/phone_data.txt", "D:/HDFS/p_d"};

        //1.獲取配置信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2.反射類
        job.setJarByClass(LiuLiangDriver.class);
        job.setMapperClass(LiuLiangMapper.class);
        job.setReducerClass(LiuLiangReducer.class);

        //3.Reduce輸入、輸出的K、V類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LiuLiangBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LiuLiangBean.class);

        //4.數據的輸入和輸出的指定目錄
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //5.提交job
        job.waitForCompletion(true);

        long endTime = System.currentTimeMillis();
        System.out.println("程序運行的時間為:"+(endTime-startTime));
    }
}

五、總結

注意

這里的map(LongWritable key, Text value, Context context)方法中的值:Text value 是針對一行數據進行的。

而reduce(Text key, Iterable<LiuLiangBean> values, Context context)方法中的值:Iterable<LiuLiangBean> values 是針對Map后的整個數據文件中的每一組<K、V>對,針對key的值的那部分數據進行操作的。

也就是說,這里的key唯一,因此總要用到for循環。這個區別可以根據value是否為復數(即:value/values)進行區分。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM