Linux RCU機制詳解


關於rcu的幾點聲明:

1:RCU使用在讀者多而寫者少的情況.RCU和讀寫鎖相似.但RCU的讀者占鎖沒有任何的系統開銷.寫者與寫寫者之間必須要保持同步,且寫者必須要等它之前的讀者全部都退出之后才能釋放之前的資源. 
2:RCU保護的是指針.這一點尤其重要.因為指針賦值是一條單指令.也就是說是一個原子操作.因它更改指針指向沒必要考慮它的同步.只需要考慮cache的影響. 
3:讀者是可以嵌套的.也就是說rcu_read_lock()可以嵌套調用. 
4:讀者在持有rcu_read_lock()的時候,不能發生進程上下文切換.否則,因為寫者需要要等待讀者完成,寫者進程也會一直被阻塞.

5:spin lock是互斥的,任何時候只有一個thread(reader or writer)進入臨界區,rw spin lock要好一些,允許多個reader並發執行,提高了性能。不過,reader和updater不能並發執行,RCU解除了這些限制,允許一個updater(不能多個updater進入臨界區,這可以通過spinlock來保證)和多個reader並發執行。

 

核心api:

對於reader,RCU的操作包括:

(1)rcu_read_lock,用來標識RCU read side臨界區的開始。

(2)rcu_dereference,該接口用來獲取RCU protected pointer。reader要訪問RCU保護的共享數據,當然要獲取RCU protected pointer,然后通過該指針進行dereference的操作。

(3)rcu_read_unlock,用來標識reader離開RCU read side臨界區

對於writer,RCU的操作包括:

(1)rcu_assign_pointer。該接口被writer用來進行removal的操作,在witer完成新版本數據分配和更新之后,調用這個接口可以讓RCU protected pointer指向RCU protected data。

(2)synchronize_rcu。writer端的操作可以是同步的,也就是說,完成更新操作之后,可以調用該接口函數等待所有在舊版本數據上的reader線程離開臨界區,一旦從該函數返回,說明舊的共享數據沒有任何引用了,可以直接進行reclaimation的操作。

(3)call_rcu。當然,某些情況下(例如在softirq context中),writer無法阻塞,這時候可以調用call_rcu接口函數,該函數僅僅是注冊了callback就直接返回了,在適當的時機會調用callback函數,完成reclaimation的操作。這樣的場景其實是分開removal和reclaimation的操作在兩個不同的線程中:updater和reclaimer。

 

Example1:

struct foo { 
int a; 
char b; 
long c; 
}; 
DEFINE_SPINLOCK(foo_mutex);

struct foo *gbl_foo; 
void foo_update_a(int new_a) 

struct foo *new_fp; 
struct foo *old_fp;

new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL); 
spin_lock(&foo_mutex); 
old_fp = gbl_foo; 
*new_fp = *old_fp; 
new_fp->a = new_a; 
rcu_assign_pointer(gbl_foo, new_fp); 
spin_unlock(&foo_mutex); 
synchronize_rcu(); 
kfree(old_fp); 
}

int foo_get_a(void) 

int retval;

rcu_read_lock(); 
retval = rcu_dereference(gbl_foo)->a; 
rcu_read_unlock(); 
return retval; 

如上代碼所示,RCU被用來保護全局指針struct foo *gbl_foo. foo_get_a()用來從RCU保護的結構中取得gbl_foo的值.而foo_update_a()用來更新被RCU保護的gbl_foo的值. 
另外,我們思考一下,為什么要在foo_update_a()中使用自旋鎖foo_mutex呢? 
假設中間沒有使用自旋鎖.那foo_update_a()的代碼如下:

void foo_update_a(int new_a) 

struct foo *new_fp; 
struct foo *old_fp;

new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);

old_fp = gbl_foo; 
1:------------------------- 
*new_fp = *old_fp; 
new_fp->a = new_a; 
rcu_assign_pointer(gbl_foo, new_fp);

synchronize_rcu(); 
kfree(old_fp); 

假設A進程在上圖----標識處被B進程搶點.B進程也執行了goo_ipdate_a().等B執行完后,再切換回A進程.此時,A進程所持的old_fd實際上已經被B進程給釋放掉了.此后A進程對old_fd的操作都是非法的.

另外,我們在上面也看到了幾個有關RCU的核心API.它們為別是: 
rcu_read_lock() 
rcu_read_unlock() 
synchronize_rcu() 
rcu_assign_pointer() 
rcu_dereference() 
其中,rcu_read_lock()和rcu_read_unlock()用來保持一個讀者的RCU臨界區.在該臨界區內不允許發生上下文切換. 
rcu_dereference():讀者調用它來獲得一個被RCU保護的指針. 
Rcu_assign_pointer():寫者使用該函數來為被RCU保護的指針分配一個新的值.這樣是為了安全從寫者到讀者更改其值.這個函數會返回一個新值

Example2:

 

 1 struct el {                          1 struct el {
 2   struct list_head list;             2   struct list_head list;
 3   long key;                          3   long key;
 4   spinlock_t mutex;                  4   spinlock_t mutex;
 5   int data;                          5   int data;
 6   /* Other data fields */            6   /* Other data fields */
 7 };                                   7 };
 8 rwlock_t listmutex;                  8 spinlock_t listmutex;
 9 struct el head;                      9 struct el head;

 1 int search(long key, int *result)    1 int search(long key, int *result)
 2 {                                    2 {
 3   struct list_head *lp;              3   struct list_head *lp;
 4   struct el *p;                      4   struct el *p;
 5                                      5
 6   read_lock(&listmutex);             6   rcu_read_lock();
 7   list_for_each_entry(p, head, lp) { 7   list_for_each_entry_rcu(p, head, lp) {
 8     if (p->key == key) {             8     if (p->key == key) {
 9       *result = p->data;             9       *result = p->data;
10       read_unlock(&listmutex);      10       rcu_read_unlock();
11       return 1;                     11       return 1;
12     }                               12     }
13   }                                 13   }
14   read_unlock(&listmutex);          14   rcu_read_unlock();
15   return 0;                         15   return 0;
16 }                                   16 }

 1 int delete(long key)                 1 int delete(long key)
 2 {                                    2 {
 3   struct el *p;                      3   struct el *p;
 4                                      4
 5   write_lock(&listmutex);            5   spin_lock(&listmutex);
 6   list_for_each_entry(p, head, lp) { 6   list_for_each_entry(p, head, lp) {
 7     if (p->key == key) {             7     if (p->key == key) {
 8       list_del(&p->list);            8       list_del_rcu(&p->list) or list_add_rcu(&p->list);
 9       write_unlock(&listmutex);      9       spin_unlock(&listmutex);
                                       10       synchronize_rcu();
10       kfree(p);                     11       kfree(p);
11       return 1;                     12       return 1;
12     }                               13     }
13   }                                 14   }
14   write_unlock(&listmutex);         15   spin_unlock(&listmutex);
15   return 0;                         16   return 0;
16 }                                   17 }


Example3:

rcu_assign_pointer()通常用於寫者的發布,rcu_dereference()通常用於讀者的訂閱。

寫者:

1 p->a = 1;
2 p->b = 2;
3 p->c = 3;
4 rcu_assign_pointer(gp, p);


讀者:

1 rcu_read_lock();
2 p = rcu_dereference(gp);
3 if (p != NULL) {
4 do_something_with(p->a, p->b, p->c);
5 }
6 rcu_read_unlock();

rcu_assign_pointer()是說,先把那塊內存寫好,再把指針指過去。這里使用的內存寫屏障是為了保證並發的讀者讀到數據一致性。在這條語句之前的讀者讀到舊的指針和舊的內存,這條語句之后的讀者讀到新的指針和新的內存。如果沒有這條語句,很有可能出現讀者讀到新的指針和舊的內存。也就是說,這里通過內存屏障刷新了p所指向的內存的值,至於gp本身的值有沒有更新還不確定。實際上,gp本身值的真正更新要等到並發的讀者來促發。
rcu_dereference() 原語用的是數據依賴屏障,smp_read_barrier_dependence,它要求后面的讀操作如果依賴前面的讀操作,則前面的讀操作需要首先完成。根據數據之間的依賴,要讀p->a, p->b, p->c, 就必須先讀p,要先讀p,就必須先讀p1,要先讀p1,就必須先讀gp。也就是說讀者所在的core在進行后續的操作之前,gp必須是同步過的當前時刻的最新值。如果沒有這個數據依賴屏障,有可能讀者所在的core很長一段時間內一直用的是舊的gp值。所以,這里使用數據依賴屏障是為了督促寫者將gp值准備好,是為了呼應寫者,這個呼應的訴求是通過數據之間的依賴關系來促發的,也就是說到了非呼應不可的地步了。

 

Example4:

  1. /*共享數據結構體*/
  2. /*其中rcu_head為雙向鏈表*/
  3. struct shared_data{
  4.     char a;
  5.     int b;
  6.     struct rcu_head rcu;
  7. }
  8.  
  9. /*讀取者,臨界區的代碼不允許睡眠*/
  10. static void reader(struct shared_data *ptr)
  11. {
  12.     struct shared_data *= NULL;
  13.     rcu_read_lock();
  14.     /*調用 rcu_dereference 在雙向鏈表中獲得ptr指針*/
  15.     p = rcu_dereference(*ptr);
  16.     if(p)
  17.         do_something_with(p);
  18.     rcu_read_unlock();
  19. }
  20.  
  21.  
  22. /*寫入者*/
  23.  
  24. /*使用回調函數,contain_of從雙向鏈表中獲取老的共享數據*/
  25. static void del_old_ptr(struct rcu_head *rh)
  26. {
  27.     struct shared_data *= contain_of(rh,struct shared_data,rcu)
  28.     kfree(p);
  29. }
  30.  
  31. static void writer(struct shared_data *ptr)
  32. {
  33.     struct shared_data *new_ptr = malloc(...);
  34.     ...
  35.     new_ptr->= 'a';
  36.     new_ptr->= 1;
  37.     /*更新指針*/
  38.     rcu_assign_pionter(new_ptr);
  39.     /*注冊回調函數*/
  40.     call_rcu(ptr->rcu,del_old_ptr);
  41. }

 

Example5:

 

	struct foo {
		int a;
		int b;
		int c;
	};
	struct foo *gp1;
	struct foo *gp2;

	void updater(void)
	{
		struct foo *p;

		p = kmalloc(...);
		if (p == NULL)
			deal_with_it();
		p->a = 42;  /* Each field in its own cache line. */
		p->b = 43;
		p->c = 44;
		rcu_assign_pointer(gp1, p);
		p->b = 143;
		p->c = 144;
		rcu_assign_pointer(gp2, p);
	}

	void reader(void)
	{
		struct foo *p;
		struct foo *q;
		int r1, r2;

		p = rcu_dereference(gp2);
		if (p == NULL)
			return;
		r1 = p->b;  /* Guaranteed to get 143. */
		q = rcu_dereference(gp1);  /* Guaranteed non-NULL. */
		if (p == q) {
			/* The compiler decides that q->c is same as p->c. */
			r2 = p->c; /* Could get 44 on weakly order system. */
		}
		do_something_with(r1, r2);
	}

You might be surprised that the outcome (r1 == 143 && r2 == 44) is possible,
but you should not be.  After all, the updater might have been invoked
a second time between the time reader() loaded into "r1" and the time
that it loaded into "r2".  The fact that this same result can occur due
to some reordering from the compiler and CPUs is beside the point.

But suppose that the reader needs a consistent view?

Then one approach is to use locking, for example, as follows:

	struct foo {
		int a;
		int b;
		int c;
		spinlock_t lock;
	};
	struct foo *gp1;
	struct foo *gp2;

	void updater(void)
	{
		struct foo *p;

		p = kmalloc(...);
		if (p == NULL)
			deal_with_it();
		spin_lock(&p->lock);
		p->a = 42;  /* Each field in its own cache line. */
		p->b = 43;
		p->c = 44;
		spin_unlock(&p->lock);
		rcu_assign_pointer(gp1, p);
		spin_lock(&p->lock);
		p->b = 143;
		p->c = 144;
		spin_unlock(&p->lock);
		rcu_assign_pointer(gp2, p);
	}

	void reader(void)
	{
		struct foo *p;
		struct foo *q;
		int r1, r2;

		p = rcu_dereference(gp2);
		if (p == NULL)
			return;
		spin_lock(&p->lock);
		r1 = p->b;  /* Guaranteed to get 143. */
		q = rcu_dereference(gp1);  /* Guaranteed non-NULL. */
		if (p == q) {
			/* The compiler decides that q->c is same as p->c. */
			r2 = p->c; /* Locking guarantees r2 == 144. */
		}
		spin_unlock(&p->lock);
		do_something_with(r1, r2);
	}

As always, use the right tool for the job!

 Example6:

如果寫者需要對鏈表條目進行修改,那么就需要首先拷貝要修改的條目,然后修改條目的拷貝,等修改完畢后,再使用條目拷貝取代要修改的條目,要修改條目將被在經歷一個grace period后安全刪除。

對於系統調用審計代碼,並沒有這種情況。這里假設有修改的情況,那么使用rwlock的修改代碼應當如下:

       static inline int audit_upd_rule(struct audit_rule *rule,
                                         struct list_head *list,
                                         __u32 newaction,
                                         __u32 newfield_count)
        {
                struct audit_entry  *e;
                struct audit_newentry *ne;
                write_lock(&auditsc_lock);
                /* Note: audit_netlink_sem held by caller. */
                list_for_each_entry(e, list, list) {
                        if (!audit_compare_rule(rule, &e->rule)) {
                                e->rule.action = newaction;
                                e->rule.file_count = newfield_count;
                                write_unlock(&auditsc_lock);
                                return 0;
                        }
                }
                write_unlock(&auditsc_lock);
                return -EFAULT;         /* No matching rule */
        }

如果使用RCU,修改代碼應當為;

      static inline int audit_upd_rule(struct audit_rule *rule,
                                         struct list_head *list,
                                         __u32 newaction,
                                         __u32 newfield_count)
        {
                struct audit_entry  *e;
                struct audit_newentry *ne;
                list_for_each_entry(e, list, list) {
                        if (!audit_compare_rule(rule, &e->rule)) {
                                ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
                                if (ne == NULL)
                                        return -ENOMEM;
                                audit_copy_rule(&ne->rule, &e->rule);
                                ne->rule.action = newaction;
                                ne->rule.file_count = newfield_count;
                                list_replace_rcu(e, ne);
                                call_rcu(&e->rcu, audit_free_rule, e);
                                return 0;
                        }
                }
                return -EFAULT;         /* No matching rule */
        }

修改操作立即可見

前面兩種情況,讀者能夠容忍修改可以在一段時間后看到,也就說讀者在修改后某一時間段內,仍然看到的是原來的數據。在很多情況下,讀者不能容忍看到舊的數據,這種情況下,需要使用一些新措施,如System V IPC,它在每一個鏈表條目中增加了一個deleted字段,標記該字段是否刪除,如果刪除了,就設置為真,否則設置為假,當代碼在遍歷鏈表時,核對每一個條目的deleted字段,如果為真,就認為它是不存在的。

還是以系統調用審計代碼為例,如果它不能容忍舊數據,那么,讀端代碼應該修改為:

       static enum audit_state audit_filter_task(struct task_struct *tsk)
        {
                struct audit_entry *e;
                enum audit_state   state;
                rcu_read_lock();
                list_for_each_entry_rcu(e, &audit_tsklist, list) {
                        if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
                                spin_lock(&e->lock);
                                if (e->deleted) {
                                        spin_unlock(&e->lock);
                                        rcu_read_unlock();
                                        return AUDIT_BUILD_CONTEXT;
                                }
                                rcu_read_unlock();
                                return state;
                        }
                }
                rcu_read_unlock();
                return AUDIT_BUILD_CONTEXT;
        }

注意,對於這種情況,每一個鏈表條目都需要一個spinlock保護,因為刪除操作將修改條目的deleted標志。此外,該函數如果搜索到條目,返回時應當保持該條目的鎖,因為只有這樣,才能看到新的修改的數據,否則,仍然可能看到就的數據。

寫端的刪除操作將變成:

       static inline int audit_del_rule(struct audit_rule *rule,
                                         struct list_head *list)
        {
                struct audit_entry  *e;
                /* Do not use the _rcu iterator here, since this is the only
                 * deletion routine. */
                list_for_each_entry(e, list, list) {
                        if (!audit_compare_rule(rule, &e->rule)) {
                                spin_lock(&e->lock);
                                list_del_rcu(&e->list);
                                e->deleted = 1;
                                spin_unlock(&e->lock);
                                call_rcu(&e->rcu, audit_free_rule, e);
                                return 0;
                        }
                }
                return -EFAULT;         /* No matching rule */
        }

刪除條目時,需要標記該條目為已刪除。這樣讀者就可以通過該標志立即得知條目是否已經刪除。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM