1非阻塞同步Permalink
之前,我們描述了即使是很簡單的賦值或更新一個字段也需要同步。盡管鎖總能滿足這個需求,一個存在競爭的鎖意味着肯定有線程會被阻塞,就會導致由上下文切換和調度的延遲帶來的開銷,在高並發以及對性能要求很高的場景,這不符合需要。.NET Framework 的 非阻塞(nonblocking)同步構造能夠在沒有阻塞、暫停或等待的情況下完成簡單的操作。
正確編寫無阻塞或無鎖的多線程代碼是棘手的!特別是內存屏障容易用錯(volatile
關鍵字更容易用錯)。在放棄使用傳統鎖之前,請仔細思考是否真的需要非阻塞同步帶來的性能優化。切記獲得和釋放一個無競爭的鎖在一個 2010 時代的計算機上僅僅需要 20ns 而已。
無阻塞的方式也可以跨進程工作。一個例子就是它可以被用來讀寫進程間共享內存。
1.1內存屏障和易失性Permalink
考慮下邊這個例子:
class Foo { int _answer; bool _complete; void A() { _answer = 123; _complete = true; } void B() { if (_complete) Console.WriteLine (_answer); } }
如果方法A
和B
在不同的線程上並發運行,B
可能會打印 “ 0 “ 嗎?答案是會的,原因如下:
- 編譯器、CLR 或 CPU 可能會重新排序(reorder)程序指令以提高效率。
- 編譯器、CLR 或 CPU 可能會進行緩存優化,導致其它線程不能馬上看到變量的賦值。
C# 和運行時會非常小心的保證這些優化不會破壞普通的單線程代碼,和正確使用鎖的多線程代碼。除這些情況外,你必須通過顯式的創建內存屏障(memory barrier,也稱作內存柵欄 (memory fence))來對抗這些優化,限制指令重排和讀寫緩存產生的影響。
全柵欄Permalink
最簡單的內存屏障是完全內存屏障(full memory barrier,或全柵欄(full fence)),它可以阻止所有跨越柵欄的指令重排和緩存。調用Thread.MemoryBarrier
生成一個全柵欄。我們可以使用 4 個全柵欄來修正之前的例子:
class Foo { int _answer; bool _complete; void A() { _answer = 123; Thread.MemoryBarrier(); // 屏障 1 _complete = true; Thread.MemoryBarrier(); // 屏障 2 } void B() { Thread.MemoryBarrier(); // 屏障 3 if (_complete) { Thread.MemoryBarrier(); // 屏障 4 Console.WriteLine (_answer); } } }
屏障 1 和 4 可以使這個例子不會打印 “ 0 “。屏障 2 和 3 提供了一個“最新(freshness)”保證:它們確保如果B
在A
后運行,讀取_complete
的值會是true
。
在 2010 時代的桌面電腦上,一個全柵欄的開銷大約是 10 納秒。
下列方式都會隱式的使用全柵欄:
- C# 的
lock
語句(Monitor.Enter / Monitor.Exit
) Interlocked
類中的所有方法(馬上會講到)- 使用線程池的異步回調,包括異步委托、APM 回調,以及任務延續(task continuations)
- 在信號構造上等待或對其設置(譯者注:發信號、復位等等)
- 任何依賴於信號同步的情況,比如啟動或等待
Task
因為最后一條的關系,下邊的代碼是線程安全的:
int x = 0; Task t = Task.Factory.StartNew (() => x++); t.Wait(); Console.WriteLine (x); // 1
不需要對每一個讀或寫都使用全柵欄。如果有 3 個 answer 字段,我們的例子仍然只需要 4 個柵欄:
class Foo { int _answer1, _answer2, _answer3; bool _complete; void A() { _answer1 = 1; _answer2 = 2; _answer3 = 3; Thread.MemoryBarrier(); _complete = true; Thread.MemoryBarrier(); } void B() { Thread.MemoryBarrier(); if (_complete) { Thread.MemoryBarrier(); Console.WriteLine (_answer1 + _answer2 + _answer3); } } }
好的方式是:首先在每一個讀寫共享字段的指令前后都加上內存屏障,然后再剔除那些不需要的。如果你無法確認是否需要,那就保留它們。或者,更好的方式是:換回使用鎖!
真的需要鎖和內存屏障嗎?Permalink
如果在用共享可寫字段(shared writable fields)時不加鎖或柵欄是自找麻煩。關於這個話題有很多誤導信息,包括 MSDN 文檔中描述只有在弱內存排序的多處理器系統上MemoryBarrier
才是必需的,例如,使用多個 Intel Itanium 處理器的系統。我們可以通過下邊這個簡短的程序證明:在普通的 Intel Core-2 和 Pentium 處理器上,內存屏障也是非常重要的。在開啟優化以及非調試模式下運行下邊的程序(在 Visual Studio 中,解決方案的配置管理里選擇 Release 模式,然后非調試模式下啟動 )
static void Main() { bool complete = false; var t = new Thread (() => { bool toggle = false; while (!complete) toggle = !toggle; }); t.Start(); Thread.Sleep (1000); complete = true; t.Join(); // 無限阻塞 }
這個程序 不會終止,因為變量complete
被緩存在 CPU 寄存器中。在while
循環中加入一個Thread.MemoryBarrier
的調用(或在讀取complete
的地方加鎖)可以修正這個錯誤。
volatile 關鍵字Permalink
另一個(更高級的)解決這個問題的方法是對_complete
字段使用volatile
關鍵字。
volatile bool _complete;
volatile
關鍵字通知編譯器在每個讀這個字段的地方使用一個讀柵欄(acquire-fence),並且在每個寫這個字段的地方使用一個寫柵欄(release-fence)。讀柵欄防止其它讀/寫被移到柵欄之前,寫柵欄防止其它讀/寫被移到柵欄之后。這種“半柵欄(half-fences)”比全柵欄更快,因為它給了運行時和硬件更大的優化空間。
巧的是,Intel 的 X86 和 X64 處理器總是在讀時使用讀柵欄,寫時使用寫柵欄,無論是否使用volatile
關鍵字。所以在使用這些處理器的情況下,這個關鍵字對硬件來說是無效的。然而,volatile
關鍵字對編譯器和 CLR 進行的優化是有作用的,以及在 64 位 AMD 和 Itanium 處理器上也是有作用的。這意味着不能因為你的客戶端運行在特定類型的 CPU 上而放松警惕。
(並且即使你使用了volatile
,也仍然應當保持一種健康的擔憂,我們稍后會看到原因!)
關於對字段使用volatile
關鍵字的效果,總結如下:
第一條指令 | 第二條指令 | 是否會被交換 |
---|---|---|
讀 | 讀 | 不會 |
讀 | 寫 | 不會 |
寫 | 寫 | 不會(CLR 確保寫-寫操作永遠不會被交換,就算是沒有volatile 關鍵字) |
寫 | 讀 | 會! |
注意:使用volatile
不能阻止寫-讀被交換,這可能是一個難題。Joe Duffy 使用下面的例子很好的說明了這個問題:如果Test1
和Test2
同時運行在不同的線程上,可能a
和b
最后的值都是 0 (盡管在x
和y
上都使用了volatile
):
class IfYouThinkYouUnderstandVolatile { volatile int x, y; void Test1() // 運行在一個線程上 { x = 1; // Volatile 寫 (寫柵欄) int a = y; // Volatile 讀 (讀柵欄) // ... } void Test2() // 運行在另一線程上 { y = 1; // Volatile 寫 (寫柵欄) int b = x; // Volatile 讀 (讀柵欄) // ... } }
MSDN 文檔描述:使用volatile
關鍵字可以確保該字段在任何時間呈現的都是最新的值。這是錯誤的,就像我們剛才看到的,寫-讀操作可能被重新排序。(譯者注:其實不能說 MSDN 的說法錯誤,使用volatile
后x
和y
的值確實是最新的,只是因為指令重排,對它們的讀可能在另一個線程上的寫之前進行)
這給出了避免使用volatile
關鍵字的理由:就算你能很好的理解這個例子,可是其它一起工作的開發者也理解么?在Test1
和Test2
的兩次賦值之間使用全柵欄(或鎖)可以解決這個問題。
volatile
關鍵字不支持引用類型的參數和捕獲的局部變量:這些情況下你必須使用VolatileRead
和VolatileWrite
方法。
VolatileRead 和 VolatileWritePermalink
使用Thread
類上的靜態方法VolatileRead
和VolatileWrite
讀/寫變量時,相當於volatile
關鍵字產生的作用(技術上說,作用是其超集)。它們的實現相對低效,可是這是因為它們實際上使用了全柵欄。這是它們對於整型的實現:
public static void VolatileWrite (ref int address, int value) { MemoryBarrier(); address = value; } public static int VolatileRead (ref int address) { int num = address; MemoryBarrier(); return num; }
可以看出來,如果調用VolatileWrite
后緊接着調用VolatileRead
,在它們中間是沒有屏障的:這會產生和我們之前看到的同樣的難題。
內存屏障和鎖Permalink
像前所述,Monitor.Enter
和Monitor.Exit
都使用了全柵欄。因此,如果我們忽略鎖的互斥作用,可以這樣說:
lock (someField) { ... }
相當於:
Thread.MemoryBarrier(); { ... } Thread.MemoryBarrier();
1.2InterlockedPermalink
無鎖代碼下,在讀寫字段時使用內存屏障往往是不夠的。在 64 位字段上進行加、減操作需要使用Interlocked
工具類這樣更加重型的方式。Interlocked
也提供了Exchange
和CompareExchange
方法,后者能夠進行無鎖的讀-改-寫(read-modify-write)操作,只需要額外增加一點代碼。
如果一條語句在底層處理器上被當作一個獨立不可分割的指令,那么它本質上是原子的(atomic)。嚴格的原子性可以阻止任何搶占的可能。對於 32 位(或更低)的字段的簡單讀寫總是原子的。而操作 64 位字段僅在 64 位運行時環境下是原子的,並且結合了多個讀寫操作的語句必然不是原子的:
class Atomicity { static int _x, _y; static long _z; static void Test() { long myLocal; _x = 3; // 原子的 _z = 3; // 32位環境下不是原子的(_z 是64位的) myLocal = _z; // 32位環境下不是原子的(_z 是64位的) _y += _x; // 不是原子的 (結合了讀和寫操作) _x++; // 不是原子的 (結合了讀和寫操作) } }
在 32 位環境下讀寫 64 位字段不是原子的,因為它需要兩條獨立的指令:每條用於對應的 32 位內存地址。所以,如果線程 X 在讀一個 64 位的值,同時線程 Y 更新它,那么線程 X 最終可能得到新舊兩個值按位組合后的結果(一個撕裂讀(torn read))。
編譯器實現x++
這種一元運算,是通過先讀一個變量,然后計算,最后寫回去的方式。考慮如下類:
class ThreadUnsafe { static int _x = 1000; static void Go() { for (int i = 0; i < 100; i++) _x--; } }
拋開內存屏障的事情,你可能會認為如果 10 個線程並發運行Go
,最終_x
會為0
。然而,這並不一定,因為可能存在競態條件(race condition),在一個線程完成讀取x
的當前值,減少值,把值寫回這個過程之間,被另一個線程搶占(導致一個過期的值被寫回)。
當然,可以通過用lock
語句封裝非原子的操作來解決這些問題。實際上,鎖如果一致的使用,可以模擬原子性。然而,Interlocked
類為這樣簡單的操作提供了一個更方便更快的方案:
class Program { static long _sum; static void Main() { // _sum // 簡單的自增/自減操作: Interlocked.Increment (ref _sum); // 1 Interlocked.Decrement (ref _sum); // 0 // 加/減一個值: Interlocked.Add (ref _sum, 3); // 3 // 讀取64位字段: Console.WriteLine (Interlocked.Read (ref _sum)); // 3 // 讀取當前值並且寫64位字段 // (打印 "3",並且將 _sum 更新為 10 ) Console.WriteLine (Interlocked.Exchange (ref _sum, 10)); // 10 // 僅當字段的當前值匹配特定的值(10)時才更新它: Console.WriteLine (Interlocked.CompareExchange (ref _sum, 123, 10); // 123 } }
Interlocked
上的所有方法都使用全柵欄。因此,通過Interlocked
訪問字段不需要額外的柵欄,除非它們在程序其它地方沒有通過Interlocked
或lock
來訪問。
Interlocked
的數學運算操作僅限於Increment
、Decrement
以及Add
。如果你希望進行乘法或其它計算,在無鎖方式下可以使用CompareExchange
方法(通常與自旋等待一起使用)。我們會在並行編程中提供一個例子。
Interlocked
類通過將原子性的需求傳達給操作系統和虛擬機來進行實現其功能。
Interlocked
類的方法通常產生 10ns 的開銷,是無競爭鎖的一半。此外,因為它們不會導致阻塞,所以不會帶來上下文切換的開銷。然而,如果在循環中多次迭代使用Interlocked
,就可能比在循環外使用一個鎖的效率低(不過Interlocked
可以實現更高的並發度)。
2使用 Wait 和 Pulse 進行信號同步Permalink
(譯者注:Pulse
翻譯為脈沖,它和Wait
都是作用在一個變量上:Wait
等待一個變量上的脈沖,Pulse
對一個變量發送脈沖。脈沖也是一種信號形式,相對於事件等待句柄那種鎖存信號,脈沖顧名思義是一種非鎖存或者說易失的信號)
之前我們討論了事件等待句柄,這是一種簡單的信號同步機制:一個線程阻塞直到收到另一個線程發來的通知。
還有個更強大的信號構造,由Monitor
類通過兩個靜態方法Wait
和Pulse
(以及PulseAll
)提供。原理是使用自定義的標識和字段(封裝在lock
語句中)自行實現信號同步邏輯,然后引入Wait
和Pulse
控制防止自旋。僅僅使用這些方法和lock
,你就可以實現AutoResetEvent
、ManualResetEvent
以及Semaphore
,還有WaitHandle
的靜態方法WaitAll
和WaitAny
的功能。此外,Wait
和Pulse
也可以用於所有等待句柄都不適用的情況。
但是,使用Wait
和Pulse
進行信號同步,對比事件等待句柄有以下缺點:
Wait / Pulse
不能跨越應用程序域和進程使用。- 必須切記通過鎖保護所有信號同步邏輯涉及的變量。
- 使用
Wait / Pulse
的程序可能會導致依賴微軟文檔的開發者困惑。
微軟文檔的問題的是就算你已經攻讀了解了Wait
和Pulse
是如何工作的,也還是無法明白它們該如何使用。Wait
和Pulse
會讓淺嘗輒止的人感到特別惡心:它們會尋找你理解中的漏洞然后折磨你取樂!幸運的是,有一種簡單的使用模式可以馴服Wait
和Pulse
。
性能方面,在 2010 時代的桌面電腦上,調用Pulse
花費大概 100ns 左右, 約是在等待句柄上調用Set
三分之一的時間。等待無競爭信號的開銷完全取決於你,因為是你使用普通的字段和變量自行實現的邏輯。在實踐中上,這非常簡單,並且基本上相當於使用鎖的代價。
2.1如何使用 Wait 和 PulsePermalink
下面是如何使用Wait
和Pulse
:
1. 定義一個字段,作為同步對象,例如:
readonly object _locker = new object();
2. 定義一個或多個字段,作為自定義的阻塞條件,例如:
bool _go; /* 或 */ int _semaphoreCount;
3. 當你希望阻塞的時候,使用下邊的代碼:
lock (_locker) while (/* <blocking-condition> */) Monitor.Wait (_locker);
4. 當改變(或隱式改變)一個阻塞條件的時候,使用下邊的代碼:
lock (_locker) { // 修改會影響阻塞條件的字段或數據 // ... Monitor.Pulse(_locker); // 或: Monitor.PulseAll (_locker); }
(如果想改變阻塞條件並等待,可以在一個lock
內合並第 3 步和第 4 步)
這個模式允許任意線程在任意時間使用任意條件等待。下邊這個簡單的例子,一個線程等待直到_go
字段被設置為true
:
class SimpleWaitPulse { static readonly object _locker = new object(); static bool _go; static void Main() { // 新線程會阻塞 new Thread (Work).Start(); // 因為 _go==false Console.ReadLine(); // 等待用戶敲回車 lock (_locker) // 現在喚醒線程 { // 通過設置 _go=true 然后 Pulse _go = true; Monitor.Pulse (_locker); } } static void Work() { lock (_locker) while (!_go) Monitor.Wait (_locker); // 當等待時鎖會被釋放 Console.WriteLine ("Woken!!!"); } }
輸出結果:
Woken!!! (按下回車鍵之后)
為了線程安全,我們確保所有共享字段的訪問都在鎖內。因此,在讀取和更新_go
標識的地方都加上了lock
語句。這很必要(除非你希望使用非阻塞同步的方式)。
Work
方法會一直阻塞,等待_go
標識變為true
。Monitor.Wait
方法按順序做了如下的操作:
- 釋放
_locker
上的鎖。 - 阻塞,直到收到
_locker
上的脈沖。 - 重新獲取
_locker
上的鎖。如果鎖已被占用,那么線程阻塞,直到鎖變為可用為止。
這意味着當Monitor.Wait
在等待脈沖時,同步對象上的鎖沒有被持有。這並不是像代碼看上去那樣。
lock (_locker) { while (!_go) Monitor.Wait (_locker); // 鎖被釋放 // 鎖重新獲得 // ... }
然后繼續執行下一條語句。Monitor.Wait
被設計為在lock
語句內使用,否則調用它會拋出一個異常。Monitor.Pulse
也是一樣。
在Main
方法中,我們通過設置_go
標識(在鎖內)和調用Pulse
來給工作線程發信號。我們一釋放鎖,工作線程就可以繼續執行,繼續它的while
循環。
Pulse
和PulseAll
方法可以釋放通過調用Wait
阻塞的線程。Pulse
最多釋放一個線程,而PulseAll
釋放全部。在我們的例子中,只有一個線程被阻塞,所以它們在這個例子中效果是一樣的。如果有多個線程在等待,以我們建議的這個模式來說,調用PulseAll
通常最安全。
為了Wait
能夠和Pulse
或PulseAll
進行通信,必須使用同一個同步對象(我們的例子中的_locker
)。
在我們的模式中,脈沖表示有些東西可能已經改變,等待線程應該重新檢查它們的阻塞條件。在Work
方法內,檢查是通過while
循環實現的。由等待方來決定是否要繼續運行,而不是通知方。如果把脈沖直接當作通知繼續的指令,那么Wait
的構造就沒有任何價值了,這樣使用就相當於一個殘疾版的AutoResetEvent
。
如果我們拋棄該模式,移除while
循環、_go
標識以及ReadLine
,就獲得了一個最基礎的Wait / Pulse
的例子:
static void Main() { new Thread (Work).Start(); lock (_locker) Monitor.Pulse (_locker); } static void Work() { lock (_locker) Monitor.Wait (_locker); Console.WriteLine ("Woken!!!"); }
這可能不會有輸出,因為它有不確定性!在主線程和工作線程之間存在競爭,如果Wait
先執行,信號可以正常工作。如果Pulse
先執行,它就會丟失,工作線程就永遠卡在那里等待。這與AutoResetEvent
的行為不同,它的Set
方法有一種記憶效果,或者說鎖存(latching)效果,所以即使它在WaitOne
之前調用,仍然有效。
但是Pulse
沒有鎖存效果,它需要你自行實現,就像我們之前使用的 “ go “ 標識。這就是為什么Wait
和Pulse
是萬能的原因:使用一個布爾標識,我們可以實現類似AutoResetEvent
的功能;使用一個整型字段,可以實現 CountdownEvent
或Semaphore
。通過更復雜的數據結構,可以進一步實現類似生產者 / 消費者隊列這樣的構造。
2.2生產者 / 消費者隊列Permalink
之前,我們描述了生產者 / 消費者隊列的概念,以及如何通過AutoResetEvent
來實現它。現在,我們通過Wait
和Pulse
來實現一個更強大的版本。
這次,我們將允許多個消費者,各自擁有它們自己的線程。使用一個數組來存放這些線程:
Thread[] _workers;
這樣可以讓我們在關閉該隊列的時候Join
這些線程。
每個工作線程會執行一個名為Consume
的方法。我們可以在一個循環中創建和啟動線程,例如:
public PCQueue (int workerCount) { _workers = new Thread [workerCount]; // 為每個worker創建和啟動一個獨立的線程 for (int i = 0; i < workerCount; i++) (_workers [i] = new Thread (Consume)).Start(); }
之前我們只是使用一個字符串來代表任務,這次使用一種更靈活的方式,即一個委托。我們使用 .NET Framework 中的System.Action
委托,它定義如下:
public delegate void Action();
這個委托可以匹配任意無參方法,很像ThreadStart
委托。當然我們也可以描述需要參數的任務,通過把調用封裝在匿名委托或 lambda 表達式中。
Action myFirstTask = delegate { Console.WriteLine ("foo"); }; Action mySecondTask = () => Console.WriteLine ("foo");
如之前一樣,使用Queue<T>
來表示任務的隊列:
Queue<Action> _itemQ = new Queue<Action>();
在討論EnqueueItem
和Consume
方法之前,先來看一下完整的代碼:
using System; using System.Threading; using System.Collections.Generic; public class PCQueue { readonly object _locker = new object(); Thread[] _workers; Queue<Action> _itemQ = new Queue<Action>(); public PCQueue (int workerCount) { _workers = new Thread [workerCount]; // 為每個worker創建和啟動一個獨立的線程 for (int i = 0; i < workerCount; i++) (_workers [i] = new Thread (Consume)).Start(); } public void Shutdown (bool waitForWorkers) { // 為每個線程加入一個 null 任務,使它們退出 foreach (Thread worker in _workers) EnqueueItem (null); // 等待工作線程完成 if (waitForWorkers) foreach (Thread worker in _workers) worker.Join(); } public void EnqueueItem (Action item) { lock (_locker) { _itemQ.Enqueue (item); // 因為改變了阻塞條件 Monitor.Pulse (_locker); // 所以發送脈沖通知 } } void Consume() { while (true) // 繼續消費直到 { // 收到通知 Action item; lock (