【電商日志項目之四】數據清洗-ETL


環境
  hadoop-2.6.5

  首先要知道為什么要做數據清洗?通過各個渠道收集到的數據並不能直接用於下一步的分析,所以需要對這些數據進行缺失值清洗、格式內容清洗、邏輯錯誤清洗、非需求數據清洗、關聯性驗證等處理操作,轉換成可用的數據。
具體要做的工作可以參考文章:數據清洗的一些梳理

當了解ETL之后,有一些工具,比如開源kettle可以做這個工作。但是也可以完全自己開發,ETL無非就是三個階段:數據抽取、數據清洗、清洗后數據存儲。比如可借助hadoop、spark、kafka都可以做這個工作,清洗的規則可以按需開發。
這里借助hadoop編寫MR來完成ETL工作。

根據架構圖設計,ETL之后的數據要存到HBase,所以ETL階段整個工作分四塊:
1、過濾臟數據
2、解析IP-IPSeeker
3、瀏覽器信息解析-UASparser
4、設計rowkey

Runner:

package com.sxt.etl.mr.ald;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

import com.sxt.common.EventLogConstants;
import com.sxt.common.GlobalConstants;
import com.sxt.util.TimeUtil;

/**
 * 編寫mapreduce的runner類
 *
 */
public class AnalyserLogDataRunner implements Tool 
{
    private static final Logger logger = Logger.getLogger(AnalyserLogDataRunner.class);
    private Configuration conf = null;

    public static void main(String[] args) 
    {
        try 
        {
            ToolRunner.run(new Configuration(), new AnalyserLogDataRunner(), args);
        } 
        catch (Exception e) 
        {
            logger.error("執行日志解析job異常", e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public void setConf(Configuration conf) {
        conf.set("fs.defaultFS", "hdfs://node1:8020");
         //conf.set("yarn.resourcemanager.hostname", "node3");
        //conf.set("hbase.zookeeper.quorum", "node1,node2,node3");//用來連接HBase
        conf.set("hbase.zookeeper.quorum", "node104");//用來連接HBase
        this.conf = HBaseConfiguration.create(conf);
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        this.processArgs(conf, args);

        Job job = Job.getInstance(conf, "analyser_logdata");

        // 設置本地提交job,集群運行,需要代碼
        // File jarFile = EJob.createTempJar("target/classes");
        // ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
        // 設置本地提交job,集群運行,需要代碼結束

        job.setJarByClass(AnalyserLogDataRunner.class);
        job.setMapperClass(AnalyserLogDataMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Put.class);
        // 設置reducer配置
        // 1. 集群上運行,打成jar運行(要求addDependencyJars參數為true,默認就是true)
        // TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS,null, job);
        // 2. 本地運行,要求參數addDependencyJars為false
        TableMapReduceUtil.initTableReducerJob(
                EventLogConstants.HBASE_NAME_EVENT_LOGS, 
                null, 
                job, 
                null, 
                null,
                null, 
                null, 
                false);
        job.setNumReduceTasks(0);

        // 設置輸入路徑
        this.setJobInputPaths(job);
        return job.waitForCompletion(true) ? 0 : -1;
    }

    /**
     * 處理參數
     * 
     * @param conf
     * @param args
     */
    private void processArgs(Configuration conf, String[] args) {
        String date = null;
        for (int i = 0; i < args.length; i++) {
            if ("-d".equals(args[i])) {
                if (i + 1 < args.length) {
                    date = args[++i];
                    break;
                }
            }
        }
        
        System.out.println("-----" + date);

        // 要求date格式為: yyyy-MM-dd
        if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) {
            // date是一個無效時間數據
            date = TimeUtil.getYesterday(); // 默認時間是昨天
            System.out.println(date);
        }
        conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date);
    }

    /**
     * 設置job的輸入路徑
     * 
     * @param job
     */
    private void setJobInputPaths(Job job) 
    {
        Configuration conf = job.getConfiguration();
        FileSystem fs = null;
        try 
        {
            fs = FileSystem.get(conf);
            String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES);
            // Path inputPath = new Path("/flume/" +
            // TimeUtil.parseLong2String(TimeUtil.parseString2Long(date),
            // "MM/dd/"));
            Path inputPath = new Path("/log/"
                    + TimeUtil.parseLong2String(
                            TimeUtil.parseString2Long(date), "yyyyMMdd")
                    + "/");
            
            if (fs.exists(inputPath)) 
            {
                FileInputFormat.addInputPath(job, inputPath);
            }
            else 
            {
                throw new RuntimeException("文件不存在:" + inputPath);
            }
        } 
        catch (IOException e)
        {
            throw new RuntimeException("設置job的mapreduce輸入路徑出現異常", e);
        } 
        finally 
        {
            if (fs != null) 
            {
                try 
                {
                    fs.close();
                } 
                catch (IOException e) 
                {
                    e.printStackTrace();
                }
            }
        }
    }

}

Mapper:

package com.sxt.etl.mr.ald;

import java.io.IOException;
import java.util.Map;
import java.util.zip.CRC32;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

import com.sxt.common.EventLogConstants;
import com.sxt.common.EventLogConstants.EventEnum;
import com.sxt.etl.util.LoggerUtil;

/**
 * 自定義數據解析map類
 * 
 * @author root
 *
 */
public class AnalyserLogDataMapper extends Mapper<LongWritable, Text, NullWritable, Put> 
{
    private final Logger logger = Logger.getLogger(AnalyserLogDataMapper.class);
    private int inputRecords, filterRecords, outputRecords; // 主要用於標志,方便查看過濾數據
    private byte[] family = Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME);
    private CRC32 crc32 = new CRC32();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    {
        this.inputRecords++;
        this.logger.debug("Analyse data of :" + value);

        try 
        {
            // 解析日志
            Map<String, String> clientInfo = LoggerUtil.handleLog(value.toString());

            // 過濾解析失敗的數據
            if (clientInfo.isEmpty()) {
                this.filterRecords++;
                return;
            }

            // 獲取事件名稱
            String eventAliasName = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME);
            EventEnum event = EventEnum.valueOfAlias(eventAliasName);
            switch (event) {
            case LAUNCH:
            case PAGEVIEW:
            case CHARGEREQUEST:
            case CHARGEREFUND:
            case CHARGESUCCESS:
            case EVENT:
                // 處理數據
                this.handleData(clientInfo, event, context);
                break;
            default:
                this.filterRecords++;
                this.logger.warn("該事件沒法進行解析,事件名稱為:" + eventAliasName);
            }
        } 
        catch (Exception e) 
        {
            this.filterRecords++;
            this.logger.error("處理數據發出異常,數據:" + value, e);
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        super.cleanup(context);
        logger.info("輸入數據:" + this.inputRecords + ";輸出數據:" + this.outputRecords + ";過濾數據:" + this.filterRecords);
    }

    /**
     * 具體處理數據的方法
     * 
     * @param clientInfo
     * @param context
     * @param event
     * @throws InterruptedException
     * @throws IOException
     */
    private void handleData(Map<String, String> clientInfo, EventEnum event,Context context) throws IOException, InterruptedException 
    {
        String uuid = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID);
        String memberId = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID);
        String serverTime = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME);
        if (StringUtils.isNotBlank(serverTime))
        {
            // 要求服務器時間不為空
            clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT); // 瀏覽器信息去掉
            String rowkey = this.generateRowKey(uuid, memberId, event.alias, serverTime); // timestamp
            Put put = new Put(Bytes.toBytes(rowkey));
            for (Map.Entry<String, String> entry : clientInfo.entrySet()) 
            {
                if (StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue())) 
                {
                    put.add(family, Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
                }
            }
            context.write(NullWritable.get(), put);
            this.outputRecords++;
        } 
        else 
        {
            this.filterRecords++;
        }
    }

    /**
     * 根據uuid memberid servertime創建rowkey
     * 
     * @param uuid
     * @param memberId
     * @param eventAliasName
     * @param serverTime
     * @return
     */
    private String generateRowKey(String uuid, String memberId, String eventAliasName, String serverTime) 
    {
        StringBuilder sb = new StringBuilder();
        sb.append(serverTime).append("_");
        this.crc32.reset();
        if (StringUtils.isNotBlank(uuid)) {
            this.crc32.update(uuid.getBytes());
        }
        if (StringUtils.isNotBlank(memberId)) {
            this.crc32.update(memberId.getBytes());
        }
        this.crc32.update(eventAliasName.getBytes());
        sb.append(this.crc32.getValue() % 100000000L);
        return sb.toString();
    }
}

 處理日志數據的具體工作類:

package com.sxt.etl.util;

import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

import com.sxt.common.EventLogConstants;
import com.sxt.etl.util.IPSeekerExt.RegionInfo;
import com.sxt.etl.util.UserAgentUtil.UserAgentInfo;
import com.sxt.util.TimeUtil;

/**
 * 處理日志數據的具體工作類
 * 
 * @author root
 *
 */
public class LoggerUtil {
    private static final Logger logger = Logger.getLogger(LoggerUtil.class);
    private static IPSeekerExt ipSeekerExt = new IPSeekerExt();

    /**
     * 處理日志數據logText,返回處理結果map集合<br/>
     * 如果logText沒有指定數據格式,那么直接返回empty的集合
     * 
     * @param logText
     * @return
     */
    public static Map<String, String> handleLog(String logText) 
    {
        Map<String, String> clientInfo = new HashMap<String, String>();
        if (StringUtils.isNotBlank(logText)) 
        {
            //192.168.118.1^A1561656575.201^Anode101^A/log.gif?en=e_l&ver=1&pl=website&sdk=js&u_ud=E5631595-EDC2-4B3B-A306-B19576D74DC3&u_sd=C7C0D4E3-7E60-479B-AC1C-2F5305EC20D4&c_time=1561627763553&l=zh-CN&b_iev=Mozilla%2F5.0%20(Windows%20NT%206.1%3B%20Win64%3B%20x64)%20AppleWebKit%2F537.36%20(KHTML%2C%20like%20Gecko)%20Chrome%2F75.0.3770.100%20Safari%2F537.36&b_rst=1920*1080
            String[] splits = logText.trim().split(EventLogConstants.LOG_SEPARTIOR);
            if (splits.length == 4) 
            {
                // 日志格式為: ip^A服務器時間^Ahost^A請求參數
                clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_IP, splits[0].trim()); // 設置ip
                // 設置服務器時間
                clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, String.valueOf(TimeUtil.parseNginxServerTime2Long(splits[1].trim())));
                int index = splits[3].indexOf("?");
                if (index > -1) {
                    String requestBody = splits[3].substring(index + 1); // 獲取請求參數,也就是我們的收集數據
                    // 處理請求參數
                    handleRequestBody(requestBody, clientInfo);
                    // 處理userAgent
                    handleUserAgent(clientInfo);
                    // 處理ip地址
                    handleIp(clientInfo);
                } else {
                    // 數據格式異常
                    clientInfo.clear();
                }
            }
        }
        return clientInfo;
    }

    /**
     * 處理ip地址
     * 
     * @param clientInfo
     */
    private static void handleIp(Map<String,String> clientInfo) {
        if (clientInfo.containsKey(EventLogConstants.LOG_COLUMN_NAME_IP)) {
            String ip = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_IP);
            RegionInfo info = ipSeekerExt.analyticIp(ip);
            if (info != null) {
                clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_COUNTRY, info.getCountry());
                clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_PROVINCE, info.getProvince());
                clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_CITY, info.getCity());
            }
        }
    }

    /**
     * 處理瀏覽器的userAgent信息
     * 
     * @param clientInfo
     */
    private static void handleUserAgent(Map<String, String> clientInfo) {
        if (clientInfo.containsKey(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT)) {
            UserAgentInfo info = UserAgentUtil.analyticUserAgent(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT));
            if (info != null) {
                clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_OS_NAME, info.getOsName());
                clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_OS_VERSION, info.getOsVersion());
                clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, info.getBrowserName());
                clientInfo.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, info.getBrowserVersion());
            }
        }
    }

    /**
     * 處理請求參數
     * 
     * @param requestBody
     * @param clientInfo
     */
    private static void handleRequestBody(String requestBody, Map<String, String> clientInfo) 
    {
        if (StringUtils.isNotBlank(requestBody)) 
        {
            String[] requestParams = requestBody.split("&");
            for (String param : requestParams) 
            {
                if (StringUtils.isNotBlank(param)) 
                {
                    int index = param.indexOf("=");
                    if (index < 0) 
                    {
                        logger.warn("沒法進行解析參數:" + param + ", 請求參數為:" + requestBody);
                        continue;
                    }

                    String key = null, value = null;
                    try 
                    {
                        key = param.substring(0, index);
                        value = URLDecoder.decode(param.substring(index + 1), "utf-8");
                    } 
                    catch (Exception e) 
                    {
                        logger.warn("解碼操作出現異常", e);
                        continue;
                    }
                    if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value))
                    {
                        clientInfo.put(key, value);
                    }
                }
            }
        }
    }
}

 HBase創建接收清洗后的數據表:

hbase(main):002:0> create 'eventlog','log'

 

 

 

 

代碼參考:wjy.rar


參考:
開源ETL工具kettle


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM