作者:騰訊雲流計算 Oceanus 團隊
流計算 Oceanus 簡介
流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。本文將為您詳細介紹如何使用 Datagen Connector 模擬生成客戶視頻點擊量數據,並利用滾動窗口函數對每分鍾內客戶的視頻點擊量進行聚合分析,最后將數據輸出到 ClickHouse 的流程。
創建流計算 Oceanus 集群
進入流計算 Oceanus 控制台 [1],點擊左側【集群管理】,點擊左上方【創建集群】,完成流計算 Oceanus 集群的創建。具體可參考流計算 Oceanus 官方文檔創建獨享集群 [2]。
創建 ClickHouse 集群
進入 ClickHouse 控制台[3],點擊左上角【新建集群】,完成 ClickHouse 集群的創建。具體可參考 ClickHouse 快速入門 [4]。
注意:創建流計算 Oceanus 集群和 ClickHouse 集群時所選的 VPC 必須相同。
創建 ClickHouse 表:
1.進入與 ClickHouse 集群同 VPC 的某一台 CVM 下,安裝 ClickHouse 客戶端(下載該客戶端需連通外網),具體操作步驟參考 ClickHouse 快速入門 [4]。
# 下載 ClickHouse-Client 命令 wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-20.7.2.30-2.noarch.rpm wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-20.7.2.30-2.x86_64.rpm # 安裝客戶端 rpm -ivh *.rpm # 使用 tcp 端口登陸 ClickHouse 集群,IP 地址可通過控制台查看 clickhouse-client -hxxx.xxx.xxx.xxx --port 9000
2.登陸 ClickHouse 集群,建表。
CREATE TABLE default.datagen_to_ck on cluster default_cluster (
win_start TIMESTAMP,
win_end TIMESTAMP,
user_id String,
amount_total Int16,
Sign Int8 )
ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/datagen_to_ck', '{replica}',Sign)
ORDER BY (win_start,win_end,user_id);
流計算 Oceanus 作業
1. 創建 Source
CREATE TABLE random_source (
user_id VARCHAR,
amount INT,
pre_time AS CURRENT_TIMESTAMP,
WATERMARK FOR pre_time AS pre_time - INTERVAL '3' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5', -- 每秒產生的數據條數
'fields.user_id.length' = '1', -- 隨機字符串的長度
'fields.amount.kind' = 'random', -- 無界的隨機數
'fields.amount.min' = '1', -- 隨機數的最小值
'fields.amount.max' = '10' -- 隨機數的最大值
);
2. 創建 Sink
CREATE TABLE clickhouse (
win_start TIMESTAMP(3),
win_end TIMESTAMP(3),
user_id VARCHAR,
amount_total BIGINT,
PRIMARY KEY (win_start,win_end,user_id) NOT ENFORCED -- 如果要同步的數據庫表定義了主鍵, 則這里也需要定義
) WITH (
-- 指定數據庫連接參數
'connector' = 'clickhouse',
'url' = 'clickhouse://10.0.0.178:8123',
-- 如果ClickHouse集群未配置賬號密碼可以不指定
--'username' = 'root',
--'password' = 'root',
'database-name' = 'default',
'table-name' = 'datagen_to_ck',
'table.collapsing.field' = 'Sign' -- CollapsingMergeTree 類型列字段的名稱
);
3. 編寫業務 SQL
INSERT INTO clickhouse SELECT TUMBLE_START(pre_time,INTERVAL '1' MINUTE) AS win_start, TUMBLE_END(pre_time,INTERVAL '1' MINUTE) AS win_end, user_id, CAST(SUM(amount) AS BIGINT) AS amount_total FROM random_source GROUP BY TUMBLE(pre_time,INTERVAL '1' MINUTE),user_id;
4. 選擇 Connector
點擊【作業參數】,在【內置 Connector】選擇 flink-connector-clickhouse,點擊【保存】>【發布草稿】運行作業。
新版 Flink 1.13 集群不需要用戶自己選擇內置 Connector
總結
本示例使用 datagen Connecor 模擬產生隨機數據,使用 TUMBLE WINDOW(滾動窗口)統計各用戶(user_id)每分鍾的視頻點擊量(amount_total),然后將數據存儲在 ClickHouse 中。 更多時間窗口函數示例請參考流計算 Oceanus 官方文檔[5]。
參考鏈接
[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
[2] 創建獨享集群:https://cloud.tencent.com/document/product/849/48298
[3] ClickHouse 控制台:https://console.cloud.tencent.com/cdwch?region=ap-guangzhou
[4] ClickHouse 快速入門:https://cloud.tencent.com/document/product/1299/49824
[5] Oceanus 窗口函數官方文檔:https://cloud.tencent.com/document/product/849/18077
