一、互斥鎖(mutex)
1.1 什么是互斥鎖
互斥鎖實現了“互相排斥”(mutual exclusion)同步的簡單形式,所以名為互斥鎖。互斥鎖禁止多個進程同時進入受保護的代碼“臨界區”(critical section)。因此,在任意時刻,只有一個進程被允許進入這樣的代碼保護區。
mutex的語義相對於信號量要簡單輕便一些,在鎖爭用激烈的測試場景下,mutex比信號量執行速度更快,可擴展性更好,另外mutex數據結構的定義比信號量小。
1.2 互斥鎖的特性
- 互斥鎖是Linux內核中用於互斥操做的一種同步原語;
- 互斥鎖是一種休眠鎖,鎖爭用時可能存在進程的睡眠與喚醒,context的切換帶來的代價較高,適用於加鎖時間較長的場景;
- 互斥鎖每次只容許一個進程進入臨界區,有點相似於二值信號量;
- 互斥鎖在鎖爭用時,在鎖被持有時,選擇自旋等待,而不當即進行休眠,能夠極大的提升性能,這種機制(optimistic spinning)也應用到了讀寫信號量上;
- 互斥鎖的缺點是互斥鎖對象的結構較大,會占用更多的CPU緩存和內存空間;
- 與信號量相比,互斥鎖的性能與擴展性都更好,所以,在內核中老是會優先考慮互斥鎖;
- 互斥鎖按為了提升性能,提供了三條路徑處理:快速路徑,中速路徑,慢速路徑;
1.3 互斥鎖的使用
定義互斥鎖:
struct mutex my_mutex;
初始化互斥鎖:
mutex_init(&my_mutex);
或者使用宏定義,並初始化互斥鎖:
DEFINE_MUTEX(my_mutex)
獲取互斥鎖:
void mutex_lock(struct mutex *lock);
該函數用於獲得mutex, 它會導致睡眠, 因此不能在中斷上下文中使用。
int mutex_lock_interruptible(struct mutex *lock);
該函數功能與mutex_lock類似,不同之處為mutex_lock進入睡眠狀態的進程不能被信號打斷,而mutex_lock_interruptible進入睡眠狀態的進程能被信號打斷,而使用此函數進入休眠后,進程狀態被設置為TASK_INTERRUPTIBLE,該類型的睡眠是可以被信號打斷的。
如果返回0,表示獲得互斥鎖;如果被信號打斷,返回EINTR。
int mutex_trylock(struct mutex *lock);
mutex_trylock用於嘗試獲得mutex,獲取不到mutex時不會引起進程睡眠。
釋放互斥鎖:
void mutex_unlock(struct mutex *lock);
1.4 mutex和信號量
mutex和信號量相比要高效的多:
- mutex最先實現自旋等待機制;
- mutex在睡眠之前嘗試獲取鎖;
- mutex實現MCS所來避免多個CPU爭用鎖而導致CPU高速緩存顛簸現象;
二、MCS鎖機制
2.1 MCS鎖
- 上文中提到過mutex在實現過程當中,采用了optimistic spinning自旋等待機制,這個機制的核心就是基於MCS鎖機制來實現的;
- MCS鎖機制是由John Mellor Crummey和Michael Scott在論文中《algorithms for scalable synchronization on shared-memory multiprocessors》提出的,並以他倆的名字來命名;
- MCS鎖機制要解決的問題是:在多CPU系統中,每當一個spinlock的值出現變化時,所有試圖獲取這個spinlock的CPU都需要讀取內存,刷新自己對應的cache line,而最終只有一個CPU可以獲得鎖,也只有它的刷新才是有意義的。鎖的爭搶越激烈(試圖獲取鎖的CPU數目越多),無謂的開銷也就越大;
- MCS鎖機制的核心思想:每一個CPU都分配一個自旋鎖結構體,自旋鎖的申請者(per-CPU)在local-CPU變量上自旋,這些結構體組建成一個鏈表,申請者自旋等待前驅節點釋放該鎖;
- osq(optimistci spinning queue)是基於MCS算法的一個具體實現,並通過了迭代優化;
2.2 oqs流程分析
optimistic spinning,樂觀自旋,到底有多樂觀呢?當發現鎖被持有時,optimistic spinning相信持有者很快就能把鎖釋放,因此它選擇自旋等待,而不是睡眠等待,這樣也就能減少進程切換帶來的開銷了。
看一下數據結構吧:
osq_lock如下:
osq加鎖有幾種情況:
- 加鎖過程中使用了原子操作,來確保正確性; 無人持有鎖,那是最理想的狀態,直接返回;
- 有人持有鎖,將當前的Node加入到OSQ隊列中,在沒有高優先級任務搶占時,自旋等待前驅節點釋放鎖;
- 自旋等待過程中,如果遇到高優先級任務搶占,那么需要做的事情就是將之前加入到OSQ隊列中的當前節點,從OSQ隊列中移除,移除的過程又分為三個步驟,分別是處理prev前驅節點的next指針指向、當前節點Node的next指針指向、以及將prev節點與next后繼節點連接;
加鎖過程中使用了原子操作,來確保正確性;
osq_unlock如下:
解鎖時也分為幾種情況:
- 無人爭用該鎖,那直接可以釋放鎖;
- 獲取當前節點指向的下一個節點,如果下一個節點不為NULL,則將下一個節點解鎖;
- 當前節點的下一個節點為NULL,則調用osq_wait_next,來等待獲取下一個節點,並在獲取成功后對下一個節點進行解鎖;
從解鎖的情況可以看出,這個過程相當於鎖的傳遞,從上一個節點傳遞給下一個節點;
在加鎖和解鎖的過程中,由於可能存在操作來更改osq隊列,因此都調用了osq_wait_next來獲取下一個確定的節點:
三、互斥鎖源碼實現
3.1 mutex
mutext結構體在include/linux/mutex.h文件中定義:
/* * Simple, straightforward mutexes with strict semantics: * * - only one task can hold the mutex at a time * - only the owner can unlock the mutex * - multiple unlocks are not permitted * - recursive locking is not permitted * - a mutex object must be initialized via the API * - a mutex object must not be initialized via memset or copying * - task may not exit with mutex held * - memory areas where held locks reside must not be freed * - held mutexes must not be reinitialized * - mutexes may not be used in hardware or software interrupt * contexts such as tasklets and timers * * These semantics are fully enforced when DEBUG_MUTEXES is * enabled. Furthermore, besides enforcing the above rules, the mutex * debugging code also implements a number of additional features * that make lock debugging easier and faster: * * - uses symbolic names of mutexes, whenever they are printed in debug output * - point-of-acquire tracking, symbolic lookup of function names * - list of all locks held in the system, printout of them * - owner tracking * - detects self-recursing locks and prints out all relevant info * - detects multi-task circular deadlocks and prints out all affected * locks and tasks (and only those tasks) */ struct mutex { atomic_long_t owner; spinlock_t wait_lock; #ifdef CONFIG_MUTEX_SPIN_ON_OWNER struct optimistic_spin_queue osq; /* Spinner MCS lock */ #endif struct list_head wait_list; #ifdef CONFIG_DEBUG_MUTEXES void *magic; #endif #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; #endif };
可以看到上面的英文注釋:
- 一次只能有一個進程能持有互斥鎖;
- 只有鎖的持有者能進行解鎖操作;
- 禁止多次解鎖操作;
- 禁止遞歸加鎖操作;
- mutext結構必須通過API進行初始化;
- mutex結構禁止通過memset或者拷貝來進行初始化;
- 持有互斥鎖的進程可能無法退出;
- 不能釋放持有鎖所在的內存區域;
- 已經被持有的muetxt鎖禁止被再初始化;
- mutext鎖不能在硬件或軟件中斷上下文中使用,比如tasklet、定時器等;
然后我們再來介紹這個結構體中幾個重要的成員:
- owner:原子計數。用於指向鎖持有者進程的task struct,0表示沒有被進程持有鎖;
- wait_lock:自旋鎖,用於wait_list鏈表的保護操作;
- wait_list:是一個雙向鏈表,使用該等待列表保存因獲取不到互斥鎖而進行睡眠的進程:;
從上面成員可以看到,mutext的源碼實現應該使用到了原子操作、以及自旋鎖。
當存在多個進程競爭互斥鎖時,由於互斥鎖是共享變量,因此對互斥鎖的成員變量的修改都要是互斥操作。
3.2 mutext初始化
mutex鎖的初始化有兩種方式,一種是靜態使用DEFINE_MUTEX宏:
#define __MUTEX_INITIALIZER(lockname) \ { .owner = ATOMIC_LONG_INIT(0) \ , .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock) \ , .wait_list = LIST_HEAD_INIT(lockname.wait_list) \ __DEBUG_MUTEX_INITIALIZER(lockname) \ __DEP_MAP_MUTEX_INITIALIZER(lockname) } #define DEFINE_MUTEX(mutexname) \ struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)
這里初始化了原子計數owner、自旋鎖結構體wait_lock 、以及等待列表wait_list。
另一種是在內核代碼中動態使用mutex_init函數,定義在kernel/locking/mutex.c文件中::
# define mutex_init(mutex) \ do { \ static struct lock_class_key __key; \ \ __mutex_init((mutex), #mutex, &__key); \ } while (0) void __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key) { atomic_set(&lock->count, 1); spin_lock_init(&lock->wait_lock); INIT_LIST_HEAD(&lock->wait_list); mutex_clear_owner(lock); #ifdef CONFIG_MUTEX_SPIN_ON_OWNER osq_lock_init(&lock->osq); //初始化MCS鎖 #endif debug_mutex_init(lock, name, key); }
3.2 mutex_lock
mutext_lock加鎖流程如下圖:
mutex_lock定義在kernel/locking/mutex.c文件中:
/** * mutex_lock - acquire the mutex * @lock: the mutex to be acquired * * Lock the mutex exclusively for this task. If the mutex is not * available right now, it will sleep until it can get it. * * The mutex must later on be released by the same task that * acquired it. Recursive locking is not allowed. The task * may not exit without first unlocking the mutex. Also, kernel * memory where the mutex resides must not be freed with * the mutex still locked. The mutex must first be initialized * (or statically defined) before it can be locked. memset()-ing * the mutex to 0 is not allowed. * * (The CONFIG_DEBUG_MUTEXES .config option turns on debugging * checks that will enforce the restrictions and will also do * deadlock debugging) * * This function is similar to (but not equivalent to) down(). */ void __sched mutex_lock(struct mutex *lock) { might_sleep(); if (!__mutex_trylock_fast(lock)) __mutex_lock_slowpath(lock); }
mutex_lock為了提高性能,分為三種路徑處理,優先使用快速和中速路徑來處理,如果條件不滿足則會跳轉到慢速路徑來處理,慢速路徑中會進行睡眠和調度,因此開銷也是最大的。
3.3 fast-path
快速路徑是在__mutex_trylock_fast中實現:
/* * Lockdep annotations are contained to the slow paths for simplicity. * There is nothing that would stop spreading the lockdep annotations outwards * except more code. */ /* * Optimistic trylock that only works in the uncontended case. Make sure to * follow with a __mutex_trylock() before failing. */ static __always_inline bool __mutex_trylock_fast(struct mutex *lock) { unsigned long curr = (unsigned long)current; unsigned long zero = 0UL; if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr)) return true; return false; }
直接調用原子操作函數atomic_long_try_cmpxchg_acquire來進行判斷:
- 如果lock->owner等於0,則將curr賦值給lock->owner,標識curr進程持有鎖,並直接返回:
- 如果lock->owner不等於0,表明鎖被持有,需要進入下一個路徑來處理了;
3.4 mid-path
中速路徑和慢速路徑都在__mutex_lock_common中實現:
static noinline void __sched __mutex_lock_slowpath(struct mutex *lock) { __mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_); } static int __sched __mutex_lock(struct mutex *lock, long state, unsigned int subclass, struct lockdep_map *nest_lock, unsigned long ip) { return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false); }
可以看到__mutex_lock_slowpath的最終實現在__mutex_lock_common函數中:

1 /* 2 * Lock a mutex (possibly interruptible), slowpath: 3 */ 4 static __always_inline int __sched 5 __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, 6 struct lockdep_map *nest_lock, unsigned long ip, 7 struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx) 8 { 9 struct mutex_waiter waiter; 10 bool first = false; 11 struct ww_mutex *ww; 12 int ret; 13 14 might_sleep(); 15 16 ww = container_of(lock, struct ww_mutex, base); 17 if (use_ww_ctx && ww_ctx) { 18 if (unlikely(ww_ctx == READ_ONCE(ww->ctx))) 19 return -EALREADY; 20 21 /* 22 * Reset the wounded flag after a kill. No other process can 23 * race and wound us here since they can't have a valid owner 24 * pointer if we don't have any locks held. 25 */ 26 if (ww_ctx->acquired == 0) 27 ww_ctx->wounded = 0; 28 } 29 30 preempt_disable(); 31 mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip); 32 33 if (__mutex_trylock(lock) || 34 mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) { 35 /* got the lock, yay! */ 36 lock_acquired(&lock->dep_map, ip); 37 if (use_ww_ctx && ww_ctx) 38 ww_mutex_set_context_fastpath(ww, ww_ctx); 39 preempt_enable(); 40 return 0; 41 } 42 spin_lock(&lock->wait_lock); 43 /* 44 * After waiting to acquire the wait_lock, try again. 45 */ 46 if (__mutex_trylock(lock)) { 47 if (use_ww_ctx && ww_ctx) 48 __ww_mutex_check_waiters(lock, ww_ctx); 49 50 goto skip_wait; 51 } 52 53 debug_mutex_lock_common(lock, &waiter); 54 55 lock_contended(&lock->dep_map, ip); 56 57 if (!use_ww_ctx) { 58 /* add waiting tasks to the end of the waitqueue (FIFO): */ 59 __mutex_add_waiter(lock, &waiter, &lock->wait_list); 60 61 62 #ifdef CONFIG_DEBUG_MUTEXES 63 waiter.ww_ctx = MUTEX_POISON_WW_CTX; 64 #endif 65 } else { 66 /* 67 * Add in stamp order, waking up waiters that must kill 68 * themselves. 69 */ 70 ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx); 71 if (ret) 72 goto err_early_kill; 73 74 waiter.ww_ctx = ww_ctx; 75 } 76 77 waiter.task = current; 78 79 set_current_state(state); 80 for (;;) { 81 /* 82 * Once we hold wait_lock, we're serialized against 83 * mutex_unlock() handing the lock off to us, do a trylock 84 * before testing the error conditions to make sure we pick up 85 * the handoff. 86 */ 87 if (__mutex_trylock(lock)) 88 goto acquired; 89 90 /* 91 * Check for signals and kill conditions while holding 92 * wait_lock. This ensures the lock cancellation is ordered 93 * against mutex_unlock() and wake-ups do not go missing. 94 */ 95 if (signal_pending_state(state, current)) { 96 ret = -EINTR; 97 goto err; 98 } 99 if (use_ww_ctx && ww_ctx) { 100 ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx); 101 if (ret) 102 goto err; 103 } 104 105 spin_unlock(&lock->wait_lock); 106 schedule_preempt_disabled(); 107 108 /* 109 * ww_mutex needs to always recheck its position since its waiter 110 * list is not FIFO ordered. 111 */ 112 if ((use_ww_ctx && ww_ctx) || !first) { 113 first = __mutex_waiter_is_first(lock, &waiter); 114 if (first) 115 __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF); 116 } 117 118 set_current_state(state); 119 /* 120 * Here we order against unlock; we must either see it change 121 * state back to RUNNING and fall through the next schedule(), 122 * or we must see its unlock and acquire. 123 */ 124 if (__mutex_trylock(lock) || 125 (first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter))) 126 break; 127 128 spin_lock(&lock->wait_lock); 129 } 130 spin_lock(&lock->wait_lock); 131 acquired: 132 __set_current_state(TASK_RUNNING); 133 134 if (use_ww_ctx && ww_ctx) { 135 /* 136 * Wound-Wait; we stole the lock (!first_waiter), check the 137 * waiters as anyone might want to wound us. 138 */ 139 if (!ww_ctx->is_wait_die && 140 !__mutex_waiter_is_first(lock, &waiter)) 141 __ww_mutex_check_waiters(lock, ww_ctx); 142 } 143 144 mutex_remove_waiter(lock, &waiter, current); 145 if (likely(list_empty(&lock->wait_list))) 146 __mutex_clear_flag(lock, MUTEX_FLAGS); 147 148 debug_mutex_free_waiter(&waiter); 149 150 skip_wait: 151 /* got the lock - cleanup and rejoice! */ 152 lock_acquired(&lock->dep_map, ip); 153 154 if (use_ww_ctx && ww_ctx) 155 ww_mutex_lock_acquired(ww, ww_ctx); 156 157 spin_unlock(&lock->wait_lock); 158 preempt_enable(); 159 return 0; 160 161 err: 162 __set_current_state(TASK_RUNNING); 163 mutex_remove_waiter(lock, &waiter, current); 164 err_early_kill: 165 spin_unlock(&lock->wait_lock); 166 debug_mutex_free_waiter(&waiter); 167 mutex_release(&lock->dep_map, 1, ip); 168 preempt_enable(); 169 return ret; 170 }
這個代碼實在太多了,我懶得看了,直接看其它博主分析的流程圖吧:
當發現mutex鎖的持有者正在運行(另一個CPU)時,可以不進行睡眠調度,而可以選擇自選等待,當鎖持有者正在運行時,它很有可能很快會釋放鎖,這個就是樂觀自旋的原因;
自旋等待的條件是持有鎖者正在臨界區運行,自旋等待才有價值;
__mutex_trylock_or_owner函數用於嘗試獲取鎖,如果獲取失敗則返回鎖的持有者。互斥鎖的結構體中owner字段,分為兩個部分:
1)鎖持有者進程的task_struct(由於L1_CACHE_BYTES對齊,低位比特沒有使用);
2)MUTEX_FLAGS部分,也就是對應低三位,如下:
- MUTEX_FLAG_WAITERS:比特0,標識存在非空等待者鏈表,在解鎖的時候需要執行喚醒操作;
- MUTEX_FLAG_HANDOFF:比特1,表明解鎖的時候需要將鎖傳遞給頂部的等待者;
- MUTEX_FLAG_PICKUP:比特2,表明鎖的交接准備已經做完了,可以等待被取走了;
mutex_optimistic_spin用於執行樂觀自旋,理想的情況下鎖持有者執行完釋放,當前進程就能很快的獲取到鎖。實際需要考慮,如果鎖的持有者如果在臨界區被調度出去了,task_struct->on_cpu == 0,那么需要結束自旋等待了,否則豈不是傻傻等待了。
- mutex_can_spin_on_owner:進入自旋前檢查一下,如果當前進程需要調度,或者鎖的持有者已經被調度出去了,那么直接就返回了,不需要做接下來的osq_lock/oqs_unlock工作了,節省一些額外的overhead;
- osq_lock用於確保只有一個等待者參與進來自旋,防止大量的等待者蜂擁而至來獲取互斥鎖;
- for(;;)自旋過程中調用__mutex_trylock_or_owner來嘗試獲取鎖,獲取到后皆大歡喜,直接返回即可;
- mutex_spin_on_owner,判斷不滿足自旋等待的條件,那么返回,讓我們進入慢速路徑吧,畢竟不能強求;
3.5 slow-path
慢速路徑的主要代碼流程如下:
從for(;;)部分的流程可以看到,當沒有獲取到鎖時,會調用schedule_preempt_disabled將本身的任務進行切換出去,睡眠等待,這也是它慢的原因了;
3.6 mutex_unlock
mutex_unlock釋放鎖流程如下圖:
mutex_unlock定義在kernel/locking/mutex.c文件中:
/** * mutex_unlock - release the mutex * @lock: the mutex to be released * * Unlock a mutex that has been locked by this task previously. * * This function must not be used in interrupt context. Unlocking * of a not locked mutex is not allowed. * * This function is similar to (but not equivalent to) up(). */ void __sched mutex_unlock(struct mutex *lock) { #ifndef CONFIG_DEBUG_LOCK_ALLOC if (__mutex_unlock_fast(lock)) return; #endif __mutex_unlock_slowpath(lock, _RET_IP_); }
釋放鎖的流程相對來說比較簡單,也分為快速路徑與慢速路徑;
快速路徑是在__mutex_unlock_fast中實現:
static __always_inline bool __mutex_unlock_fast(struct mutex *lock) { unsigned long curr = (unsigned long)current; if (atomic_long_cmpxchg_release(&lock->owner, curr, 0UL) == curr) return true; return false; }
直接調用原子操作函數atomic_long_cmpxchg_release來進行判斷:
- 如果lock->owner等於curr,也是鎖的持有者為當前進程,則將lock->owner設置為0,並返回true;
- 如果lock->owner不等於curr,表明鎖的持有者不是當前進程,返回false;
慢速路徑釋放鎖,針對三種不同的MUTEX_FLAG來進行判斷處理,並最終喚醒等待在該鎖上的任務;
void __sched __mutex_unlock_slowpath(struct mutex *lock, ...) { // 釋放mutex,同時獲取記錄狀態的低3個bits unsigned long old = atomic_long_cmpxchg_release(&lock->owner, owner, __owner_flags(owner)); ... spin_lock(&lock->wait_lock); if (!list_empty(&lock->wait_list)) { // 獲取等待隊列中的第一個線程 struct mutex_waiter *waiter = list_first_entry (&lock->wait_list, struct mutex_waiter, list); // 將該線程加入wake_q struct task_struct *next = waiter->task; wake_q_add(&wake_q, next); } spin_unlock(&lock->wait_lock); // 喚醒該線程 wake_up_q(&wake_q); }
參考文章
[3]Linux Mutex機制分析(轉載)
[5]Linux中的mutex機制[一] - 加鎖和osq lock