高可用Hadoop平台-實戰


1.概述

  今天繼續《高可用的Hadoop平台》系列,今天開始進行小規模的實戰下,前面的准備工作完成后,基本用於統計數據的平台都擁有了,關於導出統計結果的文章留到后面贅述。今天要和大家分享的案例是一個基於電商網站的用戶行為分析,這里分析的指標包含以下指標:

  • 統計每日PV
  • 每日注冊用戶
  • 每日IP
  • 跳出用戶

  其他指標可以參考上述4個指標進行拓展,下面我們開始今天的分析之旅。

2.流程

  首先,在開發之前我們需要注意哪些問題?我們不能盲目的按照自己的意願去開發項目,這樣到頭來得不到產品的認可,我們的工作又得返工。下面結合自身的工作,說說開發的具體流程:

  1. 需求產生過程及遇到問題和注意事項
  2. 產品組提出:報表需求+日志規范
  3. 報表研發流程(數據處理流程):網絡日志(一般有專門的節點來存儲日志)=>pull(拉取日志)=>本地存儲(數據組可操作的服務器節點)=>預處理(清洗或過濾,存入到hive表)=>HDFS文件系統數據存儲(統計的結果都存放在HDFS文件系統中)=>導出(將統計結構導出)=>Mysql數據庫存儲(或其他數據庫,供前端人員展示)

  在日志拉取過程,所欲問題和注意事項:如果日志量不大,我們可以直接使用python腳本或shell腳本直接將日志上傳到HDFS,若是海量數據,那我們可以使用 flume 進行上傳。具體選擇那種上傳方式取決於實際的業務,可按需選擇。

  注:若使用腳本上傳,需考慮腳本的可讀性和可維護性。

  在日志預處理過程中需要注意事項:對字段進行翻譯,反編譯,解析等操作,以確保存入到 hive 表的是有效的有用的信息。

  另外,在實際開發中,得和產品充分溝通過后,我們在開工;不然,到最后會引發一些不必要得麻煩。

  開發流程圖和之前介紹《網站日志統計案例分析與實現》得流程圖類似,這里直接拿過來使用,如下圖所示:

3.開發

  開發流程我們清楚來,需要統計得指標任務也明確了,接下來,我們開始進行編碼階段。首先,這里我贅述得是安裝流程圖得過程來的,若是在實際開發過程中,可根據實際情況來定,可以先獨立的開發后面的模塊,預留接口功能。不作限制,按需開發。

3.1上傳日志

  這里由於我本地只能連接到測試的集群上,而集群拉取的測試數據量很少,這里我就直接用 shell 腳本上傳了。內容如下所示:

#! /bin/bash
# get date param yesterday=$1

hadoop dfs -put /hdfs/logs/day/$1 /hdfs/logs/day/

  然后上傳腳本使用 crontab 來定時調度。

3.2清洗日志

  我們在確定 HDFS 存有數據后,對上傳的日志進行清洗(或過濾),抽起對統計指標有用的數據源,並將數據源重定向到 HDFS 目錄。,下面給出部分清洗代碼,內容如下:

Map類:

/**
 * 
 */
package cn.hdfs.mapreducer;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import cn.jpush.hdfs.utils.LogParserFactory;

/**
 * @author dengjie
 * @date 2015年4月1日
 * @description TODO
 */
public class LogMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    final String[] parsed = LogParserFactory.parse(value.toString());

    // 過濾掉靜態信息
    if (parsed[2].startsWith("GET /static/") || parsed[2].startsWith("GET /uc_server")) {
        return;
    }

    // 過掉開頭的特定格式字符串
    if (parsed[2].startsWith("GET /")) {
        parsed[2] = parsed[2].substring("GET /".length());
    } else if (parsed[2].startsWith("POST /")) {
        parsed[2] = parsed[2].substring("POST /".length());
    }

    // 過濾結尾的特定格式字符串
    if (parsed[2].endsWith(" HTTP/1.1")) {
        parsed[2] = parsed[2].substring(0, parsed[2].length() - " HTTP/1.1".length());
    }

    String str = "";
    for (int i = 0; i < parsed.length; i++) {
        if (i == (parsed.length - 1)) {
        str += parsed[i];
        } else {
        str += parsed[i] + ",";
        }
    }

    context.write(key, new Text(str));
    }

}

Reduce類:

/**
 * 
 */
package cn.hdfs.mapreducer;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author dengjie
 * @date 2015年4月1日
 * @description TODO
 */
public class LogReducer extends Reducer<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    for (Text v : values) {
        context.write(v, NullWritable.get());
    }
    }

}

LogParserFactory類:

/**
 * 
 */
package cn.hdfs.utils;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

/**
 * @author dengjie
 * @date 2015年4月1日
 * @description TODO
 */
public class LogParserFactory {
    public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
    public static final SimpleDateFormat dateformat = new SimpleDateFormat("yyyyMMddHHmmss");

    /**
     * 解析英文時間字符串
     * 
     * @param string
     * @return
     * @throws ParseException
     */
    private static Date parseDateFormat(String string) {
    Date parse = null;
    try {
        parse = FORMAT.parse(string);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return parse;
    }

    /**
     * 解析日志的行記錄
     * 
     * @param line
     * @return 數組含有5個元素,分別是ip、時間、url、狀態、流量
     */
    public static String[] parse(String line) {
    String ip = parseIP(line);
    String time = parseTime(line);
    String url = parseURL(line);
    String status = parseStatus(line);
    String traffic = parseTraffic(line);

    return new String[] { ip, time, url, status, traffic };
    }

    private static String parseTraffic(String line) {
    final String trim = line.substring(line.lastIndexOf("\"") + 1).trim();
    String traffic = trim.split(" ")[1];
    return traffic;
    }

    private static String parseStatus(String line) {
    final String trim = line.substring(line.lastIndexOf("\"") + 1).trim();
    String status = trim.split(" ")[0];
    return status;
    }

    private static String parseURL(String line) {
    final int first = line.indexOf("\"");
    final int last = line.lastIndexOf("\"");
    String url = line.substring(first + 1, last);
    return url;
    }

    private static String parseTime(String line) {
    final int first = line.indexOf("[");
    final int last = line.indexOf("+0800]");
    String time = line.substring(first + 1, last).trim();
    Date date = parseDateFormat(time);
    return dateformat.format(date);
    }

    private static String parseIP(String line) {
    String ip = line.split("- -")[0].trim();
    return ip;
    }
}

Main函數:

/**
 * 
 */
package cn.hdfs.main;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import cn.jpush.hdfs.mapreducer.LogMapper;
import cn.jpush.hdfs.mapreducer.LogReducer;
import cn.jpush.hdfs.utils.ConfigUtils;

/**
 * @author dengjie
 * @date 2015年4月1日
 * @description 將清洗后的日志重新存放指定的hdfs上
 */
public class LogCleanMR extends Configured implements Tool {

    @SuppressWarnings("deprecation")
    public int run(String[] args) throws Exception {

    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://cluster1");
    conf.set("dfs.nameservices", "cluster1");
    conf.set("dfs.ha.namenodes.cluster1", "nna,nns");
    conf.set("dfs.namenode.rpc-address.cluster1.nna", "10.211.55.26:9000");
    conf.set("dfs.namenode.rpc-address.cluster1.nns", "10.211.55.27:9000");
    conf.set("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

    final Job job = new Job(conf, LogCleanMR.class.getSimpleName());
    job.setJarByClass(LogCleanMR.class);
    job.setMapperClass(LogMapper.class);
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setReducerClass(LogReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    FileInputFormat.setInputPaths(job, args[0]);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    int status = job.waitForCompletion(true) ? 0 : 1;
    return status;
    }

    public static void main(String[] args) throws Exception {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
    args = new String[] { ConfigUtils.HDFS.LOGDFS_PATH, String.format("/hdfs/logs/redirect/day/%s", sdf.format(new Date())) };
    int res = ToolRunner.run(new Configuration(), new LogCleanMR(), args);
    System.exit(res);
    }

}

  清洗工作到這里就已經完成了,下面開始統計指標任務的開發。

3.3統計指標

  在這里,由於 Java API 代碼設計到實際的業務邏輯,我就直接使用 Hive SQL 來演示了統計結果了,若干有同學需要使用 Java API 來開發 Hive 應用,可參考《高可用Hadoop平台-集成Hive HAProxy》這篇博客,里面有講到如何使用 Java API 來操作 Hive。下面我們使用 Hive SQL 來進行統計。內容如下:

建表:

CREATE EXTERNAL TABLE portal(ip string, atime string, url string,status int,traffic int)PARTITIONED BY (logdate string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/hdfs/logdfs/portal'

創建分區:

ALTER TABLE portal ADD PARTITION(logdate='2015_01_02') 

加載數據:

LOCATION '/hdfs/logdfs/portal/2015_01_02'

  注:LOCATION 關鍵字后面的路徑是指定清洗后的的hdfs路徑

  下面創建臨時統計表,各表如下所示:

創建每日PV表: 

CREATE TABLE pv_2015_01_02 AS SELECT COUNT(1) AS PV FROM logdfs WHERE logdate='2015_01_02';

創建注冊用戶表:

CREATE TABLE register_2015_01_02 AS SELECT COUNT(1) AS REGUSER FROM logdfs WHERE logdate='2015_01_02' AND INSTR(url,'signup')>0;

創建IP表:

CREATE TABLE ip_2015_01_02 AS SELECT COUNT(1) AS IP FROM (SELECT DISTINCT ip from logdfs WHERE logdate='2015_01_02') tmp

創建跳出用戶表:

CREATE TABLE jumper_2015_01_02 AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM logdfs WHERE logdate='2015_01_02' GROUP BY ip HAVING times=1) e;

  最后我們將所有的結果匯總到一張 Hive 表,命令如下所示:

CREATE TABLE logdfs_2015_01_02 ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' AS SELECT '2015_01_02', a.pv, b.reguser, c.ip, d.jumper FROM pv_2015_01_02 a JOIN reguser_2015_01_02 b ON 1=1 JOIN ip_2015_01_02 c ON 1=1 JOIN jumper_2015_01_02 d ON 1=1 ;

  關於 JOIN ... ON 用法不熟悉的同學,可以參考《Hive基本操作》這篇文章。

4.總結

  這樣,我們對使用 Hive 基於 HDFS 平台進行數據分析統計的流程就完成了,這里也許會發現一個問題,操作 Hive SQL 命令出錯率是很高的,后面帶我將業務從 Java API 分離出來后,我會將操作 Hive 的 Java API 貼在這篇博客的后面。至於如何將統計的結果導出,后面會花一篇博客來贅述導出的流程。

5.結束語

  這篇博客就和大家分享到這里,如果實際研究過程中有什么疑問,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM