Flume + HDFS + Hive日志收集系統


最近一段時間,負責公司的產品日志埋點與收集工作,搭建了基於Flume+HDFS+Hive日志搜集系統。

 

一、日志搜集系統架構:

簡單畫了一下日志搜集系統的架構圖,可以看出,flume承擔了agent與collector角色,HDFS承擔了數據持久化存儲的角色。

作者搭建的服務器是個demo版,只用到了一個flume_collector,數據只存儲在HDFS。當然高可用的日志搜集處理系統架構是需要多台flume collector做負載均衡與容錯處理的。

clipboard

 

二、日志產生:

1、log4j配置,每隔1分鍾roll一個文件,如果1分鍾之內文件大於5M,則再生成一個文件。

<!-- 產品數據分析日志 按分鍾分 -->
        <RollingRandomAccessFile name="RollingFile_product_minute"
            fileName="${STAT_LOG_HOME}/${SERVER_NAME}_product.log"
            filePattern="${STAT_LOG_HOME}/${SERVER_NAME}_product.log.%d{yyyy-MM-dd-HH-mm}-%i">
            <PatternLayout charset="UTF-8"
                pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %level - %msg%xEx%n" />
            <Policies>
                <TimeBasedTriggeringPolicy interval="1"
                    modulate="true" />
                <SizeBasedTriggeringPolicy size="${EVERY_FILE_SIZE}" />
            </Policies>
            <Filters>
                <ThresholdFilter level="INFO" onMatch="ACCEPT"
                    onMismatch="NEUTRAL" />
            </Filters>
        </RollingRandomAccessFile>

 

roll后的文件格式如下

clipboard[1]

 

2、日志內容

json格式文件,最外層json按順序為:tableName,logRequest,timestamp,statBody,logResponse,resultCode,resultMsg

2016-11-30 09:18:21.916 INFO - {

    "tableName": "ReportView",

    "logRequest": {

         ***

    },

    "timestamp": 1480468701432,

    "statBody": {

        ***

    },

    "logResponse": {

        ***

    },

    "resultCode": 1,

    "resultFailMsg": ""

}

 

三、flume配置

 虛擬機環境,請見我的博客http://www.cnblogs.com/xckk/p/6000881.html

hadoop環境,請見我的另一篇博客http://www.cnblogs.com/xckk/p/6124553.html

此處flume環境是

centos1:flume-agent

centos2:flume-collector

 

1、flume agent配置,conf文件

a1.sources = skydataSource

a1.channels = skydataChannel

a1.sinks = skydataSink

a1.sources.skydataSource.type = spooldir

a1.sources.skydataSource.channels = skydataChannel

#日志目錄

a1.sources.skydataSource.spoolDir = /opt/flumeSpool

a1.sources.skydataSource.fileHeader = true

#日志內容處理完后,會生成.COMPLETED后綴的文件,同時.log文件每一分鍾roll一個,此處忽略.log文件與.COMPLETED文件

a1.sources.skydataSource.ignorePattern=([^_]+)|(.*(\.log)$)|(.*(\.COMPLETED)$)

a1.sources.skydataSource.basenameHeader=true

a1.sources.skydataSource.deserializer.maxLineLength=102400

#自定義攔截器,對json格式的源日志進行字段分隔,並添加timestamp,為后面的hdfsSink做處理,攔截器代碼見后面

a1.sources.skydataSource.interceptors=i1

a1.sources.skydataSource.interceptors.i1.type=com.skydata.flume_interceptor.HiveLogInterceptor2$Builder

a1.sinks.skydataSink.type = avro

a1.sinks.skydataSink.channel = skydataChannel

a1.sinks.skydataSink.hostname = centos2

a1.sinks.skydataSink.port = 4545

#此處配置deflate壓縮后,hive collector那邊一定也要相應配置解壓縮

a1.sinks.skydataSink.compression-type=deflate

a1.channels.skydataChannel.type=memory

a1.channels.skydataChannel.capacity=10000

a1.channels.skydataChannel.transactionCapacity=1000

 

2、flume collector配置

a1.sources = avroSource

a1.channels = memChannel

a1.sinks = hdfsSink

a1.sources.avroSource.type = avro

a1.sources.avroSource.channels = memChannel

a1.sources.avroSource.bind=centos2

a1.sources.avroSource.port=4545

#與flume agent配置對應

a1.sources.avroSource.compression-type=deflate

a1.sinks.hdfsSink.type = hdfs

a1.sinks.hdfsSink.channel = memChannel

# skydata_hive_log為hive表,按年-月-日分區存儲,

a1.sinks.hdfsSink.hdfs.path=hdfs://centos1:9000/flume/skydata_hive_log/dt=%Y-%m-%d

a1.sinks.hdfsSink.hdfs.batchSize=10000

a1.sinks.hdfsSink.hdfs.fileType=DataStream

a1.sinks.hdfsSink.hdfs.writeFormat=Text

a1.sinks.hdfsSink.hdfs.rollSize=10240000

a1.sinks.hdfsSink.hdfs.rollCount=0

a1.sinks.hdfsSink.hdfs.rollInterval=300

a1.channels.memChannel.type=memory

a1.channels.memChannel.capacity=100000

a1.channels.memChannel.transactionCapacity=10000

 

四、hive表創建與分區

 

1、hive表創建

在hive中執行建表語句后,hdfs://centos1:9000/flume/目錄下新生成了skydata_hive_log目錄。(建表語句里面有location關鍵字)

\u0001表示hive通過該分隔符進行字段分離,該字符在linux用vim編輯器打開是^A。

由於日志格式是JSON格式,因為需要將JSON格式轉換成\u0001字符分隔,並通過dt進行分區。這一步通過flume自定義攔截器來完成。

CREATE TABLE `skydata_hive_log`(

`tableNmae` string,

`logRequest` string,

`timestamp` bigint,

`statBody` string,

`logResponse` string,

`resultCode` int,

`resultFailMsg` string

)

PARTITIONED BY (

`dt` string)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '\u0001'

STORED AS INPUTFORMAT

'org.apache.hadoop.mapred.TextInputFormat'

OUTPUTFORMAT

'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

LOCATION

'hdfs://centos1:9000/flume/skydata_hive_log';

 

2、hive表分區

日志flume sink到hdfs上時,如果沒有對hive表預先進行分區,會出現日志已經上傳到hdfs目錄,但是hive表卻無法加載數據的情況。
這是因為hive表的分區沒有創建。因此要對表進行分區添加,這里對最近一年左右時間進行分區添加
分區腳本 init_flume_hive_table.sh
for ((i=-1;i<=365;i++))
do

        dt=$(date -d "$(date +%F) ${i} days" +%Y-%m-%d)

        echo date=$dt

        hive -e "ALTER TABLE skydata_hive_log ADD PARTITION(dt='${dt}')" >> logs/init_skydata_hive_log.out 2>>logs/init_skydata_hive_log.err

done

 

五、自定義flume攔截器

新建maven工程,攔截器HiveInterceptor2代碼如下。

package com.skydata.flume_interceptor;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;

import org.apache.flume.interceptor.TimestampInterceptor.Constants;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;

import com.google.common.base.Charsets;

import com.google.common.base.Joiner;

public class HiveLogInterceptor2 implements Interceptor

{

    private static Logger logger = LoggerFactory.getLogger(HiveLogInterceptor2.class);

    public static final String HIVE_SEPARATOR = "\001";

    public void close()

    {

        // TODO Auto-generated method stub

    }

    public void initialize()

    {

        // TODO Auto-generated method stub

    }

    public Event intercept(Event event)

    {

        String orginalLog = new String(event.getBody(), Charsets.UTF_8);

        try

        {

            String log = this.parseLog(orginalLog);

            // 設置時間,用於hdfsSink

            long now = System.currentTimeMillis();

            Map headers = event.getHeaders();

            headers.put(Constants.TIMESTAMP, Long.toString(now));

            event.setBody(log.getBytes());

        } catch (Throwable throwable)

        {

            logger.error(("errror when intercept,log [ " + orginalLog + " ] "), throwable);

            return null;

        }

        return event;

    }

    public List<Event> intercept(List<Event> list)

    {

        List<Event> events = new ArrayList<Event>();

        for (Event event : list)

        {

            Event interceptedEvent = this.intercept(event);

            if (interceptedEvent != null)

            {

                events.add(interceptedEvent);

            }

        }

        return events;

    }

    private static String parseLog(String log)

    {

        List<String> logFileds = new ArrayList<String>();

        String dt = log.substring(0, 10);

        String keyStr = "INFO - ";

        int index = log.indexOf(keyStr);

        String content = "";

        if (index != -1)

        {

            content = log.substring(index + keyStr.length(), log.length());

        }

        //針對不同OS,使用不同回車換行符號

        content = content.replaceAll("\r", "");

        content = content.replaceAll("\n", "\\\\" + System.getProperty("line.separator"));

        JSONObject jsonObj = JSONObject.parseObject(content);

        String tableName = jsonObj.getString("tableName");

        String logRequest = jsonObj.getString("logRequest");

        String timestamp = jsonObj.getString("timestamp");

        String statBody = jsonObj.getString("statBody");

        String logResponse = jsonObj.getString("logResponse");

        String resultCode = jsonObj.getString("resultCode");

        String resultFailMsg = jsonObj.getString("resultFailMsg");

        //字段分離

        logFileds.add(tableName);

        logFileds.add(logRequest);

        logFileds.add(timestamp);

        logFileds.add(statBody);

        logFileds.add(logResponse);

        logFileds.add(resultCode);

        logFileds.add(resultFailMsg);

        logFileds.add(dt);

        return Joiner.on(HIVE_SEPARATOR).join(logFileds);

    }

    public static class Builder implements Interceptor.Builder

    {

        public Interceptor build()

        {

            return new HiveLogInterceptor2();

        }

        public void configure(Context arg0)

        {

        }

    }

}

 

pom.xml增加如下配置,將flume攔截器工程進行maven打包,jar包與依賴包均拷到${flume-agent}/lib目錄

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <configuration>
                    <outputDirectory>
                        ${project.build.directory}
                    </outputDirectory>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <overWriteReleases>true</overWriteReleases>
                            <overWriteSnapshots>true</overWriteSnapshots>
                            <overWriteIfNewer>true</overWriteIfNewer>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

 

對日志用分隔符"\001"進行分隔,。經攔截器處理后的日志格式如下,^A即是"\001"

ReportView^A{"request":{},"requestBody":{"detailInfos":[],"flag":"","reportId":7092,"pageSize":0,"searchs":[],"orders":[],"pageNum":1}}^A1480468701432^A{"sourceId":22745,"reportId":7092,"projectId":29355,"userId":2532}^A{"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":"請求成功","httpCode":200,"timestamp":1480468701849},"statusCode":"OK"},"response":{}}^A1^A^A2016-11-30

 

至此,flume+Hdfs+Hive的配置均已完成。

后續可以通過mapreduce或者HQL對數據進行分析。

 

六、啟動運行與結果

 

1、啟動hadoop hdfs

參考我的前一篇文章:hadoop 1.2 集群搭建與環境配置  http://www.cnblogs.com/xckk/p/6124553.html

2、啟動flume_collector和flume_agent,由於flume啟動命令參數太多,自己寫了一個啟動腳本

start-Flume.sh

#!/bin/bash
jps -l|grep org.apache.flume.node.Application|awk '{print $1}'|xargs kill -9 2>&1 >/dev/null
cd "$(dirname "$0")"
cd ..
nohup bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 2>&1 > /dev/null &

3、hdfs查看數據

可以看到搜集的日志已經上傳到HDFS上

[root@centos1 bin]# rm -rf FlumeData.1480587273016.tmp 
[root@centos1 bin]# hadoop fs -ls /flume/skydata_hive_log/dt=2016-12-01/
Found 3 items
-rw-r--r--   3 root supergroup       5517 2016-12-01 08:12 /flume/skydata_hive_log/dt=2016-12-01/FlumeData.1480608753042.tmp
-rw-r--r--   3 root supergroup       5517 2016-12-01 08:40 /flume/skydata_hive_log/dt=2016-12-01/FlumeData.1480610453116
-rw-r--r--   3 root supergroup       5517 2016-12-01 08:44 /flume/skydata_hive_log/dt=2016-12-01/FlumeData.1480610453117
[root@centos1 bin]# 

 

4、啟動hive,查看數據,可以看到hive已經可以加載hdfs數據

[root@centos1 lib]# hive

Logging initialized using configuration in file:/root/apache-hive-1.2.1-bin/conf/hive-log4j.properties
hive> select * from skydata_hive_log limit 2;
OK
ReportView    {"request":{},"requestBody":{"detailInfos":[],"flag":"","reportId":7092,"pageSize":0,"searchs":[],"orders":[],"pageNum":1}}    1480468701432    {"sourceId":22745,"reportId":7092,"projectId":29355,"userId":2532}    {"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":"請求成功","httpCode":200,"timestamp":1480468701849},"statusCode":"OK"},"response":{}}    1        2016-12-01
ReportDesignResult    {"request":{},"requestBody":{"sourceId":22745,"detailInfos":[{"colName":"月份","flag":"0","reportId":7092,"colCode":"col_2_22745","pageSize":20,"type":"1","pageNum":1,"rcolCode":"col_25538","colType":"string","formula":"","id":25538,"position":"row","colId":181664,"dorder":1,"pColName":"月份","pRcolCode":"col_25538"},{"colName":"綜合利率(合計)","flag":"1","reportId":7092,"colCode":"col_11_22745","pageSize":20,"type":"1","pageNum":1,"rcolCode":"sum_col_25539","colType":"number","formula":"sum","id":25539,"position":"group","colId":181673,"dorder":1,"pColName":"綜合利率","pRcolCode":"col_25539"}],"flag":"bar1","reportId":7092,"reportName":"iiiissszzzV","pageSize":100,"searchs":[],"orders":[],"pageNum":1,"projectId":29355}}    1480468703586{"reportType":"bar1","sourceId":22745,"reportId":7092,"num":5,"usedFields":"月份$$綜合利率(合計)$$","projectId":29355,"userId":2532}    {"responseBody":{"statusCodeValue":200,"httpHeaders":{},"body":{"msg":"請求成功","reportId":7092,"httpCode":200,"timestamp":1480468703774},"statusCode":"OK"},"response":{}}    1        2016-12-01
Time taken: 2.212 seconds, Fetched: 2 row(s)
hive> 

 

七、常見問題與處理方法

1、FATAL: Spool Directory source skydataSource: { spoolDir: /opt/flumeSpool }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.

java.nio.charset.MalformedInputException: Input length = 1

可能原因:

1、字符編碼問題,spoolDir目錄下的日志文件必須是UTF-8

2、使用Spooling Directory Source的時候,一定要避免同時讀寫一個文件的情況,conf文件增加如下配置

a1.sources.skydataSource.ignorePattern=([^_]+)|(.*(\.log)$)|(.*(\.COMPLETED)$)

 

2、日志導入到hadoop目錄,但是hive表查詢無數據。如hdfs://centos1:9000/flume/skydata_hive_log/dt=2016-12-01/下面有數據,

hive查詢 select * from skydata_hive_log 卻無數據

可能原因:

1、建表的時候,沒有建立分區。即使flume進行了配置(a1.sinks.hdfsSink.hdfs.path=hdfs://centos1:9000/flume/skydata_hive_log/dt=%Y-%m-%d),但是表的分區結構沒有建立,因此文件導入到HDFS上后,HIVE並不能讀取。

解決方法:先創建分區,建立shell可執行文件,將該表的分區先建好

for ((i=-10;i<=365;i++))
do

        dt=$(date -d "$(date +%F) ${i} days" +%Y-%m-%d)

        echo date=$dt

        hive -e "ALTER TABLE skydata_hive_log ADD PARTITION(dt='${dt}')" >> logs/init_skydata_hive_log.out 2>>logs/init_skydata_hive_log.err

done

2、也可能是文件在hdfs上還是.tmp文件,仍然被hdfs在寫入。.tmp文件hive暫時無法讀取,只能讀取非.tmp文件。

解決方法:等待hdfs配置的roll間隔時間,或者達到一定大小后tmp文件重命名為hdfs上的日志文件后,再查詢hive,即可查到。

 

秀才坤坤出品

轉載請注明

原文地址:http://www.cnblogs.com/xckk/p/6125838.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM