Linux設備驅動中必須解決的一個問題是多個進程對共享資源的並發訪問,並發的訪問會導致競態,即使是經驗豐富的驅動工程師也常常設計出包含並發問題的bug驅動程序。
Linux提供了多種解決競態問題的方式,這些方式適合不同的應用場景。
並發(concurrency):指的是多個執行單元同時、並行被執行,而並發的執行單元對共享資源(硬件資源和軟件資源上的全局變量、靜態變量等)的訪問則很容易導致競態(race condition);
競態(race condition):簡單來講,競態就是指多個執行序列同時訪問同一個共享資源的狀況;
臨界區(critical sections):訪問共性資源的代碼區域稱為臨界區,臨界區需要被以某種互斥機制加以保護;
臨界資源:是指一段時間內只允許一個進程訪問的資源
在宏觀上並行或者真正意義上的並行(這里為什么是宏觀意義的並行呢?我們應該知道“時間片”這個概念,微觀上還是串行的,所以這里稱為宏觀上的並行),可能會導致競爭; 類似兩條十字交叉的道路上運行的車。當他們同一時刻要經過共同的資源(交叉點)的時候,如果沒有交通信號燈,就可能出現混亂。
在Linux內核中,主要的競態發生於如下幾種情況:
1)對稱多處理器(SMP)的多個 CPU
SMP是一種緊密耦合、共享存儲的系統模型,它的特點是多個CPU使用共同的系統總線,因此可訪問共同的外設和存儲器。
2)單 CPU 內進程與搶占它的進程
一個進程在內核執行的時候可能被另一高優先級進程打斷。
3)中斷(硬中斷、軟中斷、Tasklet、底半部)與進程之間
中斷可以打斷正在執行的進程,如果中斷處理程序訪問進程正在訪問的資源,則競態就會發生。此外,中斷也有可能被更高優先級的中斷打斷,因此,多個中斷之間本身也可能引起並發而導致競態。
Linux提供的競態問題解決方式
解決競態問題的途徑是保證對共享資源的互斥訪問,所謂互斥訪問時指一個執行單元在訪問共享資源的時候,其他的執行單元被禁止訪問。中斷屏蔽、原子操作、自旋鎖和信號量等是Linux設備驅動中可采用的互斥途徑。
在單CPU范圍內避免競態的一種簡單省事的方法是在進入臨界區之前屏蔽系統的中斷,這項功能可以保證正在執行的內核執行路徑不被中斷處理程序所搶占,防止某些競態條件的發生。具體而言:
1)中斷屏蔽將使得中斷與進程之間的並發不再發生
2)由於Linux內核的進程調度等操作都依賴中斷來實現,內核搶占與進程之間的並發就得意避免了。
中斷屏蔽的使用方法為:
local_irq_disable() /*屏蔽中斷*/ ... critical section /*臨界區*/ ... local_irq_enable() /*打開中斷*/
local_irq_disable() local_irq_enable() /* 只能禁止和使能本地CPU的中斷,所以不能解決多CPU引發的競態 */ local_irq_save(flags) local_irq_restore(flags) /* 除了能禁止和使能中斷外,還保存和還原目前的CPU中斷位信息 */ local_bh_disable() local_bh_disable() /* 如果只是想禁止中斷的底半部,這是個不錯的選擇 */
需要注意:
1)由於Linux的異步I/O、進程調度等很多重要操作依賴於中斷,中斷對於內核的執行非常重要,在屏蔽中斷期間說有的中斷都無法得到處理,因此產時間屏蔽中斷是很危險的,有可能造成數據丟失乃至系統崩潰等后果,因此在屏蔽了中斷之后,當前的內核執行路徑應當盡快的執行完臨界區的代碼;
2)單獨使用中斷屏蔽不是一種值得推薦的避免競態的方式,它宜與自旋鎖聯合使用。
原子操作(整型原子操作和位原子操作)是在執行過程不會被別的代碼路徑所中斷的操作,它在任何情況下操作都是院子的,內核代碼可以安全的調用它們而不被打斷。
整型原子操作
1、設置原子變量的值
#define atomic_set(v,i) ((v)->counter = (i))
void atomic_set(atomic_t *v, int i); /* 設置原子變量的值為i */
#define ATOMIC_INIT(i) ( (atomic_t) { (i) } )
atomic_t v = ATOMIC_INIT(0); /* 定義原子變量 v 並初始化為 0 (該宏只支持初始為 0)*/
2、獲取原子變量的值
#define atomic_read(v) ((v)->counter + 0)
atomic_read(atomic_t *v); /* 返回原子變量的值 */
3、原子變量加/減
void atomic_add(int i, atomic_t *v); /* 原子變量加 i */ void atomic_sub(int i, atomic_t *v); /* 原子變量減 i */
4、原子變量自增/自減
#define atomic_inc(v) atomic_add(1, v);
void atomic_inc(atomic_t *v); /* 原子變量自增 1 */
#define atomic_dec(v) atomic_sub(1, v);
void atomic_dec(atomic_t *v); /* 原子變量自減 1 */
5、操作並測試
#define atomic_inc_and_test(v) (atomic_add_return(1, (v)) == 0)static inline int atomic_inc_and_test(atomic_t *v); /* 原子變量自增 1 並判斷結果是否為 0 */
int atomic_dec_and_test(atomic_t *v); /* 原子變量自減 1 並判斷結果是否為 0 */ int atomic_sub_and_teset(int i, atomic_t *v); /* 原子變量減 i 並判斷結果是否為 0 */ /* 上述測試結果為 0 返回 true 否者返回 false */
6、操作並返回
int atomic_add_and_return(int i, atomic_t *v); /* 原子變量加 i 並返回新值 */ int atomic_sub_and_return(int i, atomic_t *v); /* 原子變量減 i 並返回新值 */ int atomic_inc_and_return(atomic_t *v); /* 原子變量自增 1 並返回新值 */ int atomic_dec_and_return(atomic_t *v); /* 原子變量自減 1 並返回新值 */
原子操作的優點編寫簡單;缺點是功能太簡單,只能做計數操作,保護的東西太少。下面看一個實例
static atomic_t v=ATOMIC_INIT(1); static int hello_open (struct inode *inode, struct file *filep) { if(!atomic_dec_and_test(&v)) { atomic_inc(&v); return -EBUSY; } return 0; } static int hello_release (struct inode *inode, struct file *filep) { atomic_inc(&v); return 0; }
四、自旋鎖 (http://blog.csdn.net/vividonly/article/details/6594195)
一)自旋鎖的使用
自旋鎖(spin lock)是一個互斥設備,它只有兩個值:“鎖定”和“解鎖”。它通常實現為某個整數值中的某個位。希望獲得某個特定鎖,需要代碼測試相關的位。如果鎖可用,則“鎖定”被設置,而代碼繼續進入臨界區;相反,如果鎖被其他人獲得,則代碼進入忙循環(而不是休眠,這也是自旋鎖和一般鎖的區別)並重復檢查這個鎖,直到該鎖可用為止,這就是自旋的過程。“測試並設置位”的操作必須是原子的,這樣,即使多個線程在給定時間自旋,也只有一個線程可獲得該鎖。
Linux 中與直選說相關的操作主要由以下4種。
1、定義自旋鎖
spinlock_t lock;
2、初始化自旋鎖
#define spin_lock_init(_lock) /* 該宏用於動態初始化自旋鎖 lock */void spin_lock_init(spinlock_t *);
3、獲取自旋鎖
void spin_lock(spinlock_t *lock); /* 該宏用於獲得自旋鎖 lock,如果 lock 未被加鎖,它就會稱為 持有者並立即返回,否者它將自旋在那里,知道該自旋鎖的持有者釋放 */ int spin_trylock(spinlock_t *lock); /* 該宏用於獲得自旋鎖 lock,如果 lock 未被加鎖,它就會稱為 持有者並返回真,否者立即返回假 */
4、釋放自旋鎖
void spin_unlock(spinlock_t *lock); /* 用於釋放自旋鎖,與 spin_lock 或 spin_trylock 配對使用*/
自旋鎖一般這樣被使用:
/* 定義一個自旋鎖 */ spinlock_t lock; spin_lock_init(&lock); spin_lock(&lock); /* 獲取自旋鎖,保護臨界區 */ ... critical section /*臨界區*/ ... spin_unlock(&lock); /* 解鎖*/
下面是一個實例:
static spinlock_t lock; static int flag = 1; static int hello_open (struct inode *inode, struct file *filep) { spin_lock(&lock); if(flag !=1) { spin_unlock(&lock); return -EBUSY; } flag = 0; spin_unlock(&lock); return 0;
} static int hello_release (struct inode *inode, struct file *filep) { flag = 1; return 0; }
自旋鎖最初是為了在多處理器系統(SMP)使用而設計的,但是只要考慮到並發問題,單處理器在運行可搶占內核時其行為就類似於SMP。因此,自旋鎖對於SMP和單處理器可搶占內核都適用。可以想象,當一個處理器處於自旋狀態時,它做不了任何有用的工作,因此自旋鎖對於單處理器不可搶占內核沒有意義,實際上,非搶占式的單處理器系統上自旋鎖被實現為空操作,不做任何事情。
注意:
1)自旋鎖實際上是忙等鎖,因此只有在占用鎖的時間極短的情況下,使用自旋鎖才是合理的;
2)自旋鎖可能導致系統死鎖。引發這個問題的常見情況是遞歸使用一個自旋鎖;
自旋鎖導致死鎖的實例】
a) a進程擁有自旋鎖,在內核態阻塞的,內核調度進程b,b也要或得自旋鎖,b只能自旋,而此時搶占已經關閉了,a進程就不會調度到了,b進程永遠自旋。
b) 進程a擁有自旋鎖,中斷來了,cpu執行中斷,中斷處理函數也要獲得鎖訪問共享資源,此時也獲得不到鎖,只能死鎖。
3)自旋鎖鎖定期間不能調用任何可能引起進程調度的函數。
自旋鎖有幾個重要的特性:
1、被自旋鎖保護的臨界區代碼執行時不能進入休眠;
2、被自旋鎖保護的臨界區代碼執行時是不能被被其他中斷中斷;
3、被自旋鎖保護的臨界區代碼執行時,內核不能被搶占。
從這幾個特性可以歸納出一個共性:被自旋鎖保護的臨界區代碼執行時,它不能因為任何原因放棄處理器。
二) 讀寫自旋鎖
自旋鎖不關心鎖定的臨界區究竟進行怎樣的操作,不管是讀還是寫,它都一視同仁。即多個執行單元同時讀取臨界資源也會被鎖住。讀寫自旋鎖(rwlock)是自旋鎖的衍生出來的、它允許讀的並發操作。
讀寫自旋鎖是一種比自旋鎖粒度(保護范圍)更小的鎖機制,它保留了“自旋”的概念。在讀寫操作時,允許多個讀執行單元;寫操作最多有一個寫進程,且讀和寫不能同時進行
自旋鎖與讀寫自旋鎖的對比:
| 操作 | 自旋鎖(spin lock) | 讀寫自旋鎖(rwlock) |
| 讀 | 互斥 | 不互斥 |
| 寫 | 互斥 | 互斥 |
| 讀+寫 | 互斥 | 互斥 |
讀寫自旋鎖涉及的操作
1、定義和初始化讀寫自旋鎖:
rwlock_t my_rwlock = RW_LOCK_UNLOCKED; /* 靜態初始化 */ rwlock_t my_rwlock; void rwlock_init(rwlock_t *lock); /* 動態初始化 */
2、讀鎖定
void read_lock(rwlock_t *lock) int read_trylock(rwlock_t *lock)
3、讀解鎖
void read_unlock(rwlock_t *lock)
在對共享資源進行讀取之前,應該先調用讀解鎖定函數,完成之后應調用讀解鎖函數。
4、寫鎖定
void write_lock(rwlock_t *lock) int write_trylock(rwlock_t *lock)
5、寫解鎖
void write_unlock(rwlock_t *lock)
在對共享資源進行寫之前,應該先調用寫解鎖定函數,完成之后應調用寫解鎖函數。
信號量的使用
信號量(semaphore)是用於保護臨界區的一種常用方法,它的使用方式和自旋鎖類似,只有得到信號量的進程才能執行臨界區代碼。但也與自旋鎖有不同之處,對於獲取不到信號量的執行序列將會進入休眠狀態而不是原地打轉。
信號量相關操作:
1、定義信號量
struct semaphore { spinlock_t lock; /* 用來對count變量起保護作用 */ unsigned int count; /* 大於0,資源空閑;等於0,資源忙,但沒有進程等待這個保護的資源;小於0,資源不可用,並至少有一個進程等待資源 */ struct list_head wait_list; /* 存放等待隊列鏈表的地址,當前等待資源的所有睡眠進程都會放在這個鏈表中 */ }; struct semaphore sem; /* 定義一個名為 sem 的信號量 */
2、初始化信號量
void sema_init(struct semaphore *sem, int val); /* 初始化信號量,並將信號量的值設置為 val */
3、獲得信號量
void down(struct semaphore *sem); /* P操作(減),當P操作操作的信號量為0,則休眠等(不允許被信號打斷) 不能在中斷上下文使用 */ void down_interruptible(struct semaphore *sem); /* 允許在睡眠狀態被信號打斷 */ void down_trylock(struct semaphore *sem); /* 嘗試獲取信號量,如果能立刻獲得,獲得該信號量並返回 0,否者返回非 0 值,不會導致睡眠,可在中斷上下文使用 */
4、釋放信號量
void up(struct semaphore *sem); /* 釋放信號量 sem,喚醒等待者 */
信號量一般這樣使用
struct semaphore sem; /* 定義信號量 */
down(&sem); /* 獲取信號量,保護臨界區 */ ... critical section /*臨界區*/ ... up(&sem); /* 釋放信號量 */
使用實例:
static struct semaphore sem; /* 定義信號量 */ sema_init(&sem,1); /* 初始化為 1 */ static int hello_open (struct inode *inode, struct file *filep) { if(down_interruptible(&sem)) /* p操作,獲得信號量,保護臨界區 */ { return -ERESTART; /* 已經被占用 */ } return 0; } static int hello_release (struct inode *inode, struct file *filep) { up(&sem); /* v操作,釋放信號量 */ return 0; }
互斥體是一種鎖機制,相對於自旋鎖它實現了休眠。
互斥體的相關操作
1、定義互斥體
類型:struct mutex
2、初始化互斥體
void mutex_init(struct mutex *); /* 初始化互斥體 */
3、銷毀互斥體
void mutex_destroy(struct mutex *lock); /* 釋放互斥體 */
4、加鎖
void mutex_lock(struct mutex *lock); /* 獲取互斥體,若已經被獲取則休眠等(不允許被中斷和信號打斷) */ int mutex_lock_interruptible(struct mutex *lock); /* 允許在睡眠狀態被中斷打斷 */ int mutex_lock_killable(struct mutex *lock); /* 允許在休眠過程狀態信號打斷 */ int mutex_trylock(struct mutex *lock); /* 嘗試獲取互斥體,如果能立刻獲得,獲得該互斥體並返回 0,否者返回非 0 值,不會導致睡眠 */
5、解鎖
void mutex_unlock(struct mutex *lock); /* 釋放 mutex */
mntex的使用方法和信號量用於互斥的場合完全一樣:
struct mutex my_mutex; /* 定義 mutex */ mutex_init(&my_mutex); /* 初始化 mutex */ mutex_lock(&my_mutex); /* 獲取 mutex */ ... critical section /*臨界區*/ ... mutex_unlock(&my_mutex); /* 是否 mutex */
| 信號量 | 自旋鎖 | |
| 1、開銷成本 | 進程上下文切換時間 | 忙等待獲得自旋鎖時間 |
| 2、特性 | a -- 導致阻塞,產生睡眠 b -- 進程級的(內核是代表進程來爭奪資源的) |
a -- 忙等待,內核搶占關閉 b -- 主要是用於CPU同步的 |
| 3、應用場合 | 只能運行於進程上下文 | 還可以出現中斷上下文 |
| 4、其他 | 還可以出現在用戶進程中 | 只能在內核線程中使用 |
從以上的區別以及本身的定義可以推導出兩都分別適應的場合。只考慮內核態
后記:除了上述幾種廣泛使用的的並發控制機制外,還有中斷屏蔽、順序鎖(seqlock)、RCU(Read-Copy-Update)等等,做個簡單總結如下圖:

