1.1 簡介
本章介紹在C#中實現線程同步的幾種方法。因為多個線程同時訪問共享數據時,可能會造成共享數據的損壞,從而導致與預期的結果不相符。為了解決這個問題,所以需要用到線程同步,也被俗稱為“加鎖”。但是加鎖絕對不對提高性能,最多也就是不增不減,要實現性能不增不減還得靠高質量的同步源語(Synchronization Primitive)。但是因為正確永遠比速度更重要,所以線程同步在某些場景下是必須的。
線程同步有兩種源語(Primitive)構造:用戶模式(user - mode)和內核模式(kernel - mode),當資源可用時間短的情況下,用戶模式要優於內核模式,但是如果長時間不能獲得資源,或者說長時間處於“自旋”,那么內核模式是相對來說好的選擇。
但是我們希望兼具用戶模式和內核模式的優點,我們把它稱為混合構造(hybrid construct),它兼具了兩種模式的優點。
在C#中有多種線程同步的機制,通常可以按照以下順序進行選擇。
- 如果代碼能通過優化可以不進行同步,那么就不要做同步。
- 使用原子性的
Interlocked
方法。- 使用
lock/Monitor
類。- 使用異步鎖,如
SemaphoreSlim.WaitAsync()
。- 使用其它加鎖機制,如
ReaderWriterLockSlim、Mutex、Semaphore
等。- 如果系統提供了
*Slim
版本的異步對象,那么請選用它,因為*Slim
版本全部都是混合鎖,在進入內核模式前實現了某種形式的自旋。
在同步中,一定要注意避免死鎖的發生,死鎖的發生必須滿足以下4個基本條件,所以只需要破壞任意一個條件,就可避免發生死鎖。
- 排他或互斥(Mutual exclusion):一個線程(ThreadA)獨占一個資源,沒有其它線程(ThreadB)能獲取相同的資源。
- 占有並等待(Hold and wait):互斥的一個線程(ThreadA)請求獲取另一個線程(ThreadB)占有的資源.
- 不可搶先(No preemption):一個線程(ThreadA)占有資源不能被強制拿走(只能等待ThreadA主動釋放它的資源)。
- 循環等待條件(Circular wait condition):兩個或多個線程構成一個循環等待鏈,它們鎖定兩個或多個相同的資源,每個線程都在等待鏈中的下一個線程占有的資源。
1.2 執行基本原子操作
CLR保證了對這些數據類型的讀寫是原子性的:Boolean、Char、(S)Byte、(U)Int16、(U)Int32、(U)IntPtr和Single
。但是如果讀寫Int64
可能會發生讀取撕裂(torn read)的問題,因為在32位操作系統中,它需要執行兩次Mov
操作,無法在一個時間內執行完成。
那么在本節中,就會着重的介紹System.Threading.Interlocked
類提供的方法,Interlocked
類中的每個方法都是執行一次的讀取以及寫入操作。更多與Interlocked
類相關的資料請參考鏈接,戳一戳本文不在贅述。
演示代碼如下所示,分別使用了三種方式進行計數:錯誤計數方式、lock
鎖方式和Interlocked
原子方式。
private static void Main(string[] args)
{
Console.WriteLine("錯誤的計數");
var c = new Counter();
Execute(c);
Console.WriteLine("--------------------------");
Console.WriteLine("正確的計數 - 有鎖");
var c2 = new CounterWithLock();
Execute(c2);
Console.WriteLine("--------------------------");
Console.WriteLine("正確的計數 - 無鎖");
var c3 = new CounterNoLock();
Execute(c3);
Console.ReadLine();
}
static void Execute(CounterBase c)
{
// 統計耗時
var sw = new Stopwatch();
sw.Start();
var t1 = new Thread(() => TestCounter(c));
var t2 = new Thread(() => TestCounter(c));
var t3 = new Thread(() => TestCounter(c));
t1.Start();
t2.Start();
t3.Start();
t1.Join();
t2.Join();
t3.Join();
sw.Stop();
Console.WriteLine($"Total count: {c.Count} Time:{sw.ElapsedMilliseconds} ms");
}
static void TestCounter(CounterBase c)
{
for (int i = 0; i < 100000; i++)
{
c.Increment();
c.Decrement();
}
}
class Counter : CounterBase
{
public override void Increment()
{
_count++;
}
public override void Decrement()
{
_count--;
}
}
class CounterNoLock : CounterBase
{
public override void Increment()
{
// 使用Interlocked執行原子操作
Interlocked.Increment(ref _count);
}
public override void Decrement()
{
Interlocked.Decrement(ref _count);
}
}
class CounterWithLock : CounterBase
{
private readonly object _syncRoot = new Object();
public override void Increment()
{
// 使用Lock關鍵字 鎖定私有變量
lock (_syncRoot)
{
// 同步塊
Count++;
}
}
public override void Decrement()
{
lock (_syncRoot)
{
Count--;
}
}
}
abstract class CounterBase
{
protected int _count;
public int Count
{
get
{
return _count;
}
set
{
_count = value;
}
}
public abstract void Increment();
public abstract void Decrement();
}
運行結果如下所示,與預期結果基本相符。
1.3 使用Mutex類
System.Threading.Mutex
在概念上和System.Threading.Monitor
幾乎一樣,但是Mutex
同步對文件或者其他跨進程的資源進行訪問,也就是說Mutex
是可跨進程的。因為其特性,它的一個用途是限制應用程序不能同時運行多個實例。
Mutex
對象支持遞歸,也就是說同一個線程可多次獲取同一個鎖,這在后面演示代碼中可觀察到。由於Mutex
的基類System.Theading.WaitHandle
實現了IDisposable
接口,所以當不需要在使用它時要注意進行資源的釋放。更多資料:戳一戳
演示代碼如下所示,簡單的演示了如何創建單實例的應用程序和Mutex
遞歸獲取鎖的實現。
const string MutexName = "CSharpThreadingCookbook";
static void Main(string[] args)
{
// 使用using 及時釋放資源
using (var m = new Mutex(false, MutexName))
{
if (!m.WaitOne(TimeSpan.FromSeconds(5), false))
{
Console.WriteLine("已經有實例正在運行!");
}
else
{
Console.WriteLine("運行中...");
// 演示遞歸獲取鎖
Recursion();
Console.ReadLine();
m.ReleaseMutex();
}
}
Console.ReadLine();
}
static void Recursion()
{
using (var m = new Mutex(false, MutexName))
{
if (!m.WaitOne(TimeSpan.FromSeconds(2), false))
{
// 因為Mutex支持遞歸獲取鎖 所以永遠不會執行到這里
Console.WriteLine("遞歸獲取鎖失敗!");
}
else
{
Console.WriteLine("遞歸獲取鎖成功!");
}
}
}
運行結果如下圖所示,打開了兩個應用程序,因為使用Mutex
實現了單實例,所以第二個應用程序無法獲取鎖,就會顯示已有實例正在運行。
1.4 使用SemaphoreSlim類
SemaphoreSlim
類與之前提到的同步類有鎖不同,之前提到的同步類都是互斥的,也就是說只允許一個線程進行訪問資源,而SemaphoreSlim
是可以允許多個訪問。
在之前的部分有提到,以*Slim
結尾的線程同步類,都是工作在混合模式下的,也就是說開始它們都是在用戶模式下"自旋",等發生第一次競爭時,才切換到內核模式。但是SemaphoreSlim
不同於Semaphore
類,它不支持系統信號量,所以它不能用於進程之間的同步。
該類使用比較簡單,演示代碼演示了6個線程競爭訪問只允許4個線程同時訪問的數據庫,如下所示。
static void Main(string[] args)
{
// 創建6個線程 競爭訪問AccessDatabase
for (int i = 1; i <= 6; i++)
{
string threadName = "線程 " + i;
// 越后面的線程,訪問時間越久 方便查看效果
int secondsToWait = 2 + 2 * i;
var t = new Thread(() => AccessDatabase(threadName, secondsToWait));
t.Start();
}
Console.ReadLine();
}
// 同時允許4個線程訪問
static SemaphoreSlim _semaphore = new SemaphoreSlim(4);
static void AccessDatabase(string name, int seconds)
{
Console.WriteLine($"{name} 等待訪問數據庫.... {DateTime.Now.ToString("HH:mm:ss.ffff")}");
// 等待獲取鎖 進入臨界區
_semaphore.Wait();
Console.WriteLine($"{name} 已獲取對數據庫的訪問權限 {DateTime.Now.ToString("HH:mm:ss.ffff")}");
// Do something
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine($"{name} 訪問完成... {DateTime.Now.ToString("HH:mm:ss.ffff")}");
// 釋放鎖
_semaphore.Release();
}
運行結果如下所示,可見前4個線程馬上就獲取到了鎖,進入了臨界區,而另外兩個線程在等待;等有鎖被釋放時,才能進入臨界區。
1.5 使用AutoResetEvent類
AutoResetEvent
叫自動重置事件,雖然名稱中有事件一詞,但是重置事件和C#中的委托沒有任何關系,這里的事件只是由內核維護的Boolean
變量,當事件為false
,那么在事件上等待的線程就阻塞;事件變為true
,那么阻塞解除。
在.Net中有兩種此類事件,即AutoResetEvent(自動重置事件)
和ManualResetEvent(手動重置事件)
。這兩者均是采用內核模式,它的區別在於當重置事件為true
時,自動重置事件它只喚醒一個阻塞的線程,會自動將事件重置回false,造成其它線程繼續阻塞。而手動重置事件不會自動重置,必須通過代碼手動重置回false。
因為以上的原因,所以在很多文章和書籍中不推薦使用AutoResetEvent(自動重置事件)
,因為它很容易在編寫生產者線程時發生失誤,造成它的迭代次數多余消費者線程。
演示代碼如下所示,該代碼演示了通過AutoResetEvent
實現兩個線程的互相同步。
static void Main(string[] args)
{
var t = new Thread(() => Process(10));
t.Start();
Console.WriteLine("等待另一個線程完成工作!");
// 等待工作線程通知 主線程阻塞
_workerEvent.WaitOne();
Console.WriteLine("第一個操作已經完成!");
Console.WriteLine("在主線程上執行操作");
Thread.Sleep(TimeSpan.FromSeconds(5));
// 發送通知 工作線程繼續運行
_mainEvent.Set();
Console.WriteLine("現在在第二個線程上運行第二個操作");
// 等待工作線程通知 主線程阻塞
_workerEvent.WaitOne();
Console.WriteLine("第二次操作完成!");
Console.ReadLine();
}
// 工作線程Event
private static AutoResetEvent _workerEvent = new AutoResetEvent(false);
// 主線程Event
private static AutoResetEvent _mainEvent = new AutoResetEvent(false);
static void Process(int seconds)
{
Console.WriteLine("開始長時間的工作...");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine("工作完成!");
// 發送通知 主線程繼續運行
_workerEvent.Set();
Console.WriteLine("等待主線程完成其它工作");
// 等待主線程通知 工作線程阻塞
_mainEvent.WaitOne();
Console.WriteLine("啟動第二次操作...");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine("工作完成!");
// 發送通知 主線程繼續運行
_workerEvent.Set();
}
運行結果如下圖所示,與預期結果符合。
1.6 使用ManualResetEventSlim類
ManualResetEventSlim
使用和ManualResetEvent
類基本一致,只是ManualResetEventSlim
工作在混合模式下,而它與AutoResetEventSlim
不同的地方就是需要手動重置事件,也就是調用Reset()
才能將事件重置為false
。
演示代碼如下,形象的將ManualResetEventSlim
比喻成大門,當事件為true
時大門打開,線程解除阻塞;而事件為false
時大門關閉,線程阻塞。
static void Main(string[] args)
{
var t1 = new Thread(() => TravelThroughGates("Thread 1", 5));
var t2 = new Thread(() => TravelThroughGates("Thread 2", 6));
var t3 = new Thread(() => TravelThroughGates("Thread 3", 12));
t1.Start();
t2.Start();
t3.Start();
// 休眠6秒鍾 只有Thread 1小於 6秒鍾,所以事件重置時 Thread 1 肯定能進入大門 而 Thread 2 可能可以進入大門
Thread.Sleep(TimeSpan.FromSeconds(6));
Console.WriteLine($"大門現在打開了! 時間:{DateTime.Now.ToString("mm:ss.ffff")}");
_mainEvent.Set();
// 休眠2秒鍾 此時 Thread 2 肯定可以進入大門
Thread.Sleep(TimeSpan.FromSeconds(2));
_mainEvent.Reset();
Console.WriteLine($"大門現在關閉了! 時間:{DateTime.Now.ToString("mm: ss.ffff")}");
// 休眠10秒鍾 Thread 3 可以進入大門
Thread.Sleep(TimeSpan.FromSeconds(10));
Console.WriteLine($"大門現在第二次打開! 時間:{DateTime.Now.ToString("mm: ss.ffff")}");
_mainEvent.Set();
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine($"大門現在關閉了! 時間:{DateTime.Now.ToString("mm: ss.ffff")}");
_mainEvent.Reset();
Console.ReadLine();
}
static void TravelThroughGates(string threadName, int seconds)
{
Console.WriteLine($"{threadName} 進入睡眠 時間:{DateTime.Now.ToString("mm:ss.ffff")}");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine($"{threadName} 等待大門打開! 時間:{DateTime.Now.ToString("mm:ss.ffff")}");
_mainEvent.Wait();
Console.WriteLine($"{threadName} 進入大門! 時間:{DateTime.Now.ToString("mm:ss.ffff")}");
}
static ManualResetEventSlim _mainEvent = new ManualResetEventSlim(false);
運行結果如下,與預期結果相符。
1.7 使用CountDownEvent類
CountDownEvent
類內部構造使用了一個ManualResetEventSlim
對象。這個構造阻塞一個線程,直到它內部計數器(CurrentCount)
變為0
時,才解除阻塞。也就是說它並不是阻止對已經枯竭的資源池的訪問,而是只有當計數為0
時才允許訪問。
這里需要注意的是,當CurrentCount
變為0
時,那么它就不能被更改了。為0
以后,Wait()
方法的阻塞被解除。
演示代碼如下所示,只有當Signal()
方法被調用2次以后,Wait()
方法的阻塞才被解除。
static void Main(string[] args)
{
Console.WriteLine($"開始兩個操作 {DateTime.Now.ToString("mm:ss.ffff")}");
var t1 = new Thread(() => PerformOperation("操作 1 完成!", 4));
var t2 = new Thread(() => PerformOperation("操作 2 完成!", 8));
t1.Start();
t2.Start();
// 等待操作完成
_countdown.Wait();
Console.WriteLine($"所有操作都完成 {DateTime.Now.ToString("mm: ss.ffff")}");
_countdown.Dispose();
Console.ReadLine();
}
// 構造函數的參數為2 表示只有調用了兩次 Signal方法 CurrentCount 為 0時 Wait的阻塞才解除
static CountdownEvent _countdown = new CountdownEvent(2);
static void PerformOperation(string message, int seconds)
{
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine($"{message} {DateTime.Now.ToString("mm:ss.ffff")}");
// CurrentCount 遞減 1
_countdown.Signal();
}
運行結果如下圖所示,可見只有當操作1和操作2都完成以后,才執行輸出所有操作都完成。
1.8 使用Barrier類
Barrier
類用於解決一個非常稀有的問題,平時一般用不上。Barrier
類控制一系列線程進行階段性的並行工作。
假設現在並行工作分為2個階段,每個線程在完成它自己那部分階段1的工作后,必須停下來等待其它線程完成階段1的工作;等所有線程均完成階段1工作后,每個線程又開始運行,完成階段2工作,等待其它線程全部完成階段2工作后,整個流程才結束。
演示代碼如下所示,該代碼演示了兩個線程分階段的完成工作。
static void Main(string[] args)
{
var t1 = new Thread(() => PlayMusic("鋼琴家", "演奏一首令人驚嘆的獨奏曲", 5));
var t2 = new Thread(() => PlayMusic("歌手", "唱着他的歌", 2));
t1.Start();
t2.Start();
Console.ReadLine();
}
static Barrier _barrier = new Barrier(2,
Console.WriteLine($"第 {b.CurrentPhaseNumber + 1} 階段結束"));
static void PlayMusic(string name, string message, int seconds)
{
for (int i = 1; i < 3; i++)
{
Console.WriteLine("----------------------------------------------");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine($"{name} 開始 {message}");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine($"{name} 結束 {message}");
_barrier.SignalAndWait();
}
}
運行結果如下所示,當“歌手”線程完成后,並沒有馬上結束,而是等待“鋼琴家”線程結束,當"鋼琴家"線程結束后,才開始第2階段的工作。
1.9 使用ReaderWriterLockSlim類
ReaderWriterLockSlim
類主要是解決在某些場景下,讀操作多於寫操作而使用某些互斥鎖當多個線程同時訪問資源時,只有一個線程能訪問,導致性能急劇下降。
如果所有線程都希望以只讀的方式訪問數據,就根本沒有必要阻塞它們;如果一個線程希望修改數據,那么這個線程才需要獨占訪問,這就是ReaderWriterLockSlim
的典型應用場景。這個類就像下面這樣來控制線程。
- 一個線程向數據寫入是,請求訪問的其他所有線程都被阻塞。
- 一個線程讀取數據時,請求讀取的線程允許讀取,而請求寫入的線程被阻塞。
- 寫入線程結束后,要么解除一個寫入線程的阻塞,使寫入線程能向數據接入,要么解除所有讀取線程的阻塞,使它們能並發讀取數據。如果線程沒有被阻塞,鎖就可以進入自由使用的狀態,可供下一個讀線程或寫線程獲取。
- 從數據讀取的所有線程結束后,一個寫線程被解除阻塞,使它能向數據寫入。如果線程沒有被阻塞,鎖就可以進入自由使用的狀態,可供下一個讀線程或寫線程獲取。
ReaderWriterLockSlim
還支持從讀線程升級為寫線程的操作,詳情請戳一戳。文本不作介紹。ReaderWriterLock
類已經過時,而且存在許多問題,沒有必要去使用。
示例代碼如下所示,創建了3個讀線程,2個寫線程,讀線程和寫線程競爭獲取鎖。
static void Main(string[] args)
{
// 創建3個 讀線程
new Thread(() => Read("Reader 1")) { IsBackground = true }.Start();
new Thread(() => Read("Reader 2")) { IsBackground = true }.Start();
new Thread(() => Read("Reader 3")) { IsBackground = true }.Start();
// 創建兩個寫線程
new Thread(() => Write("Writer 1")) { IsBackground = true }.Start();
new Thread(() => Write("Writer 2")) { IsBackground = true }.Start();
// 使程序運行30S
Thread.Sleep(TimeSpan.FromSeconds(30));
Console.ReadLine();
}
static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
static Dictionary<int, int> _items = new Dictionary<int, int>();
static void Read(string threadName)
{
while (true)
{
try
{
// 獲取讀鎖定
_rw.EnterReadLock();
Console.WriteLine($"{threadName} 從字典中讀取內容 {DateTime.Now.ToString("mm:ss.ffff")}");
foreach (var key in _items.Keys)
{
Thread.Sleep(TimeSpan.FromSeconds(0.1));
}
}
finally
{
// 釋放讀鎖定
_rw.ExitReadLock();
}
}
}
static void Write(string threadName)
{
while (true)
{
try
{
int newKey = new Random().Next(250);
// 嘗試進入可升級鎖模式狀態
_rw.EnterUpgradeableReadLock();
if (!_items.ContainsKey(newKey))
{
try
{
// 獲取寫鎖定
_rw.EnterWriteLock();
_items[newKey] = 1;
Console.WriteLine($"{threadName} 將新的鍵 {newKey} 添加進入字典中 {DateTime.Now.ToString("mm:ss.ffff")}");
}
finally
{
// 釋放寫鎖定
_rw.ExitWriteLock();
}
}
Thread.Sleep(TimeSpan.FromSeconds(0.1));
}
finally
{
// 減少可升級模式遞歸計數,並在計數為0時 推出可升級模式
_rw.ExitUpgradeableReadLock();
}
}
}
運行結果如下所示,與預期結果相符。
1.10 使用SpinWait類
SpinWait
是一個常用的混合模式的類,它被設計成使用用戶模式等待一段時間,人后切換至內核模式以節省CPU時間。
它的使用非常簡單,演示代碼如下所示。
static void Main(string[] args)
{
var t1 = new Thread(UserModeWait);
var t2 = new Thread(HybridSpinWait);
Console.WriteLine("運行在用戶模式下");
t1.Start();
Thread.Sleep(20);
_isCompleted = true;
Thread.Sleep(TimeSpan.FromSeconds(1));
_isCompleted = false;
Console.WriteLine("運行在混合模式下");
t2.Start();
Thread.Sleep(5);
_isCompleted = true;
Console.ReadLine();
}
static volatile bool _isCompleted = false;
static void UserModeWait()
{
while (!_isCompleted)
{
Console.Write(".");
}
Console.WriteLine();
Console.WriteLine("等待結束");
}
static void HybridSpinWait()
{
var w = new SpinWait();
while (!_isCompleted)
{
w.SpinOnce();
Console.WriteLine(w.NextSpinWillYield);
}
Console.WriteLine("等待結束");
}
運行結果如下兩圖所示,首先程序運行在模擬的用戶模式下,使CPU有一個短暫的峰值。然后使用SpinWait
工作在混合模式下,首先標志變量為False
處於用戶模式自旋中,等待以后進入內核模式。
參考書籍
本文主要參考了以下幾本書,在此對這些作者表示由衷的感謝你們提供了這么好的資料。
- 《CLR via C#》
- 《C# in Depth Third Edition》
- 《Essential C# 6.0》
- 《Multithreading with C# Cookbook Second Edition》
源碼下載點擊鏈接 示例源碼下載