為了實現對臨界資源的有效管理,應用層的程序有原子變量,條件變量,信號量來控制並發,同樣的問題也存在與驅動開發中,比如一個驅動同時被多個應用層程序調用,此時驅動中的全局變量會同時屬於多個應用層進程的進程空間,這種情況下也要使用一些技術來實現對並發的控制。本文將討論內核中下述並發控制技術的技術特點和應用場景。
- 中斷屏蔽
- 原子操作
- 原子變量操作
- 原子位操作
- 自旋鎖
- 傳統自旋鎖
- 讀寫自旋鎖
- 順序鎖
- RCU
- 信號量
- 傳統信號量
- 讀寫信號量
- 完成量
- 互斥量
中斷屏蔽
顧名思義,就是屏蔽所有的中斷。在嵌入式系統,中斷屏蔽可以有三級,1. 硬件接口的屏蔽,2. 硬件GIC的屏蔽,3. CPU(內核)的屏蔽。如果在接口處屏蔽了,那么中斷來了就丟了,根本找不到。如果在GIC處屏蔽了,那么在屏蔽期間如果來了irq_1,irq_2,irq_3個中斷,因為只有一個pending標志位,所以最后irq_3來的時候會將pending置位,之后解除屏蔽了,CPU發現pending有置位,還是會處理,但是1,2就肯定丟了。在ARM處的屏蔽,即內核中的屏蔽,看怎么設置了,如果就是local_irq_disable,那么丟了就是丟了,和在接口處屏蔽一樣,如果是local_irq_save就和第二種一樣,追到最后一個中斷,內核也有相應的機制進行中斷計數,知道這期間來了多少個中斷,但是實際操作中,大部分情況我們都不會追着執行錯過的中斷,除非這個中斷非常重要。
我們這里討論的,就是在內核中進行中斷屏蔽。由於內核中很多重要的操作都要依賴於中斷,所以屏蔽所有的中斷是十分危險的,里面執行的代碼要盡可能的快,而且,由於內核的進程調度也是由中斷驅動的,所以中斷屏蔽中不能有可能引發休眠的代碼,否則無法被喚醒。注意,中斷屏蔽只是屏蔽了本CPU的中斷,所以並不能解決SMP引發的競泰問題,通常,中斷屏蔽要和自旋鎖聯合使用,用於防止訪問自旋鎖保護的臨界區時被中斷打斷
普通的中斷屏蔽
local_irq_disable(); //屏蔽中斷
//或
local_irq_save(flags); //屏蔽中斷並保存目前CPU中的中斷位信息
/* 臨界區 */
local_irq_enable(); //解除屏蔽
//或
local_irq_restore(flags); //解除屏蔽並恢復中斷位信息
底半部的中斷屏蔽
local_bh_disable(); //屏蔽中斷,bh版本的本質是屏蔽了這個CPU上的軟中斷
/* 臨界區 */
local_bh_enable();
原子操作
原子操作即不能被打斷的操作,和應用層的概念一樣,內核中的原子操作模板如下:
整型原子變量
//asm/atomic.h
//創建並初始化原子變量
atomic_t tv = ATOMIC_INIT(初值);
//讀原子變量
int atomic_read(atomic_t *v);
//寫原子變量
void atomic_set(atomic_t *v, int i);
/**
*atomic_dec_and_test - 嘗試將原子變量-1
*v:如果-1之后原子變量變為0,返回非0, 否則返回0
*/
int atomic_dec_and_test(volatile atomic_t *v);
int atomic_inc_and_test(volatile atomic_t *v);
int atomic_sub_and_test(int i, volatile atomic_t *v);
//操作並返回
int atomic_add_return(int i, atomic *v);
int atomic_sub_return(int i, atomic *v);
int atomic_inc_return(atomic *v);
int atomic_dev_return(atomic *v);
模板
static atomic_t tv;
static int demo_open(struct inode *inode, struct file *filp)
{
if(!atomic_dec_and_test(&tv)){
atomic_inc(&tv);
return -EBUSY;
}
/* 操作代碼 */
return 0;
}
static int demo_release(struct inode *inode, struct file *filp)
{
atomic_inc(&tv);
return 0;
}
static int __init demo_init(void)
{
// init atomic_t
atomic_set(&tv, 1);
}
位原子操作
位原子操作即原子的位操作,內核中大量使用"位"來記錄信息,比如位圖,這些操作都必須是原子性的,內核API如下:
//設置位
void set_bit(nr,void *addr);
//清除位
void clear_bit(nr,void *addr);
//改變位
void change_bit(nr,void *addr);
//測試位
test_bit(nr, void *addr);
//測試並操作位
int test_and_set_bit(nr, void *addr);
int test_and_clear_bit(nr,void *addr);
int test_and_change_bit(nr,void *addr);
自旋鎖
意即"在原地打轉",當加鎖不成功時,自旋,自旋鎖會不斷的占用CPU進行變量的測試,由於屬於原子操作,所以該CPU的占用會升為100%,所以,使用自旋鎖時,臨界區的代碼需要很短,否則會影響系統性能,此外,作為鎖機制的一種,使用自旋鎖同樣需要注意死鎖的出現,自旋鎖鎖定期間不能調用可能引起進程調度的函數,如果進程獲得自旋鎖之后再阻塞,eg,copy_from_user(),copy_to_user(),kmalloc(),msleep()等,一旦阻塞發生就可能導致內核 崩潰。
自旋鎖可以用來解決SMP競態問題。不同類型的自旋鎖有自己的處理機制,適用於不同的情況。包括傳統自旋鎖,讀寫自旋鎖,RCU機制,順序鎖等,自旋鎖是信號量,互斥體的的底層實現工具。
比較\類型 | 傳統自旋鎖 | 讀寫自旋鎖 | 順序鎖 | RCU機制 |
---|---|---|---|---|
應用場合 | 需要上鎖者獨占的資源 | 需要寫者獨占的資源 | 很少同時讀寫的資源 | 讀多寫少的資源 |
讀+讀 並發 | × | √ | √ | √ |
讀+寫 並發 | × | × | √ | √ |
寫+寫 並發 | × | × | × | √ |
和其他鎖機制的一樣,使用自旋鎖保護數據分為搶鎖-操作-解鎖,下面就是一個典型的使用鎖的流程,通過自旋鎖實現一個文件只被一個進程打開。
int cnt=0;
lock_t lock;
static int my_open()
{
lock(&lock);
if(cnt){
unlock(&lock)
}
cnt++;
unlock(&lock);
}
static int release()
{
lock(&lock);
cnt--;
unlock(&lock);
}
傳統自旋鎖
是一種比較粗暴的自旋鎖,使用這種鎖的時候,被鎖定的臨界區域不允許其他CPU訪問,需要注意的是,盡管獲得鎖之后執行的臨界區操作不會被其他CPU和本CPU內其他搶占進程的打擾,但是仍然會被中斷和底半部的影響,所以通常我們會使用下述API中的衍生版本,比如上文中提到的將自旋鎖+中斷屏蔽來防止使用自旋鎖訪問臨界資源的時候被中斷打斷,對應的宏函數就是spin_lock_irq和spin_lock_irqsave。
//定義並初始化自旋鎖
spinlock_t spinlock
void spin_lock_init(spinlock_t *);
//加鎖
//spin_lock - 加鎖函數(忙等)
void spin_lock(spinlock_t *lock);
int spin_trylock(spinlock_t *lock);
spin_lock_irq(); //=spin_lock() + local_irq_disable()
spin_lock_irqsave(); //= spin_lock() + lock_irq_save();
spin_lock_bh(); //=spin_lock() + local_bh_disable();
//解鎖
void spin_unlock(spinlock_t *lock);
spin_unlock_irq(); //=spin_unlock() + local_irq_enable()
spin_unlock_irqrestore(); //= spin_unlock() + lock_irq_restore();
spin_unlock_bh(); //=spin_unlock() + local_bh_enable();
讀寫自旋鎖
傳統的自旋鎖粗暴的將臨界資源划歸給一個CPU,但是很多資源都不會因為讀而被破壞,所以我們可以允許多個CPU同時讀臨界資源,但不允許同時寫資源,類似於應用層的文件鎖,內核的讀寫鎖機制同樣有下述互斥原則:
- 讀者 + 讀者 不互斥
- 讀者 + 寫者 互斥
- 寫者 + 寫者 互斥
//include/linux/rwlock.h
//定義並初始化自旋鎖
rwlock_t rwlock;
void rwlock_init(rwlock_t *lock);
//加讀鎖
void read_lock(rwlock_t *lock);
int read_trylock(rwlock_t *lock);
void read_lock_irqsave(rwlock_t *lock,unsigned long flags);
void read_lock_irq(rwlock_t *lock, unsigned long flags);
void read_lock_bh(rwlock_t *lock);
//解讀鎖
void read_unlock(rwlock_t *lock)
void read_unlock_irqrestrore(rwlock_t *lock,unsigned long flags);
void read_unlock_irq(rwlock_t *lock, unsigned long flags);
void read_unlock_bh(rwlock_t *lock);
//加寫鎖
void write_lock(rwlock_t *lock)
int write_trylock(rwlock_t *lock)
void write_lock_irqsave(rwlock_t *lock,unsigned long flags);
void write_lock_irq(rwlock_t *lock, unsigned long flags);
void write_lock_bh(rwlock_t *lock);
//解寫鎖
void write_unlock(rwlock_t *lock)
void write_unlock_irqrestrore(rwlock_t *lock,unsigned long flags);
void write_unlock_irq(rwlock_t *lock, unsigned long flags);
void write_unlock_bh(rwlock_t *lock);
順序鎖
順序鎖可以看作讀寫鎖的升級版,讀寫鎖不允許同時存在讀者和寫者,順序鎖對這一情況進行了改良,它允許寫者和讀者同時訪問臨界區,不必再向讀寫鎖那樣讀者要讀必須等待寫者寫完,寫者要寫必須等待讀者讀完。不過,使用順序鎖的時候,臨界區不能有指針,因為寫者可能會修改指針的指向,如果讀者去讀,就會Oops,此外,如果讀者讀過的數據被寫者改變了,讀者需要重新讀,以維持數據是最新的,雖然有這兩個約束,但是順序鎖提供了比讀寫鎖更大的靈活性。對於寫者+寫者的情況,順序鎖的機制和讀寫鎖一樣,必須等!
- 讀者 + 讀者 不互斥
- 讀者 + 寫者 不互斥 , 臨界區沒有指針+讀者需自己注意更新
- 寫者 + 寫者 互斥
//include/linux/seqlock.h
//定義順序鎖
struct seqlock_t sl;
//獲取順序鎖
void write_seqlock(seqlock_t *sl);
void write_tryseqlock(seqlock_t *sl);
void write_seqlock_irqsave(lock,flags); //=local_irq_save() + write_seqlock()
void write_seqlock_irq(seqlock_t *sl); //=local_irq_disable() + write_seqlock()
void write_seqlock_bh(seqlock_t *sl); //local_bh_disable() + write_seqlock()
//釋放順序鎖
void write_sequnlock(seqlock_t *sl);
void write_sequnlock_irqsave(lock,flags); //=local_irq_restore() + write_sequnlock()
void write_sequnlock_irq(seqlock_t *sl); //=local_irq_enable() + write_sequnlock()
void write_sequnlock_bh(seqlock_t *sl); //local_bh_enable() + write_sequnlock()
//讀開始
unsigned read_seqbegin(const seqlock_t *sl);
read_seqbegin_irqsave(lock,flags); //=local_irq_save() + read_seqbegin();
//重讀
int read_seqretry(const seqlock_t *sl,unsigned iv);
read_seqretry_irqrestore(lock,iv,flags); //=local_irq_restore() + read_seqretry();
RCU
RCU即Read-Copy Update,即讀者直接讀,寫者先拷貝再擇時更新,是另外一種讀寫鎖的升級版,這種機制在VFS層被大量使用。正如其名,讀者訪問臨界資源不需要鎖,從下面的rcu_read_lock的定義即可看出,寫者在寫之前先將臨界資源進行備份,去修改這個副本,等所有的CPU都退出對這塊臨界區的引用后,再通過回調機制,將引用這塊資源的原指針指向已經修改的備份。從中可以看出,在RCU機制下,讀者的開銷大大降低,也沒有順序鎖的指針問題,但是寫者的開銷很大,所以RCU適用於讀多寫少的臨界資源。如果寫操作很多,就有可能將讀操作節約的性能抵消掉,得不償失。
- 讀者 + 讀者 不互斥
- 讀者 + 寫者 不互斥 , 讀者自己注意更新
- 寫者 + 寫者 不互斥 ,寫者之間自己去同步
內核會為每一個CPU維護兩個數據結構-rcu_data和rcu_bh_data,他們用於保存回調函數,函數call_rcu()把回調函數注冊到rcu_data,而call_rcu_bh()則把回調函數注冊到rcu_bh_data,在每一個數據結構上,回調函數們會組成一個隊列。
使用RCU時,讀執行單元必須提供一個信號給寫執行單元以便寫執行單元能夠確定數據可以被安全地釋放或修改的時機。內核中有一個專門的垃圾收集器來探測讀執行單元的信號,一旦所有的讀執行單元都已經發送信號告訴收集器自己都沒有使用RCU的數據結構,收集器就調用回調函數完成最后的數據釋放或修改操作。
讀
讀即RCU中的R,從下面的宏定義可以看出,讀操作其實就是禁止內核的搶占調度,並沒有使用一個鎖對象。
//讀鎖定
//include/linux/rcupdate.h
rcu_read_lock(); //preempt_disbale()
rcu_read_lock_bh(); //local_bh_disable()
//讀解鎖
rcu_read_unlock() //preempt_enable()
rcu_read_unlock_bh(); //local_bh_enable()
同步
同步即是RCU寫操作的最后一個步驟-Update,下面這個接口會則色寫執行單元,直到所有的讀執行單元已經完成讀執行單元臨界區,寫執行單元才可以繼續下一步操作。如果有多個RCU寫執行單元調用該函數,他們將在一個grace period(即所有的讀執行單元已經完成對臨界區的訪問)之后全部被喚醒。
synchrosize_rcu()
掛起回調
下面這個接口也由RCU寫執行單元調用,它不會使寫執行單元阻塞,因而可以在中斷上下文或軟中斷中使用,該函數把func掛接到RCU回調函數鏈上,然后立即返回。函數sychrosize_rcu()其實也會調用call_rcu()。
void call_rcu(struct rcu_head *head,void (*func)(struct rcu_head *rcu));
下面這個接口會將軟中斷的完成也當作經歷一個quiecent state(靜默狀態),因此如果寫執行單元調用了該函數,在進程上下文的讀執行單元必須使用rcu_read_lock_bh();
void call_rcu_bh(struct rcu_head *head,void (*func)(struct rcu_head *rcu));
RCU機制被大量的運用在內核鏈表的讀寫中,下面這些就是內核中使用RCU機制保護的數據結構,函數的功能和非RCU版本一樣,具體可以參考內核文件"include/linux/list.h",只不過這里的操作都會使用RCU保護起來。
void list_add_rcu(struct list_head *new, struct list_head *head);
void list_add_tail_rcu(struct list_head *new,struct list_head *head);
void list_del_rcu(struct list_head *entry);
void list_replace_rcu(struct list_head *old,struct list_head *new);
list_for_each_rcu(pos,head);
list_for_each_safe_rcu(pos,n,head);
list_for_each_entry_rcu(pos,head,member);
void hlist_del_rcu(struct hlist_node *n);
void hlist_add_head_rcu(struct hlist_node *n, struct hlist_head *h);
list_for_each_rcu(pos,head);
hlist_for_each_entry_rcu(tpos,pos,head,member);
信號量
自旋鎖一節提過,如果一個CPU不能獲取臨界資源,就會造成"原地自旋",所以自旋鎖保護的臨界區的執行時間不能太長,但如果我們的確需要保護一段執行時間比較長的臨界區呢?答案就是信號量
,信號量的底層依托於自旋鎖來實現其原子性,並進一步將其提高到"進程"的維度,稱為一種可以運行在進程上下文的"鎖",正是這種能運行在進程上下文的能力賦予了信號量和自旋鎖的諸多不同。
使用信號量,如果試圖獲取信號量的進程獲取失敗,內核就會將其調度為睡眠狀態,執行其他進程,避免了CPU的忙等。不過,進程上下文的切換也是有成本的,所以通常,信號量在內核中都是只用於保護大塊臨界區。
此外,一個進程一旦無法獲取期待的信號量就會進入睡眠,所以信號量保護的臨界區可以有睡眠的代碼。在這方面,自旋鎖保護的區域是不能睡眠也不能執行schedule()的,因為一旦一個搶到了鎖的CPU進行了進程上下文的切換或睡眠,那么其他等待這個自旋鎖的CPU就會一直在那忙等,就永遠無法等到這個鎖,,形成死鎖,除非有其他進程將其喚醒(通常都不會有)。
也正是由於信號量操作可能引起阻塞,所以信號量不能用於中斷上下文。總結一下剛才羅嗦這一段:
項目 | 信號量 | 自旋鎖 |
---|---|---|
臨界區時間 | 進程切換時間更短 | 臨界區執行時間更短 |
進程上下文 | 臨界區可以睡眠或調度 | 臨界區不可以睡眠或調度 |
中斷上下文 | 只有down_trylock()可以 | 可以 |
傳統信號量
內核的信號量和應用層的信號量的使用方式類似,但沒有獲取信號量這一步驟,因為內核中中的信號量可以映射到所有調用這個模塊的用戶進程的內核空間。這些用戶進程也就直接共享了一個信號量,所以也就沒有獲取信號量一說,相關的內容我在"Linux IPC System V 信號量"一文中有所討論。
和應用層的信號量一樣,內核信號量也是用於對臨界資源的互斥/順序訪問,同樣,雖然在使用信號量的時候我們可以初始化為任意值,但實際操作上我們通常只初始化為1或0,下述是Linux內核提供的信號量API。
//include/linux/semaphore.h
//定義並初始化semaphore對象
struct semphore sem;
//初始化信號量
void sem_init(struct semaphore * sem,int val);
init_MUTEX(sem);
init_MUTEX_LOCKED(sem);
DECLARE_MUTEX(sem);
DECLARE_MUTEX_LOCKED(sem);
//P操作
//down()會導致睡眠,不能用於中斷上下文
void down(struct semaphore *sem);
//down_interruptible同樣會進入休眠,但能被打斷
int down_interruptible(struct semaphore *sem);
//down_trylock不能獲得鎖時會立即返回,不會睡眠,可以用在中斷上下文
int down_trylock(struct semaphore *sem);
//V操作
void up(struct semaphore *sem);
讀寫信號量
讀寫信號量與信號量的關系 和 讀寫自旋鎖與自旋鎖的關系類似,他們的互斥邏輯都是一樣的,這里不再贅述
//定義並初始化讀寫信號量
struct rw_semaphore my_rwsem;
void init_rwsem(struct rw_semaphore *sem);
//P讀信號量
void down_read(struct rw_semaphore *sem);
int down_read_trylock(struct rw_semaphore *sem);
//V讀信號量
void up_read(struct rw_semaphore *sem);
//P寫信號量
void down_write(struct rw_semaphore *sem);
int down_write_trylock(struct rw_semaphore *sem);
//V寫信號量
void up_write(struct rw_semaphore *sem);
模板
struct rw_semaphore my_rwsem;
void init_rwsem(&my_rwsem);
//讀前獲取讀信號量
down_read(&my_rwsem); //若要非阻塞:down_read_trylock(&my_rwsem);
/* 讀臨界區 */
//讀完釋放讀信號量
up_read(&my_rwsem);
//寫前獲取寫信號量
down_write(&my_rwsem); //若要非阻塞:down_write_trylock(&my_rwsem);
/* 寫臨界區 */
//寫完釋放寫信號量
up_write(&my_rwsem);
完成量
完成量用於一個執行單元等待另一個執行單元執行完某事,和傳統信號量一樣,主要是用來實現隊臨界區的順序/互斥訪問。但是完成量還提供一種喚醒一個或喚醒所有等待進程的接口,有點類似與應用層的條件變量。
//定義並初始化完成量
struct completion my_completion;
init_completion(&my_completion);
//或
DECLARE_COMPLETION(my_completion)
//等待completion
void wait_for_completion(struct completion *c);
//喚醒completion
void complete(struct completion *c); //只喚醒一個等待的執行單元
void complete_all(struct completion *c); //釋放所有等待該完成量的執行單元
互斥體
除了信號量,Linux內核還提供了一種專門用於實現互斥的機制-互斥體,相關的內核API如下:
//include/linux/mutex.h
//定義並初始化mutex對象
struct mutex my_mutex;
mutex_init(&my_mutex);
//獲取mutex
void mutex_lock(struct mutex *lock);
int mutex_trylock(struct mutex *lock);
int mutex_lock_interruptible(struct mutex *lock);
//釋放mutex
void mutex_unlock(struct mutex *lock);