指標統計:基於流計算 Oceanus(Flink) 實現實時 UVPV 統計


作者:吳雲濤,騰訊 CSIG 高級工程師
導語 | 最近梳理了一下如何用 Flink 來實現實時的 UV、PV 指標的統計,並和公司內微視部門的同事交流。然后針對該場景做了簡化,並發現使用 Flink SQL 來 實現這些指標的統計會更加便捷。

一 解決方案描述

1.1 概述

本方案結合本地自建 Kafka 集群、騰訊雲流計算 Oceanus(Flink)、雲數據庫 Redis 對博客、購物等網站 UV、PV 指標進行實時可視化分析。分析指標包含網站的獨立訪客數量(UV )、產品的點擊量(PV)、轉化率(轉化率 = 成交次數 / 點擊量)等。相關概念介紹:UV(Unique Visitor):獨立訪客數量。訪問您網站的一台客戶端為一個訪客,如用戶對同一頁面訪問了 5 次,那么該頁面的 UV 只加 1,因為 UV 統計的是去重后的用戶數而不是訪問次數。PV(Page View):點擊量或頁面瀏覽量。如用戶對同一頁面訪問了 5 次,那么該頁面的 PV 會加 5。

 


1.2 方案架構及優勢

根據以上實時指標統計場景,設計了如下架構圖:

 

涉及產品列表:

  • 本地數據中心(IDC)的自建 Kafka 集群

  • 私有網絡 VPC

  • 專線接入/雲聯網/VPN連接/對等連接

  • 流計算 Oceanus (Flink)

  • 雲數據庫 Redis

二 前置准備

購買所需的騰訊雲資源,並打通網絡。自建的 Kafka 集群需根據集群所在區域需采用 VPN 連接、專線連接或對等連接的方式來實現網絡互通互聯。

2.1 創建私有網絡 VPC

私有網絡(VPC)是一塊在騰訊雲上自定義的邏輯隔離網絡空間,在構建 Oceanus 集群、Redis 組件等服務時選擇的網絡建議選擇同一個 VPC,網絡才能互通。否則需要使用對等連接、NAT 網關、VPN 等方式打通網絡。私有網絡創建步驟請參考幫助文檔

2.2 創建 Oceanus 集群

流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。

在 Oceanus 控制台的【集群管理】->【新建集群】頁面創建集群,選擇地域、可用區、VPC、日志、存儲,設置初始密碼等。VPC 及子網使用剛剛創建好的網絡。創建完后 Flink 的集群如下:

 


2.3 創建 Redis 集群

Redis 控制台的【新建實例】頁面創建集群,選擇與其他組件同一地域,同區域的同一私有網絡 VPC,這里還選擇同一子網。

 

2.4 配置自建 Kafka 集群

2.4.1 修改自建 Kafka 集群配置

自建 Kafka 集群連接時 bootstrap-servers 參數常常使用 hostname 而不是 ip 來連接。但用自建 Kafka 集群連接騰訊雲上的 Oceanus 集群為全托管集群, Oceanus 集群的節點上無法解析自建集群的 hostname 與 ip 的映射關系,所以需要改監聽器地址由 hostname 為 ip 地址連接的形式。將 config/server.properties 配置文件中 advertised.listeners 參數配置為IP地址。示例:

# 0.10.X及以后版本
advertised.listeners=PLAINTEXT://10.1.0.10:9092
# 0.10.X之前版本
advertised.host.name=PLAINTEXT://10.1.0.10:9092

修改后重啟 Kafka 集群。

! 若在雲上使用到自建的zookeeper地址,也需要將zk配置中的hostname修改IP地址形式。

2.4.2 模擬發送數據到topic

本案例使用topic為topic為 uvpv-demo。1)Kafka 客戶端進入自建 Kafka 集群節點,啟動 Kafka 客戶端,模擬發送數據。

./bin/kafka-console-producer.sh --broker-list 10.1.0.10:9092 --topic uvpv-demo
>{"record_type":0, "user_id": 2, "client_ip": "100.0.0.2", "product_id": 101, "create_time": "2021-09-08 16:20:00"}
>{"record_type":0, "user_id": 3, "client_ip": "100.0.0.3", "product_id": 101, "create_time": "2021-09-08 16:20:00"}
>{"record_type":1, "user_id": 2, "client_ip": "100.0.0.1", "product_id": 101, "create_time": "2021-09-08 16:20:00"}

2)使用腳本發送腳本一:Java 代碼參考:https://cloud.tencent.com/document/product/597/54834腳本二:Python 腳本。參考之前案例中 python 腳本進行適當修改即可:《視頻直播:實時數據可視化分析》

2.5 打通自建 IDC 集群到騰訊雲網絡通信

自建 Kafka 集群聯通騰訊雲網絡,可通過以下前 3 種方式打通自建 IDC 到騰訊雲的網絡通信。

  • 專線接入

    適用於本地數據中心 IDC 與騰訊雲網絡打通。

     

  • 雲聯網

    適用於本地數據中心 IDC 與騰訊雲網絡打通,也可用於雲上不同地域間私有網絡 VPC 打通。

     

  • VPN連接

    適用於本地數據中心 IDC 與騰訊雲網絡打通。

     

  • 對等連接+ NAT網關

    適合雲上不同地域間私有網絡 VPC 打通,不適合本地 IDC 到騰訊雲網絡。

 


本方案中使用了 VPN 連接的方式,實現本地 IDC 和雲上網絡的通信。參考鏈接:建立 VPC 到 IDC 的連接(路由表)根據方案繪制了下面的網絡架構圖:

 

三 方案實現

 

3.1 業務目標

利用流計算 Oceanus 實現網站 UV、PV、轉化率指標的實時統計,這里只列取以下3種統計指標:

  • 網站的獨立訪客數量 UV。Oceanus 處理后在 Redis 中通過 set 類型存儲獨立訪客數量,同時也達到了對同一訪客的數據去重的目的。

  • 網站商品頁面的點擊量 PV。Oceanus 處理后在 Redis 中使用 list 類型存儲頁面點擊量。

  • 轉化率(轉化率 = 成交次數 / 點擊量)。Oceanus 處理后在 Redis 中用 String 存儲即可。

3.2 源數據格式

Kafka topic:uvpv-demo(瀏覽記錄)

字段
類型
含義
record_type int 客戶號
user_id varchar 客戶ip地址
client_ip varchar 房間號
product_id Int 進入房間時間
create_time timestamp 創建時間

Kafka 內部采用 json 格式存儲,數據格式如下:

# 瀏覽記錄
{
 "record_type":0,  # 0 表示瀏覽記錄
 "user_id": 6,
 "client_ip": "100.0.0.6",
 "product_id": 101,
 "create_time": "2021-09-06 16:00:00"
}

# 購買記錄
{
 "record_type":1, # 1 表示購買記錄
 "user_id": 6,
 "client_ip": "100.0.0.8",
 "product_id": 101,
 "create_time": "2021-09-08 18:00:00"
}

3.3 編寫 Flink SQL 作業

示例中實現了 UV、PV 和轉化率3個指標的獲取邏輯,並寫入 Sink 端。

1、定義 Source

CREATE TABLE `input_web_record` (
  `record_type` INT,
  `user_id` INT,
  `client_ip` VARCHAR,
  `product_id` INT,
  `create_time` TIMESTAMP,
  `times` AS create_time,
  WATERMARK FOR times AS times - INTERVAL '10' MINUTE
) WITH (
    'connector' = 'kafka',   -- 可選 'kafka','kafka-0.11'. 注意選擇對應的內置 Connector
    'topic' = 'uvpv-demo',  
    'scan.startup.mode' = 'earliest-offset',
    --'properties.bootstrap.servers' = '82.157.27.147:9092',
    'properties.bootstrap.servers' = '10.1.0.10:9092',  
    'properties.group.id' = 'WebRecordGroup',  -- 必選參數, 一定要指定 Group ID
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 結構解析異常
    'json.fail-on-missing-field' = 'false'   -- 如果設置為 true, 則遇到缺失字段會報錯 設置為 false 則缺失字段設置為 null
);

2、定義 Sink

-- UV sink
CREATE TABLE `output_uv` (  
`userids`   STRING,
`user_id` STRING
) WITH (
 'connector' = 'redis',          
 'command' = 'sadd',              -- 使用集合保存uv(支持命令:set、lpush、sadd、hset、zadd)
 'nodes' = '192.28.28.217:6379',  -- redis連接地址,集群模式多個節點使用'',''分隔。
 -- 'additional-key' = '<key>',   -- 用於指定hset和zadd的key。hset、zadd必須設置。
 'password' = 'yourpassword'  
);

-- PV sink
CREATE TABLE `output_pv` (  
`pagevisits`   STRING,
`product_id` STRING,
`hour_count` BIGINT
) WITH (
 'connector' = 'redis',          
 'command' = 'lpush',              -- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd)
 'nodes' = '192.28.28.217:6379',   -- redis連接地址,集群模式多個節點使用'',''分隔。
 -- 'additional-key' = '<key>',   -- 用於指定hset和zadd的key。hset、zadd必須設置。
 'password' = 'yourpassword'  
);

-- 轉化率 sink
CREATE TABLE `output_conversion_rate` (  
`conversion_rate`   STRING,
`rate` STRING
) WITH (
 'connector' = 'redis',        
 'command' = 'set',              -- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd)
 'nodes' = '192.28.28.217:6379', -- redis連接地址,集群模式多個節點使用'',''分隔。
 -- 'additional-key' = '<key>', -- 用於指定hset和zadd的key。hset、zadd必須設置。
 'password' = 'yourpassword'  
);

3、業務邏輯

-- 加工得到 UV 指標,統計所有時間內的 UV
INSERT INTO output_uv
SELECT
 'userids' AS `userids`,
CAST(user_id AS string) AS user_id
FROM input_web_record ;

-- 加工並得到 PV 指標,統計每 10 分鍾內的 PV
INSERT INTO output_pv
SELECT
 'pagevisits' AS pagevisits,
CAST(product_id AS string) AS product_id,
SUM(product_id) AS hour_count
FROM input_web_record WHERE record_type = 0
GROUP BY
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id,
user_id;

-- 加工並得到轉化率指標,統計每 10 分鍾內的轉化率
INSERT INTO output_conversion_rate
SELECT
 'conversion_rate' AS conversion_rate,
CAST( (((SELECT COUNT(1) FROM input_web_record WHERE record_type=0)*1.0)/SUM(a.product_id)) as string)
FROM (SELECT * FROM input_web_record where record_type = 1) AS a
GROUP BY  
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id;

3.4 結果驗證

通常情況,會通過 Web 網站來展示統計到的 UV、PV 指標,這里為了簡單直接在Redis 控制台登錄進行查詢:

 

userids: 存儲 UV
pagevisits: 存儲 PVconversion_rate: 存儲轉化率,即購買商品次數/總頁面點擊量。

四 總結

通過自建 Kafka 集群采集數據,在流計算 Oceanus (Flink) 中實時進行字段累加、窗口聚合等操作,將加工后的數據存儲在雲數據庫Redis,統計到實時刷新的 UV、PV 等指標。這個方案在 Kafka json 格式設計時為了簡便易懂做了簡化處理,將瀏覽記錄和產品購買記錄都放在了同一個 topic 中,重點通過打通自建 IDC 和騰訊雲產品間的網絡來展現整個方案。針對超大規模的 UV 去重,微視的同事采用了 Redis hyperloglog 方式來實現 UV 統計。相比直接使用 set 類型方式有極小的內存空間占用的優點,詳情見鏈接:https://cloud.tencent.com/developer/article/1889162

流計算 Oceanus 限量秒殺專享活動火爆進行中↓↓

 

 

 關注“騰訊雲大數據”公眾號,技術交流、最新活動、服務專享一站 Get~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM