Hadoop項目實戰


這個項目是流量經營項目,通過Hadoop的離線數據項目。

運營商通過HTTP日志,分析用戶的上網行為數據,進行行為軌跡的增強。

 

HTTP數據格式為:

流程:

系統架構:

技術選型:

這里只針對其中的一個功能進行說明:

其中規則庫是人工填充的,實例庫是采用機器學習自動生成的,形式都是<url,info>。

(一)統計流量排名前80%的URL,只有少數的URL流量比特別高,絕大多數的URL流量極低,沒有參考價值,應當舍棄。

FlowBean.java:

package cn.itcast.hadoop.mr.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{
    
    
    private String phoneNB;
    private long up_flow;
    private long d_flow;
    private long s_flow;
    
    //在反序列化時,反射機制需要調用空參構造函數,所以顯示定義了一個空參構造函數
    public FlowBean(){}
    
    //為了對象數據的初始化方便,加入一個帶參的構造函數
    public FlowBean(String phoneNB, long up_flow, long d_flow) {
        this.phoneNB = phoneNB;
        this.up_flow = up_flow;
        this.d_flow = d_flow;
        this.s_flow = up_flow + d_flow;
    }

    public void set(String phoneNB, long up_flow, long d_flow) {
        this.phoneNB = phoneNB;
        this.up_flow = up_flow;
        this.d_flow = d_flow;
        this.s_flow = up_flow + d_flow;
    }
    
    
    
    public String getPhoneNB() {
        return phoneNB;
    }

    public void setPhoneNB(String phoneNB) {
        this.phoneNB = phoneNB;
    }

    public long getUp_flow() {
        return up_flow;
    }

    public void setUp_flow(long up_flow) {
        this.up_flow = up_flow;
    }

    public long getD_flow() {
        return d_flow;
    }

    public void setD_flow(long d_flow) {
        this.d_flow = d_flow;
    }

    public long getS_flow() {
        return s_flow;
    }

    public void setS_flow(long s_flow) {
        this.s_flow = s_flow;
    }

    
    
    //將對象數據序列化到流中
    @Override
    public void write(DataOutput out) throws IOException {

        out.writeUTF(phoneNB);
        out.writeLong(up_flow);
        out.writeLong(d_flow);
        out.writeLong(s_flow);
        
    }

    
    //從數據流中反序列出對象的數據
    //從數據流中讀出對象字段時,必須跟序列化時的順序保持一致
    @Override
    public void readFields(DataInput in) throws IOException {

        phoneNB = in.readUTF();
        up_flow = in.readLong();
        d_flow = in.readLong();
        s_flow = in.readLong();
        
    }
    
    
    @Override
    public String toString() {

        return "" + up_flow + "\t" +d_flow + "\t" + s_flow;
    }

    @Override
    public int compareTo(FlowBean o) {
        return s_flow>o.getS_flow()?-1:1;
    }
    

}

 

TopkURLMapper.java:

package cn.itcast.hadoop.mr.llyy.topkurl;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import cn.itcast.hadoop.mr.flowsum.FlowBean;

public class TopkURLMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

    private FlowBean bean = new FlowBean();
    private Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String line = value.toString();

        String[] fields = StringUtils.split(line, "\t");
        try {
            if (fields.length > 32 && StringUtils.isNotEmpty(fields[26])
                    && fields[26].startsWith("http")) {
                String url = fields[26];

                long up_flow = Long.parseLong(fields[30]);
                long d_flow = Long.parseLong(fields[31]);

                k.set(url);
                bean.set("", up_flow, d_flow);

                context.write(k, bean);
            }
        } catch (Exception e) {

            System.out.println();

        }
    }

}

TopkURLReducer.java:

package cn.itcast.hadoop.mr.llyy.topkurl;

import java.io.IOException;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;

import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import cn.itcast.hadoop.mr.flowsum.FlowBean;

public class TopkURLReducer extends Reducer<Text, FlowBean, Text, LongWritable>{
    private TreeMap<FlowBean,Text> treeMap = new TreeMap<>();
    private double globalCount = 0;
    
    
    // <url,{bean,bean,bean,.......}>
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values,Context context)
            throws IOException, InterruptedException {
        Text url = new Text(key.toString());
        long up_sum = 0;
        long d_sum = 0;
        for(FlowBean bean : values){
            
            up_sum += bean.getUp_flow();
            d_sum += bean.getD_flow();
        }
        
        FlowBean bean = new FlowBean("", up_sum, d_sum);
        //每求得一條url的總流量,就累加到全局流量計數器中,等所有的記錄處理完成后,globalCount中的值就是全局的流量總和
        globalCount += bean.getS_flow();
        treeMap.put(bean,url);

    }
    
    
    //cleanup方法是在reduer任務即將退出時被調用一次
    @Override
    protected void cleanup(Context context)
            throws IOException, InterruptedException {

        Set<Entry<FlowBean, Text>> entrySet = treeMap.entrySet();
        double tempCount = 0;
        
        for(Entry<FlowBean, Text> ent: entrySet){
            
            if(tempCount / globalCount < 0.8){
                
                context.write(ent.getValue(), new LongWritable(ent.getKey().getS_flow()));
                tempCount += ent.getKey().getS_flow();
                
            }else{
                return;
            }
            
            
        }
        
        
        
    }
    
}

TopkURLRunner.java:

package cn.itcast.hadoop.mr.llyy.topkurl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import cn.itcast.hadoop.mr.flowsum.FlowBean;

public class TopkURLRunner extends Configured implements Tool{

    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(TopkURLRunner.class);
        
        job.setMapperClass(TopkURLMapper.class);
        job.setReducerClass(TopkURLReducer.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new TopkURLRunner(), args);
        System.exit(res);
        
    }
    
}

(二)將統計的URL導入到數據庫中,這是URL規則庫,一共就兩個字段,URL和info說明,info是人工來實現,貼上標簽。

將上面的運行結果通過sqoop導入到數據庫中,然后通過數據庫讀取再跑mapreduce程序。

DBLoader.java:數據庫的工具類。

package cn.itcast.hadoop.mr.llyy.enhance;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;

public class DBLoader {

    public static void dbLoader(HashMap<String, String> ruleMap) {

        Connection conn = null;
        Statement st = null;
        ResultSet res = null;
        
        try {
            Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://weekend01:3306/urlcontentanalyse", "root", "root");
            st = conn.createStatement();
            res = st.executeQuery("select url,info from urlrule");
            while (res.next()) {
                ruleMap.put(res.getString(1), res.getString(2));
            }

        } catch (Exception e) {
            e.printStackTrace();
            
        } finally {
            try{
                if(res!=null){
                    res.close();
                }
                if(st!=null){
                    st.close();
                }
                if(conn!=null){
                    conn.close();
                }

            }catch(Exception e){
                e.printStackTrace();
            }
        }

    }
    
    
    public static void main(String[] args) {
        DBLoader db = new DBLoader();
        HashMap<String, String> map = new HashMap<String,String>();
        db.dbLoader(map);
        System.out.println(map.size());
    }

}

LogEnhanceOutputFormat.java:默認是TextOutputFormat,這里我需要實現將不同的結果輸到不同的文件中,而不是_SUCCESS中,所以我需要重寫一個format。

package cn.itcast.hadoop.mr.llyy.enhance;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogEnhanceOutputFormat<K, V> extends FileOutputFormat<K, V> {

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
            throws IOException, InterruptedException {
        
        FileSystem fs = FileSystem.get(new Configuration());
        FSDataOutputStream enhancedOs = fs.create(new Path("/liuliang/output/enhancedLog"));
        FSDataOutputStream tocrawlOs = fs.create(new Path("/liuliang/output/tocrawl"));
        
        return new LogEnhanceRecordWriter<K, V>(enhancedOs,tocrawlOs);
    }

    
    public static class LogEnhanceRecordWriter<K, V> extends RecordWriter<K, V>{
        private FSDataOutputStream enhancedOs =null;
        private FSDataOutputStream tocrawlOs =null;

        public LogEnhanceRecordWriter(FSDataOutputStream enhancedOs,FSDataOutputStream tocrawlOs){
            
            this.enhancedOs = enhancedOs;
            this.tocrawlOs = tocrawlOs;
            
        }
        
        
        @Override
        public void write(K key, V value) throws IOException,
                InterruptedException {

            if(key.toString().contains("tocrawl")){
                tocrawlOs.write(key.toString().getBytes());
            }else{
                enhancedOs.write(key.toString().getBytes());
            }
            
            
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException,
                InterruptedException {

            if(enhancedOs != null){
                enhancedOs.close();
            }
            if(tocrawlOs != null){
                tocrawlOs.close();
            }
        
            
            
        }
        
        
        
    }
    
    
}

 

然后再從所有原始日志中抽取URL,查詢規則庫,如果由info標簽,則追加在原始日志后面。否則,這個URL就是帶爬取URL,后面追加tocrawl,這兩種不同情況要輸出到不同文件中。

LogEnhanceMapper.java:

package cn.itcast.hadoop.mr.llyy.enhance;

import java.io.IOException;
import java.util.HashMap;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 
 * 
 * 讀入原始日志數據,抽取其中的url,查詢規則庫,獲得該url指向的網頁內容的分析結果,追加到原始日志后
 * 
 * @author duanhaitao@itcast.cn
 * 
 */

// 讀入原始數據 (47個字段) 時間戳 ..... destip srcip ... url .. . get 200 ...
// 抽取其中的url查詢規則庫得到眾多的內容識別信息 網站類別,頻道類別,主題詞,關鍵詞,影片名,主演,導演。。。。
// 將分析結果追加到原始日志后面
// context.write( 時間戳 ..... destip srcip ... url .. . get 200 ...
// 網站類別,頻道類別,主題詞,關鍵詞,影片名,主演,導演。。。。)
// 如果某條url在規則庫中查不到結果,則輸出到帶爬清單
// context.write( url tocrawl)
public class LogEnhanceMapper extends
        Mapper<LongWritable, Text, Text, NullWritable> {

    private HashMap<String, String> ruleMap = new HashMap<>();

    // setup方法是在mapper task 初始化時被調用一次
    @Override
    protected void setup(Context context) throws IOException,
            InterruptedException {
        DBLoader.dbLoader(ruleMap);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String line = value.toString();

        String[] fields = StringUtils.split(line, "\t");
        try {
            if (fields.length > 27 && StringUtils.isNotEmpty(fields[26])
                    && fields[26].startsWith("http")) {
                String url = fields[26];
                String info = ruleMap.get(url);
                String result = "";
                if (info != null) {
                    result = line + "\t" + info + "\n\r";
                    context.write(new Text(result), NullWritable.get());
                } else {
                    result = url + "\t" + "tocrawl" + "\n\r";
                    context.write(new Text(result), NullWritable.get());
                }

            } else {
                return;
            }
        } catch (Exception e) {
            System.out.println("exception occured in mapper.....");
        }
    }

}

LogEnhanceRunner.java:

package cn.itcast.hadoop.mr.llyy.enhance;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class LogEnhanceRunner extends Configured implements Tool{

    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = new Configuration();
        
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(LogEnhanceRunner.class);
        
        job.setMapperClass(LogEnhanceMapper.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        
        job.setOutputFormatClass(LogEnhanceOutputFormat.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new LogEnhanceRunner(),args);
        System.exit(res);
    }
    
    
}

這里不寫reduce也行。

 

MapReduce實現Top K問題:https://blog.csdn.net/u011750989/article/details/11482805?locationNum=5


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM