C#中的多線程 - 高級多線程


1非阻塞同步Permalink

 

之前,我們描述了即使是很簡單的賦值或更新一個字段也需要同步。盡管總能滿足這個需求,一個存在競爭的鎖意味着肯定有線程會被阻塞,就會導致由上下文切換和調度的延遲帶來的開銷,在高並發以及對性能要求很高的場景,這不符合需要。.NET Framework 的 非阻塞(nonblocking)同步構造能夠在沒有阻塞、暫停或等待的情況下完成簡單的操作。

 

正確編寫無阻塞或無鎖的多線程代碼是棘手的!特別是內存屏障容易用錯(volatile 關鍵字更容易用錯)。在放棄使用傳統鎖之前,請仔細思考是否真的需要非阻塞同步帶來的性能優化。切記獲得和釋放一個無競爭的鎖在一個 2010 時代的計算機上僅僅需要 20ns 而已。

無阻塞的方式也可以跨進程工作。一個例子就是它可以被用來讀寫進程間共享內存。

1.1內存屏障和易失性Permalink

考慮下邊這個例子:

class Foo { int _answer; bool _complete; void A() { _answer = 123; _complete = true; } void B() { if (_complete) Console.WriteLine (_answer); } } 

如果方法AB在不同的線程上並發運行,B可能會打印 “ 0 “ 嗎?答案是會的,原因如下:

  • 編譯器、CLR 或 CPU 可能會重新排序(reorder)程序指令以提高效率。
  • 編譯器、CLR 或 CPU 可能會進行緩存優化,導致其它線程不能馬上看到變量的賦值。

C# 和運行時會非常小心的保證這些優化不會破壞普通的單線程代碼,和正確使用鎖的多線程代碼。除這些情況外,你必須通過顯式的創建內存屏障(memory barrier,也稱作內存柵欄 (memory fence))來對抗這些優化,限制指令重排和讀寫緩存產生的影響。

全柵欄Permalink

最簡單的內存屏障是完全內存屏障(full memory barrier,或全柵欄(full fence)),它可以阻止所有跨越柵欄的指令重排和緩存。調用Thread.MemoryBarrier生成一個全柵欄。我們可以使用 4 個全柵欄來修正之前的例子:

class Foo { int _answer; bool _complete; void A() { _answer = 123; Thread.MemoryBarrier(); // 屏障 1 _complete = true; Thread.MemoryBarrier(); // 屏障 2 } void B() { Thread.MemoryBarrier(); // 屏障 3 if (_complete) { Thread.MemoryBarrier(); // 屏障 4 Console.WriteLine (_answer); } } } 

屏障 1 和 4 可以使這個例子不會打印 “ 0 “。屏障 2 和 3 提供了一個“最新(freshness)”保證:它們確保如果BA后運行,讀取_complete的值會是true

在 2010 時代的桌面電腦上,一個全柵欄的開銷大約是 10 納秒。

下列方式都會隱式的使用全柵欄:

因為最后一條的關系,下邊的代碼是線程安全的:

int x = 0; Task t = Task.Factory.StartNew (() => x++); t.Wait(); Console.WriteLine (x); // 1 

不需要對每一個讀或寫都使用全柵欄。如果有 3 個 answer 字段,我們的例子仍然只需要 4 個柵欄:

class Foo { int _answer1, _answer2, _answer3; bool _complete; void A() { _answer1 = 1; _answer2 = 2; _answer3 = 3; Thread.MemoryBarrier(); _complete = true; Thread.MemoryBarrier(); } void B() { Thread.MemoryBarrier(); if (_complete) { Thread.MemoryBarrier(); Console.WriteLine (_answer1 + _answer2 + _answer3); } } } 

好的方式是:首先在每一個讀寫共享字段的指令前后都加上內存屏障,然后再剔除那些不需要的。如果你無法確認是否需要,那就保留它們。或者,更好的方式是:換回使用鎖!

真的需要鎖和內存屏障嗎?Permalink

如果在用共享可寫字段(shared writable fields)時不加鎖或柵欄是自找麻煩。關於這個話題有很多誤導信息,包括 MSDN 文檔中描述只有在弱內存排序的多處理器系統上MemoryBarrier才是必需的,例如,使用多個 Intel Itanium 處理器的系統。我們可以通過下邊這個簡短的程序證明:在普通的 Intel Core-2 和 Pentium 處理器上,內存屏障也是非常重要的。在開啟優化以及非調試模式下運行下邊的程序(在 Visual Studio 中,解決方案的配置管理里選擇 Release 模式,然后非調試模式下啟動 )

static void Main() { bool complete = false; var t = new Thread (() => { bool toggle = false; while (!complete) toggle = !toggle; }); t.Start(); Thread.Sleep (1000); complete = true; t.Join(); // 無限阻塞 } 

這個程序 不會終止,因為變量complete被緩存在 CPU 寄存器中。在while循環中加入一個Thread.MemoryBarrier的調用(或在讀取complete的地方加)可以修正這個錯誤。

volatile 關鍵字Permalink

另一個(更高級的)解決這個問題的方法是對_complete字段使用volatile關鍵字。

volatile bool _complete; 

volatile關鍵字通知編譯器在每個讀這個字段的地方使用一個讀柵欄(acquire-fence),並且在每個寫這個字段的地方使用一個寫柵欄(release-fence)。讀柵欄防止其它讀/寫被移到柵欄之前,寫柵欄防止其它讀/寫被移到柵欄之后。這種“半柵欄(half-fences)”比全柵欄更快,因為它給了運行時和硬件更大的優化空間。

巧的是,Intel 的 X86 和 X64 處理器總是在讀時使用讀柵欄,寫時使用寫柵欄,無論是否使用volatile關鍵字。所以在使用這些處理器的情況下,這個關鍵字對硬件來說是無效的。然而,volatile關鍵字對編譯器和 CLR 進行的優化是有作用的,以及在 64 位 AMD 和 Itanium 處理器上也是有作用的。這意味着不能因為你的客戶端運行在特定類型的 CPU 上而放松警惕。

(並且即使你使用了volatile,也仍然應當保持一種健康的擔憂,我們稍后會看到原因!)

關於對字段使用volatile關鍵字的效果,總結如下:

第一條指令 第二條指令 是否會被交換
讀        讀        不會
不會
不會(CLR 確保寫-寫操作永遠不會被交換,就算是沒有volatile關鍵字)
會!

注意:使用volatile不能阻止寫-讀被交換,這可能是一個難題。Joe Duffy 使用下面的例子很好的說明了這個問題:如果Test1Test2同時運行在不同的線程上,可能ab最后的值都是 0 (盡管在xy上都使用了volatile):

class IfYouThinkYouUnderstandVolatile { volatile int x, y; void Test1() // 運行在一個線程上  { x = 1; // Volatile 寫 (寫柵欄) int a = y; // Volatile 讀 (讀柵欄) // ... } void Test2() // 運行在另一線程上  { y = 1; // Volatile 寫 (寫柵欄) int b = x; // Volatile 讀 (讀柵欄) // ... } } 

MSDN 文檔描述:使用volatile關鍵字可以確保該字段在任何時間呈現的都是最新的值。這是錯誤的,就像我們剛才看到的,寫-讀操作可能被重新排序。(譯者注:其實不能說 MSDN 的說法錯誤,使用volatilexy的值確實是最新的,只是因為指令重排,對它們的讀可能在另一個線程上的寫之前進行)

這給出了避免使用volatile關鍵字的理由:就算你能很好的理解這個例子,可是其它一起工作的開發者也理解么?在Test1Test2的兩次賦值之間使用全柵欄(或)可以解決這個問題。

volatile關鍵字不支持引用類型的參數和捕獲的局部變量:這些情況下你必須使用VolatileReadVolatileWrite方法。

VolatileRead 和 VolatileWritePermalink

使用Thread類上的靜態方法VolatileReadVolatileWrite讀/寫變量時,相當於volatile關鍵字產生的作用(技術上說,作用是其超集)。它們的實現相對低效,可是這是因為它們實際上使用了全柵欄。這是它們對於整型的實現:

public static void VolatileWrite (ref int address, int value) { MemoryBarrier(); address = value; } public static int VolatileRead (ref int address) { int num = address; MemoryBarrier(); return num; } 

可以看出來,如果調用VolatileWrite后緊接着調用VolatileRead,在它們中間是沒有屏障的:這會產生和我們之前看到的同樣的難題。

內存屏障和鎖Permalink

像前所述,Monitor.EnterMonitor.Exit都使用了全柵欄。因此,如果我們忽略鎖的互斥作用,可以這樣說:

lock (someField) { ... } 

相當於:

Thread.MemoryBarrier(); { ... } Thread.MemoryBarrier(); 

1.2InterlockedPermalink

無鎖代碼下,在讀寫字段時使用內存屏障往往是不夠的。在 64 位字段上進行加、減操作需要使用Interlocked工具類這樣更加重型的方式。Interlocked也提供了ExchangeCompareExchange方法,后者能夠進行無鎖的讀-改-寫(read-modify-write)操作,只需要額外增加一點代碼。

如果一條語句在底層處理器上被當作一個獨立不可分割的指令,那么它本質上是原子的(atomic)。嚴格的原子性可以阻止任何搶占的可能。對於 32 位(或更低)的字段的簡單讀寫總是原子的。而操作 64 位字段僅在 64 位運行時環境下是原子的,並且結合了多個讀寫操作的語句必然不是原子的:

class Atomicity { static int _x, _y; static long _z; static void Test() { long myLocal; _x = 3; // 原子的 _z = 3; // 32位環境下不是原子的(_z 是64位的) myLocal = _z; // 32位環境下不是原子的(_z 是64位的) _y += _x; // 不是原子的 (結合了讀和寫操作) _x++; // 不是原子的 (結合了讀和寫操作) } } 

在 32 位環境下讀寫 64 位字段不是原子的,因為它需要兩條獨立的指令:每條用於對應的 32 位內存地址。所以,如果線程 X 在讀一個 64 位的值,同時線程 Y 更新它,那么線程 X 最終可能得到新舊兩個值按位組合后的結果(一個撕裂讀(torn read))。

編譯器實現x++這種一元運算,是通過先讀一個變量,然后計算,最后寫回去的方式。考慮如下類:

class ThreadUnsafe { static int _x = 1000; static void Go() { for (int i = 0; i < 100; i++) _x--; } } 

拋開內存屏障的事情,你可能會認為如果 10 個線程並發運行Go,最終_x會為0。然而,這並不一定,因為可能存在競態條件(race condition),在一個線程完成讀取x的當前值,減少值,把值寫回這個過程之間,被另一個線程搶占(導致一個過期的值被寫回)。

當然,可以通過用lock語句封裝非原子的操作來解決這些問題。實際上,鎖如果一致的使用,可以模擬原子性。然而,Interlocked類為這樣簡單的操作提供了一個更方便更快的方案:

class Program { static long _sum; static void Main() { // _sum // 簡單的自增/自減操作: Interlocked.Increment (ref _sum); // 1 Interlocked.Decrement (ref _sum); // 0 // 加/減一個值: Interlocked.Add (ref _sum, 3); // 3 // 讀取64位字段: Console.WriteLine (Interlocked.Read (ref _sum)); // 3 // 讀取當前值並且寫64位字段 // (打印 "3",並且將 _sum 更新為 10 ) Console.WriteLine (Interlocked.Exchange (ref _sum, 10)); // 10 // 僅當字段的當前值匹配特定的值(10)時才更新它: Console.WriteLine (Interlocked.CompareExchange (ref _sum, 123, 10); // 123 } } 

Interlocked上的所有方法都使用全柵欄。因此,通過Interlocked訪問字段不需要額外的柵欄,除非它們在程序其它地方沒有通過Interlockedlock來訪問。

Interlocked的數學運算操作僅限於IncrementDecrement以及Add。如果你希望進行乘法或其它計算,在無鎖方式下可以使用CompareExchange方法(通常與自旋等待一起使用)。我們會在並行編程中提供一個例子。

Interlocked類通過將原子性的需求傳達給操作系統和虛擬機來進行實現其功能。

Interlocked類的方法通常產生 10ns 的開銷,是無競爭鎖的一半。此外,因為它們不會導致阻塞,所以不會帶來上下文切換的開銷。然而,如果在循環中多次迭代使用Interlocked,就可能比在循環外使用一個鎖的效率低(不過Interlocked可以實現更高的並發度)。

2使用 Wait 和 Pulse 進行信號同步Permalink

(譯者注:Pulse翻譯為脈沖,它和Wait都是作用在一個變量上:Wait等待一個變量上的脈沖,Pulse對一個變量發送脈沖。脈沖也是一種信號形式,相對於事件等待句柄那種鎖存信號,脈沖顧名思義是一種非鎖存或者說易失的信號)

之前我們討論了事件等待句柄,這是一種簡單的信號同步機制:一個線程阻塞直到收到另一個線程發來的通知。

還有個更強大的信號構造,由Monitor類通過兩個靜態方法WaitPulse(以及PulseAll)提供。原理是使用自定義的標識和字段(封裝在lock語句中)自行實現信號同步邏輯,然后引入WaitPulse控制防止自旋。僅僅使用這些方法和lock,你就可以實現AutoResetEventManualResetEvent以及Semaphore,還有WaitHandle的靜態方法WaitAllWaitAny的功能。此外,WaitPulse也可以用於所有等待句柄都不適用的情況。

但是,使用WaitPulse進行信號同步,對比事件等待句柄有以下缺點:

  • Wait / Pulse不能跨越應用程序域和進程使用。
  • 必須切記通過鎖保護所有信號同步邏輯涉及的變量。
  • 使用Wait / Pulse的程序可能會導致依賴微軟文檔的開發者困惑。

微軟文檔的問題的是就算你已經攻讀了解了WaitPulse是如何工作的,也還是無法明白它們該如何使用。WaitPulse會讓淺嘗輒止的人感到特別惡心:它們會尋找你理解中的漏洞然后折磨你取樂!幸運的是,有一種簡單的使用模式可以馴服WaitPulse

性能方面,在 2010 時代的桌面電腦上,調用Pulse花費大概 100ns 左右, 約是在等待句柄上調用Set三分之一的時間。等待無競爭信號的開銷完全取決於你,因為是你使用普通的字段和變量自行實現的邏輯。在實踐中上,這非常簡單,並且基本上相當於使用鎖的代價。

2.1如何使用 Wait 和 PulsePermalink

下面是如何使用WaitPulse:

1. 定義一個字段,作為同步對象,例如:

readonly object _locker = new object(); 

2. 定義一個或多個字段,作為自定義的阻塞條件,例如:

bool _go; /* 或 */ int _semaphoreCount; 

3. 當你希望阻塞的時候,使用下邊的代碼:

lock (_locker) while (/* <blocking-condition> */) Monitor.Wait (_locker); 

4. 當改變(或隱式改變)一個阻塞條件的時候,使用下邊的代碼:

lock (_locker) { // 修改會影響阻塞條件的字段或數據 // ... Monitor.Pulse(_locker); // 或: Monitor.PulseAll (_locker); } 

(如果想改變阻塞條件並等待,可以在一個lock內合並第 3 步和第 4 步)

這個模式允許任意線程在任意時間使用任意條件等待。下邊這個簡單的例子,一個線程等待直到_go字段被設置為true

class SimpleWaitPulse { static readonly object _locker = new object(); static bool _go; static void Main() { // 新線程會阻塞 new Thread (Work).Start(); // 因為 _go==false Console.ReadLine(); // 等待用戶敲回車 lock (_locker) // 現在喚醒線程 { // 通過設置 _go=true 然后 Pulse _go = true; Monitor.Pulse (_locker); } } static void Work() { lock (_locker) while (!_go) Monitor.Wait (_locker); // 當等待時鎖會被釋放 Console.WriteLine ("Woken!!!"); } } 

輸出結果:

Woken!!!   (按下回車鍵之后)

為了線程安全,我們確保所有共享字段的訪問都在鎖內。因此,在讀取和更新_go標識的地方都加上了lock語句。這很必要(除非你希望使用非阻塞同步的方式)。

Work方法會一直阻塞,等待_go標識變為trueMonitor.Wait方法按順序做了如下的操作:

  1. 釋放_locker上的鎖。
  2. 阻塞,直到收到_locker上的脈沖。
  3. 重新獲取_locker上的鎖。如果鎖已被占用,那么線程阻塞,直到鎖變為可用為止。

這意味着當Monitor.Wait在等待脈沖時,同步對象上的鎖沒有被持有。這並不是像代碼看上去那樣。

lock (_locker) { while (!_go) Monitor.Wait (_locker); // 鎖被釋放 // 鎖重新獲得 // ... } 

然后繼續執行下一條語句。Monitor.Wait被設計為在lock語句內使用,否則調用它會拋出一個異常。Monitor.Pulse也是一樣。

Main方法中,我們通過設置_go標識(在鎖內)和調用Pulse來給工作線程發信號。我們一釋放鎖,工作線程就可以繼續執行,繼續它的while循環。

PulsePulseAll方法可以釋放通過調用Wait阻塞的線程。Pulse最多釋放一個線程,而PulseAll釋放全部。在我們的例子中,只有一個線程被阻塞,所以它們在這個例子中效果是一樣的。如果有多個線程在等待,以我們建議的這個模式來說,調用PulseAll通常最安全。

為了Wait能夠和PulsePulseAll進行通信,必須使用同一個同步對象(我們的例子中的_locker)。

在我們的模式中,脈沖表示有些東西可能已經改變,等待線程應該重新檢查它們的阻塞條件。在Work方法內,檢查是通過while循環實現的。由等待方來決定是否要繼續運行,而不是通知方。如果把脈沖直接當作通知繼續的指令,那么Wait的構造就沒有任何價值了,這樣使用就相當於一個殘疾版的AutoResetEvent

如果我們拋棄該模式,移除while循環、_go標識以及ReadLine,就獲得了一個最基礎的Wait / Pulse的例子:

static void Main() { new Thread (Work).Start(); lock (_locker) Monitor.Pulse (_locker); } static void Work() { lock (_locker) Monitor.Wait (_locker); Console.WriteLine ("Woken!!!"); } 

這可能不會有輸出,因為它有不確定性!在主線程和工作線程之間存在競爭,如果Wait先執行,信號可以正常工作。如果Pulse先執行,它就會丟失,工作線程就永遠卡在那里等待。這與AutoResetEvent的行為不同,它的Set方法有一種記憶效果,或者說鎖存(latching)效果,所以即使它在WaitOne之前調用,仍然有效。

但是Pulse沒有鎖存效果,它需要你自行實現,就像我們之前使用的 “ go “ 標識。這就是為什么WaitPulse是萬能的原因:使用一個布爾標識,我們可以實現類似AutoResetEvent的功能;使用一個整型字段,可以實現 CountdownEventSemaphore。通過更復雜的數據結構,可以進一步實現類似生產者 / 消費者隊列這樣的構造。

2.2生產者 / 消費者隊列Permalink

之前,我們描述了生產者 / 消費者隊列的概念,以及如何通過AutoResetEvent來實現它。現在,我們通過WaitPulse來實現一個更強大的版本。

這次,我們將允許多個消費者,各自擁有它們自己的線程。使用一個數組來存放這些線程:

Thread[] _workers; 

這樣可以讓我們在關閉該隊列的時候Join這些線程。

每個工作線程會執行一個名為Consume的方法。我們可以在一個循環中創建和啟動線程,例如:

public PCQueue (int workerCount) { _workers = new Thread [workerCount]; // 為每個worker創建和啟動一個獨立的線程 for (int i = 0; i < workerCount; i++) (_workers [i] = new Thread (Consume)).Start(); } 

之前我們只是使用一個字符串來代表任務,這次使用一種更靈活的方式,即一個委托。我們使用 .NET Framework 中的System.Action委托,它定義如下:

public delegate void Action(); 

這個委托可以匹配任意無參方法,很像ThreadStart委托。當然我們也可以描述需要參數的任務,通過把調用封裝在匿名委托或 lambda 表達式中。

Action myFirstTask = delegate { Console.WriteLine ("foo"); }; Action mySecondTask = () => Console.WriteLine ("foo"); 

如之前一樣,使用Queue<T>來表示任務的隊列:

Queue<Action> _itemQ = new Queue<Action>(); 

在討論EnqueueItemConsume方法之前,先來看一下完整的代碼:

using System; using System.Threading; using System.Collections.Generic; public class PCQueue { readonly object _locker = new object(); Thread[] _workers; Queue<Action> _itemQ = new Queue<Action>(); public PCQueue (int workerCount) { _workers = new Thread [workerCount]; // 為每個worker創建和啟動一個獨立的線程 for (int i = 0; i < workerCount; i++) (_workers [i] = new Thread (Consume)).Start(); } public void Shutdown (bool waitForWorkers) { // 為每個線程加入一個 null 任務,使它們退出 foreach (Thread worker in _workers) EnqueueItem (null); // 等待工作線程完成 if (waitForWorkers) foreach (Thread worker in _workers) worker.Join(); } public void EnqueueItem (Action item) { lock (_locker) { _itemQ.Enqueue (item); // 因為改變了阻塞條件 Monitor.Pulse (_locker); // 所以發送脈沖通知 } } void Consume() { while (true) // 繼續消費直到 { // 收到通知 Action item; lock (_locker) { while (_itemQ.Count == 0) Monitor.Wait (_locker); item = _itemQ.Dequeue(); } if (item == null) return; // 通知我們退出 item(); // 執行任務 } } } 

我們的退出策略是:向隊列中加入一個 null 項目來通知消費者在完成任何未完成的項目后退出。(如果我們希望更快退出,可以使用一個獨立的 “ cancel “ 標識)。因為支持多個消費者,所以我們必須為每個消費者在隊列中加入一個 null 項目來讓隊列完全關閉。

這里的Main方法,用來啟動一個生產者 / 消費者隊列。它指定了兩個並發的消費者線程,然后向隊列中加入 10 個委托,它們將在兩個消費者之間共享。

static void Main() { PCQueue q = new PCQueue (2); Console.WriteLine ("Enqueuing 10 items..."); for (int i = 0; i < 10; i++) { int itemNumber = i; // 為了避免被捕獲的變量陷阱 q.EnqueueItem (() => { Thread.Sleep (1000); // 模擬耗時任務 Console.Write (" Task" + itemNumber); }); } q.Shutdown (true); Console.WriteLine(); Console.WriteLine ("Workers complete!"); } 

輸出結果:

Enqueuing 10 items...
 Task1 Task0 (pause...) Task2 Task3 (pause...) Task4 Task5 (pause...)
 Task6 Task7 (pause...) Task8 Task9 (pause...)
Workers complete!

現在我們來看一下EnqueueItem方法:

public void EnqueueItem (Action item) { lock (_locker) { _itemQ.Enqueue (item); // 因為改變了阻塞條件 Monitor.Pulse (_locker); // 所以發送脈沖通知 } } 

因為這個隊列是在多線程環境下使用,所以必須對所有的讀寫操作加鎖。並且因為改變了阻塞條件(由於向隊列中加入了一個任務,一個消費者可以開始動作),所以需要調用Pulse來送脈沖信號。

出於對效率的考慮,當加入一個項目時,我們調用Pulse而不是PulseAll。這是因為每個項目只需要喚醒(至多)一個消費者。如果你只有一個冰激凌,你不會把一個班 30 個正在睡覺的孩子都叫起來排隊獲取它。同樣地,有 30 個消費者,把它們全部喚醒沒有任何好處,那樣僅僅是在它們重新回去睡覺前進行了一共 29 次毫無用處的while迭代。總之,在這里使用Pulse而不是PulseAll不會破壞任何功能。

現在,來看一下Comsume方法,一個工作線程從隊列中取出並執行一個項目。我們希望工作線程沒什么事情做的時候,或者說當隊列中沒有任何項目時,它們應該被阻塞。因此,我們的阻塞條件是_itemQ.Count == 0

      Action item; lock (_locker) { while (_itemQ.Count == 0) Monitor.Wait (_locker); item = _itemQ.Dequeue(); } if (item == null) return; // 通知我們退出 item(); 

_itemQ.Count非 0 時,while循環退出,意味着(至少)有一個項目尚未完成。我們必須在釋放鎖之前取出這個項目,否則,當我們取它時,它可能已經不在隊列里了(存在其它線程的情況下,事情可能在你眨眼的瞬間發生變化!)。特別的,如果我們沒有持有鎖,其它那些剛完成一個之前工作的消費者可以偷偷進來取走我們的項目,例如如果使用下邊的代碼:

      Action item; lock (_locker) { while (_itemQ.Count == 0) Monitor.Wait (_locker); } lock (_locker) // 錯! { item = _itemQ.Dequeue(); // 項目也許已經不在了! } ... 

在項目被取出后,我們立即釋放鎖。如果我們在執行任務期間一直持有鎖,就可能會造成其它消費者和生產者沒有意義的阻塞。我們沒有在取出項目之后發送脈沖,是因為此時如果隊列中還有項目的話,其它線程都不可能解除阻塞。(譯者注:此時如果隊列已經為空,那么沒有必要再喚醒其它線程;而如果隊列不為空,意味着其它線程要么在執行任務,要么正在等待鎖被釋放,都沒有必要再通知其它線程。)

當我們使用WaitPulse時(還有在一般情況下),鎖定短暫些比較好,它避免了其它線程不必要的阻塞。可以鎖定很多行代碼,只要保證它們都執行的很快。記住,使用Monitor.Wait來等待脈沖時,它幫你釋放了鎖!

2.3等待超時Permalink

調用Wait方法時,你可以設定一個超時時間,可以是毫秒或TimeSpan的形式。如果因為超時而放棄了等待,那么Wait方法就會返回false。超時時間僅適用於等待階段(waiting phase),帶有超時的Wait方法按下列步驟進行:

  1. 釋放鎖
  2. 阻塞,直到收到脈沖或超時
  3. 重新獲取鎖

設置超時時間就像是讓 CLR 在超時時間到達時給你一個“虛擬的脈沖(virtual pulse)”。超時的Wait仍會執行第 3 步,重新獲取鎖,就像收到脈沖后一樣。

如果Wait在進行第 3 步(重新獲取鎖)的時候被阻塞,超時時間會被忽略。然而這是個很少見的情況,因為在有良好設計的Wait / Pulse應用中,其它線程僅會鎖定很短的時間,所以重新獲取鎖應該是一個幾乎立即完成的操作。

Wait的超時時間有一個有用的地方。有時,當阻塞條件改變時,可能無法使用Pulse。一個例子是阻塞條件涉及調用一個方法,它通過定期查詢數據庫來獲取信息。如果延遲不是問題,解決方案就很簡單:可以在調用Wait時設置一個超時時間。

lock (_locker) while (/* <blocking-condition> */) Monitor.Wait (_locker, /* <timeout> */); 

這樣可以強迫以指定的超時時間為間隔重新檢查阻塞條件,和收到脈沖一樣。阻塞條件越簡單,超時時間可以越短,避免影響效率。這種情況下,我們可以不關心Wait是接收到了脈沖還是超時了,因此我們可以忽略它的返回值。

如果由於程序 bug 造成脈沖丟失,超時時間同樣可以很好的工作。在同步很復雜的程序中可以給所有Wait指定超時時間,作為可能的脈沖丟失錯誤的備份。如果程序之后被其他人修改,但沒能正確使用Pulse,這樣也可以在一定程度上免疫 bug!

Monitor.Wait返回一個布爾值表示是否獲得了一個“真實的”脈沖。如果返回false,代表超時了。如果超時並不是期待的行為,可以記錄日志或拋出異常。

等待隊列Permalink

當多個線程使用Wait等待同一對象時,就會形成了一個“等待隊列(waiting queue)”(和用於等待獲得鎖的“就緒隊列(ready queue)”不同)。每次調用Pulse時,會釋放排在等待隊列最前面的那個線程,它會進入就緒隊列,然后重新獲取鎖。可以把它想象成一個自動停車場,你首先在收費處前排隊(等待隊列)驗票,然后再在閘門前排隊(就緒隊列)出去。

Wait 和 Pulse

這個隊列結構天生保證順序,但是對於Wait / Pulse的應用來說通常不重要,在這種場景下,把它想象成一個等待線程的“池(pool)”更好理解,每次調用Pulse都會從池中釋放一個等待線程。

PulseAll釋放整個等待隊列或者說等待池。接收到脈沖的線程不會在此時同時開始執行,而是順序的執行,因為每個Wait語句都要試圖重新獲取同一個鎖。效果就是,PulseAll將線程從等待隊列移到就緒隊列中,讓它們可以按順序繼續執行。

2.4雙向信號和競爭Permalink

Monitor.Pulse的一個重要特性是異步執行,這意味着它不會阻塞或暫停當前線程。如果另一個線程在等待時收到脈沖,它會解除阻塞,否則,脈沖就沒有任何效果或者說被忽略掉了。

Pulse提供了一種單向通信機制:發送脈沖的線程向等待的線程發送信號。它沒有內在的確認機制:Pulse不會返回一個值來表示信號是否被收到。此外,當發送了信號並釋放鎖后,並不能保證一個符合要求的等待線程能立即開始工作。因為線程調度器的調度可能存在小小的延遲,在這期間並沒有任何線程擁有鎖。這意味着脈沖的發送方很難知道等待的線程是否或何時能繼續運行,除非你的代碼做了特別的處理(例如使用WaitPulse在另一個標識上做信號同步來通知)。

使用PulseWait時,如果沒有自定義的確認機制,卻想依賴等待線程能夠及時響應的話,可以認為是胡來。你會搞砸的!

為舉例說明,假設我們希望連續向一個線程發 5 次信號:

class Race { static readonly object _locker = new object(); static bool _go; static void Main() { new Thread (SaySomething).Start(); for (int i = 0; i < 5; i++) lock (_locker) { _go = true; Monitor.PulseAll (_locker); } } static void SaySomething() { for (int i = 0; i < 5; i++) lock (_locker) { while (!_go) Monitor.Wait (_locker); _go = false; Console.WriteLine ("Wassup?"); } } } 

期待的結果:

Wassup?
Wassup?
Wassup?
Wassup?
Wassup?

實際的結果:

Wassup? (沒啦)

這個程序是有缺陷的,它展示了一個競爭狀態(race condition ,競態條件):主線程中的for循環可能在工作線程沒有持有鎖的時間內,就立即空轉完成了 5 次迭代,甚至可能是在工作線程啟動之前!生產者 / 消費者隊列的例子中不會面臨這個問題,因為如果主線程跑在了工作線程前面,請求都會加入隊列。但在這里的例子中,如果工作線程仍然在處理之前的任務,就需要主線程在每次迭代時阻塞。

可以通過為這個類增加一個由工作線程控制的_ready標識來解決這個問題。主線程在設置_go標識之前等待工作線程就緒。

這類似於一個之前的例子,那是使用AutoResetEvent來實現相同的事情,只不過這個的擴展性更好。

代碼如下:

class Solved { static readonly object _locker = new object(); static bool _ready, _go; static void Main() { new Thread (SaySomething).Start(); for (int i = 0; i < 5; i++) lock (_locker) { while (!_ready) Monitor.Wait (_locker); _ready = false; _go = true; Monitor.PulseAll (_locker); } } static void SaySomething() { for (int i = 0; i < 5; i++) lock (_locker) { _ready = true; Monitor.PulseAll (_locker); // 記得調用 while (!_go) Monitor.Wait (_locker); // Monitor.Wait 會釋放 go = false; // 並重新獲取鎖 Console.WriteLine ("Wassup?"); } } } 

結果:

Wassup? (重復 5 次)

Main方法中,我們清除_ready標識,設置_go標識,然后發送脈沖,這些都在同一個lock語句中。好處是:如果我們之后引入第三個線程,這樣做就提高了健壯性。想象一下另一個線程試圖同時給工作線程發信號。在這種情況下,我們的邏輯也滴水不漏,在效果上,我們清除_ready和設置_go的操作是原子的。

2.5模擬等待句柄Permalink

你可能注意到了之前例子中的一個模式:兩個等待循環都有如下的結構:

lock (_locker) { while (!_flag) Monitor.Wait (_locker); _flag = false; // ... } 

_flag會在另一線程中被設置為true。這段代碼在效果上看,它模仿了一個AutoResetEvent。如果去掉_flag=false,就得到了ManualResetEvent的基礎功能。

讓我們補完使用WaitPulse來實現ManualResetEvent的完整代碼:

readonly object _locker = new object(); bool _signal; void WaitOne() { lock (_locker) { while (!_signal) Monitor.Wait (_locker); } } void Set() { lock (_locker) { _signal = true; Monitor.PulseAll (_locker); } } void Reset() { lock (_locker) _signal = false; } 

使用PulseAll,是因為可能存在多個被阻塞的等待線程。

實現AutoResetEvent非常簡單,只需要將WaitOne方法內改為:

lock (_locker) { while (!_signal) Monitor.Wait (_locker); _signal = false; } 

然后將Set方法內的PulseAll替換成Pulse

 lock (_locker) { _signal = true; Monitor.Pulse (_locker); } 

使用PulseAll會放棄等待隊列的公平性,因為每次調用PulseAll都會導致隊列被破壞並重建。

_signal替換為一個整型字段可以得到Semaphore的基礎功能。

在簡單的場景中,模擬跨多個等待句柄工作的靜態方法也很容易。調用WaitAll只不過相當於使用一阻塞條件,它結合了所有等待句柄使用的標識:

lock (_locker) while (!_flag1 && !_flag2 && !_flag3...) Monitor.Wait (_locker); 

這特別適用於WaitAll因為舊有的 COM 問題不可用的場景。模擬WaitAny也很容易,只要把 &&操作符替換成||就可以了。

如果有很多標識,這個方式就會變得低效。這是因為,為了保證信號同步的原子性,它們必須全部共享一個同步對象。在這個方面,等待句柄更有優勢。

2.6實現 CountdownEventPermalink

使用WaitPulse,我們也可以實現CountdownEvent的基本功能:

public class Countdown { object _locker = new object (); int _value; public Countdown() { } public Countdown (int initialCount) { _value = initialCount; } public void Signal() { AddCount (-1); } public void AddCount (int amount) { lock (_locker) { _value += amount; if (_value <= 0) Monitor.PulseAll (_locker); } } public void Wait() { lock (_locker) while (_value > 0) Monitor.Wait (_locker); } } 

這個模式和之前見到的很像,不同之處就是阻塞條件基於一個整型字段。

2.7線程會合Permalink

可以使用剛剛寫的Countdown類,來實現兩個線程的會合。和之前的 WaitHandle.SignalAndWait可以用於會合一對線程一樣:

class Rendezvous { static object _locker = new object(); // 在 Framework 4.0 中, 我們可以改為使用內置的 CountdownEvent 類。 static Countdown _countdown = new Countdown(2); public static void Main() { // 每個線程都睡眠一段隨機時間 Random r = new Random(); new Thread (Mate).Start (r.Next (10000)); Thread.Sleep (r.Next (10000)); _countdown.Signal(); _countdown.Wait(); Console.Write ("Mate! "); } static void Mate (object delay) { Thread.Sleep ((int) delay); _countdown.Signal(); _countdown.Wait(); Console.Write ("Mate! "); } } 

在這個例子中,每個線程睡眠一段隨機時間,然后等待其它線程,結果是在(幾乎)相同的時間它們都打印了 “ Meta “。這被稱為線程執行屏障(thread execution barrier),並且可以擴展到任意多個線程(通過調整計數器的初始值)。

當你想讓多個線程執行一個系列任務,希望它們步調一致時,可以用到線程執行屏障。然而,我們現在的解決方案有一定限制:我們不能重用同一個Countdown對象來第二次會合線程,至少在沒有額外信號構造的情況下不能。為解決這個問題,Framework 4.0 提供了一個新的類Barrier

3Barrier 類Permalink

Barrier類是 Framework 4.0 加入的一個信號構造。它實現了線程執行屏障(thread execution barrier),允許多個線程在一個時間點會合。這個類非常快速和高效,它是建立在Wait / Pulse和自旋鎖基礎上的。

使用這個類的步驟是:

  1. 實例化它,指定有多少個線程參與會合(你可以在之后調用AddParticipants / RemoveParticipants來進行更改)。
  2. 當希望會合時,在每個線程上都調用SignalAndWait

實例化Barrier,參數為 3 ,意思是調用SignalAndWait會被阻塞直到該方法被調用 3 次。但與CountdownEvent不同,它會自動復位:再調用SignalAndWait仍會阻塞直到被調用 3 次。這允許你保持多個線程“步調一致”,讓它們執行一個系列任務。

Barrier

下邊的例子中,三個線程每個都與其它線程步調一致地打印數字 0 到 4:

static Barrier _barrier = new Barrier (3); static void Main() { new Thread (Speak).Start(); new Thread (Speak).Start(); new Thread (Speak).Start(); } static void Speak() { for (int i = 0; i < 5; i++) { Console.Write (i + " "); _barrier.SignalAndWait(); } } 

結果:

0 0 0 1 1 1 2 2 2 3 3 3 4 4 4

Barrier類的一個非常有用的功能是,構造它時你可以指定一個后期動作(post-phase action)。這是一個委托,在SignalAndWait被調用 n 次后,且在線程解除阻塞之前運行。在我們的例子中,如果像這樣實例化Barrier

static Barrier _barrier = new Barrier (3, barrier => Console.WriteLine()); 

結果會是:

0 0 0
1 1 1
2 2 2
3 3 3
4 4 4

后期動作可以用來合並來自每個工作線程的數據。不用擔心搶占,因為當它被執行時,所有的工作線程都是被阻塞的。

4讀 / 寫鎖Permalink

通常,一個類型的實例對於並發讀操作是線程安全的,但對並發的更新操作卻不是(並發讀然后更新也不是)。這對於像文件這種資源也是同樣。盡管可以簡單的對所有訪問都使用排它鎖來確保這種類型的實例是線程安全的,但對於有很多讀操作而只有少量更新操作的情況,它就會過度限制並發能力。舉個栗子,在一個商業應用服務中,為了實現快速檢索,數據通常會緩存在靜態字段中。在這種情況下,ReaderWriterLockSlim類被設計用來提供高可用性的鎖。

ReaderWriterLockSlim是 Framework 3.5 加入的,替代以前ReaderWriterLock這個“胖(fat)”版本。后者擁有類似的功能,但運行速度慢幾倍,並且處理鎖升級的機制也存在設計缺陷。

然而如果與普通的lockMonitor.Enter / Exit)對比,ReaderWriterLockSlim要比它慢一倍。

這兩個類都有兩種基本類型的鎖,讀鎖和寫鎖:

  • 寫鎖完全的排它。
  • 讀鎖可以與其它的讀鎖相容。

所以,一個線程持有寫鎖會阻塞其它想要獲取讀鎖或寫鎖的線程(反之亦然)。而如果沒有線程持有寫鎖,任意數量的線程可以同時獲取讀鎖。

ReaderWriterLockSlim定義了如下的方法來獲取和釋放讀 / 寫鎖:

public void EnterReadLock(); public void ExitReadLock(); public void EnterWriteLock(); public void ExitWriteLock(); 

另外,對應所有EnterXXX的方法,都有相應的 “ Try “ 版本,可以接受一個超時參數,方式與Monitor.TryEnter類似(如果資源上存在大量競爭,超時會很容易產生)。ReaderWriterLock提供了類似的方法,名為AcquireXXX以及ReleaseXXX。如果出現超時的情況,它們會拋出ApplicationException異常,而不是返回false

下邊的程序演示了ReaderWriterLockSlim的用法。三個線程不停枚舉一個列表,同時另外兩個線程每秒向列表中加入一個隨機數。讀鎖保護對列表的讀,寫鎖保護對列表的寫。

class SlimDemo { static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim(); static List<int> _items = new List<int>(); static Random _rand = new Random(); static void Main() { new Thread (Read).Start(); new Thread (Read).Start(); new Thread (Read).Start(); new Thread (Write).Start ("A"); new Thread (Write).Start ("B"); } static void Read() { while (true) { _rw.EnterReadLock(); foreach (int i in _items) Thread.Sleep (10); _rw.ExitReadLock(); } } static void Write (object threadID) { while (true) { int newNumber = GetRandNum (100); _rw.EnterWriteLock(); _items.Add (newNumber); _rw.ExitWriteLock(); Console.WriteLine ("Thread " + threadID + " added " + newNumber); Thread.Sleep (100); } } static int GetRandNum (int max) { lock (_rand) return _rand.Next(max); } } 

在用於生產環境的代碼中,通常需要添加try / finally塊來確保拋出異常時鎖能夠被釋放。

結果:

Thread B added 61
Thread A added 83
Thread B added 55
Thread A added 33
...

相比簡單的lockReaderWriterLockSlim允許更多的並發讀操作。為說明這一點,可以在Write方法中while循環開始的位置插入如下代碼:

Console.WriteLine (_rw.CurrentReadCount + " concurrent readers"); 

幾乎總會打印 “ 3 concurrent readers “(Read方法花費絕大多數時間在foreach循環中)。像CurrentReadCount一樣,ReaderWriterLockSlim提供了如下屬性用來監視鎖:

public bool IsReadLockHeld { get; } public bool IsUpgradeableReadLockHeld { get; } public bool IsWriteLockHeld { get; } public int WaitingReadCount { get; } public int WaitingUpgradeCount { get; } public int WaitingWriteCount { get; } public int RecursiveReadCount { get; } public int RecursiveUpgradeCount { get; } public int RecursiveWriteCount { get; } 

4.1可升級鎖和遞歸Permalink

有時,需要在一個原子操作中將一個讀鎖升級為寫鎖。例如,假設你向列表中添加一個條目,希望當它不在列表中時才進行添加。理想狀態下,你希望持有寫(排它)鎖的時間應盡可能短,所以可能進行下列步驟:

  1. 獲取一個讀鎖。
  2. 檢查條目是否已經在列表中存在,如果存在則釋放鎖並返回。
  3. 釋放讀鎖。
  4. 獲取一個寫鎖。
  5. 添加條目。

問題是在第 3 步和第 4 步中間,另一個線程可以偷偷進入並修改了列表(例如,添加了一個相同的條目)。ReaderWriterLockSlim提供了第三種鎖來解決這個問題,叫做可升級鎖(upgradeable lock)。可升級鎖很像讀鎖,除了它可以在一個原子操作中被升級為寫鎖。下面是用法:

  1. 調用EnterUpgradeableReadLock.
  2. 進行讀操作(例如檢查條目在列表中是否已經存在)。
  3. 調用EnterWriteLock(將可升級鎖轉換為寫鎖)。
  4. 進行寫操作(例如向列表中添加條目)。
  5. 調用ExitWriteLock(將寫鎖轉換回可升級鎖)。
  6. 執行其它的讀操作。
  7. 調用ExitUpgradeableReadLock

從調用方角度看,它很像一個嵌套或遞歸的鎖。從功能上講,在第 3 步,ReaderWriterLockSlim釋放讀鎖,然后獲取寫鎖的操作是原子的。

可升級鎖和讀鎖的另一個重要的區別是:可升級鎖可以與任意數量的讀鎖共存,但同時只能有一個可升級鎖被獲取。這是通過“串行化”鎖的轉換來防止轉換時發生死鎖,就像 SQL Server 中的更新鎖一樣。(譯者注:可升級鎖轉換為寫鎖的時候,由於寫鎖排它,需要保證不存在其它的鎖,或者說需要等待其它鎖釋放。如果可以同時存在兩個可升級鎖,其中一個想要轉換為寫鎖,就必須等待另一個釋放,而如果這時另一個可升級鎖也想要轉換為寫鎖,就造成了互相等待,死鎖就產生了)

SQL Server ReaderWriterLockSlim
共享鎖(Share lock) 讀鎖(Read lock)
排它鎖(Exclusive lock) 寫鎖(Write lock)
更新鎖(Update lock) 可升級鎖(Upgradeable lock)

我們下面演示更新鎖的用法,修改之前例子中的Write方法,僅當數字不在列表中的時候才添加它:

while (true) { int newNumber = GetRandNum (100); _rw.EnterUpgradeableReadLock(); if (!_items.Contains (newNumber)) { _rw.EnterWriteLock(); _items.Add (newNumber); _rw.ExitWriteLock(); Console.WriteLine ("Thread " + threadID + " added " + newNumber); } _rw.ExitUpgradeableReadLock(); Thread.Sleep (100); } 

ReaderWriterLock也可以進行鎖的轉換,但它不可靠,因為它不支持可升級鎖這個概念。這就是為什么ReaderWriterLockSlim的設計者不得不使用一個新類來重新實現。

鎖的遞歸Permalink

通常,ReaderWriterLockSlim禁止嵌套或遞歸鎖定。因此,如下的代碼會拋出異常:

var rw = new ReaderWriterLockSlim(); rw.EnterReadLock(); rw.EnterReadLock(); rw.ExitReadLock(); rw.ExitReadLock(); 

然而,如果你使用下邊的代碼來構造ReaderWriterLockSlim,就可以正常運行:

var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion); 

這確保了只有在你計划使用時,遞歸鎖定才可以使用。遞歸鎖定會產生額外的復雜度,因為可能會同時獲取不只一種類型的鎖:

rw.EnterWriteLock(); rw.EnterReadLock(); Console.WriteLine (rw.IsReadLockHeld); // True Console.WriteLine (rw.IsWriteLockHeld); // True rw.ExitReadLock(); rw.ExitWriteLock(); 

基本原則是,一旦你已經獲取了一個鎖,之后的遞歸鎖定的級別可以更小但不能更大,級別順序如下:讀鎖,可升級鎖,寫鎖

然而,把可升級鎖提升為寫鎖總是合法的。

5掛起和恢復Permalink

通過已過時的方法Thread.SuspendThread.Resume,線程可以顯式的掛起和恢復。這個機制與阻塞是完全獨立的。這兩套機制可以獨立且並行的作用。

一個線程可以掛起它自己或另一個線程,調用Suspend使線程暫時進入SuspendRequested狀態,然后在到達垃圾回收的一個安全點時,會進入Suspended狀態。從這開始,它只能通過另一個線程調用Resume方法來恢復。Resume只對掛起的線程有用,而不是阻塞的線程。

從 .NET 2.0 開始,SuspendResume就已經被反對使用了,因為任意掛起另一個線程是危險的。如果一個持有重要資源上的鎖的線程被掛起,整個程序(或計算機)都可能會產生死鎖。這遠比調用Abort更危險,Abort還可以通過finally塊中的代碼來釋放鎖(至少理論上可以)。

然而,在當前線程上調用Suspend是安全的,並且這樣做你可以實現一個簡單的同步機制:工作線程在一個循環中執行完任務,然后調用Suspend,等待主線程在有新的任務時將其恢復(“喚醒”)。但是難點是判斷工作線程是否被掛起了,考慮如下代碼:

worker.NextTask = "MowTheLawn"; if ((worker.ThreadState & ThreadState.Suspended) > 0) worker.Resume; else // 當線程正在運行時,我們不能調用Resume, // 而是用一個標識來通知工作線程: worker.AnotherTaskAwaits = true; 

這完全不是線程安全的:在工作線程執行期間以及改變狀態的時候,這 5 行代碼的任意一點都可能被搶占。盡管也有修復的辦法,但是比其它方案,比如使用AutoResetEventWait 和 Pulse這樣的同步構造更為復雜。這使得SuspendResume在所有情況下都毫無用處。

不建議使用的SuspendResume有兩個特點:危險和無用!

6中止線程Permalink

可以通過調用Abort方法來強制結束一個線程:

class Abort { static void Main() { Thread t = new Thread (delegate() { while(true); } ); // 永久自旋 t.Start(); Thread.Sleep (1000); // 讓它運行一秒... t.Abort(); // 然后中止它 } } 

線程被中止時會立即進入AbortRequested狀態。如果它如預期一樣中止了,就會進入Stopped狀態。調用方可以調用Join來等待這個過程完成:

class Abort { static void Main() { Thread t = new Thread (delegate() { while (true); } ); Console.WriteLine (t.ThreadState); // Unstarted 狀態 t.Start(); Thread.Sleep (1000); Console.WriteLine (t.ThreadState); // Running 狀態 t.Abort(); Console.WriteLine (t.ThreadState); // AbortRequested 狀態 t.Join(); Console.WriteLine (t.ThreadState); // Stopped 狀態 } } 

調用Abort會在目標線程上拋出ThreadAbortException異常,大多數情況下都會發生在線程正在執行的點。線程被中止時可以選擇處理異常,但異常會在catch塊的最后自動被重新拋出(用來確保線程能夠如願結束)。但是,可以通過在catch塊中調用Thread.ResetAbort來阻止異常被自動重新拋出。之后,線程重新進入Running狀態(從這開始,它可能被再次中止)。在下邊的例子中,每當Abort試圖中止的時候,工作線程都會起死回生:

class Terminator { static void Main() { Thread t = new Thread (Work); t.Start(); Thread.Sleep (1000); t.Abort(); Thread.Sleep (1000); t.Abort(); Thread.Sleep (1000); t.Abort(); } static void Work() { while (true) { try { while (true); } catch (ThreadAbortException) { Thread.ResetAbort(); } Console.WriteLine ("I will not die!"); } } } 

ThreadAbortException被運行時特殊對待,使它在沒有被處理情況下,不會導致整個程序結束,這與其它類型的異常不同。

Abort幾乎對處於任何狀態的線程都有效:RunningBlockedSuspended以及Stopped。然而,當掛起的線程被中止時,ThreadStateException異常會被拋出。這次是在調用方線程中,中止會直到線程之后恢復時才會起作用。下邊是如何中止一個掛起的線程:

try { suspendedThread.Abort(); } catch (ThreadStateException) { suspendedThread.Resume(); } // 現在 suspendedThread 才會中止 

6.1Thread.Abort 的復雜問題Permalink

假設一個被中止的線程沒有調用ResetAbort,你也許會覺得它能快速結束。但如果碰巧這個線程有個“好律師”,它可能會繼續在死亡線上逗留相當長一段時間!以下是可能使它保持AbortRequested狀態的一些因素:

  • 靜態構造方法在執行中途永遠不會被中止(以免破壞該類的狀態,它在應用程序域之后的生命周期中還存在)
  • 所有的catch / finally塊都很重要,不會在中途中止。
  • 如果線程正執行非托管的代碼時被中止,會繼續直到執行到托管代碼時。

最后這個因素特別麻煩,.NET Framework 本身就經常調用非托管代碼,有時還會持續很長一段時間。例如,使用網絡或數據庫類的時候,如果網絡資源或數據庫無法連接或響應很慢,就有可能使執行始終停留在非托管代碼中,也許幾分鍾,這依賴於類的實現。在這些情況下,當然不能用Join來等待中止線程,至少在沒有指定超時時間的情況下不能!

中止純 .NET 代碼沒多大問題,只要使用try / finally塊或using語句在ThreadAbortException被拋出時進行適當地清理。然而即使這樣,還是可能碰到“驚喜”。例如,考慮下邊的代碼:

using (StreamWriter w = File.CreateText ("myfile.txt")) w.Write ("Abort-Safe?"); 

C# 的using語句是一個語法糖,它可以擴展為如下代碼:

StreamWriter w; w = File.CreateText ("myfile.txt"); try { w.Write ("Abort-Safe"); } finally { w.Dispose(); } 

Abort有可能發生在StreamWriter創建之后但是在try塊之前。實際上,通過分析 IL,可以看出它也有可能發生在StreamWriter被創建和賦值給w之間:

IL_0001: ldstr "myfile.txt" IL_0006: call class [mscorlib]System.IO.StreamWriter [mscorlib]System.IO.File::CreateText(string) IL_000b: stloc.0 .try { // ... 

無論是哪種,finally塊中的Dispose調用都會被繞開,導致打開文件的句柄被丟棄,在這之后就無法創建myfile.txt文件,直到進程結束。

在現實中,這個例子的情況可能更糟,因為Abort最可能發生在File.CreateText的實現中。這里是不透明的代碼,我們沒有它的源碼。幸運的是,.NET 的代碼不是真正不透明的:我們可以借助 ILDASM 或者更好用的 Reflector(譯者注:還有微軟的reference source),來了解File.CreateText是如何調用StreamWriter的構造方法的,邏輯如下:

public StreamWriter (string path, bool append, ...) { ... ... Stream stream1 = StreamWriter.CreateFile (path, append); this.Init (stream1, ...); } 

在這個構造方法中,沒有try / catch語句,意味着如果Abort發生在(復雜的)Init方法內,新創建的流將被丟棄,且無法關閉底層文件句柄。

這產生了如何編寫“中止友好(abort-friendly)”方法的問題。最常用的辦法就是根本不要中止其它線程,而是如前所述實現一個協作取消模式。

6.2結束應用程序域Permalink

另一個實現友好中止工作線程的方式是:讓工作線程在自己的應用程序域中運行。調用 Abort后,卸載並重建應用程序域。這樣就無所謂因為部分或不正確的初始化造成的狀態損壞(然而不幸的是,這樣也無法應對上邊描述的最壞情況,中止StreamWriter的構造方法還是可能導致非托管句柄泄露)。

嚴格來講,上面第一步的中止線程是不必要的,因為當應用程序域卸載時,域內的所有線程都會被自動中止。不過,依賴這個行為的缺點是:如果被中止的線程沒有及時結束(可能由於finally中的代碼,或之前討論的其它原因),應用程序域不會完成卸載,並且會在調用方拋出CannotUnloadAppDomainException異常。由於這個原因,在卸載應用程序域之前,最好還是顯式中止線程,調用Join並指定一個超時時間(可以由你控制)。

對於線程活動的世界來說,創建和銷毀一個應用程序域是相對耗時的操作(大約幾毫秒),因此最好不要頻繁使用。並且,因為應用程序域引入的隔離又產生了新的問題,它可能是好處也可能是壞處,這取決於該多線程程序是用來做什么。例如,在單元測試的場景下,將線程運行在單獨的應用程序域中就是好處。

6.3結束進程Permalink

另一個可以使線程結束的方式是結束其所在進程。舉個栗子,當工作線程的IsBackground屬性被設置為true,並且在工作線程還在運行的時候主線程結束了。這時后台線程不能保持應用程序存活,所以進程結束,后台線程也一起結束。

當線程由於其所在進程結束而結束時,finally塊中的代碼都不會被執行。

如果用戶通過 Windows 任務管理器或以編程的方式通過Process.Kill來結束進程,也是相同的情況


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM