轉自:https://www.jianshu.com/p/f0d6e7103d9b
spinlock用在什么場景?
自旋鎖用在臨界區代碼非常少的情況。
spinlock在使用時有什么注意事項?
- 臨界區代碼應該盡可能精簡
- 不允許睡眠(會出現死鎖)
- Need to have interrupts disabled when locked by ordinary threads, if shared by an interrupt handler。(會出現死鎖)
spinlock是怎么實現的?
看一下源代碼:
typedef struct raw_spinlock { arch_spinlock_t raw_lock; #ifdef CONFIG_GENERIC_LOCKBREAK unsigned int break_lock; #endif #ifdef CONFIG_DEBUG_SPINLOCK unsigned int magic, owner_cpu; void *owner; #endif #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; #endif } raw_spinlock_t; typedef struct spinlock { union { struct raw_spinlock rlock; #ifdef CONFIG_DEBUG_LOCK_ALLOC # define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map)) struct { u8 __padding[LOCK_PADSIZE]; struct lockdep_map dep_map; }; #endif }; } spinlock_t;
如果忽略CONFIG_DEBUG_LOCK_ALLOC話,spinlock主要包含一個arch_spinlock_t的結構,從名字可以看出,這個結構是跟體系結構有關的。
加鎖流程
加鎖的相關源碼如下:
#define raw_spin_lock(lock) _raw_spin_lock(lock) static inline void spin_lock(spinlock_t *lock) { raw_spin_lock(&lock->rlock); }
_raw_spin_lock完成實際的加鎖動作。
根據CPU體系結構,spinlock分為SMP版本和UP版本,這里以SMP版本為例來分析。SMP版本中,_raw_spin_lock為聲明為:
void __lockfunc _raw_spin_lock(raw_spinlock_t *lock) __acquires(lock);
再看_raw_spin_lock的實現,SMP版本中,看_raw_spin_lock最終調用了__raw_spin_lock,__raw_spin_lock的源代碼如下:
static inline void __raw_spin_lock(raw_spinlock_t *lock) { // 禁止搶占 preempt_disable(); // for debug spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); // real work done here LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock); }
LOCK_CONTENDED是一個通用的加鎖流程。do_raw_spin_trylock和do_raw_spin_lock的實現依賴於具體的體系結構,以x86為例,do_raw_spin_trylock最終調用的是:
do_raw_spin_trylock的源代碼:
static inline int do_raw_spin_trylock(raw_spinlock_t *lock) { // 體系結構相關 return arch_spin_trylock(&(lock)->raw_lock); }
以x86為例,arch_spin_trylock最終調用__ticket_spin_trylock函數。其源代碼如下:
// 定義在arch/x86/include/asm/spinlock_types.h typedef struct arch_spinlock { union { __ticketpair_t head_tail; struct __raw_tickets { __ticket_t head, tail; // 注意,x86使用的是小端模式,存在高地址空間的是tail } tickets; }; } arch_spinlock_t; // 定義在arch/x86/include/asm中 static __always_inline int __ticket_spin_trylock(arch_spinlock_t *lock) { arch_spinlock_t old, new; // 獲取舊的ticket信息 old.tickets = ACCESS_ONCE(lock->tickets); // head和tail不一致,說明鎖正被占用,加鎖不成功 if (old.tickets.head != old.tickets.tail) return 0; new.head_tail = old.head_tail + (1 << TICKET_SHIFT); // 將tail + 1 /* cmpxchg is a full barrier, so nothing can move before it */ return cmpxchg(&lock->head_tail, old.head_tail, new.head_tail) == old.head_tail; }
從上述代碼中可知,__ticket_spin_trylock的核心功能,就是判斷自旋鎖是否被占用,如果沒被占用,嘗試原子性地更新lock中的head_tail的值,將tail+1,返回是否加鎖成功。
不考慮CONFIG_DEBUG_SPINLOCK宏的話, do_raw_spin_lock的源代碼如下:
static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock) { __acquire(lock); arch_spin_lock(&lock->raw_lock); }
arch_spin_lock的源代碼:
static __always_inline void arch_spin_lock(arch_spinlock_t *lock) { __ticket_spin_lock(lock); }
__ticket_spin_lock的源代碼:
static __always_inline void __ticket_spin_lock(arch_spinlock_t *lock) { register struct __raw_tickets inc = { .tail = 1 }; // 原子性地把ticket中的tail+1,返回的inc是+1之前的原始值 inc = xadd(&lock->tickets, inc); for (;;) { // 循環直到head和tail相等 if (inc.head == inc.tail) break; cpu_relax(); // 讀取新的head值 inc.head = ACCESS_ONCE(lock->tickets.head); } barrier(); /* make sure nothing creeps before the lock is taken */ }
ticket分成兩個部分,一部分叫tail,相當於一個隊列的隊尾,一個部分叫head,相當於一個隊列的隊頭。初始化的時候,tail和head都是0,表示無人占用鎖。
__ticket_spin_lock就是原子性地把tail+1,並且把+1之前的值記錄下來,然后不斷地和head進行比較。由於是原子性的操作,所以不同的鎖競爭者拿到的tail值是不一樣的。如果tail值和head一樣了,說明這時候沒人占用鎖了,下一個拿到鎖的就是自己了。
舉例來說,假設線程A和線程B競爭同一個自旋鎖:
- 初始化tail=0, head=0,線程A將tail+1, 並返回tail的舊值0,將0和head值比較,相等,於是這時候線程A就拿到了鎖。
- 線程A這時候也來拿鎖,將tail值+1,變成2,返回tail的舊值1,將其和head值0比較,不相等,繼續循環。
- 線程A用完鎖了,將head值+1。
- 線程B讀取head值,並將其和tail值比較,發現相等,獲得鎖。
解鎖流程
對於SMP架構來說,spin_unlock最終調用的是__raw_spin_unlock,其源代碼如下:
static inline void __raw_spin_unlock(raw_spinlock_t *lock) { spin_release(&lock->dep_map, 1, _RET_IP_); // 主要的解鎖工作 do_raw_spin_unlock(lock); // 啟用搶占 preempt_enable(); } static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock) { arch_spin_unlock(&lock->raw_lock); __release(lock); }
arch_spin_unlock在x86體系結構下的實現代碼如下:
static __always_inline void arch_spin_unlock(arch_spinlock_t *lock) { __ticket_spin_unlock(lock); } static __always_inline void __ticket_spin_unlock(arch_spinlock_t *lock) { // 將tickers的head值加1 __add(&lock->tickets.head, 1, UNLOCK_LOCK_PREFIX); }
考慮中斷處理函數
如果自旋鎖可能在中斷處理處理中使用,那么在獲取自旋鎖之前,必須禁止本地中斷。則,持有鎖的內核代碼會被中斷處理程序打斷,接着試圖去爭用這個已經被持有的自旋鎖。這樣的結果是,中斷處理函數自旋,等待該鎖重新可用,但是鎖的持有者在該中斷處理程序執行完畢之前不可能運行,這就成為了雙重請求死鎖。注意,需要關閉的只是當前處理器上的中斷。因為中斷發生在不同的處理器上,即使中斷處理程序在同一鎖上自旋,也不會妨礙鎖的持有者(在不同處理器上)最終釋放。
所以要使用spin_lock_irqsave() / spin_unlock_irqrestore()這個版本的加鎖、解鎖函數。
函數spin_lock_irqsave():保存中斷的當前狀態,禁止本地中斷,然后獲取指定的鎖。
函數spin_unlock_reqrestore():對指定的鎖解鎖,讓中斷恢復到加鎖前的狀態。所以即使中斷最初是被禁止的,代碼也不會錯誤地激活它們。
spinlock的幾種變種
- rwlock_t 讀寫鎖
- seqlock_t 順序鎖
參考資料
Ticket spinklocks
Linux內核源代碼
作者:Jiafu89
鏈接:https://www.jianshu.com/p/f0d6e7103d9b
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。