OverView
同步基元分為用戶模式和內核模式
用戶模式:Iterlocked.Exchange(互鎖)、SpinLocked(自旋鎖)、易變構造(volatile關鍵字、
volatile
類、Thread.VolatitleRead|Thread.VolatitleWrite
)、MemoryBarrier。
通過對SpinLock鎖的內部代碼分析,徹底了解SpinLock的工作原理。
SpinLock內部有一個共享變量 owner 表示鎖的所有者是誰。當該鎖有所有者時,owner不在為0。當owner為0時,表示該鎖沒有擁有者。任何線程都可以參與競爭該鎖。
獲取鎖的采用的是位邏輯運算,這也是常用的權限運算方式。
鎖住其他線程采用的是死循模式,只有滿足一定條件才能跳出死循。當第一個線程獲取鎖的時候。后續進入的線程都會被困在死循環里面,做spinner.SpinOnce()自旋,這是很消耗cpu的,因此SplinLock 鎖只能 用於短時間的運算。
鎖的內部 沒有使用到 Win32 內核對象,所以只能進行線程之間的同步,不能進行跨進程同步。如果要完成跨進程的同步,需要使用 Monitor
、Mutex
這樣的方案。
通過源代碼分析我們可以總結出SpinLock鎖的特點: 互斥 、自旋、非重入、只能用於極短暫的運算,進程內使用。
SpinLock鎖雖然是值類型,但是內部狀態會改變,所以不要把他聲明為Readonly字段。
SpinLock鎖 的內部構造分析
變量
private volatile int _owner; //多線程共享變量 所以volatile關鍵字 private const int SLEEP_ONE_FREQUENCY = 40;//自旋多少次以后,執行sleep(1), private const int TIMEOUT_CHECK_FREQUENCY = 10; // After how many yields, check the timeout //禁用ID 跟蹤 性能模式:當高位為1時,鎖可用性由低位表示。當低位為1時——鎖被持有;0——鎖可用。 private const int LOCK_ID_DISABLE_MASK = unchecked((int)0x80000000); // 1000 0000 0000 0000 0000 0000 0000 0000 private const int ID_DISABLED_AND_ANONYMOUS_OWNED = unchecked((int)0x80000001); // 1000 0000 0000 0000 0000 0000 0000 0001 //除非在構造函數時,傳入false。否則默認啟用線程id跟蹤 //啟用ID跟蹤 啟用所有權跟蹤模式:高位為0,剩余位為存儲當前所有者的托管線程ID。當31位低是0,鎖是可用的。 private const int WAITERS_MASK = ~(LOCK_ID_DISABLE_MASK | 1); // 0111 1111 1111 1111 1111 1111 1111 1110 private const int LOCK_ANONYMOUS_OWNED = 0x1; // 0000 0000 0000 0000 0000 0000 0000 0001
構造函數
//除非在初始化時候給構造函數傳入false。用默認構造函數初始化或者傳入true 都是啟用線程id跟蹤 public SpinLock(bool enableThreadOwnerTracking) { _owner = LOCK_UNOWNED; // 0000 0000 0000 0000 0000 0000 0000 0000 if (!enableThreadOwnerTracking) { _owner |= LOCK_ID_DISABLE_MASK; // 1000 0000 0000 0000 0000 0000 0000 0000 Debug.Assert(!IsThreadOwnerTrackingEnabled, "property should be false by now"); } }
Enter(bool)方法
public void Enter(ref bool lockTaken) { // Try to keep the code and branching in this method as small as possible in order to inline the method int observedOwner = _owner; if (lockTaken || // invalid parameter 剛開始鎖都是未啟用的,所以該值都是false ////除非在構造函數時,傳入false。否則默認啟用線程id跟蹤 // 構造函數傳入true或者用默認構造函數時候啟用線程id跟蹤 // observedOwner & ID_DISABLED_AND_ANONYMOUS_OWNED= 0000 0000 0000 0000 0000 0000 0000 0000& 1000 0000 0000 0000 0000 0000 0000 0001 // 當構造函數傳入false。 // observedOwner & ID_DISABLED_AND_ANONYMOUS_OWNED= 1000 0000 0000 0000 0000 0000 0000 0000&1000 0000 0000 0000 0000 0000 0000 0001 (observedOwner & ID_DISABLED_AND_ANONYMOUS_OWNED) != LOCK_ID_DISABLE_MASK || //一般情況下是false,構造函數傳入false情況下它是ture 。 // 構造函數傳入true或者用默認構造函數時候啟用線程id跟蹤 //observedOwner | LOCK_ANONYMOUS_OWNED=0000 0000 0000 0000 0000 0000 0000 0000| 0000 0000 0000 0000 0000 0000 0000 0001 // 當構造函數傳入false。 //observedOwner | LOCK_ANONYMOUS_OWNED=1000 0000 0000 0000 0000 0000 0000 0000| 0000 0000 0000 0000 0000 0000 0000 0001 //用到cas機制,這就是為什么說spinlock是樂觀鎖 CompareExchange(ref _owner, observedOwner | LOCK_ANONYMOUS_OWNED, observedOwner, ref lockTaken) != observedOwner) //結果為true時候,獲取鎖失敗。 ContinueTryEnter(Timeout.Infinite, ref lockTaken); // Timeout.Infinite=-1 一個用於指定無限長等待時間的常數 如果獲取鎖失敗,就進入自旋等待 }
ContinueTryEnter 方法
//其他代碼 //跟蹤鎖的持有者 (_owner & LOCK_ID_DISABLE_MASK) == 0; 除非構造函數傳入false ,否則都走這個分支 if (IsThreadOwnerTrackingEnabled) { // Slow path for enabled thread tracking mode ContinueTryEnterWithThreadTracking(millisecondsTimeout, startTime, ref lockTaken); return; } //其他代碼
ContinueTryEnterWithThreadTracking 方法
核心函數
private void ContinueTryEnterWithThreadTracking(int millisecondsTimeout, uint startTime, ref bool lockTaken) { Debug.Assert(IsThreadOwnerTrackingEnabled); const int LockUnowned = 0; int newOwner = Environment.CurrentManagedThreadId; if (_owner == newOwner) { //防止鎖重入 throw new LockRecursionException(SR.SpinLock_TryEnter_LockRecursionException); } SpinWait spinner = default; // Loop until the lock has been successfully acquired or, if specified, the timeout expires. while (true) { // We failed to get the lock, either from the fast route or the last iteration // and the timeout hasn't expired; spin once and try again. spinner.SpinOnce(); // Test before trying to CAS, to avoid acquiring the line exclusively unnecessarily. //判斷鎖釋放釋放了 if (_owner == LockUnowned) { //如果釋放了就立即獲取鎖。 if (CompareExchange(ref _owner, newOwner, LockUnowned, ref lockTaken) == LockUnowned) { return;//獲取成功 退出自旋式的等待 } } // Check the timeout. We only RDTSC if the next spin will yield, to amortize the cost. if (millisecondsTimeout == 0 || (millisecondsTimeout != Timeout.Infinite && spinner.NextSpinWillYield && TimeoutHelper.UpdateTimeOut(startTime, millisecondsTimeout) <= 0)) { return; } } }
EXIT()
public void Exit() { // This is the fast path for the thread tracking is disabled, otherwise go to the slow path if ((_owner & LOCK_ID_DISABLE_MASK) == 0)//默認的構造函數初始化的spinlock 走這一步分支 ExitSlowPath(true); else Interlocked.Decrement(ref _owner);//SpinLock(false)的構造函數初始化的spinlock 走這一步分支 } /// </exception> public void Exit(bool useMemoryBarrier) { int tmpOwner = _owner; if ((tmpOwner & LOCK_ID_DISABLE_MASK) != 0 & !useMemoryBarrier) { //退出對鎖所有權 _owner = tmpOwner & (~LOCK_ANONYMOUS_OWNED); } else { //用原子操作的方式 退出鎖。因為只有一個線程獲取到鎖,所以這一般不用這種方式退出,比較耗時。 ExitSlowPath(useMemoryBarrier); } }
通過以上代碼我們可以總結出SpinLock鎖的特點: 互斥 、自旋、非重入、只能用於極短暫的運算。
假如開啟4個線程 數數,從0數到1千萬,這個程序在4核cpu上運行,其中用了interlock鎖 那么運行情況如下圖:
此時線程1獲得鎖,其他線程未獲得鎖都在自旋中(死循環),占着core不放。所以要確保interLock鎖任何線程持有鎖的時間不會超過一個非常短的時間段。要不就造成資源巨大浪費。
SpinLock內部使用spinWait、InterLocked實現原子操作。
原理:
鎖定內部式SpinWait.SpinOnce。在自旋次數超過10之后,每次進行自旋便會觸發上下文切換的操作,在這之后每自旋5次會進行一次sleep(0)操作,每20次會進行一次sleep(1)操作。
Sleep(0) 只允許那些優先級相等或更高的線程使用當前的CPU,其它線程只能等着挨餓了。如果沒有合適的線程,那當前線程會重新使用 CPU 時間片。
使用要點:
1、每次使用都要初始化為false 確保未被獲取,如果已獲取鎖,則為 true,否則為 false。
2、SpinLock 是非重入鎖,這意味着,如果線程持有鎖,則不允許再次進入該鎖。
3、SpinLock結構是一個低級別的互斥同步基元,它在等待獲取鎖時進行旋轉。
4、用 SpinLock 時,請確保任何線程持有鎖的時間不會超過一個非常短的時間段,並確保任何線程在持有鎖時不會阻塞。
5、 即使 SpinLock 未獲取鎖,它也會產生線程的時間片。此時的未獲取鎖的線程就是占着cpu的其他core 等着,已經占用鎖的線程釋放鎖。
6、 在多核計算機上,當等待時間預計較短且極少出現爭用情況時,SpinLock 的性能將高於其他類型的鎖。
7、由於 SpinLock 是一個值類型,因此,如果您希望兩個副本都引用同一個鎖,則必須通過引用顯式傳遞該鎖。
8、如果調用時 Exit 沒有首先調用的 Enter 內部狀態,則 SpinLock 可能會損壞。
9、如果啟用了線程所有權跟蹤 (通過) 是否可以使用它 IsThreadOwnerTrackingEnabled ,則當某個線程嘗試重新進入它已經持有的鎖時,將引發異常。 但是,如果禁用了線程所有權跟蹤,嘗試輸入已持有的鎖將導致死鎖。
10、SpinLock每次請求同步鎖的效率非常高,但如果請求不到的話,會一直請求而浪費CPU時間,所以它適合那種並發程度不高、競爭性不強的場景。
11、在某些情況下,SpinLock 會停止旋轉,以防出現邏輯處理器資源不足或超線程系統上優先級反轉的情況。
使用場合:
1、只能在進程內的線程使用。
因為他是輕量級鎖。輕量級線程同步方案因為沒有使用到 Win32 內核對象,而是在 .NET 內部完成,所以只能進行線程之間的同步,不能進行跨進程同步。如果要完成跨進程的同步,需要使用 Monitor
、Mutex
這樣的方案。
2、適合在非常輕量的計算中使用。
它與普通 lock 的區別在於普通 lock 使用 Win32 內核態對象來實現等待
屬性 描述
IsHeld 獲取鎖當前是否已由任何線程占用。
IsHeldByCurrentThread 獲取鎖是否已由當前線程占用。
IsThreadOwnerTrackingEnabled 獲取是否已為此實例啟用了線程所有權跟蹤。
方法 描述
Enter(Boolean) 采用可靠的方式獲取鎖,這樣,即使在方法調用中發生異常的情況下,都能采用可靠的方式檢查 lockTaken 以確定是否已獲取鎖。
Exit() 釋放鎖。
Exit(Boolean) 釋放鎖。
TryEnter(Boolean) 嘗試采用可靠的方式獲取鎖,這樣,即使在方法調用中發生異常的情況下,都能采用可靠的方式檢查 lockTaken 以確定是否已獲取鎖
TryEnter(Int32, Boolean) 嘗試采用可靠的方式獲取鎖,這樣,即使在方法調用中發生異常的情況下,都能采用可靠的方式檢查 lockTaken 以確定是否已獲取鎖。
TryEnter(TimeSpan, Boolean) 嘗試采用可靠的方式獲取鎖,這樣,即使在方法調用中發生異常的情況下,都能采用可靠的方式檢查 lockTaken 以確定是否已獲取鎖。
案例:
開4個線程 從0數到1千萬
using System.Diagnostics; class Program { static long counter = 1; //如果聲明為只讀字段,會導致每次調用都會返回一個SpinLock新副本, //在多線程下,每個方法都會成功獲得鎖,而受到保護的臨界區不會按照預期進行串行化。 static SpinLock sl = new();//一個類申請一把鎖給多線程用,不能聲明成只讀的。 // 開4個線程 從0數到1千萬 static void Main(string[] args) { Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); Parallel.Invoke(f1, f1, f1, f1); Console.WriteLine(stopwatch.ElapsedMilliseconds); Console.WriteLine(counter); } static void f1() { for (int i = 1; i <= 25_000_00; i++) { // static SpinLock sl = new();錯誤聲明方式,這樣每個線程都會獲得一把鎖,導致失去同步的效果 bool dfdf = false;//每次使用都要初始化為false,每一次循環都是開始爭搶鎖。 sl.Enter(ref dfdf); try { counter++; } finally { sl.Exit(); } } } }
注意:多線程數數 的效率比單線程還慢。原因是搶鎖浪費時間和Volatile變量 浪費時間。單線程數據就在寄存器中,運算速度不受到資源,以最快速度計算。