RCU基本概念
-
讀側臨界區 (read-side critical sections): RCU讀者執行的區域,每一個臨界區開始於
rcu_read_lock()
,結束於rcu_read_unlock()
,可能包含rcu_dereference()
等訪問RCU保護的數據結構的函數。這些指針函數實現了依賴順序加載的概念,稱為memory_order_consume
加載。 -
靜默態(quiescent state): 當一個線程沒有運行在讀側臨界區時,其就處在靜默狀態。持續相當長一段時間的靜默狀態稱之為延長的靜默態(extended quiescent state)。
-
寬限期(Grace period): 每個線程都至少一次進入靜默態的時間。寬限期前所有在讀側臨界區的讀者在寬限區后都會結束。不同的寬限期可能有部分或全部重疊。
讀者在讀臨界區遍歷RCU數據。如果寫者從此數據中移除一個元素,需要等待一個寬限期后才能執行回收內存操作。
上述操作的示意圖如下圖所示,其中,標有read
的框框為一個讀臨界區。
上圖中,每一個讀者、更新者表示一個獨立的線程,總共4個讀線程,一個寫線程。
RCU更新操作分為兩個階段:移除階段和回收階段。兩個階段通過寬限期隔開。更新者在移除元素后,通過synchronize_rcu()
原語,初始化一個寬限期,並等待寬限期結束后,回收移除的元素。
- 移除階段:RCU更新通過
rcu_assign_pointer()
等函數移除或插入元素。現代CPU的指針操作都是原子的,
rcu_assign_pointer()
原語在大多數系統上編譯為一個簡單的指針賦值操作。移除的元素僅可被移除階段(以灰色顯示)前的讀者訪問。 - 回收階段:一個寬限期后, 寬限期開始前的原有讀者都完成讀操作,因此,此階段可安全釋放由刪除階段刪除的元素。
一個寬限期可以用於多個刪除階段,即可由多個更新程序執行更新。
此外,跟蹤RCU寬限期的開銷可能會均攤到現有流程調度上,因此開銷較小。對於某些常見的工作負載,寬限期跟蹤開銷可以被多個RCU更新操作均攤,從而使每個RCU更新的平均開銷接近零。
QSBR 模式
針對不同的應用場景,urcu提供了以下5種不同的flavors:1. urcu,2. QSBR(quiescent-state-based RCU), 3. Memory-barrier-base RCU,4. "Bullet-proof" RCU,5. Signal-based RCU, 其中,顯示靜默模式QSBR性能最好。
QSBR模式下,rcu_read_lock()
和rcu_read_unlock()
為空操作,對於讀者來說負擔為零,這一點是另外4種flavor不具備的。
每個讀者線程必須周期性的調用rcu_quiescent_state()
來聲明自己進入靜默期。
注意:
- 並非每次讀操作完成后都需要做此聲明,考慮到讀操作的性能和應用讀寫操作次數的不平衡性,通常的做法是每進行一定次數(如1024)的讀操作之后聲明進入一次靜默期。
- 每個進入讀側臨界區的線程都需要事先通過
rcu_register_thread()
接口進行注冊,退出時調用rcu_unregister_thread()
接口取消注冊。
示例1:
下圖展示了Linux中的鏈式列表的實現中,兩個讀者並行讀,一個更新者並行更新列表元素的場景。
圖中,第一行與第二行分別展示了寬限期開始前的讀者與之后的讀者看到的數據結構。第三行是更新者的視角看到的數據結構。
- 第一列中,展示了一個由A B C組成的單向列表,任何在寬限期之前初始化的讀者都有可能引用任何一個元素;
- 第二列中,
list_del_rcu()
將B從列表中解除,但保留了B和C之間的連接,允許已經引用了B的讀者可以引用到C。 - 第三列,從第二列到第三列的轉換,顯示元素B從讀者線程的角度消失了。 在此過渡期間,元素B從全局可見(其中任何讀者都可以獲取新的引用)移動到局部可見,只有已經擁有引用的讀者才能看到元素B。
- 第四列,
synchronize_rcu()
原語等待寬限期期間,所有先前存在的讀側臨界區都將完成, 然后可以安全地調用free()
刪除,回收元素B的內存。
示例2:
示例2中,根據用戶輸入創建若干個writer和reader,writer不斷申請釋放內存資源,並用全局指針test_rcu_pointer
記錄資源,reader不斷讀取test_rcu_pointer
指向資源的值,並且每1024次聲明靜默期,最后統計reader和writer的次數。
寫者:
void *thr_writer(void *_count)
{
unsigned long long *count = _count;
int *new, *old;
for (;;) {
new = malloc(sizeof(int));
assert(new);
*new = 8;
old = rcu_xchg_pointer(&test_rcu_pointer, new);
synchronize_rcu();
if (old)
*old = 0;
free(old);
URCU_TLS(nr_writes)++;
}
printf_verbose("thread_end %s, tid %lu\n",
"writer", urcu_get_thread_id());
*count = URCU_TLS(nr_writes);
return ((void*)2);
}
讀者:
void *thr_reader(void *_count)
{
unsigned long long *count = _count;
int *local_ptr;
rcu_register_thread();
rcu_thread_offline();
rcu_thread_online();
for (;;) {
rcu_read_lock();
local_ptr = rcu_dereference(test_rcu_pointer);
if (local_ptr)
assert(*local_ptr == 8);
rcu_read_unlock();
URCU_TLS(nr_reads)++;
/* 每讀1024次,進入1次靜默期 */
if (caa_unlikely((URCU_TLS(nr_reads) & ((1 << 10) - 1)) == 0))
rcu_quiescent_state();
}
rcu_unregister_thread();
*count = URCU_TLS(nr_reads);
printf_verbose("thread_end %s, tid %lu\n",
"reader", urcu_get_thread_id());
return ((void*)1);
}
不同urcu模式下性能對比:
[](_v_images/20191123163832376_23665.png =578x)
可見,qsbr 模式性能最好,在讀者1024讀后進入靜默期的情況下,讀寫操作比為6000:1。
QSBR 源碼分析
讀者-注冊/上線
注冊:
qsbr rcu的實現中,reader線程必須進行顯式注冊, 將自己掛接在全局鏈表registry
上,通俗地說就是將自己置於全局管理之下,這樣當writer在進行同步(synchronize)時,才能知道哪些線程需要同步(只有注冊過的線程才需要)**。
上線:
線程的上線狀態分為在線(online)和離線(offline)。
其中處於offline的線程雖然在registry鏈表上,但在synchronized時,writer會忽略這些線程。線程注冊會默認置於online狀態。
void rcu_register_thread(void)
{
URCU_TLS(rcu_reader).tid = pthread_self(); // rcu_reader讀者的線程id
mutex_lock(&rcu_registry_lock);
URCU_TLS(rcu_reader).registered = 1; // 已注冊標記
cds_list_add(&URCU_TLS(rcu_reader).node, ®istry); // 將rcu_reader加入全局鏈表registry
_rcu_thread_online(); // 上線,設置rcu_reader.ctr = rcu_gp.ctr
}
線程上線(online)的本質,就是將rcu_gp.ctr的值存儲到本線程的ctr中
static inline void _rcu_thread_online(void)
{
_CMM_STORE_SHARED(URCU_TLS(rcu_reader).ctr, CMM_LOAD_SHARED(rcu_gp.ctr));
}
線程下線(offline),則是將本線程的ctr清零:
static inline void _rcu_thread_offline(void)
{
CMM_STORE_SHARED(URCU_TLS(rcu_reader).ctr, 0);
wake_up_gp();
}
URCU_TLS(name): 訪問線程的本地變量。其產生一個C語言的本地變量,可以加載與存儲。
寫者-同步(synchronize)
rcu機制的一個典型場景: 全局指針gp_ptr指向內存區域A,writer在申請了一份新的內存區域B后,使全局指針gp_ptr指向B。
在多核系統中,writer在更新后並不知道有沒有reader正在引用區域A的數據,所以它需要阻塞等待所有的reader線程更新本地ctr(即reader.ctr = gp.ctr),這個操作便是同步(synchronize),其簡化版實現代碼片段如下
void synchronize_rcu(void)
{
// 定義一個struct cds_list_head qsreaders變量
CDS_LIST_HEAD(qsreaders);
// 定義struct urcu_wait_node wait變量
DEFINE_URCU_WAIT_NODE(wait, URCU_WAIT_WAITING);
// 將wait將入gp_waitersduil,表示writer置於wait狀態
urcu_wait_add(&gp_waiters, &wait)
......
/* 遍歷所有reader的ctr,直到其更新到最新的gp.ctr */
wait_for_readers(registry, &cur_snap_readers, &qsreaders);
.....
}
static void wait_for_readers(struct cds_list_head *input_readers,
struct cds_list_head *cur_snap_readers,
struct cds_list_head *qsreaders)
{
unsigned int wait_loops = 0;
struct rcu_reader *index, *tmp;
/*
* Wait for each thread URCU_TLS(rcu_reader).ctr to either
* indicate quiescence (offline), or for them to observe the
* current rcu_gp.ctr value.
*/
/* 直到所有reader.ctr已經到最新才跳出循環 */
for (;;) {
uatomic_set(&rcu_gp.futex, -1);
cds_list_for_each_entry(index, input_readers, node) {
_CMM_STORE_SHARED(index->waiting, 1);
/* 遍歷所有輸入的reader */
cds_list_for_each_entry_safe(index, tmp, input_readers, node) {
switch(rcu_reader_state(&index->ctr)) {
case RCU_READER_ACTIVE_CURRENT: /* reader.ctr已經最新 */
case RCU_READER_INACTIVE: /* reader處於offline狀態 */
cds_list_move(&index->node, qsreaders); /* 從遍歷列表中移除 */
break;
case RCU_READER_ACTIVE_OLD: /* reader.ctr不是最新 */
break;
}
}
if (cds_list_empty(input_readers)) {
uatomic_set(&rcu_gp.futex, 0); /* 列表空了,表示所有reader已更新 跳出循環 */
break;
}
}
}
計數器
全局計數器:
RCU機制是用於多核系統中,保持每個核上的線程所看到的全局數據一致性的一種機制,所以需要一種手段可以判斷當writer線程進行數據更新后,reader線程看到的數據是否已經最新。
為此urcu維護了一個全局的計數器rcu_gp.ctr
,每次writer進行同步操作(synchronize),都會使計數器加1,表示數據已經更新了,等待reader更新。
struct rcu_gp rcu_gp;
struct rcu_gp {
unsigned long ctr;
...
} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
讀線程計數器:
每個reader線程也持有一個線程內部的計數器ctr,如果這個ctr
與rcu_gp.ctr
一致,就表明本reader線程的數據已經最新(ACTIVE_CURRENT),反之則不是最新(ACTIVE_OLD),
struct rcu_reader {
unsigned long ctr;
...
};
DECLARE_URCU_TLS(struct rcu_reader, rcu_reader)
讀者-靜默
讀者進入靜默期表示本次讀操作完成。
從上面writer synchronize
的過程可知,要使writer結束阻塞狀態,reader必須將其ctr更新到最新(除非它處於offline狀態),更新到最新是通過reader調用rcu_quiescent_state()
接口聲明靜默期完成的.
static inline void _rcu_quiescent_state(void)
{
unsigned long gp_ctr;
if ((gp_ctr = CMM_LOAD_SHARED(rcu_gp.ctr)) == URCU_TLS(rcu_reader).ctr)
return;
_rcu_quiescent_state_update_and_wakeup(gp_ctr);
}
static inline void _rcu_quiescent_state_update_and_wakeup(unsigned long gp_ctr)
{
/* 將本線程ctr更新為gp_ctr */
_CMM_STORE_SHARED(URCU_TLS(rcu_reader).ctr, gp_ctr);
/* 喚醒writer */
wake_up_gp();
}
如果讀者不周期性聲明靜默期,若寫者更新了,則其一直讀到的是舊數據。
寫者-異步(call_rcu)
前面writer的例子中,當writer進行數據更新后需要釋放舊資源,而這要在synchronize_rcu()
結束阻塞后才能進行(否則reader還在使用),但還有的時候,我們希望提高writer的效率,‘釋放’過程不要阻塞,再reader進行了更新后,再進行資源釋放,urcu提供了call_rcu()
接口來完成這一功能。
call_rcu()原型:
struct rcu_head {
struct cds_wfcq_node next;
void (*func)(struct rcu_head *head);
};
void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *head);
call_rcu()使用方法:
一般的,將要延遲釋放的數據結構內嵌一個rcu_head結構,在需要延遲釋放時調用:
struct global_foo {
struct rcu_head rcu_head;
......
};
struct global_foo g_foo;
在writer更新后,需要釋放舊的資源時時,調用call_rcu(),之后當所有reader都更新完成后,設置的回調函數free_func被自動調用
call_rcu(&g_foo.rcu_head, free_func);
那么urcu是如何實現這個功能的呢?
既然不能阻塞將writer阻塞在synchronize_rcu(),那總得有一個線程阻塞在synchronize_rcu()等待所有reader更新,於是urcu內部創建一個線程,稱為call_rcu_thread
,這個線程專門用於writer call_rcu()
(這個線程只會在第一次call_rcu()
被創建,之后的call_rcu()
均使用這個線程),以下是call_rcu_thread創建時的代碼片段。
/* 第一次call_rcu()會調用到 call_rcu_data_init() */
static void call_rcu_data_init(struct call_rcu_data **crdpp,unsigned long flags,int cpu_affinity)
{
struct call_rcu_data *crdp;
int ret;
crdp = malloc(sizeof(*crdp));
if (crdp == NULL)
urcu_die(errno);
memset(crdp, '\0', sizeof(*crdp));
cds_wfcq_init(&crdp->cbs_head, &crdp->cbs_tail);
......
/* 創建call_rcu_thread */
ret = pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp);
if (ret)
urcu_die(ret);
}
static void *call_rcu_thread(void *arg)
{
struct call_rcu_data *crdp = (struct call_rcu_data *) arg;
rcu_register_thread();
URCU_TLS(thread_call_rcu_data) = crdp;
for (;;) {
......
synchronize_rcu(); /* 在這里完成同步 */
rhp->func(rhp); /* 執行回調 */
......
}
rcu_unregister_thread();
return NULL;
}
參考
附錄
urcu接口
userspace rcu的API相關文檔在doc/
中,按API前綴可分為:
1. rcu_: Read-Copy Update (see doc/rcu-api.md)
2. cmm_: Concurrent Memory Model
3. caa_: Concurrent Architecture Abstraction
4. cds_: Concurrent Data Structures (see doc/cds-api.md)
5. uatomic_: Userspace Atomic (see doc/uatomic-api.md)
-
void rcu_init(void);
其必須在以下任何函數被調用之前被調用。 -
void ruc_read_lock(void);
RCU讀側臨界區開始前被調用,這些臨界區可以被嵌套。 -
void rcu_read_unlock(void);
RCU讀側臨界區結束時調用。 -
void rcu_register_thread(void);
每個線程在調用rcu_read_lock()
之前,必須調用這個函數。若不會調用rcu_read_lock()
函數,則不需要調用此函數。此外,rcu bp(“bullet proof”rcu)
不需要任何線程來調用rcu register_thread()
。 -
void rcu_unregister_thread(void);
每個調用rcu_register_thread()
函數的線程在調用pthread_exit()
或返回底層函數之前,必須調用rcu_unregister_thread()
。 -
void synchronize_rcu(void);
等待,直到每個原來存在的RCU讀側臨界區完成。注意,此原語不需要等待在此寬限期開始之后的讀區臨界區完成。 -
void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *head));
注冊由“head”指示的回調。這意味着func將在未來的RCU寬限期結束后被調用。head引用的rcu_head結構通常是受rcu保護的大型結構中的一個字段。func的典型實現如下:
void func(struct rcu_hed *head)
{
struct foo *p = container_of(head, struct foo, rcu); // head 在foo中的名稱為rcu,通過head獲得foo
free(p);
}
在給定指向封閉結構的指針p時,可以按如下方式注冊此RCU回調函數:
call_rcu(&p->rcu, func);
call_rcu()
應該從注冊的rcu讀取側線程調用。對於QSBR風格,調用者應該在線。
void rcu_barrier(void);
在調用rcu_barrier()
之前,確保所有的啟動的所有call_rcu()
工作在rcu barrier()
返回之前完成。決不能從調用rcu()線程調用rcu barrier()。例如,此函數可用於確保在允許此共享對象的dlclose()完成之前,涉及共享對象的所有內存回收都已完成。