Schema | Name | Result data type | Argument data types | Type ------------+-------------------------------------+------------------+---------------------------------------------------------------------------+-------- pg_catalog | pg_create_logical_replication_slot | record | slot_name name, plugin name, OUT slot_name text, OUT xlog_position pg_lsn | normal pg_catalog | pg_create_physical_replication_slot | record | slot_name name, OUT slot_name name, OUT xlog_position pg_lsn | normal
1.需求
遇到一種需要將一部分表通過logical_tool用邏輯復制槽的方式同步到kaffka,之前沒有使用過,因此對邏輯復制槽進行了了解。
2.資料
pg_create_logical_replication_slot方法配置兩個參數:slot_name、plugin_name,例如:
pg_create_logical_replication_slot('test_logical_slot', 'wal2json');
這樣就可以通過wal2json這個工具來解析wal日志,也可以用test_decoding自帶的解析工具來做,有時間進行詳細了解····
--201906027有時間了:
test_decoding自帶的,直接在psql中安裝插件:create extension test_decoding;
wal2json需要下載編譯安裝:
安裝: $ git clone https://github.com/eulerto/wal2json.git $ cd wal2json # Make sure your path includes the bin directory that contains the correct `pg_config` $ PATH=/path/to/pg/bin:$PATH $ USE_PGXS=1 make $ USE_PGXS=1 make install 修改wal日志級別: wal_level = logical max_replication_slots = 5 max_wal_senders = 5 參數: include-xids: add xid to each changeset. Default is false. include-timestamp: add timestamp to each changeset. Default is false. include-schemas: add schema to each change. Default is true. include-types: add type to each change. Default is true. include-typmod: add modifier to types that have it (eg. varchar(20) instead of varchar). Default is true. include-type-oids: add type oids. Default is false. include-not-null: add not null information as columnoptionals. Default is false. pretty-print: add spaces and indentation to JSON structures. Default is false. write-in-chunks: write after every change instead of every changeset. Default is false. include-lsn: add nextlsn to each changeset. Default is false. include-unchanged-toast (deprecated): add TOAST value even if it was not modified. Since TOAST values are usually large, this option could save IO and bandwidth if it is disabled. Default is true. filter-tables: exclude rows from the specified tables. Default is empty which means that no table will be filtered. It is a comma separated value. The tables should be schema-qualified. *.foo means table foo in all schemas and bar.* means all tables in schema bar. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table are case-sensitive. Table "public"."Foo bar" should be specified as public.Foo\ bar. add-tables: include only rows from the specified tables. Default is all tables from all schemas. It has the same rules from filter-tables. format-version: defines which format to use. Default is 1. 常用參數是,pretty-print 1 修改pg訪問權限,可選: local replication myuser trust 啟動: $ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json $ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -f - 停止直接ctrl+c 刪除: pg_recvlogical -d postgres --slot test_slot --drop-slot 測試腳本: $ cat /tmp/example2.sql CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); BEGIN; INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table2_with_pk (b, c) VALUES('Replication', now()); DELETE FROM table2_with_pk WHERE a < 3; INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir'); -- it is not added to stream because there isn't a pk or a replica identity UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1'); SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
3.類似工具
另外關於邏輯槽的工具,在阿里雲中看到他們用邏輯復制槽的方式進行數據庫遷移:
1.安裝他們的ali_decoding工具;
2.創建邏輯復制槽:SELECT * FROM pg_create_logical_replication_slot('replication_slot_test', 'ali_decoding');
然后配置連接參數,開始遷移。
4.應對集群切換
而客戶這邊有一些不一樣的地方,為了適應集群的切換,他們將邏輯槽所在的目錄,pglogical設置為共享目錄,當主節點被切換走了,在新的主節點重新啟動邏輯復制槽(通過守護進程實現,同時,他們還需要重啟數據庫才能讓共享目錄被識別,不能簡單的promote方式提示從節點為主節點)。