關鍵詞:mutex、MCS、OSQ。
信號量是在並行處理環境中對多個處理器訪問某個公共資源進行保護的機制,mutex用於互斥操作。
信號量的count初始化為1,down()/up()也可以實現類似mutex的作用,那為什么還要單獨實現mutex機制呢?
mutex的語義相對於信號量要簡單輕便一些,在鎖爭用激烈的測試場景下,mutex比信號量執行速度更快,可擴展性更好,另外mutex數據結構的定義比信號量小。
1. mutex數據結構
struct mutex數據結構用於描述mutex。
struct mutex { /* 1: unlocked, 0: locked, negative: locked, possible waiters */ atomic_t count;----------------------------原子計數,1表示沒人持有鎖;0表示鎖被持有;負數表示鎖被持有且有人在等待隊列中等待。 spinlock_t wait_lock;----------------------spinlock鎖,用於保護wait_list睡眠等待隊列。 struct list_head wait_list;--------------------用於管理所有在該mutex上睡眠的進程,沒有成功獲取鎖的進程會睡眠在此鏈表上。 #if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER) struct task_struct *owner;---------------------用於指向鎖持有者的task_struct數據結構。 #endif #ifdef CONFIG_MUTEX_SPIN_ON_OWNER struct optimistic_spin_queue osq; /* Spinner MCS lock */----用於實現MCS鎖機制。 #endif... };
2. MCS鎖機制
mutex實現了自旋等待的機制,自旋等待機制的核心原理是當發現持有鎖者正在臨界區執行並且沒有其他優先級高的進程要被調度(need_resched)時,那么當前進程堅信鎖持有者會很快離開臨界區並釋放鎖,因此睡眠與其睡眠等待不如樂觀地自選等待,以減少睡眠喚醒的開銷。
排隊自旋鎖的缺點:
在多處理器和NUMA系統中,排隊自旋鎖仍然存在一個比較嚴重的問題。
假設在一個鎖爭用激烈的系統中,所有自旋等待鎖的線程都在同一個共享變量上自旋,申請和釋放鎖都在同一個變量上修改,由cache一致性原理導致參與自旋的CPU中的cacheline變得無效,即多個CPU上的cacheline反復失效,大大降低系統整體性能。
MCS算法可以解決自旋遇到的問題,顯著減少CPU cacheline bouncing問題。
MCS算法的核心思想是每個鎖的申請者只在本地CPU的變量上自旋,而不是全局變量。
2.1 MCS鎖數據結構
struct optimistic_spin_node數據結構表示本地CPU上的節點,它可以組織成一個雙向鏈表,包含next和prev指針。
/* * An MCS like lock especially tailored for optimistic spinning for sleeping * lock implementations (mutex, rwsem, etc). */ struct optimistic_spin_node { struct optimistic_spin_node *next, *prev;-------------------next和prev指針可以組成一個雙向鏈表。 int locked; /* 1 if lock acquired */------------------------表示加鎖狀態。 int cpu; /* encoded CPU # + 1 value */----------------------用於重新編碼CPU編號,表示該node在那個CPU上。 }; struct optimistic_spin_queue { /* * Stores an encoded value of the CPU # of the tail node in the queue. * If the queue is empty, then it's set to OSQ_UNLOCKED_VAL. */ atomic_t tail; };
struct optimistic_spin_node數據結構會定義成per-CPU變量,即每個CPU有一個node結構。
static DEFINE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node, osq_node);
2.2 MCS鎖初始化
MCS鎖在osq_lock_init()函數中進行初始化。
#define OSQ_UNLOCKED_VAL (0) /* Init macro and function. */ #define OSQ_LOCK_UNLOCKED { ATOMIC_INIT(OSQ_UNLOCKED_VAL) } static inline void osq_lock_init(struct optimistic_spin_queue *lock) { atomic_set(&lock->tail, OSQ_UNLOCKED_VAL); }
2.3 osq_lock()/osq_unlock()
osq_lock()/osq_unlock()用於申請和釋放MCS鎖。
bool osq_lock(struct optimistic_spin_queue *lock) { struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);-----------node指向當前CPU的struct optimistic_spin_node節點。 struct optimistic_spin_node *prev, *next; int curr = encode_cpu(smp_processor_id());-----------------------------表示當前CPU編號,0表示沒有CPU,1表示CPU0,以此類推。 int old; node->locked = 0; node->next = NULL; node->cpu = curr; old = atomic_xchg(&lock->tail, curr);-----------使用原子交換函數atomic_xchg()交換全局lock->tail和當前CPU號,如果lock->tail就只等於初始化OSQ_UNLOCKED_VAL,說明沒有人持鎖,那么讓lock->tail等於當前CPU標號表示成功持鎖。 if (old == OSQ_UNLOCKED_VAL) return true; prev = decode_cpu(old);-------------------------之前獲取鎖失敗,prev表示old指向的CPU所屬節點的struct optimistic_spin_node數據結構。 node->prev = prev; ACCESS_ONCE(prev->next) = node; while (!ACCESS_ONCE(node->locked)) {------------一直查詢當前節點node->locked是否變成了1,因為前繼節點prev釋放鎖時會把它的下一個節點中的locked成員置為1,然后才能成功釋放鎖。 if (need_resched())-------------------------在自旋等待過程中,如果有更高優先級進程搶占或者被調度器要求調度出去,那應該放棄自旋等待,退出MCS鏈表,跳轉到unqueue標簽處處理MCS鏈表刪除節點的情況。 goto unqueue; cpu_relax_lowlatency(); } return true; unqueue: /* * Step - A -- stabilize @prev * * Undo our @prev->next assignment; this will make @prev's * unlock()/unqueue() wait for a next pointer since @lock points to us * (or later). */ for (;;) { if (prev->next == node && cmpxchg(&prev->next, node, NULL) == node) break; /* * We can only fail the cmpxchg() racing against an unlock(), * in which case we should observe @node->locked becomming * true. */ if (smp_load_acquire(&node->locked)) return true; cpu_relax_lowlatency(); /* * Or we race against a concurrent unqueue()'s step-B, in which * case its step-C will write us a new @node->prev pointer. */ prev = ACCESS_ONCE(node->prev); } /* * Step - B -- stabilize @next * * Similar to unlock(), wait for @node->next or move @lock from @node * back to @prev. */ next = osq_wait_next(lock, node, prev); if (!next) return false; /* * Step - C -- unlink * * @prev is stable because its still waiting for a new @prev->next * pointer, @next is stable because our @node->next pointer is NULL and * it will wait in Step-A. */ ACCESS_ONCE(next->prev) = prev; ACCESS_ONCE(prev->next) = next; return false; } #define smp_load_acquire(p) \ ({ \ typeof(*p) ___p1 = ACCESS_ONCE(*p); \ compiletime_assert_atomic_type(*p); \ smp_mb(); \ ___p1; \ }) /* * Get a stable @node->next pointer, either for unlock() or unqueue() purposes. * Can return NULL in case we were the last queued and we updated @lock instead. */ static inline struct optimistic_spin_node * osq_wait_next(struct optimistic_spin_queue *lock, struct optimistic_spin_node *node, struct optimistic_spin_node *prev) { struct optimistic_spin_node *next = NULL; int curr = encode_cpu(smp_processor_id()); int old; /* * If there is a prev node in queue, then the 'old' value will be * the prev node's CPU #, else it's set to OSQ_UNLOCKED_VAL since if * we're currently last in queue, then the queue will then become empty. */ old = prev ? prev->cpu : OSQ_UNLOCKED_VAL; for (;;) { if (atomic_read(&lock->tail) == curr && atomic_cmpxchg(&lock->tail, curr, old) == curr) { /* * We were the last queued, we moved @lock back. @prev * will now observe @lock and will complete its * unlock()/unqueue(). */ break; } /* * We must xchg() the @node->next value, because if we were to * leave it in, a concurrent unlock()/unqueue() from * @node->next might complete Step-A and think its @prev is * still valid. * * If the concurrent unlock()/unqueue() wins the race, we'll * wait for either @lock to point to us, through its Step-B, or * wait for a new @node->next from its Step-C. */ if (node->next) { next = xchg(&node->next, NULL); if (next) break; } cpu_relax_lowlatency(); } return next; }
void osq_unlock(struct optimistic_spin_queue *lock) { struct optimistic_spin_node *node, *next; int curr = encode_cpu(smp_processor_id()); /* * Fast path for the uncontended case. */ if (likely(atomic_cmpxchg(&lock->tail, curr, OSQ_UNLOCKED_VAL) == curr)) return; /* * Second most likely case. */ node = this_cpu_ptr(&osq_node); next = xchg(&node->next, NULL); if (next) { ACCESS_ONCE(next->locked) = 1; return; } next = osq_wait_next(lock, node, NULL); if (next) ACCESS_ONCE(next->locked) = 1; }
3. mutex鎖的實現
3.1 mutex數據結構
struct mutex { /* 1: unlocked, 0: locked, negative: locked, possible waiters */ atomic_t count; spinlock_t wait_lock; struct list_head wait_list; #if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER) struct task_struct *owner; #endif #ifdef CONFIG_MUTEX_SPIN_ON_OWNER struct optimistic_spin_queue osq; /* Spinner MCS lock */ #endif #ifdef CONFIG_DEBUG_MUTEXES void *magic; #endif #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; #endif }; struct mutex_waiter { struct list_head list; struct task_struct *task; #ifdef CONFIG_DEBUG_MUTEXES void *magic; #endif };
3.2 mutex初始化
mutex鎖的初始化有兩種方式,一種是靜態使用DEFINE_MUTEX宏,另一種是在內核代碼中動態使用mutex_init()函數。
#define __MUTEX_INITIALIZER(lockname) \ { .count = ATOMIC_INIT(1) \ , .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock) \ , .wait_list = LIST_HEAD_INIT(lockname.wait_list) \ __DEBUG_MUTEX_INITIALIZER(lockname) \ __DEP_MAP_MUTEX_INITIALIZER(lockname) } #define DEFINE_MUTEX(mutexname) \ struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)
# define mutex_init(mutex) \ do { \ static struct lock_class_key __key; \ \ __mutex_init((mutex), #mutex, &__key); \ } while (0) void __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key) { atomic_set(&lock->count, 1); spin_lock_init(&lock->wait_lock); INIT_LIST_HEAD(&lock->wait_list); mutex_clear_owner(lock); #ifdef CONFIG_MUTEX_SPIN_ON_OWNER osq_lock_init(&lock->osq);----------------------初始化MCS鎖 #endif debug_mutex_init(lock, name, key); }
3.3 mutex_lock()/mutex_unlock()
mutex_lock()申請mutex鎖的快車道條件是count計數原子減1后等於0;如果count原子減1后小於0,說明該鎖已經被人持有,那么要進入慢車道__mutex_lock_slowpath()。
void __sched mutex_lock(struct mutex *lock) { might_sleep(); /* * The locking fastpath is the 1->0 transition from * 'unlocked' into 'locked' state. */ __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath); mutex_set_owner(lock);--------------------------------------------在成功持鎖后要設置lock->owner指向當前進程的task_struct數據結構。 } static inline void __mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *)) { if (unlikely(atomic_dec_return(count) < 0)) fail_fn(count); } __visible void __sched __mutex_lock_slowpath(atomic_t *lock_count) { struct mutex *lock = container_of(lock_count, struct mutex, count); __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_, NULL, 0); }
static inline void mutex_set_owner(struct mutex *lock)
{
lock->owner = current;
}
static __always_inline int __sched __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, struct lockdep_map *nest_lock, unsigned long ip, struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx) { struct task_struct *task = current; struct mutex_waiter waiter; unsigned long flags; int ret; preempt_disable();---------------------------------------------關閉內核搶占。 mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip); if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) { /* got the lock, yay! */ preempt_enable();-------------------------------------------恢復搶占。 return 0; } spin_lock_mutex(&lock->wait_lock, flags); /* * Once more, try to acquire the lock. Only try-lock the mutex if * it is unlocked to reduce unnecessary xchg() operations. */ if (!mutex_is_locked(lock) && (atomic_xchg(&lock->count, 0) == 1))----再嘗試一次獲取鎖。 goto skip_wait; debug_mutex_lock_common(lock, &waiter); debug_mutex_add_waiter(lock, &waiter, task_thread_info(task)); /* add waiting tasks to the end of the waitqueue (FIFO): */ list_add_tail(&waiter.list, &lock->wait_list);------------------------把waiter加入到mutex等待隊列wait_list中,這里實現的是先進先出隊列。 waiter.task = task; lock_contended(&lock->dep_map, ip); for (;;) { if (atomic_read(&lock->count) >= 0 && (atomic_xchg(&lock->count, -1) == 1))---------------------每次循環首先嘗試是否可以獲取鎖,lock->count設置為-1,在后面代碼中還會判斷會判斷等待隊列中是否還有等待者。 break; if (unlikely(signal_pending_state(state, task))) { ret = -EINTR; goto err;-------------------------------------------------收到異常信號退出循環。 } if (use_ww_ctx && ww_ctx->acquired > 0) { ret = __ww_mutex_lock_check_stamp(lock, ww_ctx); if (ret) goto err; } __set_task_state(task, state); /* didn't get the lock, go to sleep: */ spin_unlock_mutex(&lock->wait_lock, flags); schedule_preempt_disabled();----------------------------------如果獲取失敗,調用schedule_preempt_disabled()讓出CPU,當前進程進入睡眠狀態。 spin_lock_mutex(&lock->wait_lock, flags); } __set_task_state(task, TASK_RUNNING);-----------------------------如果for循環成功獲取鎖而退出for循環,那么將設置當前進程為可運行狀態TASK_EUNNING。 mutex_remove_waiter(lock, &waiter, current_thread_info());--------從lock->waiter_list中出列。 /* set it to 0 if there are no waiters left: */ if (likely(list_empty(&lock->wait_list))) atomic_set(&lock->count, 0);----------------------------------如果等待隊列中沒有人在睡眠等待,那么把count設置為0。 debug_mutex_free_waiter(&waiter); skip_wait: /* got the lock - cleanup and rejoice! */ lock_acquired(&lock->dep_map, ip); mutex_set_owner(lock); if (use_ww_ctx) { struct ww_mutex *ww = container_of(lock, struct ww_mutex, base); ww_mutex_set_context_slowpath(ww, ww_ctx); } spin_unlock_mutex(&lock->wait_lock, flags); preempt_enable(); return 0;-------------------------------------------------------成功獲取鎖,設置owner為當前進程,打開內核搶占,返回0. err: mutex_remove_waiter(lock, &waiter, task_thread_info(task)); spin_unlock_mutex(&lock->wait_lock, flags); debug_mutex_free_waiter(&waiter); mutex_release(&lock->dep_map, 1, ip); preempt_enable(); return ret; } static bool mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx) { struct task_struct *task = current; if (!mutex_can_spin_on_owner(lock))----------mutex_can_spin_on_owner()返回0說明鎖持有者並沒有正在運行,不符合自旋等待機制的條件。自旋等待的條件是持有鎖者正在臨界區執行,自選等待才有價值。 goto done; if (!osq_lock(&lock->osq))-------------------獲取一個OSQ鎖保護,因為接下來要自旋等待該鎖盡快釋放,不希望有其他人參與進來一起自旋等待,多人參與自旋等待會導致嚴重的CPU高速緩存顛簸。這里把所有在等待mutex的參與者放入OSQ鎖隊列中,只有隊列的第一個等待者可以參與自旋等待。 goto done; while (true) {-----------------------------一直自旋並且判斷鎖持有者是否釋放了鎖。 struct task_struct *owner; if (use_ww_ctx && ww_ctx->acquired > 0) { struct ww_mutex *ww; ww = container_of(lock, struct ww_mutex, base); if (ACCESS_ONCE(ww->ctx)) break; } owner = ACCESS_ONCE(lock->owner); if (owner && !mutex_spin_on_owner(lock, owner))------一直自旋等待鎖持有者盡快釋放鎖,返回true表示釋放鎖。 break; /* Try to acquire the mutex if it is unlocked. */ if (mutex_try_to_acquire(lock)) {---------------------在只有這釋放了鎖之后,當前進程嘗試去獲取該鎖。 lock_acquired(&lock->dep_map, ip); if (use_ww_ctx) { struct ww_mutex *ww; ww = container_of(lock, struct ww_mutex, base); ww_mutex_set_context_fastpath(ww, ww_ctx); } mutex_set_owner(lock);----------------------------把lock->owner指向當前進程task_struct數據結構。 osq_unlock(&lock->osq); return true; } if (!owner && (need_resched() || rt_task(task)))-----owner為NULL,也有可能持有鎖者在成功獲取鎖和設置owner間隙中被強占調度,或者如果當前是實時進程或者也要退出自旋等待。 break; cpu_relax_lowlatency(); } osq_unlock(&lock->osq); done: if (need_resched()) { __set_current_state(TASK_RUNNING); schedule_preempt_disabled(); } return false; } static inline int mutex_can_spin_on_owner(struct mutex *lock) { struct task_struct *owner; int retval = 1; if (need_resched())---------------------如果當前進程需要被調度,返回0。 return 0; rcu_read_lock();-------------------------RCU讀臨界區包括owner指向的task_struct數據結構在讀臨界區內不會被釋放。 owner = ACCESS_ONCE(lock->owner); if (owner) retval = owner->on_cpu;--------------owner指向持鎖進程的task_struct數據結構,task_struct->on_cpu為1表示鎖持有者正在運行,也就是正在臨界區中執行,因為鎖持有者釋放該鎖后lock->owner為NULL。 rcu_read_unlock(); /* * if lock->owner is not set, the mutex owner may have just acquired * it and not set the owner yet or the mutex has been released. */ return retval; } static noinline int mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner) { rcu_read_lock(); while (owner_running(lock, owner)) {-----判斷鎖的持有者lock->owner是否和owner相等,如果不等返回0;否則返回owner->on_cpu的值。owner_running()返回0,那么當前進程就沒有必要在while循環里一直監視持有鎖者的情況。 if (need_resched())------------------如果調度器需要調度其它進程,那么當前進程也只能被迫退出自旋等待。
break; cpu_relax_lowlatency(); } rcu_read_unlock(); return lock->owner == NULL;--------------當lock->owner為null時,表示持有者釋放鎖,返回true。 }
有兩種情況導致退出自旋:
一是鎖持有者釋放了鎖,即lock->owner不指向鎖持有者或者鎖持有者發生了變化;
二是鎖持有者沒有釋放鎖,但是鎖持有者在臨界區執行時被調度出去了,也就是睡眠了,即on_cpu=0。
這兩種情況下,當前進程都應該積極主動退出自旋等待機制。
static inline bool owner_running(struct mutex *lock, struct task_struct *owner)
{
if (lock->owner != owner)
return false;
barrier();
return owner->on_cpu;
}
static inline bool mutex_try_to_acquire(struct mutex *lock) { return !mutex_is_locked(lock) && (atomic_cmpxchg(&lock->count, 1, 0) == 1);-----------首先讀取原子變量lock->count,判斷是否為1,如果是1使用atomic_cmpxchg()函數把count設為0,成功獲取鎖。 } static inline int mutex_is_locked(struct mutex *lock) { return atomic_read(&lock->count) != 1; }
mutex_unlock()解除mutex鎖,和加鎖一樣有快車道和慢車道之分。
解鎖快車道時如果count原子加1后大於0,說明等待隊列中沒有人,那么就解鎖成功;否則進入慢車道函數__mutex_unlock_slowpath()。
void __sched mutex_unlock(struct mutex *lock) { #ifndef CONFIG_DEBUG_MUTEXES mutex_clear_owner(lock);------------清除lock->owner的指向。 #endif __mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath); } static inline void mutex_clear_owner(struct mutex *lock) { lock->owner = NULL; } static inline void __mutex_fastpath_unlock(atomic_t *count, void (*fail_fn)(atomic_t *)) { if (unlikely(atomic_inc_return(count) <= 0))-------count加1后大於0,說明等待隊列中沒有人,解鎖成功;小於等於0,說明等待隊列中還有人,進入解鎖慢車道。 fail_fn(count); } __visible void __mutex_unlock_slowpath(atomic_t *lock_count) { struct mutex *lock = container_of(lock_count, struct mutex, count); __mutex_unlock_common_slowpath(lock, 1); }
__mutex_unlock_common_slowpath()是慢車道釋放鎖的函數,首先lock->count減1,然后查看等待隊列是否為空,不為空則喚醒第一個waiter。
static inline void __mutex_unlock_common_slowpath(struct mutex *lock, int nested) { unsigned long flags; if (__mutex_slowpath_needs_to_unlock())------------處於性能考慮,首先釋放鎖,然后取喚醒等待隊列中的waiters。讓其它進程可以搶占鎖。 atomic_set(&lock->count, 1); spin_lock_mutex(&lock->wait_lock, flags); mutex_release(&lock->dep_map, nested, _RET_IP_); debug_mutex_unlock(lock); if (!list_empty(&lock->wait_list)) { /* get the first entry from the wait-list: */ struct mutex_waiter *waiter = list_entry(lock->wait_list.next, struct mutex_waiter, list);-----只喚醒等待隊列中排在第一位的waiter。 debug_mutex_wake_waiter(lock, waiter); wake_up_process(waiter->task); } spin_unlock_mutex(&lock->wait_lock, flags); }
4. 小結
4.1 mutex的優點和使用注意點
mutex和信號量相比要高效的多:
- mutex最先實現自旋等待機制
- mutex在睡眠之前嘗試獲取鎖
- mutex實現MCS所來避免多個CPU爭用鎖而導致CPU高速緩存顛簸現象。
正因為mutex的簡潔高效,mutex的使用場景比信號量更加嚴格:
- 同一時刻只有一個線程可以持有mutex。
- 只有鎖持有者可以解鎖。不能再一個進程中持有mutex,在另外一個進程中釋放他。
- 不允許遞歸地加鎖和解鎖。
- 當進程持有mutex時,進程不可以退出。
- mutex必須使用官方API來初始化。
- mutex可以睡眠,所以不允許在中斷處理程序或者中斷下半部中使用,例如tasklet、定時器等。
4.2 如何選擇spinlock、信號量、mutex?
- 在中斷上下文中毫不猶豫地使用spinlock。
- 如果臨界區可能睡眠、調度等動作,應避免選擇spinlock。
- mutex和信號量,優先選擇mutex,除非mutex不適合上述限制場景。
