異步編程:輕量級線程同步基元對象


 

clip_image001

         從此圖中我們會發現 .NET C# 的每個版本發布都是有一個“主題”。即:C#1.0托管代碼→C#2.0泛型→C#3.0LINQC#4.0動態語言→C#5.0異步編程。現在我為最新版本的“異步編程”主題寫系列分享,期待你的查看及點評。

 

傳送門:異步編程系列目錄……

 

開始《異步編程:同步基元對象(下)》

示例:異步編程:輕量級線程同步基元對象.rar

         《異步編程:線程同步基元對象》中我介紹了.NET4.0之前為我們提供的各種同步基元(包括InterlockedMonitor\lockEventWaitHandleMutexSemaphore等),隨着.NET框架的進化,.NET4.0|.NET4.5又為我們帶來了更多優化的同步基元選擇。這當然不是告訴我們完全放棄.NET4.0之前所提供的同步基元,只是需要我們“因地制宜”。那我們如何判斷適合使用哪種同步基元結構呢,就需要我們對各種同步基元有個本質的理解和清楚.NET所做的優化本質是什么。

 

基元用戶模式構造、基元內核模式構造、混合構造

基元線程同步構造分為:基元用戶模式構造和基元內核模式構造。

1.         基元用戶模式構造

應盡量使用基元用戶模式構造,因為它們使用特殊的CPU指令來協調線程,這種協調發生硬件中,速度很快。但也因此Windows操作系統永遠檢測不到一個線程在一個用戶模式構造上阻塞了,這種檢測不到有利有弊:

1)         利:因為用戶模式構造上阻塞的一個線程,池線程永遠不認為已經阻塞,所以不會出現“線程池根據CPU使用情況誤判創建更多的線程以便執行其他任務,然而新創建的線程也可能因請求的共享資源而被阻塞,惡性循環,徒增線程上下文切換的次數”的問題。

2)         弊:當你想要取得一個資源但又短時間取不到時,一個線程會一直在用戶模式中運行,造成CPU資源的浪費,此時我們更希望像內核模式那樣停止一個線程的運行讓出CPU

《異步編程:線程同步基元對象》中包含的用戶模式構造有:volatile關鍵字、Interlocked靜態類、ThreadVolatileWrite()VolatileRead()方法。

2.         基元內核模式構造

Windows操作系統自身提供的。它們要求我們調用在操作系統內核中實現的函數,調用線程將從托管代碼轉換為本地用戶模式代碼,再轉換為本地內核模式代碼,然后還要朝相反的方向一路返回,會浪費大量CPU時間,同時還伴隨着線程上下文切換,因此盡量不要讓線程從用戶模式轉到內核模式

內核模式的構造具有基元用戶模式構造所不具有的一些優點:

1)         一個內核模式的構造檢測到在一個資源上的競爭時,Windows會阻塞輸掉的線程,使它不占着一個CPU“自旋”,無謂地浪費處理器資源。

2)         內核模式的構造可實現本地和托管線程相互之間的同步。

3)         內核模式的構造可同步在一台機器的不同進程中運行的線程。

4)         內核模式的構造可應用安全性設置,防止未經授權的帳戶訪問它們。

5)         一個線程可一直阻塞,直到一個集合中的所有內核模式的構造都可用,或者直到一個集合中的任何一個內核模式的構造可用。

6)         在內核模式的構造上阻塞的一個線程可以指定一個超時值;如果在指定的時間內訪問不到希望的資源,線程可以解除阻塞並執行其他任務。

《異步編程:線程同步基元對象》中包含的內核模式構造有:EventWaitHandle(以及AutoResetEventManualResetEvent)MutexSemaphore。(另外:ReaderWriterLock

3.         混合構造

對於在一個構造上等待的線程,如果擁有這個構造的線程一直不釋放它,則會出現:

1)         如果是用戶模式構造,則線程將一直占用CPU,我們稱之為“活鎖”。

2)         如果是內核模式構造,則線程將一直被阻塞,我們稱之為“死鎖”。

然后兩者之間,死鎖總是優於活鎖,因為活鎖既浪費CPU時間,又浪費內存。而死鎖只浪費內存。

混合構造正是為了解決這種場景。其通過合並用戶模式和內核模式實現:在沒有線程競爭的時候,混合構造提供了基元用戶模式構造所具有的性能優勢。多個線程同時競爭一個構造的時候,混合構造則使用基元內核模式的構造來提供不“自旋”的優勢。由於在大多數應用程序中,線程都很少同時競爭一個構造,所以在性能上的增強可以使你的應用程序表現得更出色。

混合結構優化的本質:兩階段等待操作

線程上下文切換需要花費幾千個周期(每當線程等待內核事件WaitHandle時都會發生)。我們暫且稱其為C。假如線程所等待的時間小於2C(1C用於等待自身,1C用於喚醒),則自旋等待可以降低等待所造成的系統開銷和滯后時間,從而提升算法的整體吞吐量和可伸縮性。

在多核計算機上,當預計資源不會保留很長一段時間時,如果讓等待線程以用戶模式旋轉數十或數百個周期,然后重新嘗試獲取資源,則效率會更高。如果在旋轉后資源變為可用的,則可以節省數千個周期。如果資源仍然不可用,則只花費了少量周期,並且仍然可以進行基於內核的等待。這一旋轉-等待的組合稱為“兩階段等待操作”。

         《異步編程:線程同步基元對象》中包含的有:Monitor\lock

本節將給大家介紹.NET4.0中加入的混合結構:ManualResetEventSlimSemaphoreSlimCountdownEventBarrierReaderWriterLockSlim

 

        另外:看到園友寫了篇《理解Windows內核模式與用戶模式》,講的是內核架構及用戶模式調用內核模式方式。

 

在介紹.NET4.0新同步結構前,我們需要:

1)         認識兩個協作對象:CancellationTokenSourcecancellationToken。因為它們常常被用於混合結構中。Eg:使一個線程強迫解除其構造上的等待阻塞。

2)         認識兩個自旋結構:SpinWaitSpinLock

 

協作式取消

         對於長時間運行的計算限制操作來說,支持取消是一件很“棒”的事情。.NET 4.0提供了一個標准的取消操作模式。即通過使用CancellationTokenSource創建一個或多個取消標記CancellationToken(cancellationToken可在線程池中線程或 Task 對象之間實現協作取消),然后將此取消標記傳遞給應接收取消通知的任意數量的線程或Task對象。當調用CancellationToken關聯的CancellationTokenSource對象的 Cancle()時,每個取消標記(CancellationToken)的IsCancellationRequested屬性將返回true。異步操作中可以通過檢查此屬性做出任何適當響應。

1.         CancellationTokenSource相關API

    // 通知System.Threading.CancellationToken,告知其應被取消。
    public class CancellationTokenSource : IDisposable
    {
        // 構造一個CancellationTokenSource將在指定的時間跨度后取消。
        public CancellationTokenSource(int millisecondsDelay);

        // 獲取是否已請求取消此CancellationTokenSource。
        public bool IsCancellationRequested { get; }
        
        // 獲取與此CancellationTokenSource關聯的CancellationToken。
        public CancellationToken Token { get; }

        // 傳達取消請求。參數throwOnFirstException:指定異常是否應立即傳播。
        public void Cancel();
        public void Cancel(bool throwOnFirstException);
        // 在此CancellationTokenSource上等待指定時間后“取消”操作。
        public void CancelAfter(int millisecondsDelay);

        // 創建一組CancellationToken關聯的CancellationTokenSource。
        public static CancellationTokenSource CreateLinkedTokenSource(paramsCancellationToken[] tokens);

        // 釋放由CancellationTokenSource類的當前實例占用的所有資源。
        public void Dispose();
        ……
    }

分析:

1)         CancellationTokenSource.CreateLinkedTokenSource()方法

將一組CancellationToken連接起來並創建一個新的CancellationTokenSource。任何一個CancellationToken對應的舊CancellationTokenSource被取消,這個新的CancellationTokenSource對象也會被取消。

原理:創建一個新的CancellationTokenSource實例,並將該實例的Cancel()委托分別傳遞給這組CancellationToken實例的Register()方法,然后返回新創建的CancellationTokenSource實例。

2)         CancellationTokenSource實例Cancel()方法做了什么:

a)         CancellationTokenSource實例的IsCancellationRequested屬性設置為trueCancellationToken實例的IsCancellationRequested屬性是調用CancellationTokenSource實例的IsCancellationRequested屬性。

b)         調用CancellationTokenSource實例的CreateLinkedTokenSource()注冊的Cancel()委托回調;

c)         調用CancellationToken實例的Register()注冊的回調;

d)         處理回調異常。(參數throwOnFirstException

                                       i.              若為Cancel()傳遞true參數,那么拋出了未處理異常的第一個回調方法會阻止其他回調方法的執行,異常會立即從Cancel()中拋出;

                                      ii.              若為Cancel()傳遞false(默認為false),那么登記的所有回調方法都會調用。所有未處理的異常都會封裝到一個AggregateException對象中待回調都執行完后返回,其InnerExceptions屬性包含了所有異常的詳細信息。

        e)    給CancellationToken對象的ManualResetEvent對象Set()信號。

2.         CancellationToken相關API

    // 傳播有關應取消操作的通知。
    public struct CancellationToken
    {
        public CancellationToken(bool canceled);
        public static CancellationToken None { get; }

        // 獲取此標記是否能處於已取消狀態。
        public bool CanBeCanceled { get; }
        // 獲取是否已請求取消此標記。
        public bool IsCancellationRequested { get; }

        // 獲取內部ManualResetEvent,在CancellationTokenSource執行Cancel()時收到set()通知。
        public WaitHandle WaitHandle{ get; }

        // 注冊一個將在取消此CancellationToken時調用的委托。
        // 參數:useSynchronizationContext:
        //一個布爾值,該值指示是否捕獲當前SynchronizationContext並在調用 callback 時使用它。
        public CancellationTokenRegistration Register(Action<object> callback, object state
                                , bool useSynchronizationContext);

        // 如果已請求取消此標記,則引發OperationCanceledException。
        public void ThrowIfCancellationRequested();
        ……
    }

分析:

1)         CancellationToken是結構struct,值類型。

2)         CancellationTokenSourceCancellationToken關聯是“一一對應”

a)         無論CancellationTokenSource是通過構造函數創建還是CreateLinkedTokenSource()方法創建,與之對應的CancellationToken只有一個。

b)         每個CancellationToken都會包含一個私有字段,保存唯一與之對應的CancellationTokenSource引用。

3)         CancellationToken實例的None屬性與參數不是trueCancellationToken構造函數

它們返回一個特殊的CancellationToken實例,該實例不與任何CancellationTokenSource實例關聯(即不可能調用Cancel()),其CanBeCanceled實例屬性為false

4)         CancellationTokenRegister()方法返回的CancellationTokenRegistration對象,可調用其Dispose()方法刪除一個Register()登記的回調方法。

5)         CancellationToken實例的WaitHandle屬性

會先判斷若沒有對應的CancellationTokenSource,則創建一個默認的CancellationTokenSource對象。然后再判斷若沒有內部事件等待句柄則new ManualResetEvent(false),在CancellationTokenSource執行Cancel()時收到set()通知。;

6)         CancellationToken實例的ThrowIfCancellationRequested()方法如下:

public void ThrowIfCancellationRequested()
{
    if (this.IsCancellationRequested)
    {
        throw new OperationCanceledException(
        Environment.GetResourceString("OperationCanceled"), this);
    }
}

3.         示例

示例:一個線程池線程協作取消的例子:

    public static void ThreadPool_Cancel_test()
    {
        CancellationTokenSource cts = new CancellationTokenSource();

        ThreadPool.QueueUserWorkItem(
            token =>
            {
                CancellationToken curCancelToken = (CancellationToken)token;
                while (true)
                {
                    // 耗時操作
                    Thread.Sleep(400);
                    if (curCancelToken.IsCancellationRequested)
                    {
                        break;   // 或者拋異常curCancelToken.ThrowIfCancellationRequested();
                    }
                }
                Console.WriteLine(String.Format("線程{0}上,CancellationTokenSource操作已取消,退出循環"
                    , Thread.CurrentThread.ManagedThreadId));
            }
            , cts.Token
         );

        ThreadPool.QueueUserWorkItem(
            token =>
            {
                Console.WriteLine(String.Format("線程{0}上,調用CancellationToken實例的WaitHandle.WaitOne() "
                     , Thread.CurrentThread.ManagedThreadId));
                CancellationToken curCancelToken = (CancellationToken)token;
                curCancelToken.WaitHandle.WaitOne();
                Console.WriteLine(String.Format("線程{0}上,CancellationTokenSource操作已取消,WaitHandle獲得信號"
                     , Thread.CurrentThread.ManagedThreadId));
            }
            , cts.Token
         );

        Thread.Sleep(2000);
        Console.WriteLine("執行CancellationTokenSource實例的Cancel()");
        cts.Cancel();            
    }

結果:

clip_image003

 

SpinWait結構----自旋等待

一個輕量同步類型(結構體),提供對基於自旋的等待的支持。SpinWait只有在多核處理器下才具有使用意義。在單處理器下,自旋轉會占據CPU時間,卻做不了任何事。

SpinWait並沒有設計為讓多個任務或線程並發使用。因此,如果多個任務或者線程通過SpinWait的方法進行自旋,那么每一個任務或線程都應該使用自己的SpinWait實例。

    public struct SpinWait
    {
        // 獲取已對此實例調用SpinWait.SpinOnce() 的次數。
        public int Count { get; }
        // 判斷對SpinWait.SpinOnce() 的下一次調用是否觸發上下文切換和內核轉換。
        public bool NextSpinWillYield { get; }

        // 重置自旋計數器。
        public void Reset();
        // 執行單一自旋。
        public void SpinOnce();
        // 在指定條件得到滿足(Func<bool>委托返回true)之前自旋。
        public static void SpinUntil(Func<bool> condition);
        // 在指定條件得到滿足或指定超時過期之前自旋。參數condition為在返回 true 之前重復執行的委托。
        // 返回結果:
        // 如果條件在超時時間內得到滿足,則為 true;否則為 false
        public static bool SpinUntil(Func<bool> condition, int millisecondsTimeout);
        public static bool SpinUntil(Func<bool> condition, TimeSpan timeout);
    }

分析SpinWait關鍵實現代碼:

        public bool NextSpinWillYield
        {
            get
            {
                if (this.m_count<= 10)  // 自旋轉計數
                {
                    return Environment.ProcessorCount == 1;
                }
                return true;
            }
        }
        public void SpinOnce()
        {
            if (this.NextSpinWillYield)
            {
                Int num = (this.m_count>= 10) ? (this.m_count - 10) :this.m_count;
                if ((num % 20) == 0x13)
                {
                    Thread.Sleep(1);
                }
                else if ((num % 5) == 4)
                {
                    Thread.Sleep(0);
                }
                else
                {
                    Thread.Yield();
                }
            }
            else
            {
                Thread.SpinWait(((int) 4) <<this.m_count);
            }
            this.m_count = (this.m_count == 0x7fffffff) ?10 : (this.m_count + 1);
        }

從代碼中我們可知:

1)         SpinWait自旋轉是調用Thread.SpinWait()

2)         NextSpinWillYield屬性代碼可知,若SpinWait運行在單核計算機上,它總是進行上下文切換(讓出處理器)。

3)         SpinWait不僅僅是一個空循環。它經過了精心實現,可以針對一般情況提供正確的旋轉行為以避免內核事件所需的高開銷的上下文切換和內核轉換;在旋轉時間足夠長的情況下自行啟動上下文切換,SpinWait甚至還會在多核計算機上產生線程的時間片(Thread.Yield())以防止等待線程阻塞高優先級的線程或垃圾回收器線程。

4)         SpinOnce()自旋一定次數后可能導致頻繁上下文切換。注意只有等待時間非常短時,SpinOnce()或SpinUntil()提供的智能行為才會獲得更好的效率,否則您應該SpinWait自行啟動上下文切換之前調用自己的內核等待。

通常使用SpinWait來封裝自己“兩階段等待操作”,避免內核事件所需的高開銷的上下文切換和內核轉換。

實現自己的“兩階段等待操作”:

if (!spinner.NextSpinWillYield)
{spinner.SpinOnce();}
else
{自己的事件等待句柄;}

 

SpinLock結構----自旋鎖

一個輕量同步類型,提供一個相互排斥鎖基元,在該基元中,嘗試獲取鎖的線程將在重復檢查的循環中等待,直至該鎖變為可用為止。SpinLock是結構體,如果您希望兩個副本都引用同一個鎖,則必須通過引用顯式傳遞該鎖。

    public struct SpinLock
    {
        // 初始化SpinLock結構的新實例,參數標識是否啟動線程所有權跟蹤以助於調試。
        public SpinLock(bool enableThreadOwnerTracking);

        // 獲取鎖當前是否已由任何線程占用。
        public bool IsHeld { get; }
        // 獲取是否已為此實例啟用了線程所有權跟蹤。
        public bool IsThreadOwnerTrackingEnabled { get; }
        // 若IsThreadOwnerTrackingEnabled=true,則可獲取鎖是否已由當前線程占用。
        public bool IsHeldByCurrentThread { get; }

        // 采用可靠的方式獲取鎖,這樣,即使在方法調用中發生異常的情況下,都能采用可靠的方式檢查lockTaken以確定是否已獲取鎖。
        public void Enter(ref boollockTaken);
        public void TryEnter(ref boollockTaken);
        public void TryEnter(int millisecondsTimeout, ref bool lockTaken);
        public void TryEnter(TimeSpan timeout, ref bool lockTaken);
        // Enter(ref boollockTaken)與TryEnter(ref bool lockTaken)效果一樣,TryEnter(ref boollockTaken)會跳轉更多方法降低的性能。

        // 釋放鎖。參數useMemoryBarrier:指示是否應發出內存屏障,以便將退出操作立即發布到其他線程(默認為true)。
        public void Exit();
        public void Exit(bool useMemoryBarrier);
}

使用需注意:

1)         SpinLock支持線程跟蹤模式,可以在開發階段使用此模式來幫助跟蹤在特定時間持有鎖的線程。雖然線程跟蹤模式對於調試很有用,但此模式可能會導致性能降低。(構造函數:可接受一個bool值以指示是否啟用調試模式,跟蹤線程所有權)

2)         SpinLock不可重入。在線程進入鎖之后,它必須先正確地退出鎖,然后才能再次進入鎖。通常,任何重新進入鎖的嘗試都會導致死鎖。

如果在調用 Exit 前沒有調用 EnterSpinLock的內部狀態可能被破壞。

3)         EnterTryEnter的選擇

a)         Enter(ref boollockTaken)          在獲取不到鎖時會阻止等待鎖可用,自旋等待,相當於等待時間傳入-1(即無限期等待)。

b)         TryEnter(ref boollockTaken)              在獲取不到鎖時立即返回而不行進任何自旋等待,相當於等待時間傳入0

c)         TryEnter(時間參數, ref boollockTaken)           在獲取不到鎖時,會在指定時間內自旋等待。

d)  在指定時間內,若自旋等待足夠長時間,內部會自動切換上下文進行內核等待,切換邏輯類似SpinWait結構(即,並沒有使用等待事件,只是使用Thread.Sleep(0)Thread.Sleep(1)以及Thread.Yield(),所以也可能導致頻繁上下文切換。

4)         在多核計算機上,當等待時間預計較短且極少出現爭用情況時,SpinLock的性能將高於其他類型的鎖(長時或預期有大量阻塞,由於旋轉過多,性能會下降)。但需注意的一點是,SpinLock比標准鎖更耗費資源。建議您僅在通過分析確定 Monitor方法或 Interlocked 方法顯著降低了程序的性能時使用SpinLock

5)         在保持一個自旋鎖時,應避免任何這些操作:

a)         阻塞,

b)         調用本身可能阻塞的任何內容,

c)         一個SpinLock結構上保持過多自旋鎖,

d)         進行動態調度的調用(接口和虛方法)

e)         非托管代碼的調度,或分配內存。

6)         不要將SpinLock聲明為只讀字段,因為如果這樣做的話,會導致每次調用這個字段都返回SpinLock的一個新副本,而不是同一個SpinLock。這樣所有對Enter()的調用都能成功獲得鎖,因此受保護的臨界區不會按照預期進行串行化。

 

驚奇的Monitor\lock

         說到Monitor(監視器)相信大家早已銘記於心了,此結構在.NET早期版本就已經存在。但是大家可能對他是“混合構造”這一說法感到驚奇,分析下它的幾個步驟:

1)         執行Monitor.Enter()/lock的線程會首先測試Monitor的鎖定位。如果該位為OFF(解鎖),那么線程就會在該位上設置一下(加鎖),且不需要等待便繼續。這通常只需執行1~2個機器指令。

2)         如果Monitor被鎖定,線程就會進入一個旋轉等待持有鎖。而線程在旋轉期間會反復測試鎖定位。單處理器系統會立即放棄,而在多核處理器系統上則旋轉一段時間才會放棄。在此之前,線程都在用戶模式下運行。

3)         一旦線程放棄測試鎖定位(在單處理器上立即如此),線程使用信號量在內核進入等待狀態。

4)         執行Monitor.Exit()或代碼退出了lock塊。如果存在等待線程,則使用ReleaseSemaphore()通知內核。

在第二步中,提到的旋轉等待。正是:SpinWait

 

ManualResetEventSlim

當等待時間預計非常短時,並且當事件不會跨越進程邊界時,可使用ManualResetEventSlim類以獲得更好的性能(ManualResetEvent的優化版本)。

    public class ManualResetEventSlim : IDisposable
    {
        // 初始化 ManualResetEventSlim 類的新實例。
        //   initialState:(默認為false)
        //     如果為 true,則IsSet屬性設置為true,此時為有信號狀態,不會阻止線程。
        //   spinCount:
        //     設置在回退到基於內核的等待操作之前需發生的自旋等待數量,默認為10。
        public ManualResetEventSlim(bool initialState, int spinCount);

        // 獲取是否設置了事件。Reset()將其設置為false;Set()將其設置為true
        public bool IsSet { get; }
        // 獲取在回退到基於內核的等待操作之前發生需的自旋等待數量,由構造函數設置。
        public int SpinCount { get; }
        // 獲取此ManualResetEventSlim的基礎WaitHandle(ManualResetEvent)
        public WaitHandle WaitHandle { get; }

        // 將事件狀態設置為非終止狀態,從而阻塞線程。
        public void Reset();
        // 將事件狀態設置為終止,從而允許一個或多個等待該事件的線程繼續。
        public void Set();
        // 阻止當前線程,直到Set()了當前ManualResetEventSlim為止。無限期等待。
        public void Wait();
        // 阻止當前線程,直到Set()了當前ManualResetEventSlim為止,並使用 32 位帶符號整數測量時間間隔,
        // 同時觀察.CancellationToken。在指定時間內收到信號,則返回true。
        public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken);

        // 釋放由ManualResetEventSlim類的當前實例占用的所有資源。
        public void Dispose();
        ……
    }

1.         分析

1)         首先要明確的是ManualResetEventSlimManualResetEvent的優化版本,但並不是說其混合構造就是基於自旋+ManualResetEvent完成。ManualResetEventSlim是基於自旋+Monitor完成

2)         可在ManualResetEventSlim的構造函數中指定切換為內核模式之前需發生的自旋等待數量(只讀的SpinCount屬性),默認為10

3)         訪問WaitHandle屬性會延遲創建一個ManualResetEvent(false)對象。在調用ManualResetEventSlimset()方法時通知WaitHandle.WaitOne()獲得信號。

2.         示例

        public static void Test()
        {
            ManualResetEventSlim manualSlim = new ManualResetEventSlim(false);
            Console.WriteLine("ManualResetEventSlim示例開始");
            Thread thread1 = new Thread(o =>
            { 
                Thread.Sleep(500);
                Console.WriteLine("調用ManualResetEventSlim的Set()");
                manualSlim.Set(); 
            });
            thread1.Start();
            Console.WriteLine("調用ManualResetEventSlim的Wait()");
            manualSlim.Wait();
            Console.WriteLine("調用ManualResetEventSlim的Reset()");
            manualSlim.Reset();  // 重置為非終止狀態,以便下一次Wait()等待
            CancellationTokenSource cts = new CancellationTokenSource();
            Thread thread2 = new Thread(obj =>
            {
                Thread.Sleep(500);
                CancellationTokenSource curCTS = obj as CancellationTokenSource;
                Console.WriteLine("調用CancellationTokenSource的Cancel()");
                curCTS.Cancel();
            });
            thread2.Start(cts);
            try
            {
                Console.WriteLine("調用ManualResetEventSlim的Wait()");
                manualSlim.Wait(cts.Token);
                Console.WriteLine("調用CancellationTokenSource后的輸出");
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("異常:OperationCanceledException");
            }
        }

         結果:

         clip_image004

 

SemaphoreSlim

         SemaphoreSlimSemaphore的優化版本。限制可同時訪問某一資源或資源池的線程數。

         SemaphoreSlim利用SpinWait結構+Monitor可重入的特性+引用計數實現,並且提供的異步API:返回TaskWaitAsync();重載方法。

         注意CurrentCount屬性的使用,此屬性能夠獲取進入信號量的任務或線程的數目。因為這個值總是在變化,所以當信號量在執行並發的ReleaseWait方法時,某一時刻CurrentCount等於某個值並不能說明任務或線程執行下一條指令的時候也一樣。因此,一定要通過Wait方法和Release方法進入和退出由信號量所保護的資源。

使用很簡單,請參考ManualResetEventSlim小節示例。

 

CountdownEvent

這個構造阻塞一個線程,直到它的內部計數器變成0(與信號量相反,信號量是在計數位0時阻塞線程)。CountdownEvent是對ManualResetEventSlim的一個封裝

CountdownEvent簡化了fork/join模式。盡管基於新的任務編程模型通過Task實例、延續和Parallel.Invoke可以更方便的表達fork-join並行。然而,CountdownEvent對於任務而言依然有用。使用Task.WaitAll()TaskFactory.ContinueWhenAll()方法要求有一組等待的Task實例構成的數組。CountdownEvent不要求對象的引用,而且可以用於最終隨着時間變化的動態數目的任務。

使用方式:

1)         CurrentCount屬性標識剩余信號數(和InitialCount屬性一起由構造函數初始化);

2)         Wait()阻止當前線程,直到CurrentCount計數為0(即所有的參與者都完成了);

3)         Signal()CountdownEvent注冊一個或指定數量信號,通知任務完成並且將CurrentCount的值減少一或指定數量。注意不能將事件的計數遞減為小於零

4)         允許使用AddCount()\TryAddCount()增加CurrentCount一個或指定數量信號(且只能增加)。一旦一個CountdownEventCurrentCount變成0,就不允許再更改了。

5)         Reset()CurrentCount重新設置為初始值或指定值,並且允許大於InitialCount屬性,此方法為非線程安全方法。

示例:

        public static void Test()
        {
            Console.WriteLine("初始化CountdownEvent計數為1000");
            CountdownEvent cde = new CountdownEvent(1000);            
            // CurrentCount(當前值)1200允許大於InitialCount(初始值)1000
            cde.AddCount(200);
            Console.WriteLine("增加CountdownEvent計數200至1200");
            Thread thread = new Thread(o =>
                {
                    int i = 1200;                   
                    for (int j = 1; j <= i; j++)
                    {
                        if(j==i)
                            Console.WriteLine("CurrentCount為1200,所以必須調用Signal()1200次");
                        cde.Signal();                       
                    }                   
                }
            );
            thread.Start();
            Console.WriteLine("調用CountdownEvent的Wait()方法");
            cde.Wait();
            Console.WriteLine("CountdownEvent計數為0,完成等待");
        }

結果:

         clip_image005

 

ReaderWriterLockSlim(多讀少寫鎖)

         為了保證線程同步構造介紹的完整性,我這邊提下這個對象,因為此對象相對復雜且自身沒有接觸類似對象,所以不展開講,后續單獨開貼分享。

         ReaderWriterLockSlim.NET3.5引入了,是對.NET1.0中的ReaderWriterLock構造的改進,ReaderWriterLockSlim的性能明顯優於ReaderWriterLock,建議在所有新的開發工作中使用ReaderWriterLockSlim它們目的都是用於多讀少寫的場景,都是線程關聯對象。

ReaderWriterLockSlim是通過封裝“自旋+AutoResetEvent+ManualResetEvent實現。

 

Barrier(關卡)

         Barrier適用於並行操作是分階段執行的,並且每一階段要求各任務之間進行同步。使用Barrier可以在並行操作中的所有任務都達到相應的關卡之前,阻止各個任務繼續執行。

         情景:當你需要一組任務並行地運行一連串的階段,但是每一個階段都要等待所有其他任務都完成前一階段之后才能開始。

         Barrier構造由SpinWait結構+ManualResetEventSlim實現

    public class Barrier : IDisposable
    {
        // 指定參與線程數與后期階段操作的委托來初始化 Barrier 類的新實例。 
        public Barrier(int participantCount, Action<Barrier> postPhaseAction);

        // 獲取屏障的當前階段的編號。
        public long CurrentPhaseNumber { get; internal set; }
        // 獲取屏障中參與者的總數。
        public int ParticipantCount { get; }
        // 獲取屏障中尚未在當前階段發出信號的參與者的數量。
        public int ParticipantsRemaining { get; }

        // 增加一個或指定數量參與者。返回新參與者開始參與關卡的階段編號。
        public long AddParticipant();
        public long AddParticipants(int participantCount);
        //  減少一個或指定數量參與者。
        public void RemoveParticipant();
        public void RemoveParticipants(int participantCount);

        // 發出參與者已達到關卡的信號,並等待所有其他參與者也達到關卡,
        // 使用 System.TimeSpan 對象測量時間間隔,同時觀察取消標記。
        // 返回結果:如果所有其他參與者已達到屏障,則為 true;否則為 false。
        public bool SignalAndWait(TimeSpan timeout, CancellationToken cancellationToken);

        // 釋放由 Barrier 類的當前實例占用的所有資源。
        public void Dispose();
        ……
    }

         使用方式:

1)         構造一個Barrier時,要告訴它有多少線程准備參與工作(0<=x<=32767),還可以傳遞一個Action<Barrier>委托來引用所有參與者完成一個簡短的工作后要執行的后期階段操作(此委托內部會傳入當前Barrier實例,如果后期階段委托引發異常,則在 BarrierPostPhaseException 對象中包裝它,然后將其傳播到所有參與者,需要用try-catch塊包裹SignalAndWait()方法)。

2)         可以調用AddParticipantRemoveParticipant方法在Barrier中動態添加和刪除參與線程。如果關卡當前正在執行后期階段(Action<Barrier>委托)操作,此調用將被阻止,直到后期階段操作完成且該關卡已轉至下一階段。

3)         每個線程完成它的階段性工作后,應調用SignalAndWait(),告訴Barrier線程已經完成一個階段的工作,並阻塞當前線程。待所有參與者都調用了SignalAndWait()后,由最后一個調用SignalAndWait()的線程調用Barrier構造函數指定的Action<Barrier>委托,然后解除正在等待的所有線程的阻塞,使它們開始下一個階段。

如果有一個參與者未能到達關卡,則會發生死鎖。若要避免這些死鎖,可使用SignalAndWait方法的重載來指定超時期限和取消標記。(SignalAndWait() 內部由SpinWait結構實現)

4)         每當Barrier完成一個階段時ParticipantsRemaining屬性(獲取屏障中尚未在當前階段發出信號的參與者的數量)會重置,在Barrier調用Action<Barrier>委托之前就已被重置。

5)         當執行階段后操作的委托時,屏障的CurrentPhaseNumber屬性的值會等於已經完成的階段的數值,而不是新的階段數。

示例:

        private static int m_count = 3;
        private static int m_curCount = 0;
        private static Barrier pauseBarr = new Barrier(2);
        public static void Test()
        {
            Thread.VolatileWrite(ref m_curCount, 0);
            Barrier barr = new Barrier(m_count, new Action<Barrier>(Write_PhaseNumber));
            Console.WriteLine("Barrier開始第一階段");
            AsyncSignalAndWait(barr, m_count);

            // 暫停等待 barr 第一階段執行完畢
            pauseBarr.SignalAndWait();

            Console.WriteLine("Barrier開始第二階段");
            Thread.VolatileWrite(ref m_curCount, 0);
            AsyncSignalAndWait(barr, m_count);

            // 暫停等待 barr 第二階段執行完畢
            pauseBarr.SignalAndWait();
            pauseBarr.Dispose();
            barr.Dispose();
            Console.WriteLine("Barrier兩個階段執行完畢");
        }
        // 執行 SignalAndWait 方法
        private static void AsyncSignalAndWait(Barrier barr, int count)
        {
            for (int i = 1; i <= count; i++)
            {
                ThreadPool.QueueUserWorkItem(o =>
                    {
                        Thread.Sleep(200);
                        Interlocked.Increment(ref m_curCount);
                        barr.SignalAndWait();
                    }
                );
            }
        }
        // 輸出當前Barrier的當前階段
        private static void Write_PhaseNumber(Barrier b)
        {
            Console.WriteLine(String.Format("Barrier調用完{0}次SignalAndWait()", m_curCount));
            Console.WriteLine("階段編號為:" + b.CurrentPhaseNumber);
            Console.WriteLine("ParticipantsRemaining屬性值為:" + b.ParticipantsRemaining);
            pauseBarr.SignalAndWait();
        }

結果:

image

 

 

Dispose()的好習慣

         使用完資源后釋放是個好習慣。同步基元WaitHandleManualResetEventSlimSemaphoreSlimCountdownEventBarrierReaderWriterLockSlim都實現了IDisposable接口,即我們使用完都應該進行釋放。

1)         WaitHandleDispose()方法是關閉SafeWaitHandle引用的Win32內核對象句柄。

2)         ManualResetEventSlimSemaphoreSlimCountdownEventBarrierReaderWriterLockSlim由於都提供了WaitHanle屬性,以延遲創建內核等待事件,所以調用Dispose實質上是間接的調用WaitHandleDispose()方法。

 

同步構造的最佳實踐

線程同步構造選擇可以遵循下面規則:    

1.         代碼中盡量不要阻塞任何線程。因為創建線程不僅耗費內存資源也影響性能,如果創建出來的線程因阻塞而不做任何事太浪費。

2.         對於簡單操作,盡量使用Thread類的VolatileRead()方法、VolatileWrite()方法和Interlocked靜態類方法。

3.         對於復雜操作:

1)         如果一定要阻塞線程,為了同步不在AppDomain或者進程中運行的線程,請使用內核對象構造。

2)         否則,使用混合構造Monitor鎖定一個靜態私有的引用對象方式(ManualResetEventSlimSemaphoreSlimCountdownEvent構造都是對Monitor進行封裝)。

3)         另外,還可以使用一個reader-writer鎖來代替Monitorreader-writer鎖通常比Monitor慢一些,但它允許多個線程並發的以只讀方式訪問數據,這提升了總體性能,並將阻塞線程的幾率降至最低。

4.         避免不必要地使用可變字段。大多數的時間、鎖或並發集合 (System.Collections.Concurrent.*) 更適合於在線程之間交換數據。在一些情況下,可以使用可變字段來優化並發代碼,但您應該使用性能度量來驗證所得到的利益勝過復雜性的增加。

5.         應該使用 System.Lazy<T> System.Threading.LazyInitializer 類型,而不是使用可變字段自己實現遲緩初始化模式。

6.         避免輪詢循環。通常,您可以使用 BlockingCollection<T>Monitor.Wait/Pulse、事件或異步編程,而不是輪詢循環。

7.         盡可能使用標准 .NET 並發基元,而不是自己實現等效的功能。

8.         在使用任何同步機制的時候,提供超時和取消是一件非常重要的事情。因為代碼中的錯誤或不可預知的情形都可能導致任務或線程永遠等待。

 

 

 

    本博文主要介紹用戶模式\內核模式,如何實現協作式取消,.NET4.0中新同步基元對象:ManualResetSlim\SemaphoreSlim\CountdownEvent\Barrier(關卡)\ReaderWriterLockSlim,自旋等待SpinWait和自旋鎖SpinLock……

目前,我已經為大家介紹了線程、線程池、線程同步知識,接下來我將給大家介紹.NET4.X給我們帶來的新異步編程方式Task。(想先了解Task相關知識的,可以查看關於AsyncAwaitFAQ

本節到此結束,謝謝大家的觀看,贊的還請多推薦。

 

推薦閱讀:

               《理論與實踐中的 C# 內存模型》

 

參考資料:

         CLR via C#(第三版)

         MSDN

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM