流量匯總程序案例
1.自定義輸出
統計手機號耗費的總上行流量、下行流量、總流量(序列化)
1)需求: 統計每一個手機號耗費的總上行流量、下行流量、總流量
2)數據准備 phone_date.txt
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網站 15 12 1527 2106 200 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960 690 200 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 1938 180 200 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 13560436666 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
輸入數據格式:
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
手機號碼 上行流量 下行流量
輸出數據格式
13560436666 1116 954 2070
手機號碼 上行流量 下行流量 總流量
3)分析
基本思路:
Map階段:
(1)讀取一行數據,切分字段
(2)抽取手機號、上行流量、下行流量
(3)以手機號為key,bean對象為value輸出,即context.write(手機號,bean);
Reduce階段:
(1)累加上行流量和下行流量得到總流量。
(2)實現自定義的bean來封裝流量信息,並將bean作為map輸出的key來傳輸
(3)MR程序在處理數據的過程中會對數據排序(map輸出的kv對傳輸到reduce之前,會排序),排序的依據是map輸出的key
所以,我們如果要實現自己需要的排序規則,則可以考慮將排序因素放到key中,讓key實現接口:WritableComparable。
然后重寫key的compareTo方法。
4)編寫mapreduce程序
(1)編寫流量統計的bean對象
package com.xyg.mr.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; // bean對象要實例化 public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; // 反序列化時,需要反射調用空參構造函數,所以必須有 public FlowBean() { super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } /** * 序列化方法 * * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化方法 注意反序列化的順序和序列化的順序完全一致 * * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } }
(2)編寫mapreduce主程序
package com.xyg.mr.flowsum; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCount { static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 將一行內容轉成string String ling = value.toString(); // 2 切分字段 String[] fields = ling.split("\t"); // 3 取出手機號碼 String phoneNum = fields[1]; // 4 取出上行流量和下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); // 5 寫出數據 context.write(new Text(phoneNum), new FlowBean(upFlow, downFlow)); } } static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upFlow = 0; long sum_downFlow = 0; // 1 遍歷所用bean,將其中的上行流量,下行流量分別累加 for (FlowBean bean : values) { sum_upFlow += bean.getUpFlow(); sum_downFlow += bean.getDownFlow(); } // 2 封裝對象 FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow); context.write(key, resultBean); } } public static void main(String[] args) throws Exception { // 1 獲取配置信息,或者job對象實例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 6 指定本程序的jar包所在的本地路徑 job.setJarByClass(FlowCount.class); // 2 指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 3 指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 4 指定最終輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 5 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
(3)將程序打成jar包,然后拷貝到hadoop集群中。
(4)啟動hadoop集群(3)將程序打成jar包,然后拷貝到hadoop集群中。
(5)執行flowcount程序
[root@node21 ~]$ hadoop jar flowcount.jar com.xyg.mr.flowsum.FlowCount /user/root/flowcount/input/ /user/root/flowcount/output
(6)查看結果
[root@node21 ~]$ hadoop fs -cat /user/root/flowcount/output/part-r-00000
13480253104 FlowBean [upFlow=180, downFlow=180, sumFlow=360]
13502468823 FlowBean [upFlow=7335, downFlow=110349, sumFlow=117684]
13560436666 FlowBean [upFlow=1116, downFlow=954, sumFlow=2070]
13560439658 FlowBean [upFlow=2034, downFlow=5892, sumFlow=7926]
13602846565 FlowBean [upFlow=1938, downFlow=2910, sumFlow=4848]
。。。
2.自定義分區
將統計結果按照手機歸屬地不同省份輸出到不同文件中(Partitioner)
0)需求:將統計結果按照手機歸屬地不同省份輸出到不同文件中(分區)
1)數據准備 phone_date.txt
2)分析
(1)Mapreduce中會將map輸出的kv對,按照相同key分組,然后分發給不同的reducetask。默認的分發規則為:根據key的hashcode%reducetask數來分發
(2)如果要按照我們自己的需求進行分組,則需要改寫數據分發(分組)組件Partitioner
自定義一個CustomPartitioner繼承抽象類:Partitioner
(3)在job驅動中,設置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)
3)在需求1的基礎上,增加一個分區類
package com.xyg.mr.partitioner; import java.util.HashMap; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * K2 V2 對應的是map輸出kv類型 * @author Administrator */ public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 1 獲取電話號碼的前三位 String preNum = key.toString().substring(0, 3); int partition = 4; // 2 判斷是哪個省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }
2)在驅動函數中增加自定義數據分區設置和reduce task設置
public static void main(String[] args) throws Exception { // 1 獲取配置信息,或者job對象實例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 6 指定本程序的jar包所在的本地路徑 job.setJarByClass(FlowCount.class); // 8 指定自定義數據分區 job.setPartitionerClass(ProvincePartitioner.class); // 9 同時指定相應數量的reduce task job.setNumReduceTasks(5); // 2 指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 3 指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 4 指定最終輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 5 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }
3)將程序打成jar包,然后拷貝到hadoop集群中。
4)啟動hadoop集群
5)執行flowcountPartitionser程序
[root@node21 ~]$ hadoop jar flowcountPartitionser.jar com.xyg.mr.partitioner.FlowCount /user/root/flowcount/input /user/root/flowcount/output
6)查看結果
[root@node21 ~]]$ hadoop fs -lsr /
/user/root/flowcount/output/part-r-00000
/user/root/flowcount/output/part-r-00001
/user/root/flowcount/output/part-r-00002
/user/root/flowcount/output/part-r-00003
/user/root/flowcount/output/part-r-00004
3.自定義全排序
將統計結果按照總流量倒序排序(全排序)
0)需求 根據需求1產生的結果再次對總流量進行排序。
1)數據准備 phone_date.txt
2)分析
(1)把程序分兩步走,第一步正常統計總流量,第二步再把結果進行排序
(2)context.write(總流量,手機號)
(3)FlowBean實現WritableComparable接口重寫compareTo方法
@Override
public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
package com.xyg.mr.sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; // 反序列化時,需要反射調用空參構造函數,所以必須有 public FlowBean() { super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } /** * 序列化方法 * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化方法 注意反序列化的順序和序列化的順序完全一致 * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } @Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } }
4)Map方法優化為一個對象,reduce方法則直接輸出結果即可,驅動函數根據輸入輸出重寫配置即可。3)FlowBean對象在在需求1基礎上增加了比較功能
package com.xyg.mr.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountSort { static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{ FlowBean bean = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 拿到的是上一個統計程序輸出的結果,已經是各手機號的總流量信息 String line = value.toString(); // 2 截取字符串並獲取電話號、上行流量、下行流量 String[] fields = line.split("\t"); String phoneNbr = fields[0]; long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); // 3 封裝對象 bean.set(upFlow, downFlow); v.set(phoneNbr); // 4 輸出 context.write(bean, v); } } static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{ @Override protected void reduce(FlowBean bean, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), bean); } } public static void main(String[] args) throws Exception { // 1 獲取配置信息,或者job對象實例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 6 指定本程序的jar包所在的本地路徑 job.setJarByClass(FlowCountSort.class); // 2 指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(FlowCountSortMapper.class); job.setReducerClass(FlowCountSortReducer.class); // 3 指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 4 指定最終輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 5 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); Path outPath = new Path(args[1]); // FileSystem fs = FileSystem.get(configuration); // if (fs.exists(outPath)) { // fs.delete(outPath, true); // } FileOutputFormat.setOutputPath(job, outPath); // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
5)將程序打成jar包,然后拷貝到hadoop集群中。
6)啟動hadoop集群5)將程序打成jar包,然后拷貝到hadoop集群中。
7)執行flowcountsort程序
[root@node21 module]$ hadoop jar flowcountsort.jar com.xyg.mr.sort.FlowCountSort /user/root/flowcount/output /user/root/flowcount/output_sort
8)查看結果
[root@node21 module]$ hadoop fs -cat /user/flowcount/output_sort/part-r-00000
13502468823 7335 110349 117684
13925057413 11058 48243 59301
13726238888 2481 24681 27162
13726230503 2481 24681 27162
18320173382 9531 2412 11943
4.自定義局部排序
不同省份輸出文件內部排序(部分排序)
1)需求 要求每個省份手機號輸出的文件中按照總流量內部排序。
2)分析 基於需求3,增加自定義分區類即可。
3)案例實操
(1)增加自定義分區類
package com.xyg.reduce.flowsort; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class FlowSortPartitioner extends Partitioner<FlowBean, Text> { @Override public int getPartition(FlowBean key, Text value, int numPartitions) { int partition = 0; String preNum = value.toString().substring(0, 3); if (" ".equals(preNum)) { partition = 5; } else { if ("136".equals(preNum)) { partition = 1; } else if ("137".equals(preNum)) { partition = 2; } else if ("138".equals(preNum)) { partition = 3; } else if ("139".equals(preNum)) { partition = 4; } } return partition; } }
(2)在驅動類中添加分區類
job.setPartitionerClass(FlowSortPartitioner.class);
job.setNumReduceTasks(5);
