由於公司業務需要,需要實時同步pgsql數據,我們選擇使用flink-cdc方式進行
架構圖:
前提步驟:
1,更改配置文件postgresql.conf
# 更改wal日志方式為logical
wal_level = logical # minimal, replica, or logical
# 更改solts最大數量(默認值為10),flink-cdc默認一張表占用一個slots
max_replication_slots = 20 # max number of replication slots
# 更改wal發送最大進程數(默認值為10),這個值和上面的solts設置一樣
max_wal_senders = 20 # max number of walsender processes
# 中斷那些停止活動超過指定毫秒數的復制連接,可以適當設置大一點(默認60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
wal_level是必須更改的,其它參數選着性更改,如果同步表數量超過10張建議修改為合適的值
更改配置文件postgresql.conf完成,需要重啟pg服務生效,所以一般是在業務低峰期更改
2,新建用戶並且給用戶復制流權限
-- pg新建用戶
CREATE USER user WITH PASSWORD 'pwd';
-- 給用戶復制流權限
ALTER ROLE user replication;
-- 給用戶登錄數據庫權限
grant CONNECT ON DATABASE test to user;
-- 把當前庫public下所有表查詢權限賦給用戶
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;
3,發布表
-- 設置發布為true update pg_publication set puballtables=true where pubname is not null; -- 把所有表進行發布 CREATE PUBLICATION dbz_publication FOR ALL TABLES; -- 查詢哪些表已經發布 select * from pg_publication_tables;
4,更改表的復制標識包含更新和刪除的值
-- 更改復制標識包含更新和刪除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;
-- 查看復制標識(為f標識說明設置成功)
select relreplident from pg_class where relname='test0425';
OK,到這一步,設置已經完全可以啦,上面步驟都是必須的
常用的pgsql命令(備忘)
-- pg新建用戶 CREATE USER ODPS_ETL WITH PASSWORD 'odpsETL@2021'; -- 給用戶復制流權限 ALTER ROLE ODPS_ETL replication; -- 給用戶數據庫權限 grant CONNECT ON DATABASE test to ODPS_ETL; -- 設置發布開關 update pg_publication set puballtables=true where pubname is not null; -- 把所有表進行發布 CREATE PUBLICATION dbz_publication FOR ALL TABLES; -- 查詢哪些表已經發布 select * from pg_publication_tables; -- 給表查詢權限 grant select on TABLE aa to ODPS_ETL; -- 給用戶讀寫權限 grant select,insert,update,delete ON ALL TABLES IN SCHEMA public to bd_test; -- 把當前庫所有表查詢權限賦給用戶 GRANT SELECT ON ALL TABLES IN SCHEMA public TO ODPS_ETL; -- 把當前庫以后新建的表查詢權限賦給用戶 alter default privileges in schema public grant select on tables to ODPS_ETL; -- 更改復制標識包含更新和刪除之前值 ALTER TABLE test0425 REPLICA IDENTITY FULL; -- 查看復制標識 select relreplident from pg_class where relname='test0425'; -- 查看solt使用情況 SELECT * FROM pg_replication_slots; -- 刪除solt SELECT pg_drop_replication_slot('zd_org_goods_solt'); -- 查詢用戶當前連接數 select usename, count(*) from pg_stat_activity group by usename order by count(*) desc; -- 設置用戶最大連接數 alter role odps_etl connection limit 200;
5,下面開始上代碼:,
maven依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.13.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-postgres-cdc</artifactId> <version>1.1.0</version> </dependency>
java代碼
package flinkTest.connect; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class PgsqlToMysqlTest { public static void main(String[] args) { //設置flink表環境變量 EnvironmentSettings fsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); //獲取flink流環境變量 StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); exeEnv.setParallelism(1); //表執行環境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(exeEnv, fsSettings); //拼接souceDLL String sourceDDL = "CREATE TABLE pgsql_source (\n" + " id int,\n" + " name STRING,\n" + " py_code STRING,\n" + " seq_no int,\n" + " description STRING\n" + ") WITH (\n" + " 'connector' = 'postgres-cdc',\n" + " 'hostname' = '***',\n" + " 'port' = '5432',\n" + " 'username' = 'bd_test',\n" + " 'password' = '***',\n" + " 'database-name' = 'bd_test',\n" + " 'schema-name' = 'public',\n" + " 'debezium.snapshot.mode' = 'never',\n" + " 'decoding.plugin.name' = 'pgoutput',\n" + " 'debezium.slot.name' = 'test',\n" + " 'table-name' = 'test'\n" + ")"; String sinkDDL = "CREATE TABLE mysql_sink (\n" + " id int,\n" + " name STRING,\n" + " py_code STRING,\n" + " seq_no int,\n" + " description STRING,\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://ip:3306/test_db?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8',\n" + " 'username' = 'bd_test',\n" + " 'password' = '***',\n" + " 'table-name' = 'test'\n" + ")"; String transformSQL = "INSERT INTO mysql_sink " + "SELECT id,name,py_code,seq_no,description " + "FROM pgsql_source"; //執行source表ddl tableEnv.executeSql(sourceDDL); //執行sink表ddl tableEnv.executeSql(sinkDDL); //執行邏輯sql語句 TableResult tableResult = tableEnv.executeSql(transformSQL); //控制塔輸出 // tableResult.print(); } }
表機構奉上:
-- pgsql表結構 CREATE TABLE "public"."test" ( "id" int4 NOT NULL, "name" varchar(50) COLLATE "pg_catalog"."default" NOT NULL, "py_code" varchar(50) COLLATE "pg_catalog"."default", "seq_no" int4 NOT NULL, "description" varchar(200) COLLATE "pg_catalog"."default", CONSTRAINT "pk_zd_business_type" PRIMARY KEY ("id") ) ; -- mysql表結構 CREATE TABLE `test` ( `id` int(11) NOT NULL DEFAULT '0' COMMENT 'ID', `name` varchar(50) DEFAULT NULL COMMENT '名稱', `py_code` varchar(50) DEFAULT NULL COMMENT '助記碼', `seq_no` int(11) DEFAULT NULL COMMENT '排序', `description` varchar(200) DEFAULT NULL COMMENT '備注', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
6,下面就可以進行操作原表,然后增刪改操作
WITH參數
參數 | 說明 | 是否必填 | 數據類型 | 備注 |
---|---|---|---|---|
connector | 源表類型 | 是 | STRING | 固定值為postgres-cdc 。 |
hostname | Postgres數據庫的IP地址或者Hostname。 | 是 | STRING | 無 |
username | Postgres數據庫服務的用戶名。 | 是 | STRING | 無 |
password | Postgres數據庫服務的密碼 | 是 | STRING | 無 |
database-name | 數據庫名稱 | 是 | STRING | 數據庫名稱支持正則表達式以讀取多個數據庫的數據。 |
schema-name | Postgres Schema名稱 | 是 | STRING | Schema名稱支持正則表達式以讀取多個Schema的數據。 |
table-name | Postgres表名 | 是 | STRING | 表名支持正則表達式去讀取多個表的數據。 |
port | Postgres數據庫服務的端口號 | 否 | INTEGER | 默認值為5432。 |
decoding.plugin.name | Postgres Logical Decoding插件名稱 | 否 | STRING | 根據Postgres服務上安裝的插件確定。支持的插件列表如下:
說明 如果您使用的是阿里雲RDS PostgreSQL,你需要開啟邏輯解碼(wal2json)功能,詳情請參見
邏輯解碼(wal2json)。
|
debezium.* | Debezium屬性參數 | 否 | STRING | 更細粒度控制Debezium客戶端的行為。例如'debezium.snapshot.mode' = 'never' ,詳情請參見配置屬性。
說明 建議每個表都設置
debezium.slot.name 參數,以避免出現
PSQLException: ERROR: replication slot "debezium" is active for PID 974 報錯。
|
類型映射
