2.1 簡介
競爭條件:多個線程同時使用共享對象。需要同步這些線程使得共享對象的操作能夠以正確的順序執行
線程同步問題:多線程的執行並沒有正確的同步,當一個線程執行遞增和遞減操作時,其他線程需要依次等待
線程同步解決方案:
無須共享對象:大部分時候可以通過重新設計來移除共享對象,去掉復雜的同步構造,避免多線程使用單一對象
必須共享對象:只使用原子操作,一個操作只占用一個量子的時間,無須實現其他線程等待當前操作完成
內核模式:將等待的線程置於阻塞狀態,消耗少量的CPU資源,但會引入至少一次上下文切換,適用於線程等待較長時間
用戶模式:只是簡單的等待,線程等待會浪費CPU時間但是可以節省上下文切換消耗的CPU時間,適用於線程等待較短時間
混合模式:先嘗試用戶模式,如果等待時間較長,則會切換到內核模式
2.2 執行基本的原子操作
using System; using System.Threading; namespace MulityThreadNote { class Program { static void Main(string[] args) { Console.WriteLine("Incorrect counter"); //沒有限定,會遇到競爭條件,得出的結果大部分不是正確的 var c = new Counter(); var t1 = new Thread(() => TestCounter(c)); var t2 = new Thread(() => TestCounter(c)); var t3 = new Thread(() => TestCounter(c)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine($"Total count:{c.Count}"); Console.WriteLine("--------------------------"); Console.WriteLine("Correct counter"); //使用Interlocked類提供的原子操作方法,無需鎖定任何對象可得出正確結果 var c1 = new CounterNoLock(); t1 = new Thread(() => TestCounter(c1)); t2 = new Thread(() => TestCounter(c1)); t3 = new Thread(() => TestCounter(c1)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine($"Total count:{c1.Count}"); Console.ReadLine(); } static void TestCounter(CounterBase c) { for (int i = 0; i < 100000; i++) { c.Increment(); c.Decrement(); } } class Counter : CounterBase { private int _count; public int Count { get { return _count; } } public override void Decrement() { _count--; } public override void Increment() { _count++; } } class CounterNoLock : CounterBase { private int _count; public int Count { get { return _count; } } public override void Decrement() { //Interlocked提供了Increment()、Decrement()和Add等基本數學操作的原子方法 Interlocked.Decrement(ref _count); } public override void Increment() { Interlocked.Increment(ref _count); } } abstract class CounterBase { public abstract void Increment(); public abstract void Decrement(); } } }
注釋:Interlocked提供了Increment()、Decrement()和Add等基本數學操作的原子方法,不用鎖也可以得出正確結果
2.3 使用Mutex類
using System; using System.Threading; namespace MulityThreadNote { class Program { static void Main(string[] args) { const string MutexName = "CSharpThreadingCookbook"; //Mutex是一種原始的同步方式,只對一個線程授予對共享資源的獨占訪問 //定義一個指定名稱的互斥量,設置initialOwner標志為false using (var m = new Mutex(false, MutexName)) { //如果互斥量已經被創建,獲取互斥量,否則就執行else語句 if (!m.WaitOne(TimeSpan.FromSeconds(5), false)) { Console.WriteLine("Second instance is running!"); } else { Console.WriteLine("Running!"); Console.ReadLine(); m.ReleaseMutex(); } } //如果再運行同樣的程序,則會在5秒內嘗試獲取互斥量,如果第一個程序按下了任何鍵,第二個程序開始執行。 //如果等待5秒鍾,第二個程序將無法獲取該互斥量 } } }
注釋:互斥量是全局操作對象,必須正確關閉,最好用using
2.4 使用SemaphoreSlim類
using System; using System.Threading; namespace MulityThreadNote { class Program { static void Main(string[] args) { //啟動6個線程,啟動的順序不一樣 for (int i = 0; i <= 6; i++) { string threadName = "Thread " + i; int secondsToWait = 2 + 2 * i; var t = new Thread(() => AccessDatabase(threadName, secondsToWait)); t.Start(); } Console.ReadLine(); } //SemaphoreSlim的構造函數參數為允許的並發線程數目 static SemaphoreSlim semaphore = new SemaphoreSlim(4); static void AccessDatabase(string name, int seconds) { Console.WriteLine($"{name} waits to access a database"); semaphore.Wait(); Console.WriteLine($"{name} was granted an access to a database"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine($"{name} is Completed"); //調用Release方法說明線程已經完成,可以開啟一個新的線程了 semaphore.Release(); } } }
注釋:這里使用了混合模式,允許我們在等待時間很短的情況下無需上下文切換。SemaphoreSlim並不使用Windows內核信號量,而且也不支持進程間同步
2.5 使用AutoResetEvent類
using System; using System.Threading; namespace MulityThreadNote { class Program { static void Main(string[] args) { Thread t = new Thread(() => Process(10)); t.Start(); Console.WriteLine("Waiting for another thread to complete work"); //開啟一個線程后workEvent等待,直到收到Set信號 workEvent.WaitOne(); Console.WriteLine("First operation is complete"); Console.WriteLine("Performing an operation on a main thread"); Thread.Sleep(TimeSpan.FromSeconds(5)); mainEvent.Set(); Console.WriteLine("Now running the second operation on a second thread"); workEvent.WaitOne(); Console.WriteLine("Second operation is complete"); Console.ReadLine(); } //初始狀態為unsignaled,子線程向主線程發信號 private static AutoResetEvent workEvent = new AutoResetEvent(false); //初始狀態為unsignaled,主線程向子線程發信號 private static AutoResetEvent mainEvent = new AutoResetEvent(false); static void Process(int seconds) { Console.WriteLine("Starting a long running work..."); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("Work is done!"); workEvent.Set();//將事件設為終止狀態允許一個或多個線程繼續 Console.WriteLine("Waiting for a main thread to complete its work"); mainEvent.WaitOne();//阻止當前線程,直到mainEvent收到信號 Console.WriteLine("Starting second operation..."); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("Work is done"); workEvent.Set(); } } }
注釋:AutoResetEvent采用的是內核模式,所以等待時間不能太長
2.6 使用ManualResetEventSlim類
using System; using System.Threading; namespace MulityThreadNote { class Program { static void Main(string[] args) { Thread t1 = new Thread(() => TravelThroughGates("Thread 1", 5)); Thread t2 = new Thread(() => TravelThroughGates("Thread 2", 6)); Thread t3 = new Thread(() => TravelThroughGates("Thread 3", 12)); t1.Start(); t2.Start(); t3.Start(); Thread.Sleep(TimeSpan.FromSeconds(6)); Console.WriteLine("The gates are now open"); mainEvent.Set();//將時間設置為有信號,從而讓一個或多個等待該事件的線程繼續 Thread.Sleep(TimeSpan.FromSeconds(2)); mainEvent.Reset();//將事件設置為非終止,從而導致線程受阻 Console.WriteLine("The gates have been closed!"); Thread.Sleep(TimeSpan.FromSeconds(10)); Console.WriteLine("The gates are now open for the second time"); mainEvent.Set(); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("The gates have been closed!"); mainEvent.Reset(); Console.ReadLine(); } static ManualResetEventSlim mainEvent = new ManualResetEventSlim(false); static void TravelThroughGates(string threadName, int seconds) { Console.WriteLine($"{threadName} falls to sleep"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine($"{threadName} waits for the gates to open!"); mainEvent.Wait();//阻止當前線程 Console.WriteLine($"{threadName} enter the gates!"); } } }
注釋:ManualResetEventSlim工作方式像人群通過的大門,一直保持大門敞開直到調用reset,set相當於打開大門,reset相當於關閉大門
2.7 使用CountDownEvent類
using System; using System.Threading; namespace MulityThreadNote { class Program { static void Main(string[] args) { Console.WriteLine("Starting two operations"); Thread t1 = new Thread(() => PerformOperation("Operation 1 is completed", 4)); Thread t2 = new Thread(() => PerformOperation("Operation 2 is completed", 8)); t1.Start(); t2.Start(); //開啟了兩個線程,調用Wait方法阻止當前線程,知道所有線程都完成 countdown.Wait(); Console.WriteLine("Both operations have been completed"); countdown.Dispose(); Console.ReadLine(); } //計數器初始化CountdownEvent實例,計數器表示:當計數器個數完成操作發出信號 static CountdownEvent countdown = new CountdownEvent(2); static void PerformOperation(string message, int seconds) { Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine(message); //向CountdownEvent注冊信息,並減少當前計數器數值 countdown.Signal(); } } }
注釋:如果Signal方法沒有達到指定的次數,那么countdown.wait()會一直等待,所以請確保所有線程完成后都要調用Signal方法
2.8 使用Barrier類
using System; using System.Threading; namespace MulityThreadNote { class Program { static void Main(string[] args) { Thread t1 = new Thread(() => PlayMusic("the gutarist", "play an amazing solo", 5)); Thread t2 = new Thread(() => PlayMusic("the signer", "sing his song", 2)); t1.Start(); t2.Start(); Console.ReadLine(); } //后面的Lamda表達式是回調函數。執行完SignalAndWait后執行 static Barrier barrier = new Barrier(2, b=>Console.WriteLine($"End of phase {b.CurrentPhaseNumber + 1}")); static void PlayMusic(string name, string message, int seconds) { for (int i = 0; i < 3; i++) { Console.WriteLine("==========================="); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine($"{name} starts to {message}"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine($"{name} finishes to {message}"); //等所有調用線程都結束 barrier.SignalAndWait(); } } } }
注釋:
2.9 使用ReaderWriterlockSlim類
using System; using System.Collections.Generic; using System.Threading; namespace MulityThreadNote { class Program { static void Main(string[] args) { new Thread(Read) { IsBackground = true }.Start(); new Thread(Read) { IsBackground = true }.Start(); new Thread(Read) { IsBackground = true }.Start(); new Thread(() => Write("Thread 1")) { IsBackground = true }.Start(); new Thread(() => Write("Thread 2")) { IsBackground = true }.Start(); Thread.Sleep(TimeSpan.FromSeconds(30)); Console.ReadLine(); } //實現線程安全 static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim(); static Dictionary<int, int> items = new Dictionary<int, int>(); static void Read() { Console.WriteLine("Reading contents of a dictionary"); while (true) { try { //讀鎖 _rw.EnterReadLock(); foreach (var key in items.Keys) { Thread.Sleep(TimeSpan.FromSeconds(0.1)); } } finally { //計數為0時退出讀取模式 _rw.ExitReadLock(); } } } static void Write(string threadName) { while (true) { try { int newKey = new Random().Next(250); _rw.EnterUpgradeableReadLock(); if (!items.ContainsKey(newKey)) { try { //寫鎖 _rw.EnterWriteLock(); items[newKey] = 1; Console.WriteLine($"New key {newKey} is added to a dictionary by a {threadName}"); } finally { //計數為0時退出寫入模式 _rw.ExitWriteLock(); } } Thread.Sleep(TimeSpan.FromSeconds(0.1)); } finally { //計數為0時退出可升級模式 _rw.ExitUpgradeableReadLock(); } } } } }
注釋:從集合讀取數據時,根據當前數據決定是否獲取一個寫鎖並修改該集合。獲取寫鎖后集合會處於阻塞狀態。
2.10 使用SpinWait類
using System; using System.Collections.Generic; using System.Threading; namespace MulityThreadNote { class Program { static void Main(string[] args) { Thread t1 = new Thread(UserModeWait); Thread t2 = new Thread(HybridSpinWait); Console.WriteLine("Running user mode waiting"); t1.Start(); Thread.Sleep(20); _isComplete = true; Thread.Sleep(TimeSpan.FromSeconds(1)); _isComplete = false; Console.WriteLine("Running hybrid SpinWait construct waiting"); t2.Start(); Thread.Sleep(5); _isComplete = true; Console.ReadLine(); } //volatile 一個字段可能會被多個線程同時修改,不會被編譯器和處理器優化為只能被單個線程訪問 static volatile bool _isComplete = false; static void UserModeWait() { while (!_isComplete) { Console.WriteLine("."); } Console.WriteLine(); Console.WriteLine("Waiting is complete"); } static void HybridSpinWait() { var w = new SpinWait(); while (!_isComplete) { //執行單一自旋 w.SpinOnce(); //NextSpinWillYield:獲取對SpinOnce的下一次調用是否將產生處理,同時觸發強制上下文切換 //顯示線程是否切換為阻塞狀態 Console.WriteLine(w.NextSpinWillYield); } Console.WriteLine("Waiting is complete"); } } }
注釋: