這個項目是流量經營項目,通過Hadoop的離線數據項目。
運營商通過HTTP日志,分析用戶的上網行為數據,進行行為軌跡的增強。
HTTP數據格式為:
流程:
系統架構:
技術選型:
這里只針對其中的一個功能進行說明:
其中規則庫是人工填充的,實例庫是采用機器學習自動生成的,形式都是<url,info>。
(一)統計流量排名前80%的URL,只有少數的URL流量比特別高,絕大多數的URL流量極低,沒有參考價值,應當舍棄。
FlowBean.java:
package cn.itcast.hadoop.mr.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean>{ private String phoneNB; private long up_flow; private long d_flow; private long s_flow; //在反序列化時,反射機制需要調用空參構造函數,所以顯示定義了一個空參構造函數 public FlowBean(){} //為了對象數據的初始化方便,加入一個帶參的構造函數 public FlowBean(String phoneNB, long up_flow, long d_flow) { this.phoneNB = phoneNB; this.up_flow = up_flow; this.d_flow = d_flow; this.s_flow = up_flow + d_flow; } public void set(String phoneNB, long up_flow, long d_flow) { this.phoneNB = phoneNB; this.up_flow = up_flow; this.d_flow = d_flow; this.s_flow = up_flow + d_flow; } public String getPhoneNB() { return phoneNB; } public void setPhoneNB(String phoneNB) { this.phoneNB = phoneNB; } public long getUp_flow() { return up_flow; } public void setUp_flow(long up_flow) { this.up_flow = up_flow; } public long getD_flow() { return d_flow; } public void setD_flow(long d_flow) { this.d_flow = d_flow; } public long getS_flow() { return s_flow; } public void setS_flow(long s_flow) { this.s_flow = s_flow; } //將對象數據序列化到流中 @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNB); out.writeLong(up_flow); out.writeLong(d_flow); out.writeLong(s_flow); } //從數據流中反序列出對象的數據 //從數據流中讀出對象字段時,必須跟序列化時的順序保持一致 @Override public void readFields(DataInput in) throws IOException { phoneNB = in.readUTF(); up_flow = in.readLong(); d_flow = in.readLong(); s_flow = in.readLong(); } @Override public String toString() { return "" + up_flow + "\t" +d_flow + "\t" + s_flow; } @Override public int compareTo(FlowBean o) { return s_flow>o.getS_flow()?-1:1; } }
TopkURLMapper.java:
package cn.itcast.hadoop.mr.llyy.topkurl; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import cn.itcast.hadoop.mr.flowsum.FlowBean; public class TopkURLMapper extends Mapper<LongWritable, Text, Text, FlowBean> { private FlowBean bean = new FlowBean(); private Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { if (fields.length > 32 && StringUtils.isNotEmpty(fields[26]) && fields[26].startsWith("http")) { String url = fields[26]; long up_flow = Long.parseLong(fields[30]); long d_flow = Long.parseLong(fields[31]); k.set(url); bean.set("", up_flow, d_flow); context.write(k, bean); } } catch (Exception e) { System.out.println(); } } }
TopkURLReducer.java:
package cn.itcast.hadoop.mr.llyy.topkurl; import java.io.IOException; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import cn.itcast.hadoop.mr.flowsum.FlowBean; public class TopkURLReducer extends Reducer<Text, FlowBean, Text, LongWritable>{ private TreeMap<FlowBean,Text> treeMap = new TreeMap<>(); private double globalCount = 0; // <url,{bean,bean,bean,.......}> @Override protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException { Text url = new Text(key.toString()); long up_sum = 0; long d_sum = 0; for(FlowBean bean : values){ up_sum += bean.getUp_flow(); d_sum += bean.getD_flow(); } FlowBean bean = new FlowBean("", up_sum, d_sum); //每求得一條url的總流量,就累加到全局流量計數器中,等所有的記錄處理完成后,globalCount中的值就是全局的流量總和 globalCount += bean.getS_flow(); treeMap.put(bean,url); } //cleanup方法是在reduer任務即將退出時被調用一次 @Override protected void cleanup(Context context) throws IOException, InterruptedException { Set<Entry<FlowBean, Text>> entrySet = treeMap.entrySet(); double tempCount = 0; for(Entry<FlowBean, Text> ent: entrySet){ if(tempCount / globalCount < 0.8){ context.write(ent.getValue(), new LongWritable(ent.getKey().getS_flow())); tempCount += ent.getKey().getS_flow(); }else{ return; } } } }
TopkURLRunner.java:
package cn.itcast.hadoop.mr.llyy.topkurl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import cn.itcast.hadoop.mr.flowsum.FlowBean; public class TopkURLRunner extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TopkURLRunner.class); job.setMapperClass(TopkURLMapper.class); job.setReducerClass(TopkURLReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new TopkURLRunner(), args); System.exit(res); } }
(二)將統計的URL導入到數據庫中,這是URL規則庫,一共就兩個字段,URL和info說明,info是人工來實現,貼上標簽。
將上面的運行結果通過sqoop導入到數據庫中,然后通過數據庫讀取再跑mapreduce程序。
DBLoader.java:數據庫的工具類。
package cn.itcast.hadoop.mr.llyy.enhance; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.HashMap; public class DBLoader { public static void dbLoader(HashMap<String, String> ruleMap) { Connection conn = null; Statement st = null; ResultSet res = null; try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://weekend01:3306/urlcontentanalyse", "root", "root"); st = conn.createStatement(); res = st.executeQuery("select url,info from urlrule"); while (res.next()) { ruleMap.put(res.getString(1), res.getString(2)); } } catch (Exception e) { e.printStackTrace(); } finally { try{ if(res!=null){ res.close(); } if(st!=null){ st.close(); } if(conn!=null){ conn.close(); } }catch(Exception e){ e.printStackTrace(); } } } public static void main(String[] args) { DBLoader db = new DBLoader(); HashMap<String, String> map = new HashMap<String,String>(); db.dbLoader(map); System.out.println(map.size()); } }
LogEnhanceOutputFormat.java:默認是TextOutputFormat,這里我需要實現將不同的結果輸到不同的文件中,而不是_SUCCESS中,所以我需要重寫一個format。
package cn.itcast.hadoop.mr.llyy.enhance; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LogEnhanceOutputFormat<K, V> extends FileOutputFormat<K, V> { @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(new Configuration()); FSDataOutputStream enhancedOs = fs.create(new Path("/liuliang/output/enhancedLog")); FSDataOutputStream tocrawlOs = fs.create(new Path("/liuliang/output/tocrawl")); return new LogEnhanceRecordWriter<K, V>(enhancedOs,tocrawlOs); } public static class LogEnhanceRecordWriter<K, V> extends RecordWriter<K, V>{ private FSDataOutputStream enhancedOs =null; private FSDataOutputStream tocrawlOs =null; public LogEnhanceRecordWriter(FSDataOutputStream enhancedOs,FSDataOutputStream tocrawlOs){ this.enhancedOs = enhancedOs; this.tocrawlOs = tocrawlOs; } @Override public void write(K key, V value) throws IOException, InterruptedException { if(key.toString().contains("tocrawl")){ tocrawlOs.write(key.toString().getBytes()); }else{ enhancedOs.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if(enhancedOs != null){ enhancedOs.close(); } if(tocrawlOs != null){ tocrawlOs.close(); } } } }
然后再從所有原始日志中抽取URL,查詢規則庫,如果由info標簽,則追加在原始日志后面。否則,這個URL就是帶爬取URL,后面追加tocrawl,這兩種不同情況要輸出到不同文件中。
LogEnhanceMapper.java:
package cn.itcast.hadoop.mr.llyy.enhance; import java.io.IOException; import java.util.HashMap; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * * * 讀入原始日志數據,抽取其中的url,查詢規則庫,獲得該url指向的網頁內容的分析結果,追加到原始日志后 * * @author duanhaitao@itcast.cn * */ // 讀入原始數據 (47個字段) 時間戳 ..... destip srcip ... url .. . get 200 ... // 抽取其中的url查詢規則庫得到眾多的內容識別信息 網站類別,頻道類別,主題詞,關鍵詞,影片名,主演,導演。。。。 // 將分析結果追加到原始日志后面 // context.write( 時間戳 ..... destip srcip ... url .. . get 200 ... // 網站類別,頻道類別,主題詞,關鍵詞,影片名,主演,導演。。。。) // 如果某條url在規則庫中查不到結果,則輸出到帶爬清單 // context.write( url tocrawl) public class LogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private HashMap<String, String> ruleMap = new HashMap<>(); // setup方法是在mapper task 初始化時被調用一次 @Override protected void setup(Context context) throws IOException, InterruptedException { DBLoader.dbLoader(ruleMap); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { if (fields.length > 27 && StringUtils.isNotEmpty(fields[26]) && fields[26].startsWith("http")) { String url = fields[26]; String info = ruleMap.get(url); String result = ""; if (info != null) { result = line + "\t" + info + "\n\r"; context.write(new Text(result), NullWritable.get()); } else { result = url + "\t" + "tocrawl" + "\n\r"; context.write(new Text(result), NullWritable.get()); } } else { return; } } catch (Exception e) { System.out.println("exception occured in mapper....."); } } }
LogEnhanceRunner.java:
package cn.itcast.hadoop.mr.llyy.enhance; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class LogEnhanceRunner extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogEnhanceRunner.class); job.setMapperClass(LogEnhanceMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setOutputFormatClass(LogEnhanceOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new LogEnhanceRunner(),args); System.exit(res); } }
這里不寫reduce也行。