---------------------------------------------------------------------------------------------------------------
[版權申明:本文系作者原創,轉載請注明出處]
文章出處:http://blog.csdn.net/sdksdk0/article/details/51628874
作者:朱培
---------------------------------------------------------------------------------------------------------------
本文是結合hadoop中的mapreduce來對用戶數據進行分析,統計用戶的手機號碼、上行流量、下行流量、總流量的信息,同時可以按照總流量大小對用戶進行分組排序等。是一個非常簡潔易用的hadoop項目,主要用戶進一步加強對MapReduce的理解及實際應用。文末提供源數據采集文件和系統源碼。
本案例非常適合hadoop初級人員學習以及想入門大數據、雲計算、數據分析等領域的朋友進行學習。
一、待分析的數據源
以下是一個待分析的文本文件,里面有非常多的用戶瀏覽信息,保擴用戶手機號碼,上網時間,機器序列號,訪問的IP,訪問的網站,上行流量,下行流量,總流量等信息。這里只截取一小段,具體文件在文末提供下載鏈接。
二、基本功能實現
private long upFlow; private long dFlow; private long sumFlow;然后就是各種右鍵生成get,set方法,還要toString(),以及生成構造函數,(千萬記得要生成一個空的構造函數,不然后面進行分析的時候會報錯)。
package cn.tf.flow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean>{ private long upFlow; private long dFlow; private long sumFlow; public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getdFlow() { return dFlow; } public void setdFlow(long dFlow) { this.dFlow = dFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public FlowBean(long upFlow, long dFlow) { super(); this.upFlow = upFlow; this.dFlow = dFlow; this.sumFlow = upFlow+dFlow; } @Override public void readFields(DataInput in) throws IOException { upFlow=in.readLong(); dFlow=in.readLong(); sumFlow=in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dFlow); out.writeLong(sumFlow); } public FlowBean() { super(); } @Override public String toString() { return upFlow + "\t" + dFlow + "\t" + sumFlow; } @Override public int compareTo(FlowBean o) { return this.sumFlow>o.getSumFlow() ? -1:1; } }
然后就是這個統計的代碼了,新建一個FlowCount.java.在這個類里面,我直接把Mapper和Reduce寫在同一個類里面了,如果按規范的要求應該是要分開寫的。
public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 拿到這行的內容轉成string String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { if (fields.length > 3) { // 獲得手機號及上下行流量字段值 String phone = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long dFlow = Long.parseLong(fields[fields.length - 2]); // 輸出這一行的處理結果,key為手機號,value為流量信息bean context.write(new Text(phone), new FlowBean(upFlow, dFlow)); } else { return; } } catch (Exception e) { } } }
在reduce中隊數據進行整理,統計
public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upSum = 0; long dSum = 0; for (FlowBean bean : values) { upSum += bean.getUpFlow(); dSum += bean.getdFlow(); } FlowBean resultBean = new FlowBean(upSum, dSum); context.write(key, resultBean); } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowCount.class); job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); }
bin/hadoop fs -mkdir -p /flow/data bin/hadoop fs -put HTTP_20130313143750.dat /flow/data bin/hadoop jar ../lx/flow.jar
bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCount /flow/data /flow/output

在這整過過程中,我們是有yarnchild的進程在執行的,如下圖所示:當整個過程執行完畢之后yarnchild也會自動退出。

三、按總流量從大到小排序
如果你上面這個基本操作以及完成了的話,按總流量排序就非常簡單了。我們新建一個FlowCountSort.java.
全部代碼如下:
package cn.tf.flow; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountSort { public static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); String[] fields=StringUtils.split(line,"\t"); String phone=fields[0]; long upSum=Long.parseLong(fields[1]); long dSum=Long.parseLong(fields[2]); FlowBean sumBean=new FlowBean(upSum,dSum); context.write(sumBean, new Text(phone)); } } public static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{ //進來的“一組”數據就是一個手機的流量bean和手機號 @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), key); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowCountSort.class); job.setMapperClass(FlowCountSortMapper.class); job.setReducerClass(FlowCountSortReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
這個主要就是使用了FlowBean.java中的代碼來實現的,主要是繼承了WritableComparable<FlowBean>接口來實現,然后重寫了compareTo()方法。
@Override public int compareTo(FlowBean o) { return this.sumFlow>o.getSumFlow() ? -1:1; }按照同樣的方法對這個文件打成jar包,然后使用hadoop的相關語句進行執行就可以了。
bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCountSort /flow/output /flow/sortoutput結果圖:
四、按用戶號碼區域進行分類
流量匯總之后的結果需要按照省份輸出到不同的結果文件中,需要解決兩個問題:
1、如何讓mr的最終結果產生多個文件: 原理:MR中的結果文件數量由reduce
task的數量絕對,是一一對應的 做法:在代碼中指定reduce task的數量
2、如何讓手機號進入正確的文件 原理:讓不同手機號數據發給正確的reduce task,就進入了正確的結果文件
要自定義MR中的分區partition的機制(默認的機制是按照kv中k的hashcode%reducetask數)
做法:自定義一個類來干預MR的分區策略——Partitioner的自定義實現類
主要代碼與前面的排序是非常類似的,只要在main方法中添加如下兩行代碼就可以了。
//指定自定義的partitioner job.setPartitionerClass(ProvincePartioner.class); job.setNumReduceTasks(5);
這里我們需要新建一個ProvincePartioner.java來處理號碼分類的邏輯。
public class ProvincePartioner extends Partitioner<Text, FlowBean>{ private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>(); static { provinceMap.put("135", 0); provinceMap.put("136", 1); provinceMap.put("137", 2); provinceMap.put("138", 3); } @Override public int getPartition(Text key, FlowBean value, int numPartitions) { String prefix = key.toString().substring(0, 3); Integer partNum = provinceMap.get(prefix); if(partNum == null) partNum=4; return partNum; } }
執行方法和前面也是一樣的。從執行的流程中我們可以看到這里啟動了5個reduce task,因為我這里數據量比較小,所以只啟動了一個map task。
到這里,整個用戶流量分析系統就全部結束了。關於大數據的更多內容,歡迎關注。點擊左上角頭像下方“點擊關注".感謝您的支持!
數據源下載地址:http://download.csdn.net/detail/sdksdk0/9545935
源碼項目地址:https://github.com/sdksdk0/HDFS_MapReduce