Flink-cdc實時讀postgresql


由於公司業務需要,需要實時同步pgsql數據,我們選擇使用flink-cdc方式進行

架構圖:

前提步驟:

1,更改配置文件postgresql.conf

# 更改wal日志方式為logical
wal_level = logical # minimal, replica, or logical

# 更改solts最大數量(默認值為10),flink-cdc默認一張表占用一個slots
max_replication_slots = 20 # max number of replication slots

# 更改wal發送最大進程數(默認值為10),這個值和上面的solts設置一樣
max_wal_senders = 20 # max number of walsender processes
# 中斷那些停止活動超過指定毫秒數的復制連接,可以適當設置大一點(默認60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable  

wal_level是必須更改的,其它參數選着性更改,如果同步表數量超過10張建議修改為合適的值

更改配置文件postgresql.conf完成,需要重啟pg服務生效,所以一般是在業務低峰期更改

 

2,新建用戶並且給用戶復制流權限

-- pg新建用戶
CREATE USER user WITH PASSWORD 'pwd';

-- 給用戶復制流權限
ALTER ROLE user replication;

-- 給用戶登錄數據庫權限
grant CONNECT ON DATABASE test to user;

-- 把當前庫public下所有表查詢權限賦給用戶
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;

 

3,發布表

-- 設置發布為true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表進行發布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查詢哪些表已經發布
select * from pg_publication_tables;

 

4,更改表的復制標識包含更新和刪除的值

-- 更改復制標識包含更新和刪除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;
-- 查看復制標識(為f標識說明設置成功)
select relreplident from pg_class where relname='test0425';

 OK,到這一步,設置已經完全可以啦,上面步驟都是必須的

 

常用的pgsql命令(備忘)

-- pg新建用戶
CREATE USER ODPS_ETL WITH PASSWORD 'odpsETL@2021';
-- 給用戶復制流權限
ALTER ROLE ODPS_ETL replication;
-- 給用戶數據庫權限
grant CONNECT ON DATABASE test to ODPS_ETL;
-- 設置發布開關
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表進行發布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查詢哪些表已經發布
select * from pg_publication_tables;
-- 給表查詢權限
grant select on TABLE aa to ODPS_ETL;
-- 給用戶讀寫權限
grant select,insert,update,delete ON  ALL TABLES IN SCHEMA public to bd_test;
-- 把當前庫所有表查詢權限賦給用戶
GRANT SELECT ON ALL TABLES IN SCHEMA public TO ODPS_ETL;
-- 把當前庫以后新建的表查詢權限賦給用戶
alter default privileges in schema public grant select on tables to ODPS_ETL;
-- 更改復制標識包含更新和刪除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;
-- 查看復制標識
select relreplident from pg_class where relname='test0425';
-- 查看solt使用情況
SELECT * FROM pg_replication_slots;
-- 刪除solt
SELECT pg_drop_replication_slot('zd_org_goods_solt');
-- 查詢用戶當前連接數
select usename, count(*) from pg_stat_activity group by usename order by count(*) desc;
-- 設置用戶最大連接數
alter role odps_etl connection limit 200;

 

5,下面開始上代碼:,

maven依賴

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.13.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.13.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>1.1.0</version>
        </dependency>

 

java代碼

package flinkTest.connect;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class PgsqlToMysqlTest {
    public static void main(String[] args) {
        //設置flink表環境變量
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        //獲取flink流環境變量
        StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        exeEnv.setParallelism(1);

        //表執行環境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(exeEnv, fsSettings);

        //拼接souceDLL
        String sourceDDL =
                "CREATE TABLE pgsql_source (\n" +
                        " id int,\n" +
                        " name STRING,\n" +
                        " py_code STRING,\n" +
                        " seq_no int,\n" +
                        " description STRING\n" +
                        ") WITH (\n" +
                        " 'connector' = 'postgres-cdc',\n" +
                        " 'hostname' = '***',\n" +
                        " 'port' = '5432',\n" +
                        " 'username' = 'bd_test',\n" +
                        " 'password' = '***',\n" +
                        " 'database-name' = 'bd_test',\n" +
                        " 'schema-name' = 'public',\n" +
                        " 'debezium.snapshot.mode' = 'never',\n" +
                        " 'decoding.plugin.name' = 'pgoutput',\n" +
                        " 'debezium.slot.name' = 'test',\n" +
                        " 'table-name' = 'test'\n" +
                        ")";

        String sinkDDL =
                "CREATE TABLE mysql_sink (\n" +
                        " id int,\n" +
                        " name STRING,\n" +
                        " py_code STRING,\n" +
                        " seq_no int,\n" +
                        " description STRING,\n" +
                        " PRIMARY KEY (id) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        " 'connector' = 'jdbc',\n" +
                        " 'url' = 'jdbc:mysql://ip:3306/test_db?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8',\n" +
                        " 'username' = 'bd_test',\n" +
                        " 'password' = '***',\n" +
                        " 'table-name' = 'test'\n" +
                        ")";

        String transformSQL =
                "INSERT INTO mysql_sink " +
                        "SELECT id,name,py_code,seq_no,description " +
                        "FROM pgsql_source";

        //執行source表ddl
        tableEnv.executeSql(sourceDDL);
        //執行sink表ddl
        tableEnv.executeSql(sinkDDL);
        //執行邏輯sql語句
        TableResult tableResult = tableEnv.executeSql(transformSQL);

        //控制塔輸出
//        tableResult.print();
    }
}

 

表機構奉上:

-- pgsql表結構
CREATE TABLE "public"."test" (
  "id" int4 NOT NULL,
  "name" varchar(50) COLLATE "pg_catalog"."default" NOT NULL,
  "py_code" varchar(50) COLLATE "pg_catalog"."default",
  "seq_no" int4 NOT NULL,
  "description" varchar(200) COLLATE "pg_catalog"."default",
  CONSTRAINT "pk_zd_business_type" PRIMARY KEY ("id")
)
;

-- mysql表結構
CREATE TABLE `test` (
  `id` int(11) NOT NULL DEFAULT '0' COMMENT 'ID',
  `name` varchar(50) DEFAULT NULL COMMENT '名稱',
  `py_code` varchar(50) DEFAULT NULL COMMENT '助記碼',
  `seq_no` int(11) DEFAULT NULL COMMENT '排序',
  `description` varchar(200) DEFAULT NULL COMMENT '備注',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

 

 6,下面就可以進行操作原表,然后增刪改操作

 

WITH參數

 
參數 說明 是否必填 數據類型 備注
connector 源表類型 STRING 固定值為postgres-cdc
hostname Postgres數據庫的IP地址或者Hostname。 STRING
username Postgres數據庫服務的用戶名。 STRING
password Postgres數據庫服務的密碼 STRING
database-name 數據庫名稱 STRING 數據庫名稱支持正則表達式以讀取多個數據庫的數據。
schema-name Postgres Schema名稱 STRING Schema名稱支持正則表達式以讀取多個Schema的數據。
table-name Postgres表名 STRING 表名支持正則表達式去讀取多個表的數據。
port Postgres數據庫服務的端口號 INTEGER 默認值為5432。
decoding.plugin.name Postgres Logical Decoding插件名稱 STRING 根據Postgres服務上安裝的插件確定。支持的插件列表如下:
  • decoderbufs(默認值)
  • wal2json
  • wal2json_rds
  • wal2json_streaming
  • wal2json_rds_streaming
  • pgoutput
 
說明 如果您使用的是阿里雲RDS PostgreSQL,你需要開啟邏輯解碼(wal2json)功能,詳情請參見 邏輯解碼(wal2json)
debezium.* Debezium屬性參數 STRING 更細粒度控制Debezium客戶端的行為。例如'debezium.snapshot.mode' = 'never',詳情請參見配置屬性
 
說明 建議每個表都設置 debezium.slot.name參數,以避免出現 PSQLException: ERROR: replication slot "debezium" is active for PID 974報錯。

類型映射

Postgres CDC和Flink字段類型對應關系如下。

 

 


 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM