日志分析_使用shell完整日志分析案例


一、需求分析

 1. 日志文件每天生成一份(需要將日志文件定時上傳至hdfs)
 2. 分析日志文件中包含的字段:訪問IP,訪問時間,訪問URL,訪問狀態,訪問流量
3. 現在有"昨日"的日志文件即logclean.jar 3. 需求指標 a. 統計PV值 b. 統計注冊人數 c. 統計IP數 d. 統計跳出率 f. 統計二跳率

二、數據分析

1. 數據采集 使用shell腳本定時上傳
2. 數據清洗 過濾字段 格式化時間等字段
3. 數據分析 使用一級分區(date)
4. 數據導出 sqoop
5. 使用到的框架有: shell腳本 hdfs mapreduce hive sqoop mysql
期望結果
  pv    register  ip    jumpprob    two_jumpprob

三、實施

1. 自動上傳到hdfs
     $HADOOP_HOME/bin/hdfs dfs -rm -r $HDFS_INPUT_PATH > /dev/null 2>&1
     $HADOOP_HOME/bin/hdfs dfs -mkdir -p $HDFS_INPUT_PATH/$yesterday > /dev/null 2>&1
     $HADOOP_HOME/bin/hdfs dfs -put $LOG_PATH  $HDFS_INPUT_PATH/$yesterday > /dev/null 2>&1
2. 數據清洗(使用mapreduce過濾臟數據與不需要的靜態數據及去雙引號,轉換date)
     $HADOOP_HOME/bin/hdfs dfs -rm -r $HDFS_OUTPUT_PATH > /dev/null 2>&1
     $HADOOP_HOME/bin/yarn jar $JAR_PATH $ENTRANCE $HDFS_INPUT_PATH/$yesterday $HDFS_OUTPUT_PATH/date=$yesterday
3. 在Hive中創建日志數據庫和分區表並將清洗后的文件加入分區
     $HIVE_HOME/bin/hive -e "create database if not exists $HIVE_DATABASE" > /dev/null 2>&1
     $HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "create external table if not exists $HIVE_TABLE(
     ip string,day string,url string) partitioned by (date string)
     row format delimited fields terminated by '\t' location '$HDFS_OUTPUT_PATH' "
     $HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "alter table $HIVE_TABLE add partition (date='$yesterday')"
4. 分析數據並使用sqoop導出至mysql
     pv:
        create table if not exists pv_tb(pv string) row format delimited fields terminated by '\t';
        insert overwrite table pv_tb select count(1) from weblog_clean where date='2016_11_13';
     register:
        create table if not exists register_tb(register string) row format delimited fields terminated by '\t';
        insert overwrite table register_tb select count(1) from weblog_clean where date='2016_11_13' and instr(url,'member.php?mod=register') > 0;
     ip:
        create table if not exists ip_tb(ip string) row format delimited fields terminated by '\t';
        insert overwrite table ip_tb select count(distinct ip) from weblog_clean where date='2016_11_13';
     jumpprob:
        create table if not exists jumpprob_tb(jump double) row format delimited fields terminated by '\t';
        insert overwrite table jumpprob_tb
        select ghip.singleip/aip.ips from (select count(1) singleip from(select count(ip) ips from weblog_clean where date='2016_11_13' group by ip having ips <2) gip) ghip,
        (select count(ip) ips from weblog_clean where date='2016_11_13') aip;
     two_jumpprob:
        create table if not exists two_jumpprob_tb(jump double) row format delimited fields terminated by '\t';
        insert overwrite table two_jumpprob_tb
        select ghip.singleip/aip.ips from (select count(1) singleip from(select count(ip) ips from weblog_clean where date='2016_11_13' group by ip having ips >=2) gip) ghip,
        (select count(ip) ips from weblog_clean where date='2016_11_13') aip;
     merge table # 注意上面幾個表是分開創建,效率比下面高,但存儲消耗上面較高
        create table if not exists log_result(pv string,register string,ip string,jumpprob double,two_jumpprob double ) row format delimited fields terminated by '\t';
        insert overwrite table log_result
        select log_pv.pv,log_register.register,log_ip.ip,log_jumpprob.jumpprob,log_two_jumpprob.two_jumpprob from (select count(1) pv from weblog_clean where date='2016_11_13') log_pv,
        (select count(1) register from weblog_clean where date='2016_11_13' and instr(url,'member.php?mod=register') > 0) log_register,
        (select count(distinct ip) ip from weblog_clean where date='2016_11_13') log_ip,
        (select ghip.singleip/aip.ips jumpprob from (select count(1) singleip from(select count(ip) ips from weblog_clean where date='2016_11_13' group by ip having ips <2) gip) ghip,
        (select count(ip) ips from weblog_clean where date='2016_11_13') aip) log_jumpprob,
        (select ghip.singleip/aip.ips two_jumpprob from (select count(1) singleip from(select count(ip) ips from weblog_clean where date='2016_11_13' group by ip having ips >=2) gip) ghip,
        (select count(ip) ips from weblog_clean where date='2016_11_13') aip) log_two_jumpprob;

四、結果展示

mysql> select * from weblog_result;
       +--------+----------+-------+----------+--------------+
       | pv     | register | ip    | jumpprob | two_jumpprob |
       +--------+----------+-------+----------+--------------+
       | 169857 | 28       | 10411 |     0.02 |         0.04 |
       +--------+----------+-------+----------+--------------+
       1 row in set (0.00 sec)

五、logclean.jar(過濾日志字段:日期轉換,去除雙引號,過去根url)

package org.apache.hadoop.log.project;

import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class LogClean extends Configured implements Tool {

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int res = ToolRunner.run(conf, new LogClean(), args);
            System.exit(res);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public int run(String[] args) throws Exception {
    	Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "logclean");
        // 設置為可以打包運行

        job.setJarByClass(LogClean.class);
        FileInputFormat.setInputPaths(job, args[0]);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 清理已存在的輸出文件
        FileSystem fs = FileSystem.get(new URI(args[0]), getConf());
        Path outPath = new Path(args[1]);
        if (fs.exists(outPath)) {
            fs.delete(outPath, true);
        }
        
        boolean success = job.waitForCompletion(true);
        if(success){
            System.out.println("Clean process success!");
        }
        else{
            System.out.println("Clean process failed!");
        }
        return 0;
    }

    static class MyMapper extends
            Mapper<LongWritable, Text, LongWritable, Text> {
        LogParser logParser = new LogParser();
        Text outputValue = new Text();

        protected void map(
                LongWritable key,
                Text value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)
                throws java.io.IOException, InterruptedException {
            final String[] parsed = logParser.parse(value.toString());

            // step1.過濾掉靜態資源訪問請求
            if (parsed[2].startsWith("GET /static/")
                    || parsed[2].startsWith("GET /uc_server")) {
                return;
            }
            // step2.過濾掉開頭的指定字符串
            if (parsed[2].startsWith("GET /")) {
                parsed[2] = parsed[2].substring("GET /".length());
            } else if (parsed[2].startsWith("POST /")) {
                parsed[2] = parsed[2].substring("POST /".length());
            }
            // step3.過濾掉結尾的特定字符串
            if (parsed[2].endsWith(" HTTP/1.1")) {
                parsed[2] = parsed[2].substring(0, parsed[2].length()
                        - " HTTP/1.1".length());
            }
            // step4.只寫入前三個記錄類型項
            outputValue.set(parsed[0] + "\t" + parsed[1] + "\t" + parsed[2]);
            context.write(key, outputValue);
        }
    }

    static class MyReducer extends
            Reducer<LongWritable, Text, Text, NullWritable> {
        protected void reduce(
                LongWritable k2,
                java.lang.Iterable<Text> v2s,
                org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            for (Text v2 : v2s) {
                context.write(v2, NullWritable.get());
            }
        };
    }

    /*
     * 日志解析類
     */
    static class LogParser {
        public static final SimpleDateFormat FORMAT = new SimpleDateFormat(
                "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
        public static final SimpleDateFormat dateformat1 = new SimpleDateFormat(
                "yyyyMMddHHmmss");

        public static void main(String[] args) throws ParseException {
            final String S1 = "27.19.74.143 - - [30/May/2013:17:38:20 +0800] \"GET /static/image/common/faq.gif HTTP/1.1\" 200 1127";
            LogParser parser = new LogParser();
            final String[] array = parser.parse(S1);
            System.out.println("樣例數據: " + S1);
            System.out.format(
                    "解析結果:  ip=%s, time=%s, url=%s, status=%s, traffic=%s",
                    array[0], array[1], array[2], array[3], array[4]);
        }

        /**
         * 解析英文時間字符串
         * 
         * @param string
         * @return
         * @throws ParseException
         */
        private Date parseDateFormat(String string) {
            Date parse = null;
            try {
                parse = FORMAT.parse(string);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            return parse;
        }

        /**
         * 解析日志的行記錄
         * 
         * @param line
         * @return 數組含有5個元素,分別是ip、時間、url、狀態、流量
         */
        public String[] parse(String line) {
            String ip = parseIP(line);
            String time = parseTime(line);
            String url = parseURL(line);
            String status = parseStatus(line);
            String traffic = parseTraffic(line);

            return new String[] { ip, time, url, status, traffic };
        }

        private String parseTraffic(String line) {
            final String trim = line.substring(line.lastIndexOf("\"") + 1)
                    .trim();
            String traffic = trim.split(" ")[1];
            return traffic;
        }

        private String parseStatus(String line) {
            final String trim = line.substring(line.lastIndexOf("\"") + 1)
                    .trim();
            String status = trim.split(" ")[0];
            return status;
        }

        private String parseURL(String line) {
            final int first = line.indexOf("\"");
            final int last = line.lastIndexOf("\"");
            String url = line.substring(first + 1, last);
            return url;
        }

        private String parseTime(String line) {
            final int first = line.indexOf("[");
            final int last = line.indexOf("+0800]");
            String time = line.substring(first + 1, last).trim();
            Date date = parseDateFormat(time);
            return dateformat1.format(date);
        }

        private String parseIP(String line) {
            String ip = line.split("- -")[0].trim();
            return ip;
        }
    }
}

六、完整shell,注意准備logclean.jar(用於日志過濾MR程序),與"昨日"的日志文件和文件位置

#!/bin/bash

echo -ne | cat <<eot
#############################################################################
##########################   普   度   眾   生    ###########################
                                  _oo0oo_                                    
                                 088888880                                   
                                 88" . "88                                   
                                 (| -_- |)                                   
                                  0\ = /0                                    
                               ___/'---'\___                                 
                             .' \\\\|     |// '.                             
                            / \\\\|||  :  |||// \\                           
                           /_ ||||| -:- |||||- \\                            
                          |   | \\\\\\  -  /// |   |                         
                          | \_|  ''\---/''  |_/ |                            
                          \  .-\__  '-'  __/-.  /                            
                        ___'. .'  /--.--\  '. .'___                          
                     ."" '<  '.___\_<|>_/___.' >'  "".                       
                    | | : '-  \'.;'\ _ /';.'/ - ' : | |                      
                    \  \ '_.   \_ __\ /__ _/   .-' /  /                      
                ====='-.____'.___ \_____/___.-'____.-'=====                  
                                  '=---='                                    
  
  
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                        佛祖保佑    iii    永不出錯
eot
##get yesterday date
yesterday=`date -d '-1 day' +'%Y_%m_%d'`
echo $yesterday
############
## define ##
############
HADOOP_HOME=/opt/cdh-5.6.3/hadoop-2.5.0-cdh5.3.6
HIVE_HOME=/opt/cdh-5.6.3/hive-0.13.1-cdh5.3.6
SQOOP_HOME=/opt/cdh-5.6.3/sqoop-1.4.5-cdh5.2.6
HIVE_DATABASE=weblog
HIVE_TABLE=weblog_clean
HIVE_RSTABLE=weblog_result
MYSQL_USERNAME=root
MYSQL_PASSWORD=root
EXPORT_DIR=/user/hive/warehouse/weblog.db/weblog_result
NUM_MAPPERS=1
#########################
##  get logfile path   ##
#########################
LOG_PATH=/home/liuwl/opt/datas/weblog/access_$yesterday.log
JAR_PATH=/home/liuwl/opt/datas/logclean.jar
ENTRANCE=org.apache.hadoop.log.project.LogClean
HDFS_INPUT_PATH=/weblog/source
HDFS_OUTPUT_PATH=/weblog/clean
SQOOP_JDBC=jdbc:mysql://hadoop09-linux-01.ibeifeng.com:3306/$HIVE_DATABASE
############################
## upload logfile to hdfs ##
############################
echo "start to upload logfile"
#$HADOOP_HOME/bin/hdfs dfs -rm -r $HDFS_INPUT_PATH > /dev/null 2>&1
HSFiles=`$HADOOP_HOME/bin/hdfs dfs -ls $HDFS_INPUT_PATH/$yesterday`
if [ -z "$HSFiles" ]; then
$HADOOP_HOME/bin/hdfs dfs -mkdir -p $HDFS_INPUT_PATH/$yesterday > /dev/null 2>&1
$HADOOP_HOME/bin/hdfs dfs -put $LOG_PATH  $HDFS_INPUT_PATH/$yesterday > /dev/null 2>&1
echo "upload ok"
else
echo "exists"
fi
###########################
## clean the source file ##
###########################
echo "start to clean logfile"
HCFiles=`$HADOOP_HOME/bin/hdfs dfs -ls $HDFS_OUTPUT_PATH`
if [ -z "$HCFiles" ]; then
$HADOOP_HOME/bin/yarn jar $JAR_PATH $ENTRANCE $HDFS_INPUT_PATH/$yesterday $HDFS_OUTPUT_PATH/date=$yesterday
echo "clean ok"
fi
###########################
## create the hive table ##
###########################
echo "start to create the hive table"
$HIVE_HOME/bin/hive -e "create database if not exists $HIVE_DATABASE" > /dev/null 2>&1
$HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "create external table if not exists $HIVE_TABLE(ip string,day string,url string) partitioned by (date string) row format delimited fields terminated by '\t' location '$HDFS_OUTPUT_PATH' "
echo "add patition to hive table"
$HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "alter table $HIVE_TABLE add partition (date='$yesterday')"
##################################
## create the hive reslut table ##
##################################
echo "start to create the hive reslut table"
$HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "create table if not exists $HIVE_RSTABLE(pv string,register string,ip string,jumpprob double,two_jumpprob double ) row format delimited fields terminated by '\t';"
#################
## insert data ##
#################
echo "start to insert data"
HTFiles=`$HADOOP_HOME/bin/hdfs dfs -ls $EXPORT_DIR`
if [ -z "$HTFiles" ]; then
$HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "insert overwrite table $HIVE_RSTABLE select log_pv.pv,log_register.register,log_ip.ip,log_jumpprob.jumpprob,log_two_jumpprob.two_jumpprob from (select count(1) pv from $HIVE_TABLE where date='$yesterday') log_pv,(select count(1) register from $HIVE_TABLE where date='$yesterday' and instr(url,'member.php?mod=register') > 0) log_register,(select count(distinct ip) ip from $HIVE_TABLE where date='$yesterday') log_ip,(select ghip.singleip/aip.ips jumpprob from (select count(1) singleip from(select count(ip) ips from $HIVE_TABLE where date='$yesterday' group by ip having ips <2) gip) ghip,(select count(ip) ips from $HIVE_TABLE where date='$yesterday') aip) log_jumpprob,(select ghip.singleip/aip.ips two_jumpprob from (select count(1) singleip from(select count(ip) ips from $HIVE_TABLE where date='$yesterday' group by ip having ips >=2) gip) ghip,(select count(ip) ips from $HIVE_TABLE where date='$yesterday') aip) log_two_jumpprob"
fi
###################################
## create the mysql reslut table ##
###################################
mysql -u$MYSQL_USERNAME -p$MYSQL_PASSWORD -e "
create database if not exists $HIVE_DATABASE DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
use $HIVE_DATABASE;
create table if not exists $HIVE_RSTABLE(pv varchar(20) not null,register varchar(20) not null,ip varchar(20) not null,jumpprob double(6,2) not null,two_jumpprob double(6,2) not null) DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci; 
truncate table if exists $HIVE_RSTABLE;
quit"
#######################################
## export hive result table to mysql ##
#######################################
echo "start to export hive result table to mysql"
$SQOOP_HOME/bin/sqoop export --connect $SQOOP_JDBC --username $MYSQL_USERNAME --password $MYSQL_PASSWORD --table $HIVE_RSTABLE --export-dir $EXPORT_DIR --num-mappers $NUM_MAPPERS --input-fields-terminated-by '\t'
echo "shell finished"


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM