Flink 實踐教程-入門(5):寫入 ClickHouse


作者:騰訊雲流計算 Oceanus 團隊

流計算 Oceanus 簡介 

流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。本文將為您詳細介紹如何使用 Datagen Connector 模擬生成客戶視頻點擊量數據,並利用滾動窗口函數對每分鍾內客戶的視頻點擊量進行聚合分析,最后將數據輸出到 ClickHouse 的流程。

操作視頻


前置准備

創建流計算 Oceanus 集群

進入流計算 Oceanus 控制台 [1],點擊左側【集群管理】,點擊左上方【創建集群】,完成流計算 Oceanus 集群的創建。具體可參考流計算 Oceanus 官方文檔創建獨享集群 [2]。

創建 ClickHouse 集群

進入 ClickHouse 控制台[3],點擊左上角【新建集群】,完成 ClickHouse 集群的創建。具體可參考 ClickHouse 快速入門 [4]。

注意:創建流計算 Oceanus 集群和 ClickHouse 集群時所選的 VPC 必須相同。

創建 ClickHouse 表:

1.進入與 ClickHouse 集群同 VPC 的某一台 CVM 下,安裝 ClickHouse 客戶端(下載該客戶端需連通外網),具體操作步驟參考 ClickHouse 快速入門 [4]。

# 下載 ClickHouse-Client 命令
wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-20.7.2.30-2.noarch.rpm
wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-20.7.2.30-2.x86_64.rpm

# 安裝客戶端
rpm -ivh *.rpm

# 使用 tcp 端口登陸 ClickHouse 集群,IP 地址可通過控制台查看
clickhouse-client -hxxx.xxx.xxx.xxx --port 9000

  

2.登陸 ClickHouse 集群,建表。

CREATE TABLE default.datagen_to_ck on cluster default_cluster (
win_start     TIMESTAMP,
win_end       TIMESTAMP,
user_id       String,
amount_total  Int16,
Sign          Int8  )
ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/datagen_to_ck', '{replica}',Sign)
ORDER BY (win_start,win_end,user_id);

  

流計算 Oceanus 作業

1. 創建 Source

CREATE TABLE random_source ( 
    user_id   VARCHAR,
    amount    INT,
    pre_time  AS CURRENT_TIMESTAMP,
    WATERMARK FOR pre_time AS pre_time - INTERVAL '3' SECOND
  ) WITH ( 
  'connector' = 'datagen', 
  'rows-per-second' = '5',            -- 每秒產生的數據條數
  'fields.user_id.length' = '1',      -- 隨機字符串的長度
  'fields.amount.kind' = 'random',    -- 無界的隨機數
  'fields.amount.min' = '1',          -- 隨機數的最小值
  'fields.amount.max' = '10'          -- 隨機數的最大值
);

  

2. 創建 Sink

CREATE TABLE clickhouse (
    win_start     TIMESTAMP(3),
    win_end       TIMESTAMP(3),
    user_id       VARCHAR,
    amount_total  BIGINT,
    PRIMARY KEY (win_start,win_end,user_id) NOT ENFORCED -- 如果要同步的數據庫表定義了主鍵, 則這里也需要定義
) WITH (
    -- 指定數據庫連接參數
    'connector' = 'clickhouse',
    'url' = 'clickhouse://10.0.0.178:8123',
    -- 如果ClickHouse集群未配置賬號密碼可以不指定
    --'username' = 'root',
    --'password' = 'root',
    'database-name' = 'default',
    'table-name' = 'datagen_to_ck',
    'table.collapsing.field' = 'Sign'   -- CollapsingMergeTree 類型列字段的名稱
);

  

3. 編寫業務 SQL

INSERT INTO clickhouse
SELECT
TUMBLE_START(pre_time,INTERVAL '1' MINUTE) AS win_start,
TUMBLE_END(pre_time,INTERVAL '1' MINUTE) AS win_end,
user_id,
CAST(SUM(amount) AS BIGINT) AS amount_total
FROM random_source
GROUP BY TUMBLE(pre_time,INTERVAL '1' MINUTE),user_id;

  

4. 選擇 Connector

點擊【作業參數】,在【內置 Connector】選擇 flink-connector-clickhouse,點擊【保存】>【發布草稿】運行作業。

新版 Flink 1.13 集群不需要用戶自己選擇內置 Connector

 

總結

本示例使用 datagen Connecor 模擬產生隨機數據,使用 TUMBLE WINDOW(滾動窗口)統計各用戶(user_id)每分鍾的視頻點擊量(amount_total),然后將數據存儲在 ClickHouse 中。 更多時間窗口函數示例請參考流計算 Oceanus 官方文檔[5]。

 

參考鏈接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 創建獨享集群:https://cloud.tencent.com/document/product/849/48298

[3] ClickHouse 控制台:https://console.cloud.tencent.com/cdwch?region=ap-guangzhou  

[4] ClickHouse 快速入門:https://cloud.tencent.com/document/product/1299/49824  

[5] Oceanus 窗口函數官方文檔:https://cloud.tencent.com/document/product/849/18077  

關注“騰訊雲大數據”公眾號,技術交流、最新活動、服務專享一站 Get~

 

 

流計算 Oceanus 限量秒殺專享活動火爆進行中↓↓

 

 

 
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM