C#多線程編程實戰(二):線程同步


2.1 簡介

競爭條件:多個線程同時使用共享對象。需要同步這些線程使得共享對象的操作能夠以正確的順序執行

線程同步問題:多線程的執行並沒有正確的同步,當一個線程執行遞增和遞減操作時,其他線程需要依次等待

線程同步解決方案:

無須共享對象:大部分時候可以通過重新設計來移除共享對象,去掉復雜的同步構造,避免多線程使用單一對象

必須共享對象:只使用原子操作,一個操作只占用一個量子的時間,無須實現其他線程等待當前操作完成

內核模式:將等待的線程置於阻塞狀態,消耗少量的CPU資源,但會引入至少一次上下文切換,適用於線程等待較長時間

用戶模式:只是簡單的等待,線程等待會浪費CPU時間但是可以節省上下文切換消耗的CPU時間,適用於線程等待較短時間

混合模式:先嘗試用戶模式,如果等待時間較長,則會切換到內核模式

2.2 執行基本的原子操作

using System;
using System.Threading;

namespace MulityThreadNote
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Incorrect counter");
            //沒有限定,會遇到競爭條件,得出的結果大部分不是正確的
            var c = new Counter();
            var t1 = new Thread(() => TestCounter(c));
            var t2 = new Thread(() => TestCounter(c));
            var t3 = new Thread(() => TestCounter(c));
            t1.Start();
            t2.Start();
            t3.Start();
            t1.Join();
            t2.Join();
            t3.Join();
            Console.WriteLine($"Total count:{c.Count}");
            Console.WriteLine("--------------------------");
            Console.WriteLine("Correct counter");
            //使用Interlocked類提供的原子操作方法,無需鎖定任何對象可得出正確結果
            var c1 = new CounterNoLock();
            t1 = new Thread(() => TestCounter(c1));
            t2 = new Thread(() => TestCounter(c1));
            t3 = new Thread(() => TestCounter(c1));
            t1.Start();
            t2.Start();
            t3.Start();
            t1.Join();
            t2.Join();
            t3.Join();
            Console.WriteLine($"Total count:{c1.Count}");
            Console.ReadLine();
        }
        static void TestCounter(CounterBase c)
        {
            for (int i = 0; i < 100000; i++)
            {
                c.Increment();
                c.Decrement();
            }
        }
        class Counter : CounterBase
        {
            private int _count;
            public int Count { get { return _count; } }

            public override void Decrement()
            {
                _count--;
            }

            public override void Increment()
            {
                _count++;
            }
        }

        class CounterNoLock : CounterBase
        {
            private int _count;
            public int Count { get { return _count; } }

            public override void Decrement()
            {
                //Interlocked提供了Increment()、Decrement()和Add等基本數學操作的原子方法
                Interlocked.Decrement(ref _count);
            }

            public override void Increment()
            {
                Interlocked.Increment(ref _count);
            }
        }

        abstract class CounterBase
        {
            public abstract void Increment();
            public abstract void Decrement();
        }
    }
}

注釋:Interlocked提供了Increment()、Decrement()和Add等基本數學操作的原子方法,不用鎖也可以得出正確結果

2.3 使用Mutex類

using System;
using System.Threading;

namespace MulityThreadNote
{
    class Program
    {
        static void Main(string[] args)
        {
            const string MutexName = "CSharpThreadingCookbook";
            //Mutex是一種原始的同步方式,只對一個線程授予對共享資源的獨占訪問
            //定義一個指定名稱的互斥量,設置initialOwner標志為false
            using (var m = new Mutex(false, MutexName))
            {
                //如果互斥量已經被創建,獲取互斥量,否則就執行else語句
                if (!m.WaitOne(TimeSpan.FromSeconds(5), false))
                {
                    Console.WriteLine("Second instance is running!");
                }
                else
                {
                    Console.WriteLine("Running!");
                    Console.ReadLine();
                    m.ReleaseMutex();
                }
            }
            //如果再運行同樣的程序,則會在5秒內嘗試獲取互斥量,如果第一個程序按下了任何鍵,第二個程序開始執行。
            //如果等待5秒鍾,第二個程序將無法獲取該互斥量
        }
    }
}

注釋:互斥量是全局操作對象,必須正確關閉,最好用using

2.4 使用SemaphoreSlim類

using System;
using System.Threading;

namespace MulityThreadNote
{
    class Program
    {
        static void Main(string[] args)
         {
            //啟動6個線程,啟動的順序不一樣
            for (int i = 0; i <= 6; i++)
            {
                string threadName = "Thread " + i;
                int secondsToWait = 2 + 2 * i;
                var t = new Thread(() => AccessDatabase(threadName, secondsToWait));
                t.Start();
            }

            Console.ReadLine();
        }
        //SemaphoreSlim的構造函數參數為允許的並發線程數目
        static SemaphoreSlim semaphore = new SemaphoreSlim(4);

        static void AccessDatabase(string name, int seconds)
        {
            Console.WriteLine($"{name} waits to access a database");
            semaphore.Wait();
            Console.WriteLine($"{name} was granted an access to a database");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));
            Console.WriteLine($"{name} is Completed");
            //調用Release方法說明線程已經完成,可以開啟一個新的線程了
            semaphore.Release();
        }
    }
}

注釋:這里使用了混合模式,允許我們在等待時間很短的情況下無需上下文切換。SemaphoreSlim並不使用Windows內核信號量,而且也不支持進程間同步

2.5 使用AutoResetEvent類

using System;
using System.Threading;

namespace MulityThreadNote
{
    class Program
    {
        static void Main(string[] args)
         {
            Thread t = new Thread(() => Process(10));
            t.Start();
            Console.WriteLine("Waiting for another thread to complete work");
            //開啟一個線程后workEvent等待,直到收到Set信號
            workEvent.WaitOne();
            Console.WriteLine("First operation is complete");
            Console.WriteLine("Performing an operation on a main thread");
            Thread.Sleep(TimeSpan.FromSeconds(5));
            mainEvent.Set();
            Console.WriteLine("Now running the second operation on a second thread");
            workEvent.WaitOne();
            Console.WriteLine("Second operation is complete");
            Console.ReadLine();
        }
        //初始狀態為unsignaled,子線程向主線程發信號
        private static AutoResetEvent workEvent = new AutoResetEvent(false);
        //初始狀態為unsignaled,主線程向子線程發信號
        private static AutoResetEvent mainEvent = new AutoResetEvent(false);

        static void Process(int seconds)
        {
            Console.WriteLine("Starting a long running work...");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));
            Console.WriteLine("Work is done!");
            workEvent.Set();//將事件設為終止狀態允許一個或多個線程繼續
            Console.WriteLine("Waiting for a main thread to complete its work");
            
            mainEvent.WaitOne();//阻止當前線程,直到mainEvent收到信號
            Console.WriteLine("Starting second operation...");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));
            Console.WriteLine("Work is done");
            workEvent.Set();
        }
    }
}

注釋:AutoResetEvent采用的是內核模式,所以等待時間不能太長

2.6 使用ManualResetEventSlim類

using System;
using System.Threading;

namespace MulityThreadNote
{
    class Program
    {
        static void Main(string[] args)
         {
            Thread t1 = new Thread(() => TravelThroughGates("Thread 1", 5));
            Thread t2 = new Thread(() => TravelThroughGates("Thread 2", 6));
            Thread t3 = new Thread(() => TravelThroughGates("Thread 3", 12));
            t1.Start();
            t2.Start();
            t3.Start();
            Thread.Sleep(TimeSpan.FromSeconds(6));
            Console.WriteLine("The gates are now open");
            mainEvent.Set();//將時間設置為有信號,從而讓一個或多個等待該事件的線程繼續
            Thread.Sleep(TimeSpan.FromSeconds(2));
            mainEvent.Reset();//將事件設置為非終止,從而導致線程受阻
            Console.WriteLine("The gates have been closed!");
            Thread.Sleep(TimeSpan.FromSeconds(10));
            Console.WriteLine("The gates are now open for the second time");
            mainEvent.Set();
            Thread.Sleep(TimeSpan.FromSeconds(2));
            Console.WriteLine("The gates have been closed!");
            mainEvent.Reset();
            Console.ReadLine();
        }
        static ManualResetEventSlim mainEvent = new ManualResetEventSlim(false);

        static void TravelThroughGates(string threadName, int seconds)
        {
            Console.WriteLine($"{threadName} falls to sleep");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));
            Console.WriteLine($"{threadName} waits for the gates to open!");
            mainEvent.Wait();//阻止當前線程
            Console.WriteLine($"{threadName} enter the gates!");
        }
    }
}

注釋:ManualResetEventSlim工作方式像人群通過的大門,一直保持大門敞開直到調用reset,set相當於打開大門,reset相當於關閉大門

2.7 使用CountDownEvent類

using System;
using System.Threading;

namespace MulityThreadNote
{
    class Program
    {
        static void Main(string[] args)
         {
            Console.WriteLine("Starting two operations");
            Thread t1 = new Thread(() => PerformOperation("Operation 1 is completed", 4));
            Thread t2 = new Thread(() => PerformOperation("Operation 2 is completed", 8));
            t1.Start();
            t2.Start();
            //開啟了兩個線程,調用Wait方法阻止當前線程,知道所有線程都完成
            countdown.Wait();
            Console.WriteLine("Both operations have been completed");
            countdown.Dispose();
            Console.ReadLine();
        }
        //計數器初始化CountdownEvent實例,計數器表示:當計數器個數完成操作發出信號
        static CountdownEvent countdown = new CountdownEvent(2);

        static void PerformOperation(string message, int seconds)
        {
            Thread.Sleep(TimeSpan.FromSeconds(seconds));
            Console.WriteLine(message);
            //向CountdownEvent注冊信息,並減少當前計數器數值
            countdown.Signal();
        }
    }
}

注釋:如果Signal方法沒有達到指定的次數,那么countdown.wait()會一直等待,所以請確保所有線程完成后都要調用Signal方法

2.8 使用Barrier類

using System;
using System.Threading;

namespace MulityThreadNote
{
    class Program
    {
        static void Main(string[] args)
         {
            Thread t1 = new Thread(() => PlayMusic("the gutarist", "play an amazing solo", 5));
            Thread t2 = new Thread(() => PlayMusic("the signer", "sing his song", 2));
            t1.Start();
            t2.Start();
            Console.ReadLine();
        }
        //后面的Lamda表達式是回調函數。執行完SignalAndWait后執行
        static Barrier barrier = new Barrier(2, b=>Console.WriteLine($"End of phase {b.CurrentPhaseNumber + 1}"));

        static void PlayMusic(string name, string message, int seconds)
        {
            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine("===========================");
                Thread.Sleep(TimeSpan.FromSeconds(seconds));
                Console.WriteLine($"{name} starts to {message}");
                Thread.Sleep(TimeSpan.FromSeconds(seconds));
                Console.WriteLine($"{name} finishes to {message}");
                //等所有調用線程都結束
                barrier.SignalAndWait();
            }
        }
    }
}

注釋:

2.9 使用ReaderWriterlockSlim類

using System;
using System.Collections.Generic;
using System.Threading;

namespace MulityThreadNote
{
    class Program
    {
        static void Main(string[] args)
         {
            new Thread(Read) { IsBackground = true }.Start();
            new Thread(Read) { IsBackground = true }.Start();
            new Thread(Read) { IsBackground = true }.Start();
            new Thread(() => Write("Thread 1")) { IsBackground = true }.Start();
            new Thread(() => Write("Thread 2")) { IsBackground = true }.Start();
            Thread.Sleep(TimeSpan.FromSeconds(30));
            Console.ReadLine();
        }
        //實現線程安全
        static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
        static Dictionary<int, int> items = new Dictionary<int, int>();

        static void Read()
        {
            Console.WriteLine("Reading contents of a dictionary");
            while (true)
            {
                try
                {
                    //讀鎖
                    _rw.EnterReadLock();
                    foreach (var key in items.Keys)
                    {
                        Thread.Sleep(TimeSpan.FromSeconds(0.1));
                    }
                }
                finally
                {
                    //計數為0時退出讀取模式
                    _rw.ExitReadLock();
                }
            }
        }

        static void Write(string threadName)
        {
            while (true)
            {
                try
                {
                    int newKey = new Random().Next(250);
                    _rw.EnterUpgradeableReadLock();
                    if (!items.ContainsKey(newKey))
                    {
                        try
                        {
                            //寫鎖
                            _rw.EnterWriteLock();
                            items[newKey] = 1;
                            Console.WriteLine($"New key {newKey} is added to a dictionary by a {threadName}");
                        }
                        finally
                        {
                            //計數為0時退出寫入模式
                            _rw.ExitWriteLock();
                        }
                    }
                    Thread.Sleep(TimeSpan.FromSeconds(0.1));
                }
                finally
                {
                    //計數為0時退出可升級模式
                    _rw.ExitUpgradeableReadLock();
                }
            }
        }
    }
}

注釋:從集合讀取數據時,根據當前數據決定是否獲取一個寫鎖並修改該集合。獲取寫鎖后集合會處於阻塞狀態。

2.10 使用SpinWait類

 

using System;
using System.Collections.Generic;
using System.Threading;

namespace MulityThreadNote
{
    class Program
    {
        static void Main(string[] args)
         {
            Thread t1 = new Thread(UserModeWait);
            Thread t2 = new Thread(HybridSpinWait);
            Console.WriteLine("Running user mode waiting");
            t1.Start();
            Thread.Sleep(20);
            _isComplete = true;
            Thread.Sleep(TimeSpan.FromSeconds(1));
            _isComplete = false;
            Console.WriteLine("Running hybrid SpinWait construct waiting");
            t2.Start();
            Thread.Sleep(5);
            _isComplete = true;
            Console.ReadLine();
        }
        //volatile 一個字段可能會被多個線程同時修改,不會被編譯器和處理器優化為只能被單個線程訪問
        static volatile bool _isComplete = false;

        static void UserModeWait()
        {
            while (!_isComplete)
            {
                Console.WriteLine(".");
            }
            Console.WriteLine();
            Console.WriteLine("Waiting is complete");
        }

        static void HybridSpinWait()
        {
            var w = new SpinWait();
            while (!_isComplete)
            {
                //執行單一自旋
                w.SpinOnce();
                //NextSpinWillYield:獲取對SpinOnce的下一次調用是否將產生處理,同時觸發強制上下文切換
                //顯示線程是否切換為阻塞狀態
                Console.WriteLine(w.NextSpinWillYield);
            }
            Console.WriteLine("Waiting is complete");
        }
    }
}

注釋:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM