Flink 實踐教程-入門(6):讀取 PG 數據寫入 ClickHouse


作者:騰訊雲流計算 Oceanus 團隊

流計算 Oceanus 簡介  

流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。
本文將向您詳細介紹如何獲取 PostgreSQL 表數據,並使用字符串函數進行轉換,最后將數據輸出到 ClickHouse 中。

 

操作視頻

前置准備

創建流計算 Oceanus 集群

進入流計算 Oceanus 控制台 [1],點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考流計算 Oceanus 官方文檔 創建獨享集群 [2]。

創建 PostgreSQL 實例

進入 PostgreSQL 控制台 [3],點擊左上角【新建】創建實例,具體參考 創建 PostgreSQL 實例 [4]。 

數據准備:

進入實例數據庫,創建 test1 表,並手動插入數據。

-- 建表語句create table public.test1 ( id INT, str_one VARCHAR(50), str_two VARCHAR(50), str_thr VARCHAR(50), PRIMARY key(id));-- 插入語句INSERT INTO public.test1 VALUES (1, 'hello world', 'b', 'Oceanus-1');INSERT INTO public.test1 VALUES (2, 'good job', 'c', 'Oceanus-2');INSERT INTO public.test1 VALUES (3, 'hello oceanus', 'd', 'Oceanus-3');
筆者這里使用 DBeaver 進行外網連接,更多連接方式參考官網文檔 連接 PostgreSQL 實例 [5]

創建 ClickHouse 集群

進入 ClickHouse 控制台 [6],點擊左上角【新建集群】,完成 ClickHouse 集群創建,具體可參考 ClickHouse 快速入門 [7]。創建 ClickHouse 表:  登陸 ClickHouse 集群(登入方式參考 ClickHouse 快速入門 [7]),並建表。

CREATE TABLE default.pg_to_ck on cluster default_cluster ( id Int8, str_one String, str_two String, str_thr String, Sign Int8 )ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/pg_to_ck', '{replica}',Sign)ORDER BY (id);
注:流計算 Oceanus 集群、PostgreSQL 實例、ClickHouse 集群需在同一 VPC 下。

 

流計算 Oceanus 作業

1. 創建 Source

-- PostgreSQL CDC Source。CREATE TABLE PostgreSourceTable ( id INT, str_one VARCHAR, str_two VARCHAR, str_thr VARCHAR, PRIMARY KEY (id) NOT ENFORCED -- 如果要同步的數據庫表定義了主鍵, 則這里也需要定義) WITH ( 'connector' = 'postgres-cdc', -- 必須為 'postgres-cdc' 'hostname' = '10.0.0.236', -- 數據庫的 IP 'port' = '5432', -- 數據庫的訪問端口 'username' = 'root', -- 數據庫訪問使用的用戶名(需要提供 REPLICATION 權限, 日志級別必須大於等於 logical, 且設置后需要重啟實例) 'password' = 'xxxxxxxxxxx', -- 數據庫訪問使用的密碼 'database-name' = 'postgres', -- 需要同步的數據庫名 'schema-name' = 'public', -- 需要同步的數據庫模式 (Schema) 'table-name' = 'test1' -- 需要同步的數據表名);

2. 創建 Sink

-- ClickHouse Sink (不完全支持upsert,詳見說明文檔)。配合 flink-connector-clickhouse 使用。CREATE TABLE clickhouse_sink ( id INT, str_one VARCHAR, str_two VARCHAR, str_thr VARCHAR, PRIMARY KEY (id) NOT ENFORCED -- 如果要同步的數據庫表定義了主鍵, 則這里也需要定義) WITH ( 'connector' = 'clickhouse', -- connector 類型為 clickhouse 'url' = 'clickhouse://10.0.0.178:8123', -- 指定數據庫鏈接 url 'database-name' = 'default', -- 需要寫入的 clickhouse 庫名 'table-name' = 'pg_to_ck', -- 需要寫入的 clickhouse 表名 'table.collapsing.field' = 'Sign' -- 采用 CollapsingMergeTree 引擎的 clickhouse 表,Collapsing 列字段的名稱);

3. 編寫業務 SQL

INSERT INTO clickhouse_sinkSELECT  id,--INITCAP:將 str_one 中的單詞轉為大寫開頭,例如 INITCAP('i have a dream') 返回 'I Have A Dream'。 INITCAP(str_one) AS str_one,--TO_BASE64:將 string 表示的字符串編碼為 Base64 字符串。 TO_BASE64(str_two) AS str_two,--REPLACE:將 string1 字符串中所有的 string2 替換為 string3。例如 REPLACE('banana', 'a', 'A') 返回 'bAnAnA'。 REPLACE(str_thr,'Oceanus','Hello Oceanus') AS str_thr FROM PostgreSourceTable;
這里我們使用 Flink 1.13 集群,舊版 Flink 集群需選擇相應的內置 Connector

 

總結

  1. 使用 Postgres-CDC 連接器:

  • 用於同步的 Postgres 用戶至少需要開啟 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT 權限。可以進入 PostgreSQL 數據庫進行授權操作。

CREATE ROLE debezium_user REPLICATION LOGIN;GRANT USAGE ON DATABASE database_name TO debezium_user;GRANT USAGE ON SCHEMA schema_name TO debezium_user;GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;
  • 日志級別必須大於等於 logical, 且設置后需要重啟實例。進入數據庫實例,單擊【參數設置】,單擊【WAL】,修改【wal_level】的【參數運行值】為 "logical"。修改成功后點擊右上角【重啟】。

  1. 更多字符串操作函數請參考流計算 Oceanus 官方文檔 字符串函數[8]。

     

參考鏈接

[1] 流計算 Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview  

[2] 創建獨享集群:https://cloud.tencent.com/document/product/849/48298  

[3] PostgreSQL 控制台:https://console.cloud.tencent.com/postgres/index  

[4] 創建 PostgreSQL 實例:https://cloud.tencent.com/document/product/409/56961  

[5] 連接 PostgreSQL 實例:https://cloud.tencent.com/document/product/409/40429  

[6] ClickHouse 控制台:https://console.cloud.tencent.com/cdwch?region=ap-guangzhou  

[7] ClickHouse 快速入門:https://cloud.tencent.com/document/product/1299/49824  

[8] 流計算 Oceanus 字符串函數:https://cloud.tencent.com/document/product/849/18073

 

關注“騰訊雲大數據”公眾號,技術交流、最新活動、服務專享一站 Get~

流計算 Oceanus 限量秒殺專享活動火爆進行中↓↓

 

 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM