多線程之旅之四——淺談內存模型和用戶態同步機制


 用戶態下有兩種同步結構的
volatile construct: 在簡單數據類型上原子性的讀或者寫操作
 
interlocked construct:在簡單數據類型上原子性的讀寫操作
(在這里還是要再啰嗦一句,記住只有操作系統才有辦法阻止一個線程執行,通過無論是I/O中斷還是線程阻塞等方式。)
 
為了達到原子性操作,上面兩種結構都需要內存地址正確對齊,簡單來說就是對變量有要求,需要變量所在內存地址分別是1、2和4的倍數。正常情況下CLR中的變量類型都是字段對齊的,所以這里不展開來說。
我想還是從非常重要的interlocked類 開始說起。
 
System.Threading.Interlocked

Interlocked類中的每個方法都執行一次原子性的讀取以及寫入操作,其中public static Int32 Increment(ref Int32 location)方法是最常用到的方法,后面我在自定一個混合結構鎖的時候就會用到。
下面是這個類的方法的簽名,注釋部分說明的是相對應的同步方法。
 
public static class Interlocked {  
   // return (++location)  
   public static Int32 Increment(ref Int32 location);  
  
   // return (--location)  
   public static Int32 Decrement(ref Int32 location);  
  
   // return (location1 += value)  
   public static Int32 Add(ref Int32 location1, Int32 value);  
  
   // Int32 old = location1; location1 = value; return old;  
   public static Int32 Exchange(ref Int32 location1, Int32 value);  
  
   // Int32 old = location1;  
   // if (location1 == comparand) location1 = value; 
   // return old;  
   public static Int32 CompareExchange(ref Int32 location1,   
      Int32 value, Int32 comparand);  
   ...  
}

 

自己實現簡單的Spin Lock,不阻塞線程,但是同時又只有一個線程可以進入臨界域操作。那么其他的線程干什么了呢?肯定沒有阻塞,因為我們沒有使用到內核對象,但是為了不讓他們干擾到我們工作,只能讓它們在“原地打轉”了。

假如有多個線程調用了Enter方法,那么只有一個線程能滿足條件進入while的內部,其他線程都因為不滿足條件而在不斷的判斷while條件。

exchange方法確保第一個調用的線程將m_ResourceInUse變為1,並且原始值為0.而其他線程將會使得m_ResourceInUse從1變為1,也就是原始值為1,不滿足條件。

class SimpleSpinLock { 
   private Int32 m_ResourceInUse; // 0=false (default), 1=true 
 
   public void Enter() { 
      // Set the resource to in-use and if this thread  
      while (Interlocked.Exchange(ref m_ResourceInUse, 1) != 0) { 

      } 
   } 
 
   public void Leave() { 
      Thread.VolatileWrite(ref m_ResourceInUse, 0); 
   } 
}

如何使用這個類呢?很簡單
public sealed class SomeResource { 
   private SimpleSpinLock m_sl = new SimpleSpinLock(); 
 
   public void AccessResource() { 
      m_sl.Enter(); 
      // Only one thread at a time can get in here to access the resource... 
      m_sl.Leave(); 
   } 
}

exchange是原子判斷true和false的一個常用辦法。

 
原子性
寫到這里我覺得還可能有人對原子性不清楚,舉個例子來說吧,非常經典以及常用的++操作符號
int a = 0;
a++;

當編譯器把這行C#語句編譯成匯編代碼的時候,將會包含多條指令,如:

MOV  EAX,    [a]
INC   EAX
MOV  [a],  EAX
第一條指令獲得變量a的地址,第二條指令把以這個地址開頭的接下來的4個字節復制到寄存器EAX中。接下來的匯編指令將遞增EAX中的值,最后將遞增后的值從EAX復制回a指向的地址。
 
遺憾的是,我們從源代碼中根本無法看到++運算符中所包含的這些步驟。如果使用多個變量,那么就可以更清楚的看到這些步驟。事實上,這些步驟類似於將代碼寫成下面這樣:
int  a  = 0; 
int  tmp  =   a;
tmp++;
a  = tmp;
雖然加載寄存器和保存寄存器等指令本身都是原子的,但將加載、遞增以及保存這三條指令放在一起組成的操作組合卻就不再是原子的了
任何需要多條匯編指令的運算都是非原子的,因此++和--等操作符都是非原子的。這意味着我們需要采取額外的步驟來保證並發的安全性,下面我們來具體說下:
 
假設有三個線程t1、t2、t3同時編譯后生成下面的匯編代碼:

 

注意,縱向是時間線,#n表示當前時候a的值。

我們的原本想法應該是這樣執行:

 

但是由於搶占式操作系統線程的推進是不可預測的,真正執行的時候可能是這樣

在上面的執行流程中,t1首先更新為1,然后t2更新為2.此時,從系統中其他線程的角度來看,似乎一切都正常。

然后,此時t3被喚醒繼續執行,它將覆蓋t1和t2的執行結果,重新將a的值設置為1.

這是一個典型的數據競爭問題,之所以稱為“競爭”,是因為代碼執行的正確性完全依賴於多個線程之間的競爭結果。每個線程都試圖最先執行完代碼,並且根據哪個線程最先執行完成的不同,會導致不同的結果。也就是相同的源代碼,不同的執行結果。

 
Interlock的Increment幫我們解決了這個問題,它能保證原子的遞增。
 
下面我們用它來實現簡單的Hybird Lock
class SimpleHybridLock : IDisposable { 
   private Int32 m_waiters = 0; 
// The AutoResetEvent is the primitive kernel-mode construct private AutoResetEvent m_waiterLock = new AutoResetEvent(false); public void Enter() { if (Interlocked.Increment(ref m_waiters) == 1) //what will happen if we use m_waiters++ in this place? return; //return means we enter critical region// Another thread is waiting. There is contention, block this thread m_waiterLock.WaitOne(); // Bad performance hit here // When WaitOne returns, this thread now has the lock } public void Leave() { // This thread is releasing the lock if (Interlocked.Decrement(ref m_waiters) == 0) return; // No other threads are blocked, just return // Other threads are blocked, wake 1 of them m_waiterLock.Set(); // Bad performance hit here } public void Dispose() { m_waiterLock.Dispose(); } }

 我們用一個int私有字段來計數,確保只有一個線程調用該方法的時候不會調用到非常影響性能的內核對象。只有多個線程並發的訪問這個方法的時候,才會初始化內核對象,阻塞線程。

 

我們可以給這個鎖加入更多的功能,這時我們需要保存更多的信息,也就需要更多的字段,比如說保存哪個線程擁有這個鎖,以及它擁有了多少次。在多個線程並發訪問的時候,我們也可以推遲一段時間再創建內核對象,可以加入spin lock先自旋一段時間。

 

internal sealed class AnotherHybridLock : IDisposable { 
   // The Int32 is used by the primitive user-mode constructs (Interlocked methods) 
   private Int32 m_waiters = 0; 
 
   // The AutoResetEvent is the primitive kernel-mode construct 
   private AutoResetEvent m_waiterLock = new AutoResetEvent(false); 
 
   // This field controls spinning in an effort to improve performance 
   private Int32 m_spincount = 4000;   // Arbitrarily chosen count 
 
   // These fields indicate which thread owns the lock and how many times it owns it 
   private Int32 m_owningThreadId = 0, m_recursion = 0; 
 
   public void Enter() { 
      // If calling thread already owns the lock, increment recursion count and return 
      Int32 threadId = Thread.CurrentThread.ManagedThreadId; 
      if (threadId == m_owningThreadId) { m_recursion++; return; } 
 
      // The calling thread doesn't own the lock, try to get it 
      SpinWait spinwait = new SpinWait(); 
      for (Int32 spinCount = 0; spinCount < m_spincount; spinCount++) { 
         // If the lock was free, this thread got it; set some state and return 
         if (Interlocked.CompareExchange(ref m_waiters, 1, 0) == 0) goto GotLock; 
 
         // Black magic: give other threads a chance to run  
         // in hopes that the lock will be released 
         spinwait.SpinOnce(); 
      } 
 
      // Spinning is over and the lock was still not obtained, try one more time 
      if (Interlocked.Increment(ref m_waiters) > 1) { 
         // Other threads are blocked and this thread must block too 
         m_waiterLock.WaitOne(); // Wait for the lock; performance hit 
         // When this thread wakes, it owns the lock; set some state and return 
      } 
 
   GotLock: 
      // When a thread gets the lock, we record its ID and  
      // indicate that the thread owns the lock once 
      m_owningThreadId = threadId; m_recursion = 1; 
   } 
 
   public void Leave() { 
      // If the calling thread doesn't own the lock, there is a bug 
      Int32 threadId = Thread.CurrentThread.ManagedThreadId; 
      if (threadId != m_owningThreadId) 
         throw new SynchronizationLockException("Lock not owned by calling thread"); 
 
      // Decrement the recursion count. If this thread still owns the lock, just return 
      if (--m_recursion > 0) return; 
 
      m_owningThreadId = 0;   // No thread owns the lock now 
 
      // If no other threads are blocked, just return 
      if (Interlocked.Decrement(ref m_waiters) == 0)  
         return; 
 
      // Other threads are blocked, wake 1 of them 
      m_waiterLock.Set();     // Bad performance hit here 
   } 
 
   public void Dispose() { m_waiterLock.Dispose(); } 
}

 

當然鎖變復雜了,性能也會有相應的降低。有所得有所失去。

 

Sync block

堆上的每個對象都可以關聯一個叫做Sync block(同步塊)的數據結構。同步塊包含字段,這些字段和上面我們實現的鎖中的字段的作用是差不多的。具體地說,它為一個內核對象、擁有線程的ID、遞歸計數器、等待線程的計數提供了保存的地方。

Type類型對象 和普通對象一樣都在托管堆上,都有指向同步塊的指針。鎖住任一個普通對象和鎖住type對象是沒有什么區別的,反正用到的只是同步塊。用了不同的同步塊會創建不同的臨界域,不同的臨界域當然就沒有什么互斥的概念了。所以lock typeof(object)其實也只是說“兄弟,我要用到你所指向的同步塊來保存我同步時所必須的數據了”。

照例配圖一張,要不光看我文字描述不太容易懂:


因此同步塊干啥子的?用來保存數據的唄……
如同上面我們自己實現的混合結構鎖一樣,monitor、mutex和event就保存了0,1還有一點其他數據,比如說什么線程ID的,用來實現允許遞歸;Semaphore就保存了1,2,3,4,5……等數據。
當然,同步塊也不是一開始就上的,上面這張圖隱藏了點信息。就是其實那個指向同步塊的指針有2個指針大小的內存,還保存着hashcode的值還有一些其他 東西。如果塊內存不足以保存這些信息,那么才會為這個對象分配一個共享內存池中的同步塊。這就是Object Header Inflation現象。

懂得相同之處了,再來理解為什么鎖type類型危險的,究其原因就是type能被很多地方訪問,甚至能跨appdomain,這就很有可能你莫名其妙就和另一 個appdomain中的鎖用到同一個同步塊了。同樣情況的類型還有於AppDomain無關的反射類型,比如說啥子MemberInfo之類的。
 
為了說明臨界域互斥的問題,我寫了一段代碼,創建了2個不同的臨界域。
其中[MethodImplAttribute(MethodImplOptions.Synchronized)] 編譯后就相當於lock(this)
class Program
    {
        static void Main(string[] args)
        {
            var syncTest = new SyncTest();
            Thread t1 = new Thread(syncTest.LongSyncMethod); // critical region 1
            t1.Start();

            Thread t2 = new Thread(syncTest.NoSyncMethod);
            t2.Start();

            Thread t3 = new Thread(syncTest.LongSyncMethod);// critical region 1 
            t3.Start();

            Thread t4 = new Thread(syncTest.NoSyncMethod);
            t4.Start();

            Thread t5 = new Thread(syncTest.NoSyncMethod);
            t5.Start();

            Thread t6 = new Thread(syncTest.SyncMethodUsingPrivateObject);// critical region 2
            t6.Start();

            Thread t7 = new Thread(syncTest.SyncMethodUsingPrivateObject);// critical region 2
            t7.Start();
        }
    }




    class SyncTest
    {
        private object _lock = new object();

        [MethodImplAttribute(MethodImplOptions.Synchronized)]
        public void LongSyncMethod()
        {
            Console.WriteLine("being asleep");
            Thread.Sleep(10000);
        }


        public void NoSyncMethod()
        {
            Console.WriteLine("do sth");

        }
        
        public void SyncMethodUsingPrivateObject()
        {
            lock (_lock)
            {
                Console.WriteLine("another critical section");
                Thread.Sleep(5000);
            }

        }
    }

 

 很多對概念不清楚的人都以為lock(this)后會把整個對象都鎖住,什么方法都用不了。好一點的會認為同步方法用不了。懂得原因以后,就會明白lock(this)並沒有什么特別的,只是通過this對象創建了一個臨界域,我們同樣可以lock其他對象創建不同的臨界域,不同的臨界域並不互斥
 
 
用Monitor來實現阻塞列隊:

Monitor也是一種結合了自旋和內核對象的混合構造鎖。我們通常會用Lock關鍵字去使用它,lock關鍵字保證了我們能按照正確的模式去使用Monitor類。
1.通過臨時變量保證了進入和釋放的都是同一個對象,就算你在Lock里面修改了所對象也一樣。
2.保證鎖只要獲取了就能釋放。
 
下面是.NET4以后Lock語法糖編譯后的等價代碼
bool acquired = false;
object tmp = listLock;
try
{
   Monitor.Enter(tmp, ref acquired);
   list.Add("item");
} 
finally {
   if (acquired)
   {
        Monitor.Release(tmp);
    } 
}

 

 
 
在《多線程之旅之三》中我們用兩個內核對象實現了 有界阻塞列隊,主要的開銷就在於每次入隊的時候兩個內核對象之間發生的切換,下面我們嘗試用混合鎖Monitor來實現相應的數據結構。享受混合鎖給我們帶來的好處。
    public class BlockingQueue<T>
    {
        private Queue<T> m_queue = new Queue<T>();
        private int m_waitingConsumers = 0;
        public int Count
        {
            get
            {
                lock (m_queue)
                    return m_queue.Count;
            }
        }
        public void Clear()
        {
            lock (m_queue)
                m_queue.Clear();
        }

        public bool Contains(T item)
        {
            lock (m_queue)
                return m_queue.Contains(item);
        }
        public void Enqueue(T item)
        {
            lock (m_queue)
            {
                m_queue.Enqueue(item);
                // Wake   consumers  waiting  for  a  new  element. 
                if (m_waitingConsumers > 0)

                    Monitor.Pulse(m_queue);
            }
        }

        public T Dequeue()
        {
            lock (m_queue)
            {
                while (m_queue.Count == 0)
                {
                    //Queue  is  empty,  wait  until  en  element  arrives. 644  Chapter 12:  Parallel Containers 
                    m_waitingConsumers++;
                    try
                    {
                        Monitor.Wait(m_queue);
                    }
                    finally
                    {
                        m_waitingConsumers--;
                    }
                }
                return m_queue.Dequeue();

            }
        }

        public T Peek()
        {
            lock (m_queue)
                return m_queue.Peek();
        }
    }

 

 

1.多線程之旅——從概念開始

2.多線程之旅二——線程

3.多線程之旅之三——Windows內核對象同步機制

4.多線程之旅之四——淺談內存模型和用戶態同步機制



最后,如果你覺得文章還不錯,請點擊右下角的推薦,謝謝!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM