PostgreSQL通過解析日志,獲取數據庫增量變化,pg_recvlogical


1.首先用該工具來看我們的日志變化,需要先將test_decoding插件編譯並安裝(進入contrib,編譯安裝即可)

 

創建一個slot

SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding’);

 

獲取slot記錄的變更日志:

postgres=# insert into test values(1, 'test', now());

INSERT 0 1

postgres=# insert into test values(1, 'test', now());

INSERT 0 1

 

只獲取變化,不消費slot

postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);

    lsn    | xid |                                                             data

-----------+-----+-------------------------------------------------------------------------------------------------------------------------------

 0/16637F8 | 572 | BEGIN 572

 0/16637F8 | 572 | table public.test: INSERT: id[integer]:1 info[text]:'test' crt_time[timestamp without time zone]:'2019-05-27 11:52:08.147527'

 0/1663B20 | 572 | COMMIT 572

 0/1663B58 | 573 | BEGIN 573

 0/1663B58 | 573 | table public.test: INSERT: id[integer]:1 info[text]:'test' crt_time[timestamp without time zone]:'2019-05-27 11:52:09.67556'

 0/1663BD8 | 573 | COMMIT 573

(6 rows)

postgres=# select * from pg_replication_slots ;

    slot_name    |    plugin     | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn

-----------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------

 regression_slot | test_decoding | logical   |  13237 | postgres | f         | f      |            |      |          572 | 0/16637C0   | 0/16637F8

(1 row)

 

可以看到xmin還是572,且能再次查詢到變化:

 

postgres=# select * from pg_replication_slots ;

    slot_name    |    plugin     | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn

-----------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------

 regression_slot | test_decoding | logical   |  13237 | postgres | f         | f      |            |      |          573 | 0/1663B20   | 0/1663CF0

(1 row)

 

postgres=# select pg_logical_slot_get_changes('regression_slot', NULL, null); pg_logical_slot_get_changes

-----------------------------

(0 rows)

 

 

2.遠程使用pg_recvlogical工具來回放:

 

創建slot,當然也可以使用原來有的:

pg_recvlogical --create-slot -S test_logical -h pg36 -d postgres -p 5433

 

postgres=# select * from pg_replication_slots ;

    slot_name    |    plugin     | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn

-----------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------

 regression_slot | test_decoding | logical   |  13237 | postgres | f         | f      |            |      |          574 | 0/1663CB8   | 0/1663CF0

 test_logical    | test_decoding | logical   |  13237 | postgres | f         | f      |            |      |          574 | 0/1663CF0   | 0/1663D28

(2 rows)

 

-bash-4.1$ pg_recvlogical  -S test_logical -h pg36 -d postgres -p 5433 --start -f -

Password:

BEGIN 574

table public.test: DELETE: (no-tuple-data)

table public.test: DELETE: (no-tuple-data)

table public.test: DELETE: (no-tuple-data)

COMMIT 574

 

BEGIN 575

table public.test: INSERT: id[integer]:1 info[text]:'test' crt_time[timestamp without time zone]:'2019-05-27 14:03:02.985599'

COMMIT 575

 

如果slot已經建立好了,而遠端的pg_recvlogical停止,停止的這段時間產生的數據庫變化是不會被消費的,啟動后可以接收到所有的變化。

 

3.可以設置從什么時候開始接收回放日志:

postgres=# select pg_current_wal_lsn();

 pg_current_wal_lsn

--------------------

 0/1664FB8

(1 row)

 

postgres=# delete from test where id = 1;

DELETE 7

postgres=# select txid_current();

 txid_current

--------------

          585

(1 row)

 

postgres=# select txid_current();

 txid_current

--------------

          586

(1 row)

 

postgres=#

postgres=#

postgres=# insert into test values(1, 'test 2', now());

INSERT 0 1

postgres=# select pg_current_wal_lsn();

 pg_current_wal_lsn

--------------------

 0/16652C0

(1 row)

 

postgres=# insert into test values(1, 'test 3', now());

INSERT 0 1

 

-bash-4.1$ pg_recvlogical  -S test_logical -h pg36 -d postgres -p 5433 --startpos=0/16652C0 --start -f -

Password:

BEGIN 588

table public.test: INSERT: id[integer]:1 info[text]:'test 3' crt_time[timestamp without time zone]:'2019-05-27 14:37:31.246539'

COMMIT 588

 

可以看到,只是從0/16652C0開始進行接收的

 

 

4.不用的時候,刪除復制槽

postgres=# select * from pg_replication_slots ;

    slot_name    |    plugin     | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn

-----------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------

 regression_slot | test_decoding | logical   |  13237 | postgres | f         | f      |            |      |          574 | 0/1663CB8   | 0/1663CF0

 test_logical    | test_decoding | logical   |  13237 | postgres | f         | t      |      13155 |      |          589 | 0/1665420   | 0/1665458

(2 rows)

 

postgres=# select pg_drop_replication_slot('regression_slot');

 pg_drop_replication_slot

--------------------------

 

(1 row)

 

postgres=# select pg_drop_replication_slot('test_logical');

 pg_drop_replication_slot

--------------------------

 

(1 row)

 

postgres=# select * from pg_replication_slots ;

 slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn

-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------

(0 rows)

 

4.可以使用wal2json工具來解析日志,目前使用的是test_decoding


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM