Interlocked類中的每個方法都執行一次原子性的讀取以及寫入操作,其中public static Int32 Increment(ref Int32 location)方法是最常用到的方法,后面我在自定一個混合結構鎖的時候就會用到。
public static class Interlocked { // return (++location) public static Int32 Increment(ref Int32 location); // return (--location) public static Int32 Decrement(ref Int32 location); // return (location1 += value) public static Int32 Add(ref Int32 location1, Int32 value); // Int32 old = location1; location1 = value; return old; public static Int32 Exchange(ref Int32 location1, Int32 value); // Int32 old = location1; // if (location1 == comparand) location1 = value; // return old; public static Int32 CompareExchange(ref Int32 location1, Int32 value, Int32 comparand); ... }
假如有多個線程調用了Enter方法,那么只有一個線程能滿足條件進入while的內部,其他線程都因為不滿足條件而在不斷的判斷while條件。
exchange方法確保第一個調用的線程將m_ResourceInUse變為1,並且原始值為0.而其他線程將會使得m_ResourceInUse從1變為1,也就是原始值為1,不滿足條件。
class SimpleSpinLock { private Int32 m_ResourceInUse; // 0=false (default), 1=true public void Enter() { // Set the resource to in-use and if this thread while (Interlocked.Exchange(ref m_ResourceInUse, 1) != 0) { } } public void Leave() { Thread.VolatileWrite(ref m_ResourceInUse, 0); } }
public sealed class SomeResource { private SimpleSpinLock m_sl = new SimpleSpinLock(); public void AccessResource() { m_sl.Enter(); // Only one thread at a time can get in here to access the resource... m_sl.Leave(); } }
exchange是原子判斷true和false的一個常用辦法。
int a = 0; a++;
當編譯器把這行C#語句編譯成匯編代碼的時候,將會包含多條指令,如:
INC EAX
MOV [a], EAX
int a = 0; int tmp = a; tmp++; a = tmp;
注意,縱向是時間線,#n表示當前時候a的值。
我們的原本想法應該是這樣執行:
但是由於搶占式操作系統線程的推進是不可預測的,真正執行的時候可能是這樣
在上面的執行流程中,t1首先更新為1,然后t2更新為2.此時,從系統中其他線程的角度來看,似乎一切都正常。
然后,此時t3被喚醒繼續執行,它將覆蓋t1和t2的執行結果,重新將a的值設置為1.
這是一個典型的數據競爭問題,之所以稱為“競爭”,是因為代碼執行的正確性完全依賴於多個線程之間的競爭結果。每個線程都試圖最先執行完代碼,並且根據哪個線程最先執行完成的不同,會導致不同的結果。也就是相同的源代碼,不同的執行結果。
class SimpleHybridLock : IDisposable { private Int32 m_waiters = 0;
// The AutoResetEvent is the primitive kernel-mode construct private AutoResetEvent m_waiterLock = new AutoResetEvent(false); public void Enter() { if (Interlocked.Increment(ref m_waiters) == 1) //what will happen if we use m_waiters++ in this place? return; //return means we enter critical region// Another thread is waiting. There is contention, block this thread m_waiterLock.WaitOne(); // Bad performance hit here // When WaitOne returns, this thread now has the lock } public void Leave() { // This thread is releasing the lock if (Interlocked.Decrement(ref m_waiters) == 0) return; // No other threads are blocked, just return // Other threads are blocked, wake 1 of them m_waiterLock.Set(); // Bad performance hit here } public void Dispose() { m_waiterLock.Dispose(); } }
我們用一個int私有字段來計數,確保只有一個線程調用該方法的時候不會調用到非常影響性能的內核對象。只有多個線程並發的訪問這個方法的時候,才會初始化內核對象,阻塞線程。
我們可以給這個鎖加入更多的功能,這時我們需要保存更多的信息,也就需要更多的字段,比如說保存哪個線程擁有這個鎖,以及它擁有了多少次。在多個線程並發訪問的時候,我們也可以推遲一段時間再創建內核對象,可以加入spin lock先自旋一段時間。
internal sealed class AnotherHybridLock : IDisposable { // The Int32 is used by the primitive user-mode constructs (Interlocked methods) private Int32 m_waiters = 0; // The AutoResetEvent is the primitive kernel-mode construct private AutoResetEvent m_waiterLock = new AutoResetEvent(false); // This field controls spinning in an effort to improve performance private Int32 m_spincount = 4000; // Arbitrarily chosen count // These fields indicate which thread owns the lock and how many times it owns it private Int32 m_owningThreadId = 0, m_recursion = 0; public void Enter() { // If calling thread already owns the lock, increment recursion count and return Int32 threadId = Thread.CurrentThread.ManagedThreadId; if (threadId == m_owningThreadId) { m_recursion++; return; } // The calling thread doesn't own the lock, try to get it SpinWait spinwait = new SpinWait(); for (Int32 spinCount = 0; spinCount < m_spincount; spinCount++) { // If the lock was free, this thread got it; set some state and return if (Interlocked.CompareExchange(ref m_waiters, 1, 0) == 0) goto GotLock; // Black magic: give other threads a chance to run // in hopes that the lock will be released spinwait.SpinOnce(); } // Spinning is over and the lock was still not obtained, try one more time if (Interlocked.Increment(ref m_waiters) > 1) { // Other threads are blocked and this thread must block too m_waiterLock.WaitOne(); // Wait for the lock; performance hit // When this thread wakes, it owns the lock; set some state and return } GotLock: // When a thread gets the lock, we record its ID and // indicate that the thread owns the lock once m_owningThreadId = threadId; m_recursion = 1; } public void Leave() { // If the calling thread doesn't own the lock, there is a bug Int32 threadId = Thread.CurrentThread.ManagedThreadId; if (threadId != m_owningThreadId) throw new SynchronizationLockException("Lock not owned by calling thread"); // Decrement the recursion count. If this thread still owns the lock, just return if (--m_recursion > 0) return; m_owningThreadId = 0; // No thread owns the lock now // If no other threads are blocked, just return if (Interlocked.Decrement(ref m_waiters) == 0) return; // Other threads are blocked, wake 1 of them m_waiterLock.Set(); // Bad performance hit here } public void Dispose() { m_waiterLock.Dispose(); } }
當然鎖變復雜了,性能也會有相應的降低。有所得有所失去。
Sync block
堆上的每個對象都可以關聯一個叫做Sync block(同步塊)的數據結構。同步塊包含字段,這些字段和上面我們實現的鎖中的字段的作用是差不多的。具體地說,它為一個內核對象、擁有線程的ID、遞歸計數器、等待線程的計數提供了保存的地方。
照例配圖一張,要不光看我文字描述不太容易懂:

因此同步塊干啥子的?用來保存數據的唄……
當然,同步塊也不是一開始就上的,上面這張圖隱藏了點信息。就是其實那個指向同步塊的指針有2個指針大小的內存,還保存着hashcode的值還有一些其他 東西。如果塊內存不足以保存這些信息,那么才會為這個對象分配一個共享內存池中的同步塊。這就是Object Header Inflation現象。
懂得相同之處了,再來理解為什么鎖type類型危險的,究其原因就是type能被很多地方訪問,甚至能跨appdomain,這就很有可能你莫名其妙就和另一 個appdomain中的鎖用到同一個同步塊了。同樣情況的類型還有於AppDomain無關的反射類型,比如說啥子MemberInfo之類的。
class Program { static void Main(string[] args) { var syncTest = new SyncTest(); Thread t1 = new Thread(syncTest.LongSyncMethod); // critical region 1 t1.Start(); Thread t2 = new Thread(syncTest.NoSyncMethod); t2.Start(); Thread t3 = new Thread(syncTest.LongSyncMethod);// critical region 1 t3.Start(); Thread t4 = new Thread(syncTest.NoSyncMethod); t4.Start(); Thread t5 = new Thread(syncTest.NoSyncMethod); t5.Start(); Thread t6 = new Thread(syncTest.SyncMethodUsingPrivateObject);// critical region 2 t6.Start(); Thread t7 = new Thread(syncTest.SyncMethodUsingPrivateObject);// critical region 2 t7.Start(); } } class SyncTest { private object _lock = new object(); [MethodImplAttribute(MethodImplOptions.Synchronized)] public void LongSyncMethod() { Console.WriteLine("being asleep"); Thread.Sleep(10000); } public void NoSyncMethod() { Console.WriteLine("do sth"); } public void SyncMethodUsingPrivateObject() { lock (_lock) { Console.WriteLine("another critical section"); Thread.Sleep(5000); } } }
bool acquired = false; object tmp = listLock; try { Monitor.Enter(tmp, ref acquired); list.Add("item"); } finally { if (acquired) { Monitor.Release(tmp); } }
public class BlockingQueue<T> { private Queue<T> m_queue = new Queue<T>(); private int m_waitingConsumers = 0; public int Count { get { lock (m_queue) return m_queue.Count; } } public void Clear() { lock (m_queue) m_queue.Clear(); } public bool Contains(T item) { lock (m_queue) return m_queue.Contains(item); } public void Enqueue(T item) { lock (m_queue) { m_queue.Enqueue(item); // Wake consumers waiting for a new element. if (m_waitingConsumers > 0) Monitor.Pulse(m_queue); } } public T Dequeue() { lock (m_queue) { while (m_queue.Count == 0) { //Queue is empty, wait until en element arrives. 644 Chapter 12: Parallel Containers m_waitingConsumers++; try { Monitor.Wait(m_queue); } finally { m_waitingConsumers--; } } return m_queue.Dequeue(); } } public T Peek() { lock (m_queue) return m_queue.Peek(); } }
4.多線程之旅之四——淺談內存模型和用戶態同步機制
最后,如果你覺得文章還不錯,請點擊右下角的推薦,謝謝!