聊聊Postgres中的IPC之SI Message Queue


在 PostgreSQL中,每一個進程都有屬於自己的共享緩存(shared cache)。例如,同一個系統表在不同的進程中都有對應的Cache來緩存它的元組(對於RelCache來說緩存的是一個RelationData結構)。同一個系統表的元組可能同時被多個進程的Cache所緩存,當其中某個Cache中的一個元組被刪除或更新時 ,需要通知其他進程對其Cache進行同步。在 PostgreSQL的實現中,會記錄下已被刪除的無效元組 ,並通過SI Message方式(即共享消息隊列方式)在進程之間傳遞這一消息。收到無效消息的進程將同步地把無效元組(或RelationData結構)從自己的Cache中刪除。


1.無效消息(Invalid Message)概述

當前系統支持傳遞6種無效消息:
第一種是使給定的catcache中的一個元組無效;
第二種是使給定的系統表的所有catcache結構全部失效;
第三種是使給定的邏輯表的Relcache中RelationData結構無效;
第四種是使給定的物理表的SMGR無效(表物理位置發生變化時,需要通知SMGR關閉表文件);
第五種是使給定的數據庫的mapped-relation失效;
第六種是使一個已保存的快照失效。

可以看出這六種消息對應的影響范圍越來越大。

PostgreSQL使用以下所示的結構體來存儲無效消息。

typedef union
{
	int8		id;				/* type field --- must be first */
	SharedInvalCatcacheMsg cc;
	SharedInvalCatalogMsg cat;
	SharedInvalRelcacheMsg rc;
	SharedInvalSmgrMsg sm;
	SharedInvalRelmapMsg rm;
	SharedInvalSnapshotMsg sn;
} SharedInvalidationMessage;

其中,id為:

  • 0或正數表示一個CatCache元組;
  • -1表示整個CatCahe緩存;
  • -2表示RelCache;
  • -3表示SMGR;
  • -4表示mapped-relation mapping;
  • -5表示Snapshot

當id為0或正數時 ,它同時也表示產生該Invalid Message的CatCache的編號。

具體我們可以看注釋:

src/include/storage/sinval.h
 *	* invalidate a specific tuple in a specific catcache
 *	* invalidate all catcache entries from a given system catalog
 *	* invalidate a relcache entry for a specific logical relation
 *	* invalidate an smgr cache entry for a specific physical relation
 *	* invalidate the mapped-relation mapping for a given database
 *	* invalidate any saved snapshot that might be used to scan a given relation

進程通過調用函數CachelnvalidateHeapTuple()對Invalid Message進行注冊,主要包括以下幾步:

    1. 注冊SysCache無效消息。
    1. 如果是對pg_class系統表元組進行的更新/刪除操作,其 relfilenode或 reltablespace可能發生變化,即該表物理位置發生變化,需要通知其他進程關閉相應的SMGR。這時首先設置relationid和databaseid,然后注冊SMGR無效消息;否則轉而執行步驟3。
    1. 如果是對pg_attribute或者pg_index系統表元組進行的更新/刪除操作,則設置relationid和 dalabaseid,否則返回。
    1. 注冊RelCache無效消息(如果有的話)。
    1. 事務結束時注冊mapped-relation mapping和snapshot無效消息(如果有的話)。

當一個元組被刪除或者更新時,在同一個SQL命令的后續執行步驟中我們依然認為該元組是有效的,直到下一個命令開始或者亊務提交時改動才生效。在命令的邊界,舊元組變為失效,同時新元組置為有效。因此當執行heap_delete或者heap_update時,不能簡單地刷新Cache。而且,即使刷新了,也可能由於同一個命令中的請求把該元組再次加載到Cache中。

因此正確的方法是保持一個無效鏈表用於記錄元組的delete/update操作。事務完成后,根據前述的無效鏈表中的信息廣播該事務過程中產生的Invalid Message,其他進程通過SI Message隊列讀取Invalid Message對各自的Cache進行刷新。當子事務提交時,只需要將該事務產生的Invalid Message提交到父事務,最后由最上層的事務廣播Invalid Message。

需要注意的是,若涉及對系統表結構的改變,還需要重新加載pg_internal.init文件,因為該文件記錄了所有系統表的結構。


2.SI Message全景

以下是相關的函數,寫在前面,先混個臉熟:

CreateSharedInvalidationState()  /* Create and initialize the SI message buffer

SharedInvalBackendInit()  /* 每個backend初始化時要初始化在 SI message buffer 中的Per-backend invalidation state,procState[MaxBackends]

CleanupInvalidationState() /*每個backend shutdown時在調用on_shmem_exit()函數清空對應的procState[i]

SICleanupQueue()  /* Remove messages that have been consumed by all active backends
				 * Possible side effects of this routine include marking one or more
				* backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
				* to some backend that seems to be getting too far behind.  We signal at
				* most one backend at a time, for reasons explained at the top of the file.
				
 SendSharedInvalidMessages() /* Add shared-cache-invalidation message(s) to the global SI message queue.

那么整個SI Message隊列工作的流程大致如下:

  1. SI message 隊列的初始化。這個是由postmaster在啟動服務器時做的,作為共享內存的一部分,由postmaster初始化。此時,SI message為空,因為此時還沒有Invalid Message產生。
  2. 每個backend初始化(我們知道這些Invalid Message是由於我執行了SQL文對數據庫進行了修改才產生的,那么很顯然我們執行SQL文的途徑是前端發送SQL文,后端啟動一個backend進程去處理)時,需要初始化自己的共享內存並且向SI message注冊自己。注冊的目的有兩個,一個是聲明自己作為Invalid Message的生產者的身份,另一個表示自己也需要接受其他backend的Invalid Message。
  3. 每個backend執行SQL文,產生Invalid Message,其他backend接收該Invalid Message,當然,這個過程復雜點,會在后面細說。那么每個backend接收和發送Invalid Message的時機是什么呢?

當然啦,你每次執行SQL的時候,是一個好時機,在執行SQL文的開頭和結尾,backend都會去check SI message隊列中的無效消息。以下是調用棧:

exec_simple_query
	->start_xact_command
		->StartTransactionCommand         /* 事務開始
			->StartTransaction
				->AtStart_Cache
					->AcceptInvalidationMessages
						->ReceiveSharedInvalidMessages /* consume SI message
							->SIGetDataEntries
						
	-> do query
	
	->finish_xact_command
		->CommitTransactionCommand         /* 事務結束
			->CommitTransaction
				->AtEOXact_Inval
					->SendSharedInvalidMessages       /*  send SI message
						->SIInsertDataEntries   
							->SICleanupQueue
						
						

那么,難道我不執行SQL文,我的backend就不刷新無效消息么?

我們看一段注釋:

/*
 * Because backends sitting idle will not be reading sinval events, we
 * need a way to give an idle backend a swift kick in the rear and make
 * it catch up before the sinval queue overflows and forces it to go
 * through a cache reset exercise.  This is done by sending
 * PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind.
 *
 * The signal handler will set an interrupt pending flag and will set the
 * processes latch. Whenever starting to read from the client, or when
 * interrupted while doing so, ProcessClientReadInterrupt() will call
 * ProcessCatchupEvent().
 */

沒有錯,要是某個backend長時間不讀取SI Message或者backend落后太多,超過了SI Message隊列可以接受的最大長度,那么就向該backend發送SIGUSR1,喚醒該backend讓其做適當的操作。


3.實現細節

為了實現SI Message的這一功能,PostgreSQL在共享內存中開辟了shmInvalBuffer記錄系統中所發出的所有Invalid Message以及所有進程處理無消息的進度。shmInvalBuffer是一個全局變量,其數據結構如下:

typedef struct SISeg
{
	/*
	 * General state information
	 */
	int			minMsgNum;		/* oldest message still needed */
	int			maxMsgNum;		/* next message number to be assigned */
	int			nextThreshold;	/* # of messages to call SICleanupQueue */
	int			lastBackend;	/* index of last active procState entry, +1 */
	int			maxBackends;	/* size of procState array */

	slock_t		msgnumLock;		/* spinlock protecting maxMsgNum */

	/*
	 * Circular buffer holding shared-inval messages
	 */
	SharedInvalidationMessage buffer[MAXNUMMESSAGES];

	/*
	 * Per-backend invalidation state info (has MaxBackends entries).
	 */
	ProcState	procState[FLEXIBLE_ARRAY_MEMBER];
} SISeg;

在shmInvalBuffer中,Invalid Message存儲在由Buffer字段指定的定長數組中(其長度MAXNUMMESSAGES預定義為4096),該數組中每一個元素存儲一個Invalid Message,也可以稱該數組為無效消息隊列。無效消息隊列實際是一個環狀結構,最初數組為空時,新來的無效消息從前向后依次存放在數組中,當數組被放滿之后,新的無效消息將回到Buffer數組的頭部開始插人。minMsgNum字段記錄Buffer中還未被所有進程處理的無效消息編號中的最小值,maxMsgNum字段記錄下一個可以用於存放新無效消息的數組元素下標。實際上,minMsgNum指出了Buffer中還沒有被所有進程處理的無效消息的下界,而maxMsgNum則指出了上界,即編號比minMsgNmn小的無效消息是已經被所有進程處理完的,而編號大於等於maxMsgNum的無效消息是還沒有產生的,而兩者之間的無效消息則是至少還有一個進程沒有對其進行處理。因此在無效消息隊列構成的環中,除了 minMsgNum和maxMsgNum之間的位置之外,其他位置都可以用來存放新增加的無效消息。

PostgreSQL在shmInvalBuffer中用一個ProcState數組(procState字段)來存儲正在讀取無效消息的進程的讀取進度,該數組的大小與系統允許的最大進程數MaxBackends有關,在默認情況下這個
數組的大小為100 (系統的默認最大進程數為100,可在postgresql.conf中修改)。ProcState的結構如數據結構如下所示。

/* Per-backend state in shared invalidation structure */
typedef struct ProcState
{
	/* procPid is zero in an inactive ProcState array entry. */
	pid_t		procPid;		/* PID of backend, for signaling */
	PGPROC	   *proc;			/* PGPROC of backend */
	/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
	int			nextMsgNum;		/* next message number to read */
	bool		resetState;		/* backend needs to reset its state */
	bool		signaled;		/* backend has been sent catchup signal */
	bool		hasMessages;	/* backend has unread messages */

	/*
	 * Backend only sends invalidations, never receives them. This only makes
	 * sense for Startup process during recovery because it doesn't maintain a
	 * relcache, yet it fires inval messages to allow query backends to see
	 * schema changes.
	 */
	bool		sendOnly;		/* backend only sends, never receives */

	/*
	 * Next LocalTransactionId to use for each idle backend slot.  We keep
	 * this here because it is indexed by BackendId and it is convenient to
	 * copy the value to and from local memory when MyBackendId is set. It's
	 * meaningless in an active ProcState entry.
	 */
	LocalTransactionId nextLXID;
} ProcState;

在ProcSlate結構中記錄了PID為procPid的進程讀取無效消息的狀態,其中nextMsgNum的值介於 shmlnvalBuffer 的 minMsgNum 值和 maxMsgNum 值之間。

如下圖所示,minMsgmun和MaxMsgmim就像兩個指針,它們區分出了哪些無效消息已經被所有的進程讀取以及哪些消息還在等待某些進程讀取。在minMsgnum之前的消息已經被所有進程讀完;maxMsgnum之后的區域尚未使用;兩者之間的消息是還沒有被所有進程讀完的。當有進程調用函數SendSharedlnvalidMessage將其產生的無效消息添加到shmInvalBuffer中時,maxMsgnum就開始向后移動。SendSharedlnvalidMessage中將調用SIInsertDataEntries來完成無效消息的插人。

在向SI Message隊列中插入無效消息時,可能出現可用空間不夠的情況(此時隊列中全是沒有完全被讀取完畢的無效消息),需要清空一部分未處理無效消息,這個操作稱為清理無效消息隊列,只有當當前消息數與將要插人消息數之和超過shmInvalBuffer中nextThreshold時才會進行清理操作。這時,那些還沒有處理完SI Message隊列中無效消息的進程將收到清理通知,然后這些進程將拋棄其Cache中的所有元組(相當於重新載人Cache的內容)。

顯然,讓所有進程重載Cache會導致較高的I/O次數。為了減少重載Cache的次數,PostgreSQL會在無效消息隊列中設置兩個界限值lowbound和minsig,其計算方式如下:

• lowbound=maxMsgNum-MAXNUMMESSAGES+minFree,其中 minFree 為需要釋放的隊列空間的最小值(minFree指出了需要在無效消息隊列中清理出多少個空位用於容納新的無效消息)。

• minsig = maxMsgNum-MAXNUMMESSAGES/2,這里給出的是minsig的初始值,在進程重載過程中minsig會進行調整。
SICleanupQueue

	/*
	 * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
	 * furthest-back backend that needs signaling (if any), and reset any
	 * backends that are too far back.  Note that because we ignore sendOnly
	 * backends here it is possible for them to keep sending messages without
	 * a problem even when they are the only active backend.
	 */
	min = segP->maxMsgNum;
	minsig = min - SIG_THRESHOLD;
	lowbound = min - MAXNUMMESSAGES + minFree;

可以看到,lowbound實際上給出了此次清理過程中必須要釋放的空間的位置,這是一個強制性的限制,nextMsgNum值低於lowbound的進程都將其resetState字段置為真,這些進程將會自動進行重載Cache的工作。對於那些nextMsgNum值介於lowbound和minaig之間的進程,雖然它們並不影響本次淸理,但是為了盡量避免經常進行清理操作,會要求這些進程加快處理無效消息的進度(CatchUp)。淸理操作會找出這些進程中進度最慢的一個,向它發送SIGUSR1信號。該進程接收到SIGUSR1后會一次性處理完所有的無效消息,然后繼續向下一個進度最慢的進程發送SIGUSR1讓它也加快處理進度。

清理無效消息隊列的工作由函數SICleanupQueue實現,該函數的minFree參數給出了這一次淸理操作至少需要釋放出的空間大小。該函數的流程如下:

SICleanupQueue
	->SendProcSignal

1)計算 lowbound 和 minsig 的值。

  1. 對每一個進程的ProcState結構進行檢査,將nextMsgNum低於lowbound的進程resetState字段設置為true,並在nextMsgNum介於lowboumi和minsig之間的進程中找出進度最慢的一個。

  2. 重新計算nextThreshoW參數。

  3. 向步驟2中找到的進度最慢的進程發送SIGUSR1信號。

Postgres進程通過函數ProcessCatchupInterrupt來處理SIGUSR1信號,該函數最終將調用ReceiveSharedlnvalidMessages來處理所有未處理的無效消息,最后調用SICleanupQueue (minFree參數為0)向下一個進度最慢的進程發送SIGUSR1信號(調用棧如下)。

ProcessCatchupInterrupt
	->AcceptInvalidationMessages
		->ReceiveSharedInvalidMessages
			->SICleanupQueue

每個進程在需要刷新其Cache時也會調用ReceiveSharedInvalidMessages函數用於讀取並處理無效消息,函數參數為兩個函數指針:

  1. invalFunction:用於處理一條無效消息。

  2. resetFunction:將該后台進程的Cache元組全部拋棄。

對於resetState設置為真的進程,函數ReceiveSharedInvalidMessages會調用resetFunction拋棄其所有的Cache元組。否則,ReceiveSharedInvalidMessages將從消息隊列中讀取每條無效消息並調用invalFunction對消息進行處理。如果該進程是根據SIGUSR1信號調用該函數,那么還將調用SICleanupQueue函數將這個信號傳給比它進度慢的進程。


4.其他

在PMsignal.c中,包含后台進程向Postmaster發送信號的相關函數。在實現中,后台進程是這樣通知Postmaster的:

  1. 首先在共享內存中開辟一個數組PMSignalFlags(PMsignal.c),數組中的每一位對應一個信號。

  2. 然后如果后台進程希望向Postmaster發送一個信號,那么后台首先將信號在數組PMSignalFlags中相應的元素置1 (邏輯真),然后調用kill函數向Postmaster發送SIGUSR1信號。

  3. 當Postmaster收到SIGUSR1后首先檢測共享存儲中PMSignalFlags,確認具體的信號是什么。同時將信號在數組PMSignalFlags中相應的元素置0 (邏輯假)然后作出相應反應。

每一個后台進程都有一個結構PGPROC存儲在共享內存中。Procarray.c在共享內存中分配ProcArrayStruct類型的數組procArray,統一管理這些PGPROC結構。PGPROC結構中包含很多的信息,Procarray.c中的函數主要處理 PGPROC中的 pid、databaseld、roleld、xmin、xid、subxids 等字段。這些函數的功能或是統計事務的信息,或是通過databaseId統計有多少個pid (也就是多少個后台進程)與指定數據庫相連接等統計信息。

IPC負責的清除工作有兩個方面:一個是與共享內存相關的清除,另一個是與各個后台進程相關的清除工作。與共享內存相關的淸除並不是將共享內存丟棄,而是重新設置共享內存。清除工作的流程可以描述如下:首先在申請資源的時候,系統會同時為該資源注冊一個清除函數,當要求做清除操作時,系統將會調用對應的淸除函數。


IPC的內容還有不少,本次只是大致說了下關於SI Message共享隊列的處理,其它的以后有時間再去寫寫吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM