linux同步機制-互斥鎖


 一、互斥鎖(mutex)

1.1 什么是互斥鎖

互斥鎖實現了“互相排斥”(mutual exclusion)同步的簡單形式,所以名為互斥鎖。互斥鎖禁止多個進程同時進入受保護的代碼“臨界區”(critical section)。因此,在任意時刻,只有一個進程被允許進入這樣的代碼保護區。

mutex的語義相對於信號量要簡單輕便一些,在鎖爭用激烈的測試場景下,mutex比信號量執行速度更快,可擴展性更好,另外mutex數據結構的定義比信號量小。

1.2 互斥鎖的特性

  • 互斥鎖是Linux內核中用於互斥操做的一種同步原語;
  • 互斥鎖是一種休眠鎖,鎖爭用時可能存在進程的睡眠與喚醒,context的切換帶來的代價較高,適用於加鎖時間較長的場景;
  • 互斥鎖每次只容許一個進程進入臨界區,有點相似於二值信號量;
  • 互斥鎖在鎖爭用時,在鎖被持有時,選擇自旋等待,而不當即進行休眠,能夠極大的提升性能,這種機制(optimistic spinning)也應用到了讀寫信號量上;
  • 互斥鎖的缺點是互斥鎖對象的結構較大,會占用更多的CPU緩存和內存空間;
  • 與信號量相比,互斥鎖的性能與擴展性都更好,所以,在內核中老是會優先考慮互斥鎖;
  • 互斥鎖按為了提升性能,提供了三條路徑處理:快速路徑,中速路徑,慢速路徑;

1.3 互斥鎖的使用

定義互斥鎖:

struct mutex my_mutex;

初始化互斥鎖:

mutex_init(&my_mutex);

或者使用宏定義,並初始化互斥鎖:

DEFINE_MUTEX(my_mutex)

獲取互斥鎖:

void mutex_lock(struct mutex *lock);

該函數用於獲得mutex, 它會導致睡眠, 因此不能在中斷上下文中使用。

int mutex_lock_interruptible(struct mutex *lock);

該函數功能與mutex_lock類似,不同之處為mutex_lock進入睡眠狀態的進程不能被信號打斷,而mutex_lock_interruptible進入睡眠狀態的進程能被信號打斷,而使用此函數進入休眠后,進程狀態被設置為TASK_INTERRUPTIBLE,該類型的睡眠是可以被信號打斷的。

如果返回0,表示獲得互斥鎖;如果被信號打斷,返回EINTR。

int mutex_trylock(struct mutex *lock);

mutex_trylock用於嘗試獲得mutex,獲取不到mutex時不會引起進程睡眠。

釋放互斥鎖:

void mutex_unlock(struct mutex *lock);

1.4 mutex和信號量

mutex和信號量相比要高效的多:

  • mutex最先實現自旋等待機制;
  • mutex在睡眠之前嘗試獲取鎖;
  • mutex實現MCS所來避免多個CPU爭用鎖而導致CPU高速緩存顛簸現象;

二、MCS鎖機制

2.1 MCS鎖

  • 上文中提到過mutex在實現過程當中,采用了optimistic spinning自旋等待機制,這個機制的核心就是基於MCS鎖機制來實現的;
  • MCS鎖機制是由John Mellor Crummey和Michael Scott在論文中《algorithms for scalable synchronization on shared-memory multiprocessors》提出的,並以他倆的名字來命名;
  • MCS鎖機制要解決的問題是:在多CPU系統中,每當一個spinlock的值出現變化時,所有試圖獲取這個spinlock的CPU都需要讀取內存,刷新自己對應的cache line,而最終只有一個CPU可以獲得鎖,也只有它的刷新才是有意義的。鎖的爭搶越激烈(試圖獲取鎖的CPU數目越多),無謂的開銷也就越大;
  • MCS鎖機制的核心思想:每一個CPU都分配一個自旋鎖結構體,自旋鎖的申請者(per-CPU)在local-CPU變量上自旋,這些結構體組建成一個鏈表,申請者自旋等待前驅節點釋放該鎖;
  • osq(optimistci spinning queue)是基於MCS算法的一個具體實現,並通過了迭代優化;

2.2 oqs流程分析

optimistic spinning,樂觀自旋,到底有多樂觀呢?當發現鎖被持有時,optimistic spinning相信持有者很快就能把鎖釋放,因此它選擇自旋等待,而不是睡眠等待,這樣也就能減少進程切換帶來的開銷了。

看一下數據結構吧:

osq_lock如下:

osq加鎖有幾種情況:

  • 加鎖過程中使用了原子操作,來確保正確性; 無人持有鎖,那是最理想的狀態,直接返回;
  • 有人持有鎖,將當前的Node加入到OSQ隊列中,在沒有高優先級任務搶占時,自旋等待前驅節點釋放鎖;
  • 自旋等待過程中,如果遇到高優先級任務搶占,那么需要做的事情就是將之前加入到OSQ隊列中的當前節點,從OSQ隊列中移除,移除的過程又分為三個步驟,分別是處理prev前驅節點的next指針指向、當前節點Node的next指針指向、以及將prev節點與next后繼節點連接;

加鎖過程中使用了原子操作,來確保正確性;

osq_unlock如下:

解鎖時也分為幾種情況:

  • 無人爭用該鎖,那直接可以釋放鎖;
  • 獲取當前節點指向的下一個節點,如果下一個節點不為NULL,則將下一個節點解鎖;
  • 當前節點的下一個節點為NULL,則調用osq_wait_next,來等待獲取下一個節點,並在獲取成功后對下一個節點進行解鎖;

從解鎖的情況可以看出,這個過程相當於鎖的傳遞,從上一個節點傳遞給下一個節點;

在加鎖和解鎖的過程中,由於可能存在操作來更改osq隊列,因此都調用了osq_wait_next來獲取下一個確定的節點:

三、互斥鎖源碼實現

3.1 mutex 

mutext結構體在include/linux/mutex.h文件中定義:

/*
 * Simple, straightforward mutexes with strict semantics:
 *
 * - only one task can hold the mutex at a time
 * - only the owner can unlock the mutex
 * - multiple unlocks are not permitted
 * - recursive locking is not permitted
 * - a mutex object must be initialized via the API
 * - a mutex object must not be initialized via memset or copying
 * - task may not exit with mutex held
 * - memory areas where held locks reside must not be freed
 * - held mutexes must not be reinitialized
 * - mutexes may not be used in hardware or software interrupt
 *   contexts such as tasklets and timers
 *
 * These semantics are fully enforced when DEBUG_MUTEXES is
 * enabled. Furthermore, besides enforcing the above rules, the mutex
 * debugging code also implements a number of additional features
 * that make lock debugging easier and faster:
 *
 * - uses symbolic names of mutexes, whenever they are printed in debug output
 * - point-of-acquire tracking, symbolic lookup of function names
 * - list of all locks held in the system, printout of them
 * - owner tracking
 * - detects self-recursing locks and prints out all relevant info
 * - detects multi-task circular deadlocks and prints out all affected
 *   locks and tasks (and only those tasks)
 */
struct mutex {
        atomic_long_t           owner;
        spinlock_t              wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
        struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
        struct list_head        wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
        void                    *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
        struct lockdep_map      dep_map;
#endif
};

可以看到上面的英文注釋:

  • 一次只能有一個進程能持有互斥鎖;
  • 只有鎖的持有者能進行解鎖操作;
  • 禁止多次解鎖操作;
  • 禁止遞歸加鎖操作;
  • mutext結構必須通過API進行初始化;
  • mutex結構禁止通過memset或者拷貝來進行初始化;
  • 持有互斥鎖的進程可能無法退出;
  • 不能釋放持有鎖所在的內存區域;
  • 已經被持有的muetxt鎖禁止被再初始化;
  • mutext鎖不能在硬件或軟件中斷上下文中使用,比如tasklet、定時器等;

然后我們再來介紹這個結構體中幾個重要的成員:

  • owner:原子計數。用於指向鎖持有者進程的task struct,0表示沒有被進程持有鎖;
  • wait_lock:自旋鎖,用於wait_list鏈表的保護操作;
  • wait_list:是一個雙向鏈表,使用該等待列表保存因獲取不到互斥鎖而進行睡眠的進程:;

從上面成員可以看到,mutext的源碼實現應該使用到了原子操作、以及自旋鎖。

當存在多個進程競爭互斥鎖時,由於互斥鎖是共享變量,因此對互斥鎖的成員變量的修改都要是互斥操作。

3.2 mutext初始化

mutex鎖的初始化有兩種方式,一種是靜態使用DEFINE_MUTEX宏:

#define __MUTEX_INITIALIZER(lockname) \
                { .owner = ATOMIC_LONG_INIT(0) \
                , .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock) \
                , .wait_list = LIST_HEAD_INIT(lockname.wait_list) \
                __DEBUG_MUTEX_INITIALIZER(lockname) \
                __DEP_MAP_MUTEX_INITIALIZER(lockname) }

#define DEFINE_MUTEX(mutexname) \
        struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)

這里初始化了原子計數owner、自旋鎖結構體wait_lock 、以及等待列表wait_list。

另一種是在內核代碼中動態使用mutex_init函數,定義在kernel/locking/mutex.c文件中::

# define mutex_init(mutex) \
do {                            \
    static struct lock_class_key __key;        \
                            \
    __mutex_init((mutex), #mutex, &__key);        \
} while (0)

void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
    atomic_set(&lock->count, 1);
    spin_lock_init(&lock->wait_lock);
    INIT_LIST_HEAD(&lock->wait_list);
    mutex_clear_owner(lock);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
    osq_lock_init(&lock->osq);      //初始化MCS鎖
#endif

    debug_mutex_init(lock, name, key);
}

3.2 mutex_lock

mutext_lock加鎖流程如下圖:

mutex_lock定義在kernel/locking/mutex.c文件中:

/**
 * mutex_lock - acquire the mutex
 * @lock: the mutex to be acquired
 *
 * Lock the mutex exclusively for this task. If the mutex is not
 * available right now, it will sleep until it can get it.
 *
 * The mutex must later on be released by the same task that
 * acquired it. Recursive locking is not allowed. The task
 * may not exit without first unlocking the mutex. Also, kernel
 * memory where the mutex resides must not be freed with
 * the mutex still locked. The mutex must first be initialized
 * (or statically defined) before it can be locked. memset()-ing
 * the mutex to 0 is not allowed.
 *
 * (The CONFIG_DEBUG_MUTEXES .config option turns on debugging
 * checks that will enforce the restrictions and will also do
 * deadlock debugging)
 *
 * This function is similar to (but not equivalent to) down().
 */
void __sched mutex_lock(struct mutex *lock)
{
        might_sleep();

        if (!__mutex_trylock_fast(lock))
                __mutex_lock_slowpath(lock);
}

mutex_lock為了提高性能,分為三種路徑處理,優先使用快速和中速路徑來處理,如果條件不滿足則會跳轉到慢速路徑來處理,慢速路徑中會進行睡眠和調度,因此開銷也是最大的。

3.3 fast-path

快速路徑是在__mutex_trylock_fast中實現:

/*
 * Lockdep annotations are contained to the slow paths for simplicity.
 * There is nothing that would stop spreading the lockdep annotations outwards
 * except more code.
 */

/*
 * Optimistic trylock that only works in the uncontended case. Make sure to
 * follow with a __mutex_trylock() before failing.
 */
static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
        unsigned long curr = (unsigned long)current;
        unsigned long zero = 0UL;

        if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))
                return true;

        return false;
}

直接調用原子操作函數atomic_long_try_cmpxchg_acquire來進行判斷:

  • 如果lock->owner等於0,則將curr賦值給lock->owner,標識curr進程持有鎖,並直接返回:
  • 如果lock->owner不等於0,表明鎖被持有,需要進入下一個路徑來處理了;

3.4 mid-path

中速路徑和慢速路徑都在__mutex_lock_common中實現:

static noinline void __sched
__mutex_lock_slowpath(struct mutex *lock)
{
        __mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);
}
static int __sched
__mutex_lock(struct mutex *lock, long state, unsigned int subclass,
             struct lockdep_map *nest_lock, unsigned long ip)
{
        return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
}

可以看到__mutex_lock_slowpath的最終實現在__mutex_lock_common函數中:

  1 /*
  2  * Lock a mutex (possibly interruptible), slowpath:
  3  */
  4 static __always_inline int __sched
  5 __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
  6                     struct lockdep_map *nest_lock, unsigned long ip,
  7                     struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
  8 {
  9         struct mutex_waiter waiter;
 10         bool first = false;
 11         struct ww_mutex *ww;
 12         int ret;
 13 
 14         might_sleep();
 15 
 16         ww = container_of(lock, struct ww_mutex, base);
 17         if (use_ww_ctx && ww_ctx) {
 18                 if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
 19                         return -EALREADY;
 20 
 21                 /*
 22                  * Reset the wounded flag after a kill. No other process can
 23                  * race and wound us here since they can't have a valid owner
 24                  * pointer if we don't have any locks held.
 25                  */
 26                 if (ww_ctx->acquired == 0)
 27                         ww_ctx->wounded = 0;
 28         }
 29 
 30         preempt_disable();
 31         mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
 32 
 33         if (__mutex_trylock(lock) ||
 34             mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
 35                 /* got the lock, yay! */
 36                 lock_acquired(&lock->dep_map, ip);
 37                 if (use_ww_ctx && ww_ctx)
 38                         ww_mutex_set_context_fastpath(ww, ww_ctx);
 39                 preempt_enable();
 40                 return 0;
 41         }
 42  spin_lock(&lock->wait_lock);
 43         /*
 44          * After waiting to acquire the wait_lock, try again.
 45          */
 46         if (__mutex_trylock(lock)) {
 47                 if (use_ww_ctx && ww_ctx)
 48                         __ww_mutex_check_waiters(lock, ww_ctx);
 49 
 50                 goto skip_wait;
 51         }
 52 
 53         debug_mutex_lock_common(lock, &waiter);
 54 
 55         lock_contended(&lock->dep_map, ip);
 56 
 57         if (!use_ww_ctx) {
 58                 /* add waiting tasks to the end of the waitqueue (FIFO): */
 59                 __mutex_add_waiter(lock, &waiter, &lock->wait_list);
 60 
 61 
 62 #ifdef CONFIG_DEBUG_MUTEXES
 63                 waiter.ww_ctx = MUTEX_POISON_WW_CTX;
 64 #endif
 65         } else {
 66                 /*
 67                  * Add in stamp order, waking up waiters that must kill
 68                  * themselves.
 69                  */
 70                 ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
 71                 if (ret)
 72                         goto err_early_kill;
 73 
 74                 waiter.ww_ctx = ww_ctx;
 75         }
 76 
 77         waiter.task = current;
 78 
 79         set_current_state(state);
 80         for (;;) {
 81                 /*
 82                  * Once we hold wait_lock, we're serialized against
 83                  * mutex_unlock() handing the lock off to us, do a trylock
 84                  * before testing the error conditions to make sure we pick up
 85                  * the handoff.
 86                  */
 87                 if (__mutex_trylock(lock))
 88                         goto acquired;
 89 
 90                 /*
 91                  * Check for signals and kill conditions while holding
 92                  * wait_lock. This ensures the lock cancellation is ordered
 93                  * against mutex_unlock() and wake-ups do not go missing.
 94                  */
 95                 if (signal_pending_state(state, current)) {
 96                         ret = -EINTR;
 97                         goto err;
 98                 }
 99  if (use_ww_ctx && ww_ctx) {
100                         ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
101                         if (ret)
102                                 goto err;
103                 }
104 
105                 spin_unlock(&lock->wait_lock);
106                 schedule_preempt_disabled();
107 
108                 /*
109                  * ww_mutex needs to always recheck its position since its waiter
110                  * list is not FIFO ordered.
111                  */
112                 if ((use_ww_ctx && ww_ctx) || !first) {
113                         first = __mutex_waiter_is_first(lock, &waiter);
114                         if (first)
115                                 __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
116                 }
117 
118                 set_current_state(state);
119                 /*
120                  * Here we order against unlock; we must either see it change
121                  * state back to RUNNING and fall through the next schedule(),
122                  * or we must see its unlock and acquire.
123                  */
124                 if (__mutex_trylock(lock) ||
125                     (first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
126                         break;
127 
128                 spin_lock(&lock->wait_lock);
129         }
130         spin_lock(&lock->wait_lock);
131 acquired:
132         __set_current_state(TASK_RUNNING);
133 
134         if (use_ww_ctx && ww_ctx) {
135                 /*
136                  * Wound-Wait; we stole the lock (!first_waiter), check the
137                  * waiters as anyone might want to wound us.
138                  */
139                 if (!ww_ctx->is_wait_die &&
140                     !__mutex_waiter_is_first(lock, &waiter))
141                         __ww_mutex_check_waiters(lock, ww_ctx);
142         }
143 
144         mutex_remove_waiter(lock, &waiter, current);
145         if (likely(list_empty(&lock->wait_list)))
146                 __mutex_clear_flag(lock, MUTEX_FLAGS);
147 
148         debug_mutex_free_waiter(&waiter);
149 
150 skip_wait:
151         /* got the lock - cleanup and rejoice! */
152         lock_acquired(&lock->dep_map, ip);
153 
154         if (use_ww_ctx && ww_ctx)
155                 ww_mutex_lock_acquired(ww, ww_ctx);
156 
157         spin_unlock(&lock->wait_lock);
158         preempt_enable();
159         return 0;
160 
161 err:
162         __set_current_state(TASK_RUNNING);
163         mutex_remove_waiter(lock, &waiter, current);
164 err_early_kill:
165         spin_unlock(&lock->wait_lock);
166         debug_mutex_free_waiter(&waiter);
167         mutex_release(&lock->dep_map, 1, ip);
168         preempt_enable();
169         return ret;
170 }
View Code

這個代碼實在太多了,我懶得看了,直接看其它博主分析的流程圖吧:

當發現mutex鎖的持有者正在運行(另一個CPU)時,可以不進行睡眠調度,而可以選擇自選等待,當鎖持有者正在運行時,它很有可能很快會釋放鎖,這個就是樂觀自旋的原因;

自旋等待的條件是持有鎖者正在臨界區運行,自旋等待才有價值;

__mutex_trylock_or_owner函數用於嘗試獲取鎖,如果獲取失敗則返回鎖的持有者。互斥鎖的結構體中owner字段,分為兩個部分:

1)鎖持有者進程的task_struct(由於L1_CACHE_BYTES對齊,低位比特沒有使用);

2)MUTEX_FLAGS部分,也就是對應低三位,如下:

  • MUTEX_FLAG_WAITERS:比特0,標識存在非空等待者鏈表,在解鎖的時候需要執行喚醒操作;
  • MUTEX_FLAG_HANDOFF:比特1,表明解鎖的時候需要將鎖傳遞給頂部的等待者;
  • MUTEX_FLAG_PICKUP:比特2,表明鎖的交接准備已經做完了,可以等待被取走了;

mutex_optimistic_spin用於執行樂觀自旋,理想的情況下鎖持有者執行完釋放,當前進程就能很快的獲取到鎖。實際需要考慮,如果鎖的持有者如果在臨界區被調度出去了,task_struct->on_cpu == 0,那么需要結束自旋等待了,否則豈不是傻傻等待了。

  • mutex_can_spin_on_owner:進入自旋前檢查一下,如果當前進程需要調度,或者鎖的持有者已經被調度出去了,那么直接就返回了,不需要做接下來的osq_lock/oqs_unlock工作了,節省一些額外的overhead;
  • osq_lock用於確保只有一個等待者參與進來自旋,防止大量的等待者蜂擁而至來獲取互斥鎖;
  • for(;;)自旋過程中調用__mutex_trylock_or_owner來嘗試獲取鎖,獲取到后皆大歡喜,直接返回即可;
  • mutex_spin_on_owner,判斷不滿足自旋等待的條件,那么返回,讓我們進入慢速路徑吧,畢竟不能強求;

3.5 slow-path

慢速路徑的主要代碼流程如下:

從for(;;)部分的流程可以看到,當沒有獲取到鎖時,會調用schedule_preempt_disabled將本身的任務進行切換出去,睡眠等待,這也是它慢的原因了;

3.6 mutex_unlock

mutex_unlock釋放鎖流程如下圖:

mutex_unlock定義在kernel/locking/mutex.c文件中:

/**
 * mutex_unlock - release the mutex
 * @lock: the mutex to be released
 *
 * Unlock a mutex that has been locked by this task previously.
 *
 * This function must not be used in interrupt context. Unlocking
 * of a not locked mutex is not allowed.
 *
 * This function is similar to (but not equivalent to) up().
 */
void __sched mutex_unlock(struct mutex *lock)
{
#ifndef CONFIG_DEBUG_LOCK_ALLOC
        if (__mutex_unlock_fast(lock))
                return;
#endif
        __mutex_unlock_slowpath(lock, _RET_IP_);
}

釋放鎖的流程相對來說比較簡單,也分為快速路徑與慢速路徑;

快速路徑是在__mutex_unlock_fast中實現:

static __always_inline bool __mutex_unlock_fast(struct mutex *lock)
{
        unsigned long curr = (unsigned long)current;

        if (atomic_long_cmpxchg_release(&lock->owner, curr, 0UL) == curr)
                return true;

        return false;
}

直接調用原子操作函數atomic_long_cmpxchg_release來進行判斷:

  • 如果lock->owner等於curr,也是鎖的持有者為當前進程,則將lock->owner設置為0,並返回true;
  • 如果lock->owner不等於curr,表明鎖的持有者不是當前進程,返回false;

慢速路徑釋放鎖,針對三種不同的MUTEX_FLAG來進行判斷處理,並最終喚醒等待在該鎖上的任務;

void __sched __mutex_unlock_slowpath(struct mutex *lock, ...)
{
    // 釋放mutex,同時獲取記錄狀態的低3個bits
    unsigned long old = atomic_long_cmpxchg_release(&lock->owner, 
                        owner, __owner_flags(owner));
    ...
    spin_lock(&lock->wait_lock);
    if (!list_empty(&lock->wait_list)) {
        // 獲取等待隊列中的第一個線程
        struct mutex_waiter *waiter = list_first_entry
                                      (&lock->wait_list, struct mutex_waiter, list);
                     
        // 將該線程加入wake_q       
        struct task_struct *next = waiter->task;
        wake_q_add(&wake_q, next);
    }

    spin_unlock(&lock->wait_lock);

    // 喚醒該線程
    wake_up_q(&wake_q);
}

參考文章

[1]七、Linux驅動之並發控制

[2]10.按鍵之互斥、阻塞機制(詳解)

[3]Linux Mutex機制分析(轉載)

[4]Linux並發與同步專題 (4) Mutex互斥鎖

[5]Linux中的mutex機制[一] - 加鎖和osq lock

[6]Linux中的mutex機制[二] - 解鎖和ww-mutex

[7]linux內核空間的互斥鎖

[8]The mutex API

 [9]Linux中的spinlock機制[二] - MCS Lock


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM