PostgreSQL/lightdb邏輯復制詳解及雙活解決方案


  之所以有邏輯復制,是因為物理復制是基於數據塊的復制,每個實例的數據塊是自己維護的,無法做到全局,所以只能借助邏輯塊復制,即使是內核集成的HTAP,在行存和列存之間同步時,也采用的是邏輯塊復制。邏輯復制可用於很多場景,例如部分數據同步、DW集成、同步到大數據、ES、做流式計算、緩存更新等等,在這些場景中,CDC是非常重要的。

  除此之外,因為Postgresql不支持閃回查詢,因此,邏輯復制也可以用來解析wal日志的修改,並生成回滾用的數據。

  Postgres 10開始原生支持邏輯復制。

  邏輯復制也稱為行級復制或CDC,所以vacuum、index update這些都不會包含。主要用於雙主、同步到kafka/redis/gp等場景,因為基於復制協議,理論上也可以做到半同步,性能上可達到流復制的2/3,默認不支持DDL、序列(pglogical可配置)。是否支持多主?(是否可以源端不decode?直接到目標庫,技術上可以的。但是因為要基於當時的catalog進行decode以便精確解析出定義,所以會比較麻煩。比如oracle logminer/OGG/xstream就支持在三個地方進行解碼,也是為了需要數據字典同步)是否支持可以支持DDL?(BDR支持)

邏輯復制的架構

   邏輯復制涉及的組件包括:復制槽(pg_replication_slots)、訂閱(pg_subscription)、復制源(pg_replication_origin)、解碼插件(plugin)、發布(pg_publication、pg_publication_tables、pg_publication_rel)。其中邏輯復制的消費者不一定要是subscription,可以是其他比如pg_recvlogical。subscription和pg_subscription的存在是為了pg實例之間邏輯復制可以開箱即用。從性能上來看,wal_level=logical和wal_level=replica的性能基本相當(基於ltbench/pgbench 100並發壓測來看)。

  在PG的架構上,邏輯解碼是發生在wal_sender上的,而不是消費者負責解碼(oracle則支持在主機或其它實例,其他實例要求包含catalog,和維護replslotreplication slots 是從 postgresql 9.4 引入的,主要是提供了一種自動化的方法來確保主控機在所有的后備機收到 WAL 段之前不會移除它們,並且主控機也不會移除可能導致恢復沖突的行,即使后備機斷開也是如此。 為了防止 WAL 被刪,SLOT restart_lsn 之后的WAL文件都不會刪除。(wal_keep_segments 則是一個人為設定的邊界,slot 是自動設定的邊界(無上限),所以使用 slot 並且下游斷開后,可能導致數據庫的 WAL 堆積爆滿)。)中的min_catalog是一樣的,否則就解碼不出了,wal中包含了oid和blockid,同時pg的wal是full record的,所以apply和解析時都可以非常快),如下:

  對於任何一個有訂閱或消費者的復制槽,都有一個對應的walsender(這一點和流復制是一樣的)進程,實時等待被wal writer或bgwriter喚醒去讀取剛剛提交的xlog,通過信號通知。 

  啟動一個pg_recvlogical消費者,

[zjh@hs-10-20-30-193 ~]$ nohup lt_recvlogical -p25432 -Uzjh -d postgres --slot test_for_recvlogical --start -f - &
[1] 237063
[zjh@hs-10-20-30-193 ~]$ nohup: ignoring input and appending output to ??ohup.out?

[zjh@hs-10-20-30-193 ~]$ 
[zjh@hs-10-20-30-193 ~]$ 
[zjh@hs-10-20-30-193 ~]$ 
[zjh@hs-10-20-30-193 ~]$ tail -fn 100 nohup.out 
BEGIN 2594732
table public.users: INSERT: user_id[integer]:5 user_name[character varying]:'data5' gender[integer]:null salary[numeric]:null dept_id[integer]:null create_date[timestamp without time zone]:null update_date[timestamp without time zone]:'2022-04-10 08:30:42.870449'
COMMIT 2594732

  首先看lightdb日志,如下:

2022-04-23 08:29:58.124755C mysub zjh@postgres ::1(54602) walsender idle 00000[2022-04-23 08:29:58 UTC] 0 [83341] LOG:  starting logical decoding for slot "mysub"
2022-04-23 08:29:58.124755C mysub zjh@postgres ::1(54602) walsender idle 00000[2022-04-23 08:29:58 UTC] 0 [83341] DETAIL:  Streaming transactions committing after 19/7F15448, reading WAL from 19/7F15410.
2022-04-23 08:29:58.124815C mysub zjh@postgres ::1(54602) walsender idle 00000[2022-04-23 08:29:58 UTC] 0 [83341] LOG:  logical decoding found consistent point at 19/7F15410
2022-04-23 08:29:58.124815C mysub zjh@postgres ::1(54602) walsender idle 00000[2022-04-23 08:29:58 UTC] 0 [83341] DETAIL:  There are no running transactions.

  walsender進程啟動后會基於上一次保存的復制槽信息確定最后提交的位置,計算出需要回退的LSN(如果剛好是邊界處,則三個位置也會相同),並從此處開始重新解碼。

  再對應的walsender進程狀態。如下:

     libc.so.6!__epoll_wait_nocancel    
>    WaitEventSetWaitBlock(set = 0x1841828, cur_timeout = 30000, occurred_events = 0x7ffd43ffa980, nevents = 1)    C++ (gdb)
     WaitEventSetWait(set = 0x1841828, timeout = 30000, occurred_events = 0x7ffd43ffa980, nevents = 1, wait_event_info = 100663303)    C++ (gdb)
     WaitLatchOrSocket(latch = 0x7f9fcafdb2ec, wakeEvents = 43, sock = 11, timeout = 30000, wait_event_info = 100663303)    C++ (gdb)  ## 等待latch可用或socket可讀
     WalSndWaitForWal(loc = 1882311624)    C++ (gdb)  ## 直到wal位置大於loc返回,里面是循環
     logical_read_xlog_page(state = 0x18f2030, targetPagePtr = 1882308608, reqLen = 3016, targetRecPtr = 1882311600, cur_page = 0x18fdc18 "\006\321\005")    C++ (gdb)
     ReadPageInternal(state = 0x18f2030, pageptr = 1882308608, reqLen = 3016)    C++ (gdb)
     XLogReadRecord(state = 0x18f2030, errormsg = 0x7ffd43ffab58)    C++ (gdb)
     XLogSendLogical    C++ (gdb)
     WalSndLoop(send_data = 0x891d26 <XLogSendLogical>)    C++ (gdb)   ## 一直等待,直到有wal后調用send_data函數指針指向的XLogSendLogical函數開始寫邏輯解碼數據
     StartLogicalReplication(cmd = 0x18c1b58)    C++ (gdb)
     exec_replication_command(cmd_string = 0x1812418 "START_REPLICATION SLOT \"test_for_recvlogical\" LOGICAL 0/0")    C++ (gdb)
     PostgresMain(argc = 1, argv = 0x18407b0, dbname = 0x1840720 "postgres", username = 0x1840708 "zjh")    C++ (gdb)
     BackendRun(port = 0x183c740)    C++ (gdb)
     BackendStartup(port = 0x183c740)    C++ (gdb)
     ServerLoop    C++ (gdb)
     PostmasterMain(argc = 3, argv = 0x180cf00)    C++ (gdb)
     main(argc = 3, argv = 0x180cf00)    C++ (gdb)

  收到信號(有些信號多次會合並,有些不會合並,需要看信號類型和處理方式)(各種地方都會發起,如異步提交模式下XLogSetAsyncXactLSN()函數會調用SetLatch()設置latch,同步模式下XLogFlush()->WalSndWakeup()調用SetLatch())后,會調用信號處理器,如procsignal_sigusr1_handler,信號處理器會設置對應的latch。此時WaitEventSetWaitBlock會從阻塞返回,從而WalSndWaitForWal內部繼續循環,直到最新的flushLSN大於loc才調用LogRead相關邏輯並返回SendLogical。如下:

[zjh@hs-10-20-30-193 bin]$ pstack 221212
#0  0x00007ffb6ad5a740 in __read_nocancel () from /lib64/libpthread.so.0
#1  0x000000000055fc89 in XLogRead (buf=0x20e23e8 "\006\321\005", segsize=16777216, tli=1, startptr=67075424256, count=8192) at xlogutils.c:916
#2  0x0000000000560147 in read_local_xlog_page (state=0x20e0750, targetPagePtr=67075424256, reqLen=1427, targetRecPtr=67075419144, cur_page=0x20e23e8 "\006\321\005") at xlogutils.c:1125
#3  0x000000000055cd04 in ReadPageInternal (state=0x20e0750, pageptr=67075424256, reqLen=1427) at xlogreader.c:626
#4  0x000000000055c7fc in XLogReadRecord (state=0x20e0750, errormsg=0x7ffcf0e6e4d8) at xlogreader.c:440
#5  0x0000000000866e6d in pg_logical_slot_get_changes_guts (fcinfo=0x20d8458, confirm=true, binary=false) at logicalfuncs.c:287
#6  0x0000000000867024 in pg_logical_slot_get_changes (fcinfo=0x20d8458) at logicalfuncs.c:365
#7  0x00000000006ef307 in ExecMakeTableFunctionResult (setexpr=0x20ce980, econtext=0x20ce850, argContext=0x20d8340, expectedDesc=0x20d6368, randomAccess=false) at execSRF.c:234

  前面部分流程和流復制基本一致。然后進入LogicalDecodingProcessRecord,然后根據實際的xlog rmgrid調用對應的cb。

   

  最后回調解碼函數進行具體的處理。

  無論是流復制還是SQL接口查詢(select pg_create_logical_replication_slot('xxx','wal2json'); SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);),邏輯解碼真正的入口都是在通過XLogReadRecord()得到Xlog Record之后,調用LogicalDecodingProcessRecord(ctx, ctx->reader)。

  ReorderBufferCommit相當於一個主事務,里面會有多個修改的記錄或子事務,只是一種封裝。從功能上來看,reorderbuffercommit是把wal中解析出來的wal record根據事務的提交順序進行重新排序,以便事務一個個的進行解碼輸出。也就是如下:

   各plugin的callback需要負責將解析出的結果發送給客戶端、插入數據庫或保存到其希望的文件。

  注:如果遇到一個大事務,則會大量的數據寫入臨時文件,如下:

[zjh@hs-10-20-30-193 ~]$ pstack 175125
#0  0x00007ff55c211b90 in __memcpy_ssse3_back () from /lib64/libc.so.6
#1  0x00000000008b40ed in BufFileWriteCommon (file=0xeec718, ptr=0x20938e8, size=24, is_transient=true) at buffile.c:2129
#2  0x00000000008b3cf0 in BufFileWriteTransient (file=0xeec718, ptr=0x20938e8, size=24) at buffile.c:1910
#3  0x000000000086fb4f in ReorderBufferWriteData (file=0xeec718, ptr=0x20938e8, size=24, txn=0xee86e8) at reorderbuffer.c:2893
#4  0x000000000086f7fa in ReorderBufferSerializeChange_tde (rb=0xef6828, txn=0xee86e8, file=0xeec718, change=0x25a6308) at reorderbuffer.c:2788
#5  0x000000000086f08c in ReorderBufferSerializeTXN (rb=0xef6828, txn=0xee86e8) at reorderbuffer.c:2517
#6  0x000000000086ee5e in ReorderBufferCheckMemoryLimit (rb=0xef6828) at reorderbuffer.c:2442
#7  0x000000000086c56d in ReorderBufferQueueChange (rb=0xef6828, xid=2594767, lsn=5129826400, change=0x6a86538) at reorderbuffer.c:664
#8  0x0000000000862cfd in DecodeMultiInsert (ctx=0xee67c8, buf=0x7ffd3e44ecd0) at decode.c:972
#9  0x0000000000861c64 in DecodeHeap2Op (ctx=0xee67c8, buf=0x7ffd3e44ecd0) at decode.c:375
#10 0x00000000008615e1 in LogicalDecodingProcessRecord (ctx=0xee67c8, record=0xee6a60) at decode.c:122

 

postgresql wal中的origin

  

 

  origin主要用於記錄邏輯復制中記錄來源於哪個源,在邏輯分析插件中使用,用於過濾掉不需要的數據來源。

  

    維護在pg_replication_origin中。既可以由訂閱自動創建,也可以人工創建。

--publication
postgres=# \d
       List of relations
 Schema | Name | Type  | Owner  
--------+------+-------+--------
 public | t1   | table | movead
(1 row)
postgres=# create publication pub1 for all tables ;
CREATE PUBLICATION

--------------------------------------------------
--subscription
postgres=# create subscription sub1 connection 'host=xxxxxxxx port=5432 dbname=postgres user=movead' publication pub1;
NOTICE:  created replication slot "sub1" on publisher
CREATE SUBSCRIPTION
postgres=#

postgres=# select * from pg_replication_origin;
 roident |   roname    
---------+-------------
       1 | pg_16389

postgres=# select pg_replication_origin_create('test_origin');   -- 用給定的外部名稱創建一個復制源,並且返回分配給它的內部ID。
 pg_replication_origin_create 
------------------------------
                            2  -- 返回的是origin id
(1 row)

postgres=# select * from pg_replication_origin;
 roident |   roname    
---------+-------------
       1 | pg_16389
       2 | test_origin
(2 rows)
postgres=#

  對於訂閱產生的origin,可以通過在pub端插入數據,然后分析wal,就可以看出wal打標記了。

rmgr: Transaction len (rec/tot):     65/    65, tx:        519, lsn: 0/03000068, prev 0/03000028, desc: COMMIT 2020-04-16 17:09:01.989257 CST; origin: node 1, lsn 0/0, at 2020-04-16 17:05:49.767511 CST


rmgr: Transaction len (rec/tot):     65/    65, tx:        520, lsn: 0/03000128, prev 0/030000E8, desc: COMMIT 2020-04-16 17:09:09.327941 CST; origin: node 2, lsn 0/156BB28, at 2020-04-16 17:09:09.268948 CST

  originid通過roident標識:

postgres=#  \d pg_replication_origin
    Table "pg_catalog.pg_replication_origin"
 Column  | Type | Collation | Nullable | Default 
---------+------+-----------+----------+---------
 roident | oid  |           | not null | 
 roname  | text | C         | not null | 

  對於手工創建的origin,需要調用pg_replication_origin_session_setup () API綁定會話到origin。

postgres=# select pg_replication_origin_session_setup('test_origin'); -- 將當前會話標記為從給定的原點回放,從而允許跟蹤回放進度。 只能在當前沒有選擇原點時使用。使用pg_replication_origin_session_reset 命令來撤銷。
 pg_replication_origin_session_setup 
-------------------------------------
(1 row)
postgres=# insert into t1 values(100);select pg_current_wal_lsn();   -- 會話必須綁定到origin會話,wal中才會標記origin
INSERT 0 1
 pg_current_wal_lsn 
--------------------
 0/4000230
(1 row)

訂閱端的源碼實現

   ss

  自帶插件test_decoding可以實現Origin的解析使用。

  

   就一般使用而言,解碼器可以考慮wal2json、pglogical。但是他們倆都屬於組件級別,算不到產品級。如果沒有自研能力,基本使用可以考慮Debezium。

邏輯復制最佳實踐

  1. 至少按照schema訂閱,如果schema管理不規范,按照表訂閱,尤其是stage表比較多的系統;
  2. 所有需要邏輯復制的表需要具有主鍵或復制標識符,可查詢select relname ,relreplident from pg_catalog.pg_class ;(relreplident的取值及含義為Columns used to form “replica identity” for rows: d = default (primary key, if any), n = nothing, f = all columns, i = index with indisreplident set (same as nothing if the index used has been dropped));
    1. 全字段可通過alter table tble_name REPLICA IDENTITY FULL;進行設置。
  3. 避免超大型事務(copy/update/insert select/delete等操作)防止解碼序列化失敗(如果是TEXT模式,則最后單個事務的字符串不能超過1GB-1字節,由宏MaxAllocSize定義);
  4. 容量規划時,要為每個邏輯復制預留1核資源(內存關系不大)。

復制槽

  無論是邏輯復制還是物理復制,可靠性非常依賴於復制槽機制。復制槽分持久復制槽和臨時復制槽兩種。參數wal_receiver_create_temp_slot控制如果沒有提前創建持久復制槽的話,是否自動創建臨時復制槽。

相關異常

異常一

lt_recvlogical: error: could not send replication command "START_REPLICATION SLOT "test_perf1" LOGICAL 0/0": ERROR: replication slot "test_perf1" is active for PID 160419
lt_recvlogical: disconnected; waiting 5 seconds to try again

因為復制槽test_perf1已經被某個客戶端訂閱了。如下:

[zjh@hs-10-20-30-193 ~]$ ps axu | grep 160419
zjh 160419 0.5 0.4 10776896 1831272 ? Ss May01 6:07 lightdb: walsender zjh 10.20.30.193(40938) idle
zjh 177795 0.0 0.0 112820 976 pts/10 S+ 02:54 0:00 grep --color=auto 160419

zjh@postgres=# select * from pg_stat_replication ;
  pid   | usesysid | usename | application_name | client_addr  | client_hostname | client_port |         backend_start         | backend_xmin |   state   |  sent_lsn   |  write_lsn  |  flush_
lsn  | replay_lsn | write_lag | flush_lag | replay_lag | sync_priority | sync_state |          reply_time           
--------+----------+---------+------------------+--------------+-----------------+-------------+-------------------------------+--------------+-----------+-------------+-------------+--------
-----+------------+-----------+-----------+------------+---------------+------------+-------------------------------
 160419 |       10 | zjh     | lt_recvlogical   | 10.20.30.193 |                 |       40938 | 2022-05-01 06:34:48.526734+00 |              | streaming | 45/C95369C8 | 45/C95369C8 | 45/C953
69C8 |            |           |           |            |             0 | async      | 2022-05-02 03:00:27.661725+00

可知被10.20.30.193:40938占用了。

[zjh@hs-10-20-30-193 wal2sql]$ lsof -i:40938
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
lightdb 160419 zjh 11u IPv4 158818808 0t0 TCP lightdb1:25432->lightdb1:40938 (ESTABLISHED)
lt_recvlo 210812 zjh 3u IPv4 158811039 0t0 TCP lightdb1:40938->lightdb1:25432 (ESTABLISHED)
[zjh@hs-10-20-30-193 wal2sql]$ ps axu | grep 210812
zjh 179479 0.0 0.0 112816 980 pts/8 S+ 03:01 0:00 grep --color=auto 210812
zjh 210812 0.0 0.2 1221484 936236 ? S Apr30 0:18 lt_recvlogical --start --slot=test_perf1 --plugin=wal2json -d postgres -h 10.20.30.193 -p 25432 -f test_perf.log

可知,lt_recvlogical是4.30號啟動的。

異常二

lt_recvlogical: error: unexpected termination of replication stream: ERROR: out of memory
DETAIL: Cannot enlarge string buffer containing 1073741791(1GB=1073741824) bytes by 116 more bytes.
CONTEXT: slot "test_perf4", output plugin "wal2json", in the change callback, associated LSN 45/E1101BF0
lt_recvlogical: disconnected; waiting 5 seconds to try again

wal2json因為解碼上下文LogicalDecodingContext在輸出緩沖上用了StringInfo成員,其最大值僅支持1GB。所以,需要控制事務大小。

雙活

  借用origin和雙向訂閱、發布機制,可以輕松實現多活架構,要部署多活架構,一定要注意應用層做路由隔離,盡量做基於主鍵的親和性,避免隨意分發導致大量沖突。 

  postgresql 15開始支持針對schema進行復制,之前可以針對表或整個庫。同時支持行和列過濾配置。https://www.postgresql.org/about/news/postgresql-15-beta-1-released-2453/https://www.postgresql.fastware.com/blog/logical-replication-of-all-tables-in-schema-in-postgresql-15https://www.postgresql.fastware.com/blog/addressing-replication-conflicts-using-alter-subscription-skip

  postgresql 16開始,將支持DDL復制。https://commitfest.postgresql.org/38/3595/。在此之前,pglogical支持DDL,也支持解析為JSON等,可以考慮基於它做擴展。

  https://www.2ndquadrant.com/en/blog/pglogical-logical-replication-postgresql-10/

postgresql/lightdb到oracle實時同步復制

  可下載lightdb-x,內置ltdts_recvlogical,可將pg的變更實時同步到oracle。lightdb內置了實例failover之后同步自動切換功能,相比flink CDC和debezium等開源同步工具,更加智能和自主可控。

分布式事務

  從pg 15開始,邏輯復制已經開始支持分布式事務,參見https://www.postgresql.org/docs/current/logicaldecoding-two-phase-commits.htmlhttps://www.postgresql.org/docs/current/protocol-replication.htmlhttps://www.postgresql.fastware.com/blog/two-phase-commits-for-logical-replication-publications-subscriptions

事務過濾

  pg_logical_emit_message函數可以用來給事務打標簽,在wal2sql中對應action屬性,值為M。用來實現過濾比如雙向復制,因為origin的范圍過大,難以實現細粒度的控制。對應LogicalDecodeMessageCB message_cb回調函數。

  https://blog.51cto.com/u_15259710/4796035

http://www.postgres.cn/docs/13/functions-admin.html 

https://www.highgo.ca/2020/04/18/the-origin-in-postgresql-logical-decoding/   origin避免循環復制過濾

https://www.postgresql.org/docs/14/replication-origins.html 

https://www.postgresql.org/docs/14/logicaldecoding.html 

https://www.postgresql.org/docs/14/logicaldecoding-example.html

https://www.postgresql.org/docs/current/logical-replication-restrictions.html 邏輯復制的限制

https://www.postgresql.fastware.com/blog/logical-replication-tablesync-workers   雙進程體系

https://wiki.postgresql.org/wiki/Logical_replication_and_physical_standby_failover、patroni實現了一種高可用了下保證邏輯復制可用的做法(https://www.percona.com/blog/how-patroni-addresses-the-problem-of-the-logical-replication-slot-failover-in-a-postgresql-cluster/),lightdb 22.2也實現了類似的做法,主從切換后保證邏輯復制不中斷(opengauss 3.1.0也新增了該特性)

邏輯復制入門系列

https://blog.anayrat.info/en/2017/07/29/postgresql-10-and-logical-replication-overview/

https://severalnines.com/blog/how-optimize-postgresql-logical-replication/

https://blog.anayrat.info/en/2017/08/05/postgresql-10-and-logical-replication-setup/

https://blog.anayrat.info/2017/08/27/postgresql-10-et-la-r%C3%A9plication-logique-restrictions/

https://blog.anayrat.info/en/2018/03/10/logical-replication-internals/ 

https://www.postgresql.fastware.com/blog/how-postgresql-15-improved-communication-in-logical-replication 

https://www.slideshare.net/PetrJelinek1/logical-replication-in-postgresql-flossuk-2016

https://www.slideshare.net/UmairShahid16/logical-replication-with-pglogical

https://aws.amazon.com/cn/blogs/database/postgresql-bi-directional-replication-using-pglogical/   基於pglogical做二開也不失為一種做法,但是因為僅限pg,通用性弱了點。

https://www.slideshare.net/AlexanderShulgin3/streaming-huge-databases-using-logical-decoding  每行記錄都包含的元數據,非常耗費資源,需要考慮下其他序列化機制。

avro(相當於帶schema定義的JSONB/BSON),kafka中的應用

https://docs.oracle.com/database/121/XSTRM/xstrm_pt_concepts.htm#XSTRM72454  oracle邏輯復制

https://zhuanlan.zhihu.com/p/163204827

其它

https://kinsta.com/blog/postgresql-replication/

https://www.percona.com/blog/how-patroni-addresses-the-problem-of-the-logical-replication-slot-failover-in-a-postgresql-cluster/

https://www.enterprisedb.com/blog/logical-decoding-large-progress-transactions-postgresql 

https://www.postgresql.eu/events/pgconfeu2019/sessions/session/2651/slides/237/Deploy%20your%20own%20replication%20system%20with%20Wal2json.pdf

lightdb同步到oracle可以參見Lightdb支持在線同步到ORACLE

Why chain of snapshots is used in ReorderBufferCommit


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM