數據倉庫(一)——用戶行為數據采集


一、數據倉庫概念

數據倉庫( Data Warehouse ),是為企業制定決策,提供數據支持的。可以幫助企業,改進業務流程、提高產品質量等。

數據倉庫的輸入數據通常包括:業務數據用戶行為數據爬蟲數據

業務數據:就是各行業在處理事務過程中產生的數據。比如用戶在電商網站中登錄、下單、支付等過程中,需要和網站后台數據庫進行增刪改查交互,產生的數據就是業務數據。業務數據通常存儲在MySQL、Oracle等數據庫中。
img

用戶行為數據:用戶在使用產品過程中,通過埋點收集與客戶端產品交互過程中產生的數據,並發往日志服務器進行保存。比如頁面瀏覽、點擊、停留、評論、點贊、收藏等。用戶行為數據通常存儲在日志文件中。
img

爬蟲數據:通常事通過技術手段獲取其他公司網站的數據。不建議同學們這樣去做。

img

二、項目需求及架構設計

2.1 項目需求分析

img

1)采集平台

​ (1)用戶行為數據采集平台搭建

​ (2)業務數據采集平台搭建

2)離線需求

主題 子主題 指標
流量主題 各渠道流量統計 當日各渠道獨立訪客數
當日各渠道會話總數
當日各渠道會話平均瀏覽頁面數
當日各渠道會話平均停留時長
當日各渠道跳出率
路徑統計 路徑分析
用戶主題 用戶變動統計 流失用戶數
回流用戶數
用戶留存統計 新增留存率
用戶新增活躍統計 新增用戶數
活躍用戶數
用戶行為漏斗分析 首頁瀏覽人數
商品詳情頁瀏覽人數
加購人數
下單人數
支付人數
新增下單用戶統計 新增下單人數
新增支付成功人數
最近7日內連續3日下單用戶數
商品主題 *復購率統計 最近30日各品牌復購率
各品牌商品下單統計 各品牌訂單數
各品牌訂單人數
各品類商品交易統計 各品類訂單數
各品類訂單人數
購物車存量統計 各分類商品購物車存量Top3
各品牌商品收藏次數Top3
下單到支付時間間隔平均值
各省份交易統計 各省份訂單數
各省份訂單金額
優惠券主題 優惠券使用率統計 使用次數
使用人數

3)實時需求

主題 子主題 指標
流量主題 各渠道流量統計 當日各渠道獨立訪客數
當日各渠道會話總數
當日各渠道會話平均瀏覽頁面數
當日各渠道會話平均停留時長
當日各渠道跳出率
流量分時統計 當日各小時獨立訪客數
當日各小時頁面瀏覽數
當日各小時新訪客數
新老訪客流量統計 各類訪客數
各類訪客頁面瀏覽數
各類訪客平均在線時長
各類訪客平均訪問頁面數
關鍵詞統計 當日各關鍵詞評分
用戶主題 用戶變動統計 當日回流用戶數
用戶新增活躍統計 當日新增用戶數
當日活躍用戶數
用戶行為漏斗分析 當日首頁瀏覽人數
當日商品詳情頁瀏覽人數
當日加購人數
當日下單人數
當日支付成功人數
新增交易用戶統計 當日新增下單人數
當日新增支付成功人數
商品主題 *復購率統計 最近 7/30 日截至當前各品牌復購率
各品牌商品交易統計 當日各品牌訂單數
當日各品牌訂單人數
當日各品牌訂單金額
當日各品牌退單數
當日各品牌退單人數
各品類商品交易統計 當日各品類訂單數
當日各品類訂單人數
當日各品牌訂單金額
當日各品類退單數
當日各品類退單人數
各 SPU 商品交易統計 當日各 SPU 訂單數
當日各 SPU 訂單人數
當日各 SPU 訂單金額
交易主題 交易綜合統計 當日訂單總額
當日訂單數
當日訂單人數
當日退單數
當日退單人數
各省份交易統計 當日各省份訂單數
當日各省份訂單金額
優惠券主題 優惠券補貼率統計 當日優惠券補貼率
活動主題 活動補貼率統計 當日活動補貼率

4)思考題

1、項目技術如何選型?

2、框架版本如何選型(Apache、CDH、HDP)

3、服務器使用物理機還是雲主機?

4、如何確認集群規模?(假設每台服務器8T硬盤)

2.2 項目框架

2.2.1 技術選型

image-20230603110138788

2.2.2 系統數據流程設計

離線數倉:

img

實時數倉:

image-20230603110334924

2.2.3 框架版本選型

img

2.2.4 服務器選型

img

2.2.5 集群規模

img

2.2.6 集群資源規划設計

在企業中通常會搭建一套生產集群和一套測試集群。生產集群運行生產任務,測試集群用於上線前代碼編寫和測試。

1)生產集群
(1)消耗內存的分開
(2)數據傳輸數據比較緊密的放在一起(Kafka 、Zookeeper)
(3)客戶端盡量放在一到兩台服務器上,方便外部訪問
(4)有依賴關系的盡量放到同一台服務器(例如:Hive和Azkaban Executor)

1 2 3 4 5 6 7 8 9 10
nn nn dn dn dn dn dn dn dn dn
rm rm nm nm nm nm nm nm
nm nm
zk zk zk
kafka kafka kafka
Flume Flume flume
Hbase Hbase Hbase
hive hive
mysql mysql
spark spark
Azkaban Azkaban ES ES

2)測試集群服務器規划

離線數倉:

服務名稱 子服務 服務器hadoop102 服務器hadoop103 服務器hadoop104
HDFS NameNode
DataNode
SecondaryNameNode
Yarn NodeManager
Resourcemanager
Zookeeper Zookeeper Server
Flume(采集日志) Flume
Kafka Kafka
Flume(消費Kafka) Flume
Hive Hive
MySQL MySQL
Sqoop Sqoop
Presto Coordinator
Worker
Azkaban AzkabanWebServer
AzkabanExecutorServer
Spark
Kylin
HBase HMaster
HRegionServer
Superset
Atlas
Solr Jar
服務數總計 19 8 8

實時數倉:

服務名稱 子服務 服務器hadoop102 服務器hadoop103 服務器hadoop104
HDFS NameNode
DataNode
SecondaryNameNode
Yarn NodeManager
Resourcemanager
Zookeeper Zookeeper Server
Flume(采集日志) Flume
Kafka Kafka
Flume(消費Kafka日志) Flume
Flume(消費Kafka業務) Flume
Hive
MySQL MySQL
DataX
Spark
DolphinScheduler ApiApplicationServer
AlertServer
MasterServer
WorkerServer
LoggerServer
Superset Superset
Flink
ClickHouse
Redis
Hbase
服務數總計 20 11 12

三、用戶行為日志

3.1 目標數據

我們要收集和分析的數據主要包括頁面數據事件數據曝光數據啟動數據錯誤數據

3.1.1 頁面

頁面數據主要記錄一個頁面的用戶訪問情況,包括訪問時間、停留時間、頁面路徑等信息。

img

字段名稱 字段描述
page_id 頁面idhome("首頁"),category("分類頁"),discovery("發現頁"),top_n("熱門排行"),favor("收藏頁"),search("搜索頁"),good_list("商品列表頁"),good_detail("商品詳情"),good_spec("商品規格"),comment("評價"),comment_done("評價完成"),comment_list("評價列表"),cart("購物車"),trade("下單結算"),payment("支付頁面"),payment_done("支付完成"),orders_all("全部訂單"),orders_unpaid("訂單待支付"),orders_undelivered("訂單待發貨"),orders_unreceipted("訂單待收貨"),orders_wait_comment("訂單待評價"),mine("我的"),activity("活動"),login("登錄"),register("注冊");
last_page_id 上頁id
page_item_type 頁面對象類型sku_id("商品skuId"),keyword("搜索關鍵詞"),sku_ids("多個商品skuId"),activity_id("活動id"),coupon_id("購物券id");
page_item 頁面對象id
sourceType 頁面來源類型promotion("商品推廣"),recommend("算法推薦商品"),query("查詢結果商品"),activity("促銷活動");
during_time 停留時間(毫秒)
ts 跳入時間

3.1.2 事件

事件數據主要記錄應用內一個具體操作行為,包括操作類型、操作對象、操作對象描述等信息。
img

字段名稱 字段描述
action_id 動作idfavor_add("添加收藏"),favor_canel("取消收藏"),cart_add("添加購物車"),cart_remove("刪除購物車"),cart_add_num("增加購物車商品數量"),cart_minus_num("減少購物車商品數量"),trade_add_address("增加收貨地址"),get_coupon("領取優惠券");注:對於下單、支付等業務數據,可從業務數據庫獲取。
item_type 動作目標類型sku_id("商品"),coupon_id("購物券");
item 動作目標id
ts 動作時間

3.1.3 曝光

曝光數據主要記錄頁面所曝光的內容,包括曝光對象,曝光類型等信息。
img

字段名稱 字段描述
displayType 曝光類型promotion("商品推廣"),recommend("算法推薦商品"),query("查詢結果商品"),activity("促銷活動");
item_type 曝光對象類型sku_id("商品skuId"),activity_id("活動id");
item 曝光對象id
order 曝光順序

3.1.4 啟動

啟動數據記錄應用的啟動信息。

img

字段名稱 字段描述
entry 啟動入口icon("圖標"),notification("通知"),install("安裝后啟動");
loading_time 啟動加載時間
open_ad_id 開屏廣告id
open_ad_ms 廣告播放時間
open_ad_skip_ms 用戶跳過廣告時間
ts 啟動時間

3.1.5 錯誤

錯誤數據記錄應用使用
過程中的錯誤信息,包括錯誤編號錯誤信息

字段名稱 字段描述
error_code 錯誤碼
msg 錯誤信息

3.2 數據埋點

3.2.1 主流埋點方式(了解)

目前主流的埋點方式,有代碼埋點(前端/后端)可視化埋點全埋點三種。

代碼埋點是通過調用埋點SDK函數,在需要埋點的業務邏輯功能位置調用接口,上報埋點數據。例如,我們對頁面中的某個按鈕埋點后,當這個按鈕被點擊時,可以在這個按鈕對應的 OnClick 函數里面調用SDK提供的數據發送接口,來發送數據。

可視化埋點只需要研發人員集成采集 SDK,不需要寫埋點代碼,業務人員就可以通過訪問分析平台的“圈選”功能,來“圈”出需要對用戶行為進行捕捉的控件,並對該事件進行命名。圈選完畢后,這些配置會同步到各個用戶的終端上,由采集 SDK 按照圈選的配置自動進行用戶行為數據的采集和發送。

全埋點是通過在產品中嵌入SDK,前端自動采集頁面上的全部用戶行為事件,上報埋點數據,相當於做了一個統一的埋點。然后再通過界面配置哪些數據需要在系統里面進行分析。

3.2.2 埋點數據上報時機

埋點數據上報時機包括兩種方式。

方式一,在離開該頁面時,上傳在這個頁面產生的所有數據(頁面、事件、曝光、錯誤等)。優點,批處理,減少了服務器接收數據壓力。缺點,不是特別及時。

方式二,每個事件、動作、錯誤等,產生后,立即發送。優點,響應及時。缺點,對服務器接收數據壓力比較大。

本次項目采用方式一埋點。

3.2.3 埋點數據日志結構

我們的日志結構大致可分為兩類,一是普通頁面埋點日志,二是啟動日志。

普通頁面日志結構,以頁面瀏覽為單位,即一個頁面瀏覽記錄,生成一條頁面埋點日志。一條完整的頁面日志包含,一個頁面瀏覽記錄,若干個用戶在該頁面所做的動作記錄,若干個該頁面的曝光記錄,以及一個在該頁面發生的報錯記錄。除上述行為信息,頁面日志還包含了這些行為所處的各種環境信息,包括用戶信息、時間信息、地理位置信息、設備信息、應用信息、渠道信息等。

(1)普通頁面埋點日志格式

{
  "common": {                    -- 環境信息
    "ar": "230000",              -- 地區編碼
    "ba": "iPhone",              -- 手機品牌
    "ch": "Appstore",            -- 渠道
    "is_new": "1",--是否首日使用,首次使用的當日,該字段值為1,過了24:00,該字段置為0。
    "md": "iPhone 8",               -- 手機型號
    "mid": "YXfhjAYH6As2z9Iq",      -- 設備id
    "os": "iOS 13.2.9",             -- 操作系統
    "uid": "485",                   -- 會員id
    "vc": "v2.1.134"                -- app版本號
  },
"actions": [                        --動作(事件)  
    {
      "action_id": "favor_add",     -- 動作id
      "item": "3",                  -- 目標id
      "item_type": "sku_id",        -- 目標類型
      "ts": 1585744376605           -- 動作時間戳
    }
  ],
  "displays": [
    {
      "displayType": "query",        -- 曝光類型
      "item": "3",                   -- 曝光對象id
      "item_type": "sku_id",         -- 曝光對象類型
      "order": 1,                    -- 出現順序
      "pos_id": 2                    -- 曝光位置
    },
    {
      "displayType": "promotion",
      "item": "6",
      "item_type": "sku_id",
      "order": 2,
      "pos_id": 1
    },
    {
      "displayType": "promotion",
      "item": "9",
      "item_type": "sku_id",
      "order": 3,
      "pos_id": 3
    },
    {
      "displayType": "recommend",
      "item": "6",
      "item_type": "sku_id",
      "order": 4,
      "pos_id": 2
    },
    {
      "displayType": "query ",
      "item": "6",
      "item_type": "sku_id",
      "order": 5,
      "pos_id": 1
    }
  ],
  "page": {                     -- 頁面信息
    "during_time": 7648,        -- 持續時間毫秒
    "item": "3",                -- 目標id
    "item_type": "sku_id",      -- 目標類型
    "last_page_id": "login",    -- 上頁類型
    "page_id": "good_detail",   -- 頁面ID
    "sourceType": "promotion"   -- 來源類型
  },
"err":{                     -- 錯誤
"error_code": "1234",       -- 錯誤碼
    "msg": "***********"    -- 錯誤信息
},
  "ts": 1585744374423  -- 跳入時間戳
}

(2)啟動日志格式

啟動日志以啟動為單位,及一次啟動行為,生成一條啟動日志。一條完整的啟動日志包括一個啟動記錄,一個本次啟動時的報錯記錄,以及啟動時所處的環境信息,包括用戶信息、時間信息、地理位置信息、設備信息、應用信息、渠道信息等。

{
  "common": {
    "ar": "370000",
    "ba": "Honor",
    "ch": "wandoujia",
    "is_new": "1",
    "md": "Honor 20s",
    "mid": "eQF5boERMJFOujcp",
    "os": "Android 11.0",
    "uid": "76",
    "vc": "v2.1.134"
  },
  "start": {   
    "entry": "icon",          -- icon手機圖標  notice 通知   install 安裝后啟動
    "loading_time": 18803,    -- 啟動加載時間
    "open_ad_id": 7,          -- 廣告頁ID
    "open_ad_ms": 3449,       -- 廣告總共播放時間
    "open_ad_skip_ms": 1989   --  用戶跳過廣告時點
  },
"err":{                      -- 錯誤
"error_code": "1234",        -- 錯誤碼
    "msg": "***********"     -- 錯誤信息
},
  "ts": 1585744304000
}

3.3 服務器和JDK准備

分別安裝hadoop102、hadoop103、hadoop104三台主機。

相關內容參考:https://www.cnblogs.com/wkfvawl/p/15369416.html

3.4 模擬數據

3.4.1 使用說明

1)將application.yml、gmall2020-mock-log-2021-01-22.jar、path.json、logback.xml上傳到hadoop102的/opt/module/applog目錄下

(1)創建applog路徑

[atguigu@hadoop102 module]$ mkdir /opt/module/applog

(2)上傳文件application.yml到/opt/module/applog目錄

2)配置文件
(1)application.yml文件
可以根據需求生成對應日期的用戶行為日志。

[atguigu@hadoop102 applog]$ vim application.yml
修改如下內容

# 外部配置打開
logging.config: "./logback.xml"
#業務日期  注意:並不是Linux系統生成日志的日期,而是生成數據中的時間
mock.date: "2020-06-14"

#模擬數據發送模式
#mock.type: "http"
#mock.type: "kafka"
mock.type: "log"

#http模式下,發送的地址
mock.url: "http://hdp1/applog"

#kafka模式下,發送的地址
mock:
  kafka-server: "hdp1:9092,hdp2:9092,hdp3:9092"
  kafka-topic: "ODS_BASE_LOG"

#啟動次數
mock.startup.count: 200
#設備最大值
mock.max.mid: 500000
#會員最大值
mock.max.uid: 100
#商品最大值
mock.max.sku-id: 35
#頁面平均訪問時間
mock.page.during-time-ms: 20000
#錯誤概率 百分比
mock.error.rate: 3
#每條日志發送延遲 ms
mock.log.sleep: 10
#商品詳情來源  用戶查詢,商品推廣,智能推薦, 促銷活動
mock.detail.source-type-rate: "40:25:15:20"
#領取購物券概率
mock.if_get_coupon_rate: 75
#購物券最大id
mock.max.coupon-id: 3
#搜索關鍵詞  
mock.search.keyword: "圖書,小米,iphone11,電視,口紅,ps5,蘋果手機,小米盒子"

(2)path.json,該文件用來配置訪問路徑
根據需求,可以靈活配置用戶點擊路徑。

[
    {"path":["home","good_list","good_detail","cart","trade","payment"],"rate":20 },
    {"path":["home","search","good_list","good_detail","login","good_detail","cart","trade","payment"],"rate":40 },
    {"path":["home","mine","orders_unpaid","trade","payment"],"rate":10 },
    {"path":["home","mine","orders_unpaid","good_detail","good_spec","comment","trade","payment"],"rate":5 },
    {"path":["home","mine","orders_unpaid","good_detail","good_spec","comment","home"],"rate":5 },
    {"path":["home","good_detail"],"rate":10 },
    {"path":["home"  ],"rate":10 }
]

(3)logback配置文件
可配置日志生成路徑,修改內容如下

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="/opt/module/applog/log" />
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!-- 將某一個包下日志單獨打印日志 -->
    <logger name="com.atgugu.gmall2020.mock.log.util.LogUtil"
            level="INFO" additivity="false">
        <appender-ref ref="rollingFile" />
        <appender-ref ref="console" />
    </logger>

    <root level="error"  >
        <appender-ref ref="console" />
    </root>
</configuration>

3)生成日志
(1)進入到/opt/module/applog路徑,執行以下命令

[atguigu@hadoop102 applog]$ java -jar gmall2020-mock-log-2021-01-22.jar

(2)在/opt/module/applog/log目錄下查看生成日志

[atguigu@hadoop102 log]$ ll

3.4.2 集群日志生成腳本

在hadoop102的/home/atguigu目錄下創建bin目錄,這樣腳本可以在服務器的任何目錄執行。

[atguigu@hadoop102 ~]$ echo $PATH
/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/home/atguigu/.local/bin:/home/atguigu/bin

(1)在/home/atguigu/bin目錄下創建腳本lg.sh

[atguigu@hadoop102 bin]$ vim lg.sh

(2)在腳本中編寫如下內容

#!/bin/bash
for i in hadoop102 hadoop103; do
    echo "========== $i =========="
    ssh $i "cd /opt/module/applog/; java -jar gmall2020-mock-log-2021-01-22.jar >/dev/null 2>&1 &"
done 

注:
①/opt/module/applog/為jar包及配置文件所在路徑
/dev/null代表Linux的空設備文件,所有往這個文件里面寫入的內容都會丟失,俗稱“黑洞”。

標准輸入0:從鍵盤獲得輸入 /proc/self/fd/0
標准輸出1:輸出到屏幕(即控制台) /proc/self/fd/1
錯誤輸出2:輸出到屏幕(即控制台) /proc/self/fd/2

這里意思是 1給黑洞,2給1

(3)修改腳本執行權限

[atguigu@hadoop102 bin]$ chmod u+x lg.sh

(4)將jar包及配置文件上傳至hadoop103的/opt/module/applog/路徑
(5)啟動腳本

[atguigu@hadoop102 module]$ lg.sh 

(6)分別在hadoop102、hadoop103的/opt/module/applog/log目錄上查看生成的數據

[atguigu@hadoop102 logs]$ ls
app.2021-01-22.log
[atguigu@hadoop103 logs]$ ls
app.2020-01-22.log

四、數據采集模塊

數據采集模塊

相關內容參考:https://www.cnblogs.com/wkfvawl/p/15369416.html

4.1 Hadoop 安裝

集群規划

服務器hadoop102 服務器hadoop103 服務器hadoop104
HDFS NameNodeDataNode DataNode DataNodeSecondaryNameNode
Yarn NodeManager ResourcemanagerNodeManager NodeManager

注意:盡量使用離線方式安裝
安裝過程參見:https://www.cnblogs.com/wkfvawl/p/15369416.html

4.2 Zookeeper 安裝

集群規划

服務器hadoop102 服務器hadoop103 服務器hadoop104
Zookeeper Zookeeper Zookeeper Zookeeper

安裝過程參見:https://www.cnblogs.com/wkfvawl/p/15539847.html

4.3 Kafka 安裝

集群規划

服務器hadoop102 服務器hadoop103 服務器hadoop104
Kafka Kafka Kafka Kafka

安裝過程參見:https://www.cnblogs.com/wkfvawl/p/15579066.html

4.3.1 創建Kafka Topic

進入到/opt/module/kafka/目錄下創建日志主題

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper  hadoop102:2181,hadoop103:2181,hadoop104:2181 --create  --replication-factor 1 --partitions 1 --topic topic_log

**4.3.2 **項目經驗之Kafka機器數量計算

Kafka機器數量(經驗公式)= 2 *(峰值生產速度 * 副本數 / 100)+ 1

先拿到峰值生產速度,再根據設定的副本數,就能預估出需要部署Kafka的數量。

1)峰值生產速度

峰值生產速度可以壓測得到。

2)副本數

副本數默認是1個,在企業里面2-3個都有,2個居多。

副本多可以提高可靠性,但是會降低網絡傳輸效率。

比如我們的峰值生產速度是50M/s。副本數為2。

Kafka機器數量 = 2 *(50 * 2 / 100)+ 1 = 3台

4.4 采集日志Flume

采集日志Flume主要需要完成的任務為將日志從落盤文件中采集出來,傳輸給消息中間件Kafka集群,這期間要數據不丟失,程序出現故障死機后可以快速重啟,對日志進行初步分類,分別發往不同的Kafka Topic,方便后續對日志數據進行分別處理。

4.4.1 Flume安裝

按照規划,需要采集的用戶行為日志文件分布在hadoop102,hadoop103兩台日志服務器,故需要在hadoop102,hadoop103兩台節點配置日志采集Flume。日志采集Flume需要采集日志文件內容,並對日志格式(JSON)進行校驗,然后將校驗通過的日志發送到Kafka。

服務器hadoop102 服務器hadoop103 服務器hadoop104
Flume(采集日志) Flume Flume

安裝過程參見:https://www.cnblogs.com/wkfvawl/p/15603589.html

4.4.2 Flume組件選型

1)Source

(1)Taildir Source相比Exec Source、Spooling Directory Source的優勢

  • TailDir Source:斷點續傳、多目錄。Flume1.6以前需要自己自定義Source記錄每次讀取文件位置,實現斷點續傳。不會丟數據,但是有可能會導致數據重復。

  • Exec Source:可以實時搜集數據,但是在Flume不運行或者Shell命令出錯的情況下,數據將會丟失。

  • Spooling Directory Source:監控目錄,支持斷點續傳。

(2)batchSize大小如何設置?

答:Event 1K左右時,500-1000合適(默認為100)

2)Channel
采用Kafka Channel

img

省去了Sink,提高了效率。KafkaChannel數據存儲在Kafka里面,所以數據是存儲在磁盤中。

注意在Flume1.7以前,Kafka Channel很少有人使用,因為發現parseAsFlumeEvent這個配置起不了作用。也就是無論parseAsFlumeEvent配置為true還是false,都會轉為Flume Event。這樣的話,造成的結果是,會始終都把Flume的headers中的信息混合着內容一起寫入Kafka的消息中,這顯然不是我所需要的,我只是需要把內容寫入即可。

4.4.3 Flume 配置

1)Flume配置分析

img

image-20230603131110546

Flume直接讀log日志的數據,log日志的格式是app.yyyy-mm-dd.log。采集的數據可能不完整,需要用攔截器進行攔截,但這里只能進行簡單的數據清洗。

2)Flume的具體配置如下:

(1)在/opt/module/flume/conf目錄下創建file-flume-kafka.conf文件

[atguigu@hadoop102 conf]$ vim file-flume-kafka.conf

在文件配置如下內容

# 為各組件命名
a1.sources = r1
a1.channels = c1

# 描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder

# 描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

# 綁定source和channel以及sink和channel的關系
a1.sources.r1.channels = c1

注意:com.atguigu.flume.interceptor.ETLInterceptor是自定義的攔截器的全類名。需要根據用戶自定義的攔截器做相應修改。

4.4.4 Flume攔截器

ETL攔截器主要用於過濾JSON數據不完整的日志。

1)創建Maven工程flume-interceptor

2)創建包名:com.atguigu.flume.interceptor

3)在pom.xml文件中添加如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

注意:scope中provided的含義是編譯時用該jar包。打包時時不用。因為集群上已經存在flume的jar包。只是本地編譯時用一下。

4)在com.atguigu.flume.interceptor包下創建JSONUtils類

package com.atguigu.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;

public class JSONUtils {
    /*
     * 通過異常判斷是否是json字符串
     * 是:返回true  不是:返回false
    * */
    public static boolean isJSONValidate(String log){
        try {
            JSON.parse(log);
            return true;
        }catch (JSONException e){ 
            return false;
        }
    }
}

5)在com.atguigu.flume.interceptor包下創建ETLInterceptor類

package com.atguigu.flume.interceptor;

import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }
    @Override
    public Event intercept(Event event) {
        // 1、獲取body當中的數據並轉成字符串
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        // 2、判斷字符串是否是一個合法的json,是:返回當前event;不是:返回null
        if (JSONUtils.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }
    @Override
    public List<Event> intercept(List<Event> list) {

        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }
        return list;
    }
    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {
        }
    }
    @Override
    public void close() {
    }
}

6)打包

img

7)需要先將打好的包放入到hadoop102的/opt/module/flume/lib文件夾下面。

[atguigu@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

8)分發Flume到hadoop103、hadoop104

[atguigu@hadoop102 module]$ xsync flume/

9)分別在hadoop102、hadoop103上啟動Flume

[atguigu@hadoop102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

[atguigu@hadoop103 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

4.4.5 測試Flume-Kafka通道

(1)生成日志

[atguigu@hadoop102 ~]$ lg.sh

(2)消費Kafka數據,觀察控制台是否有數據獲取到

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic topic_log

說明:如果獲取不到數據,先檢查Kafka、Flume、Zookeeper是否都正確啟動。再檢查Flume的攔截器代碼是否正常。

4.4.6 日志采集Flume啟動停止腳本

(1)在/home/atguigu/bin目錄下創建腳本f1.sh

[atguigu@hadoop102 bin]$ vim f1.sh

在腳本中填寫如下內容

#! /bin/bash
case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------啟動 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &"
        done
};;    
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac

說明1:nohup,該命令可以在你退出帳戶/關閉終端之后繼續運行相應的進程。nohup就是不掛起的意思,不掛斷地運行命令。
說明2:awk 默認分隔符為空格
說明3:$2是在“”雙引號內部會被解析為腳本的第二個參數,但是這里面想表達的含義是awk的第二個值,所以需要將他轉義,用$2表示。
說明4:xargs 表示取出前面命令運行的結果,作為后面命令的輸入參數。
(2)增加腳本執行權限

[atguigu@hadoop102 bin]$ chmod u+x f1.sh

(3)f1集群啟動腳本

[atguigu@hadoop102 module]$ f1.sh start

(4)f1集群停止腳本

[atguigu@hadoop102 module]$ f1.sh stop

4.5 消費Kafka數據Flume

集群規划

服務器hadoop102 服務器hadoop103 服務器hadoop104
Flume(消費Kafka) Flume

4.5.1 項目經驗之Flume組件選型

1)FileChannel和MemoryChannel區別

MemoryChannel傳輸數據速度更快,但因為數據保存在JVM的堆內存中,Agent進程掛掉會導致數據丟失,適用於對數據質量要求不高的需求。

FileChannel傳輸速度相對於Memory慢,但數據安全保障高,Agent進程掛掉也可以從失敗中恢復數據。

選型:

金融類公司、對錢要求非常准確的公司通常會選擇FileChannel

傳輸的是普通日志信息(京東內部一天丟100萬-200萬條,這是非常正常的),通常選擇MemoryChannel。

2)FileChannel優化

通過配置dataDirs指向多個路徑,每個路徑對應不同的硬盤,增大Flume吞吐量。
官方說明如下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也盡量配置在不同硬盤對應的目錄中,保證checkpoint壞掉后,可以快速使用backupCheckpointDir恢復數據。
img

3)Sink:HDFS Sink

(1)HDFS存入大量小文件,有什么影響?
元數據層面:每個小文件都有一份元數據,其中包括文件路徑,文件名,所有者,所屬組,權限,創建時間等,這些信息都保存在Namenode內存中。所以小文件過多,會占用Namenode服務器大量內存,影響Namenode性能和使用壽命

計算層面:默認情況下MR會對每個小文件啟用一個Map任務計算,非常影響計算性能。同時也影響磁盤尋址時間。

(2)HDFS小文件處理
官方默認的這三個參數配置寫入HDFS后會產生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基於以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個參數綜合作用,效果如下:

①文件在達到128M時會滾動生成新文件
②文件創建超3600秒時會滾動生成新文件

4.5.2 消費者Flume配置

1)Flume配置分析
img

加上時間戳攔截器,用來解決零點漂移問題

為什么會發生零點漂移?

比如2022-01-24 23:59:59生成的日志文件,然后數據經過第一層的flume采集,加上kafka的緩沖,然后到 集群的另一台上的第二層的flume的時候,時間肯定就會到2022-01-25 00:00:XX了,這樣一來,如果采用當前系統時間作為timestamp的話,2022-01-24 的日志數據就會上傳到hdfs上的2022-01-25 的目錄下。因為Kafka Source會為其加上該header,value為當前系統的時間戳Kafka Source會為其加上該header,value為當前系統的時間戳

所以我們需要再第二層flume里面寫這么一個加時間戳的攔截器,把日志文件里面的時間添加到event的header里面

將消費日志層Flume Agent程序部署在hadoop104上,實現hadoop102、hadoop103負責日志的生成和采集,hadoop104負責日志的消費存儲。在實際生成環境中盡量做到將不同的任務部署在不同的節點服務器上。

2)Flume的具體配置如下:
(1)在hadoop104的/opt/module/flume/conf目錄下創建kafka-flume-hdfs.conf文件

[atguigu@hadoop104 conf]$ vim kafka-flume-hdfs.conf

在文件配置如下內容

## 組件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# kafka scource每次從kafka topic中拉取的event個數
a1.sources.r1.batchSize = 5000
# 拉取數據批次間隔為2000毫秒
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder

## channel1
a1.channels.c1.type = file
# file channel傳輸數據的斷點信息目錄
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
# file channel傳輸數據的存儲位置
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/


## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false

#控制生成的小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

## 控制輸出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop

## 拼裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

4.5.3 Flume時間戳攔截器

由於Flume默認會用Linux系統時間,作為輸出到HDFS路徑的時間。如果數據是23:59分產生的。Flume消費Kafka里面的數據時,有可能已經是第二天了,那么這部門數據會被發往第二天的HDFS路徑。我們希望的是根據日志里面的實際時間,發往HDFS的路徑,所以下面攔截器作用是獲取日志中的實際時間。

解決的思路:攔截json日志,通過fastjson框架解析json,獲取實際時間ts。將獲取的ts時間寫入攔截器header頭,header的key必須是timestamp,因為Flume框架會根據這個key的值識別為時間,寫入到HDFS。
1)在com.atguigu.flume.interceptor包下創建TimeStampInterceptor類

package com.atguigu.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimeStampInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }
    @Override
    public Event intercept(Event event) {
        // 將日志攔下,取出header里面的key , 取出body里面的對應的日志時間;  將ts的值賦值給header的key  timestamp
        // 1 獲取header頭
        Map<String, String> headers = event.getHeaders();
        // 2 獲取body中的ts
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        JSONObject jsonObject = JSONObject.parseObject(log);
        String ts = jsonObject.getString("ts");
        // 3 將ts賦值給timestamp
        headers.put("timestamp", ts);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }
    @Override
    public void close() {
    }
    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }
        @Override
        public void configure(Context context) {
        }
    }
}

2)重新打包
3)需要先將打好的包放入到hadoop102的/opt/module/flume/lib文件夾下面。

[atguigu@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

4)分發Flume到hadoop103、hadoop104

[atguigu@hadoop102 module]$ xsync flume/

4.5.4 消費者Flume啟動停

在腳本中填寫如下內容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop104
        do
                echo " --------啟動 $i 消費flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt   2>&1 &"
        done
};;
"stop"){
        for i in hadoop104
        do
                echo " --------停止 $i 消費flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
        done

};;
esac

(2)增加腳本執行權限

[atguigu@hadoop102 bin]$ chmod u+x f2.sh

(3)f2集群啟動腳本

[atguigu@hadoop102 module]$ f2.sh start

(4)f2集群停止腳本

[atguigu@hadoop102 module]$ f2.sh stop

4.5.5 項目經驗之Flume內存優化

1)問題描述:如果啟動消費Flume拋出如下異常

ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解決方案步驟
(1)在hadoop102服務器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

(2)同步配置到hadoop103、hadoop104服務器

[atguigu@hadoop102 conf]$ xsync flume-env.sh

3)Flume內存參數設置及優化

JVM heap一般設置為4G或更高

-Xmx與-Xms最好設置一致,減少內存抖動帶來的性能影響,如果設置不一致容易導致頻繁fullgc。

-Xms表示JVM Heap(堆內存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆內存)最大允許的尺寸,按需分配。如果不設置一致,容易在初始化時,由於內存不夠,頻繁觸發fullgc。

4.6 采集通道啟動/停止腳本

(1)在/home/atguigu/bin目錄下創建腳本cluster.sh

[atguigu@hadoop102 bin]$ vim cluster.sh

在腳本中填寫如下內容

#!/bin/bash

case $1 in
"start"){
        echo ================== 啟動 集群 ==================

        #啟動 Zookeeper集群
        zk.sh start

        #啟動 Hadoop集群
        hdp.sh start

        #啟動 Kafka采集集群
        kf.sh start

        #啟動 Flume采集集群
        f1.sh start

        #啟動 Flume消費集群
        f2.sh start

        };;
"stop"){
        echo ================== 停止 集群 ==================

        #停止 Flume消費集群
        f2.sh stop

        #停止 Flume采集集群
        f1.sh stop

        #停止 Kafka采集集群
        kf.sh stop

        #停止 Hadoop集群
        hdp.sh stop

        #停止 Zookeeper集群
        zk.sh stop

};;
esac

(2)增加腳本執行權限

[atguigu@hadoop102 bin]$ chmod u+x cluster.sh    

(3)cluster集群啟動腳本

[atguigu@hadoop102 module]$ cluster.sh start

(4)cluster集群停止腳本

[atguigu@hadoop102 module]$ cluster.sh stop

產生日志

[atguigu@hadoop102 module]$ lg.sh

查看HDFS

img


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM