1.首先用該工具來看我們的日志變化,需要先將test_decoding插件編譯並安裝(進入contrib,編譯安裝即可)
創建一個slot:
SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding’);
獲取slot記錄的變更日志:
postgres=# insert into test values(1, 'test', now());
INSERT 0 1
postgres=# insert into test values(1, 'test', now());
INSERT 0 1
只獲取變化,不消費slot:
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+-------------------------------------------------------------------------------------------------------------------------------
0/16637F8 | 572 | BEGIN 572
0/16637F8 | 572 | table public.test: INSERT: id[integer]:1 info[text]:'test' crt_time[timestamp without time zone]:'2019-05-27 11:52:08.147527'
0/1663B20 | 572 | COMMIT 572
0/1663B58 | 573 | BEGIN 573
0/1663B58 | 573 | table public.test: INSERT: id[integer]:1 info[text]:'test' crt_time[timestamp without time zone]:'2019-05-27 11:52:09.67556'
0/1663BD8 | 573 | COMMIT 573
(6 rows)
postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
regression_slot | test_decoding | logical | 13237 | postgres | f | f | | | 572 | 0/16637C0 | 0/16637F8
(1 row)
可以看到xmin還是572,且能再次查詢到變化:
postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
regression_slot | test_decoding | logical | 13237 | postgres | f | f | | | 573 | 0/1663B20 | 0/1663CF0
(1 row)
postgres=# select pg_logical_slot_get_changes('regression_slot', NULL, null); pg_logical_slot_get_changes
-----------------------------
(0 rows)
2.遠程使用pg_recvlogical工具來回放:
創建slot,當然也可以使用原來有的:
pg_recvlogical --create-slot -S test_logical -h pg36 -d postgres -p 5433
postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
regression_slot | test_decoding | logical | 13237 | postgres | f | f | | | 574 | 0/1663CB8 | 0/1663CF0
test_logical | test_decoding | logical | 13237 | postgres | f | f | | | 574 | 0/1663CF0 | 0/1663D28
(2 rows)
-bash-4.1$ pg_recvlogical -S test_logical -h pg36 -d postgres -p 5433 --start -f -
Password:
BEGIN 574
table public.test: DELETE: (no-tuple-data)
table public.test: DELETE: (no-tuple-data)
table public.test: DELETE: (no-tuple-data)
COMMIT 574
BEGIN 575
table public.test: INSERT: id[integer]:1 info[text]:'test' crt_time[timestamp without time zone]:'2019-05-27 14:03:02.985599'
COMMIT 575
如果slot已經建立好了,而遠端的pg_recvlogical停止,停止的這段時間產生的數據庫變化是不會被消費的,啟動后可以接收到所有的變化。
3.可以設置從什么時候開始接收回放日志:
postgres=# select pg_current_wal_lsn();
pg_current_wal_lsn
--------------------
0/1664FB8
(1 row)
postgres=# delete from test where id = 1;
DELETE 7
postgres=# select txid_current();
txid_current
--------------
585
(1 row)
postgres=# select txid_current();
txid_current
--------------
586
(1 row)
postgres=#
postgres=#
postgres=# insert into test values(1, 'test 2', now());
INSERT 0 1
postgres=# select pg_current_wal_lsn();
pg_current_wal_lsn
--------------------
0/16652C0
(1 row)
postgres=# insert into test values(1, 'test 3', now());
INSERT 0 1
-bash-4.1$ pg_recvlogical -S test_logical -h pg36 -d postgres -p 5433 --startpos=0/16652C0 --start -f -
Password:
BEGIN 588
table public.test: INSERT: id[integer]:1 info[text]:'test 3' crt_time[timestamp without time zone]:'2019-05-27 14:37:31.246539'
COMMIT 588
可以看到,只是從0/16652C0開始進行接收的
4.不用的時候,刪除復制槽
postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
regression_slot | test_decoding | logical | 13237 | postgres | f | f | | | 574 | 0/1663CB8 | 0/1663CF0
test_logical | test_decoding | logical | 13237 | postgres | f | t | 13155 | | 589 | 0/1665420 | 0/1665458
(2 rows)
postgres=# select pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)
postgres=# select pg_drop_replication_slot('test_logical');
pg_drop_replication_slot
--------------------------
(1 row)
postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
(0 rows)
4.可以使用wal2json工具來解析日志,目前使用的是test_decoding