PostgreSQL邏輯復制槽 pg_recvlogical test_decoding wal2json


   Schema   |                Name                 | Result data type |                            Argument data types                            |  Type
------------+-------------------------------------+------------------+---------------------------------------------------------------------------+--------
 pg_catalog | pg_create_logical_replication_slot  | record           | slot_name name, plugin name, OUT slot_name text, OUT xlog_position pg_lsn | normal
 pg_catalog | pg_create_physical_replication_slot | record           | slot_name name, OUT slot_name name, OUT xlog_position pg_lsn              | normal

 

1.需求

遇到一種需要將一部分表通過logical_tool用邏輯復制槽的方式同步到kaffka,之前沒有使用過,因此對邏輯復制槽進行了了解。

 

2.資料

pg_create_logical_replication_slot方法配置兩個參數:slot_name、plugin_name,例如:

pg_create_logical_replication_slot('test_logical_slot', 'wal2json');

這樣就可以通過wal2json這個工具來解析wal日志,也可以用test_decoding自帶的解析工具來做,有時間進行詳細了解····

 

--201906027有時間了:

test_decoding自帶的,直接在psql中安裝插件:create extension test_decoding;

wal2json需要下載編譯安裝:

安裝:
$ git clone https://github.com/eulerto/wal2json.git
$ cd wal2json
# Make sure your path includes the bin directory that contains the correct `pg_config`
$ PATH=/path/to/pg/bin:$PATH
$ USE_PGXS=1 make
$ USE_PGXS=1 make install

修改wal日志級別:
wal_level = logical
max_replication_slots = 5
max_wal_senders = 5

參數:
include-xids: add xid to each changeset. Default is false.
include-timestamp: add timestamp to each changeset. Default is false.
include-schemas: add schema to each change. Default is true.
include-types: add type to each change. Default is true.
include-typmod: add modifier to types that have it (eg. varchar(20) instead of varchar). Default is true.
include-type-oids: add type oids. Default is false.
include-not-null: add not null information as columnoptionals. Default is false.
pretty-print: add spaces and indentation to JSON structures. Default is false.
write-in-chunks: write after every change instead of every changeset. Default is false.
include-lsn: add nextlsn to each changeset. Default is false.
include-unchanged-toast (deprecated): add TOAST value even if it was not modified. Since TOAST values are usually large, this option could save IO and bandwidth if it is disabled. Default is true.
filter-tables: exclude rows from the specified tables. Default is empty which means that no table will be filtered. It is a comma separated value. The tables should be schema-qualified. *.foo means table foo in all schemas and bar.* means all tables in schema bar. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table are case-sensitive. Table "public"."Foo bar" should be specified as public.Foo\ bar.
add-tables: include only rows from the specified tables. Default is all tables from all schemas. It has the same rules from filter-tables.
format-version: defines which format to use. Default is 1.

常用參數是,pretty-print 1

修改pg訪問權限,可選:
local    replication     myuser                     trust

啟動:
$ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
$ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -f -

停止直接ctrl+c

刪除:
pg_recvlogical -d postgres --slot test_slot --drop-slot


測試腳本:
$ cat /tmp/example2.sql
CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);

SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');

BEGIN;
INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
DELETE FROM table2_with_pk WHERE a < 3;

INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;

SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');

 

 

3.類似工具


另外關於邏輯槽的工具,在阿里雲中看到他們用邏輯復制槽的方式進行數據庫遷移:
1.安裝他們的ali_decoding工具;
2.創建邏輯復制槽:SELECT * FROM pg_create_logical_replication_slot('replication_slot_test', 'ali_decoding');
然后配置連接參數,開始遷移。

 

4.應對集群切換

 

而客戶這邊有一些不一樣的地方,為了適應集群的切換,他們將邏輯槽所在的目錄,pglogical設置為共享目錄,當主節點被切換走了,在新的主節點重新啟動邏輯復制槽(通過守護進程實現,同時,他們還需要重啟數據庫才能讓共享目錄被識別,不能簡單的promote方式提示從節點為主節點)。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM