一、非阻止同步
.NET framework 非阻止同步結構完成一些簡單操作而不 用阻止,暫停或等待。它涉及到如何使用 嚴格地原子操作,告訴編譯器用 "volatile" 讀和寫的語法,有時候這種方式要比用鎖還 要簡單。
原子和互鎖
如果一個語句執行一個單獨不可分割的指令,那么它是原子的。 嚴格的原子操作排除了任何搶占的可能性。在C#中,一個簡單 的讀操作或給一個少於等與32位的字段賦值 是原子操作(假設為32位CPU)。更大的字段以及大於一個 的讀/寫操作的組合的操作都是非原子的:
class Atomicity { static int x, y; static long z; static void Test() { long myLocal; x = 3; // 原子的 z = 3; // 非原子的 (z 是 64 位) myLocal = z; // 非原子的 (z 是 64 位) y += x; // 非原子的 (讀和寫的操作) x++; // 非原子的 (讀和寫的操作) } }
在32位的計算機上讀和寫64位字段是非原子的是因為2個不同的32位的存儲單元是息息相關的。如果線程A讀一個64位的值, 而另一個線程B正在更新它,線程A會最后得到一個按位組合的老值和新值的結合體。
像x++這樣的一元運算符需要首先讀變量,然后處理它,再寫回值給它。考慮下面的類:
class ThreadUnsafe { static int x = 1000; static void Go() { for (int i = 0; i < 100; i++) x--; } }
你可能會期待如果10個線程並行運行Go,然后x最后得到0。 但這並得不到保證,因為一個線程搶占了另一個正在檢索x的 值,或減少它,或寫回它(導致一個過期的值被寫入)。
解決這個問題的一個方式就是在語句周圍套上lock 語句。鎖定,實際上是模擬原子操作。 Interlocked類提供了一個簡單快 速的簡單原子鎖的方案:
class Program { static long sum; static void Main() { // sum // 簡單地增/減操作: Interlocked.Increment(ref sum); // 1 Interlocked.Decrement(ref sum); // 0 // 加減一個值: Interlocked.Add(ref sum, 3); // 3 // 讀一個64位字段: Console.WriteLine(Interlocked.Read(ref sum)); // 3 // 當正在讀之前的值時同時寫一個64位的值: // (當正在更新sum為10的時候,這里打印的是3) Console.WriteLine(Interlocked.Exchange(ref sum, 10)); // 10 // 更新一個字段僅當它符合一個特定的值時(10): Interlocked.CompareExchange(ref sum, 123, 10); // 123 } }
使用 Interlocked比用lock更有效,因為它從不阻止也沒有臨時操作線程帶來的系統開銷。
Interlocked也對跨多個進程有效,這與只能在當前進程中跨線程的lock形成鮮明的對比。一個例子就是這對讀和寫共享內存 是非常有用的。
內存屏障和易變(Volatility)
考慮這個類:
class Unsafe { static bool endIsNigh, repented; static void Main() { new Thread(Wait).Start(); // Start up the spinning waiter Thread.Sleep(1000); // Give it a second to warm up! repented = true; endIsNigh = true; Console.WriteLine("Going..."); } static void Wait() { while (!endIsNigh) ; // Spin until endIsNigh Console.WriteLine("Gone, " + repented); } }
這兒有個問題:是否能有效地將"Going..." 和 "Gone"剝離開,換言之,是否有可能endIsNigh被設置 為true后,Wait方法仍 然在執行while循環?此外,是否有可能Wait 方法輸出"Gone, false"?
這2個問題的答案,理論上是肯定的:在一個多處理器的計算機,如果線程協調程序將這2個線程分配給不同 的CPU,repented 和endIsNigh字段可以被緩存到CPU寄存器中來提升性能,在它們的更新值被寫回內存之前有可能延 遲,當CPU寄存器被寫回 到內存時,它沒必要按原來的順序進行更新。
這個緩存過程可以用靜態方法Thread.VolatileRead 和 Thread.VolatileWrite 來包圍住來讀和寫這些字 段。VolatileRead意味着“讀最近的值”,VolatileWrite意味着 “立即寫入內存”。相同的功能可以用volatile修飾符更優雅 的實現:
class ThreadSafe { // Always use volatile read/write semantics: volatile static bool endIsNigh, repented; ... }
相同的效果可以通過用lock語句包圍住 repented 和 endIsNigh來實現。鎖定的副作用(有意的)是引起了內存屏障——一 個保證被用於lock中的易變字段不超出lock語句的范圍。換言之,字段在進入鎖之前被刷新(volatile read),在離開鎖時被寫 入內存(volatile write)。
在我們需要以原子方式進入字段repented 和 endIsNigh時,lock是必要的,比如運行像這樣的事情:
lock (locker) { if (endIsNigh) repented = true; }
當一個字段在一個循環中被使用多次,lock可能更是可取的方案。盡管一個volatile read/write在性能上擊敗了一個鎖, 但不代 表數千個volatile read/write操作能擊敗一把鎖。
易變方式僅適合於基本的整型(和不安全的指針),其它的類型不緩存在CPU寄存器上,也不能用 volatile關鍵字 聲明。當字段通過Interlocked類訪問的時候將自動地使用易變的讀和寫的語法。
二、Wait 和 Pulse
早些時候我們討論了事件等待句柄 ——一個線程被阻止 直到它收到另一個發來的信號的簡單信號機制。
一個更強大的信號機制被Monitor類通過兩個靜態方法Wait 和 Pulse 提供。原理是你自己寫一個使用自定義的標志和字段的 信號邏輯(與lock語句協作),然后傳入Wait 和 Pulse命令減輕CPU的輪詢。
Wait 和 Pulse 的定義
Wait 和 Pulse的目標是提供一種簡單的信號模式: Wait阻止直到收到其它線程的通知;Pulse提供了這個通知。 為了信號系統正常工作,Wait必須在Pulse之前執行。如果Pulse先執行了,它的pulse就會丟失,之后的wait必須等待一個 新的pulse,否則它將永遠被阻止。這和AutoResetEvent不同,AutoResetEvent的Set方法有一種“鎖存”效果,當它先 於WaitOne調用時也同樣有效。
在調用Wait 或 Pulse 的時候,你必須定義個同步對象,兩個線程使用相同的對象,它們才能彼此發信號。在調用Wait 或 Pulse之前同步對象必須被lock。例如:如果x如此被聲明: class Test { // 任何引用類型對象都可以作為同步對象 object x = new object(); } 然后在進入Monitor.Wait前的代碼: lock (x) Monitor.Wait (x); 下面的代碼釋放了被阻止的線程(稍后在另一個線程上執行): lock (x) Monitor.Pulse (x);
切換鎖
為了完成工作,在等待的時候Monitor.Wait臨時的釋放或切換當前的鎖,所以另一個線程(比如執行Pulse的這個)可以獲 得它。Wait方法可以被想象擴充為下面的偽代碼。 Monitor.Exit (x); // 釋放鎖 等待到x發的信號后 Monitor.Enter (x); // 收回鎖
因此一個Wait阻止兩次:一次是等待信號,另一次是重新獲取排它鎖。這也意味着 Pulse本身不同完全解鎖:只有當 用Pulse發信號的線程退出它的鎖語句的時候,等待的線程實際上才能繼續運行。 Wait的鎖切換對嵌套鎖也是有效的, 如果Wait在兩個嵌套的lock語句中被調用: lock (x) lock (x) Monitor.Wait (x); 那么Wait邏輯上展開如下: Monitor.Exit (x); Monitor.Exit (x); // Exit兩次來釋放鎖 wait for a pulse on x Monitor.Enter (x); Monitor.Enter (x); //還原之前的排它鎖 與普通鎖結構一致,只有在第一次調用Monitor.Enter時提供了阻止的時機。
為什么要阻止?
為什么Wait 和 Pulse 被設計成只有在鎖內才能工作呢?最主要的理由是Wait能夠被有條件的調用——而不損害線程安全。
來個例子說明,設想我們要只有在bool字段available為false時調用Wait,下面的代碼是線程安全的:
lock (x) { if (!available) Monitor.Wait (x); available = false; }
幾個線程並行運行這段代碼,沒有哪個可以在檢查available字段和調用Monitor.Wait之間搶占了另一個。這兩個語句是有效的原子操作,一個相應的通告程序也是同樣地線程安全的:
lock (x) if (!available) { available = true; Monitor.Pulse (x); }
定義超時
在調用Wait時可以定義一個超時參數,可以是毫秒或TimeSpan值。如果超時發生了,Wait 將返回false。超時僅用於“等待”階段(等待信號pulse):超時的Wait 仍然繼續執行以便重新得到鎖,而不管花費了多長時間。例如:
lock (x) { if (!Monitor.Wait (x, TimeSpan.FromSeconds (10))) Console.WriteLine ("Couldn't wait!"); Console.WriteLine ("But hey, I still have the lock on x!"); }
這性能的理論基礎是有一個良好設計的Wait/Pulse的程序, 調用 Wait 和 Pulse的對象只是暫時地被鎖定,所以重新獲得鎖應當是一個極短時間的操作。
脈沖和確認 (Pulsing and acknowledgement)
Monitor.Pulse的一個重要特性是它以異步方式執行,意味着它本身不以任何方式暫定或阻止。如果另一個線程在等待脈沖 對象,在它被通知的時候,脈沖本身沒有效果而被悄悄地忽略。 Pulse 提供了單向通信:一個脈沖的線程給等待線程發信號,沒有內部的確認機制。Pulse不返回值來指明它的信號是否被收 到了。此外,當一個提示脈沖並釋放了它的鎖,不保證一個符合要求的等待線程能馬上進入它的生命周期。在線程調度程序的 判斷上,可能存在任意的延遲,在兩個線程都沒有鎖的期間。這就難以知道等待線程時候已確切的重新開始了,除非等待線程 會明確確認,比如通過一個自定義的標志位。
從一個沒有自定義的確認機制的工作線程中,依靠即時的動作會“弄亂”Pulse 和 Wait。
等待隊列和PulseAll
當多於一個線程同時Wait相同的對象——也就是在同步對象上形成了“等待隊列”(這和有權訪問某個鎖的“就緒隊列”明顯不 同)。每個Pulse然后釋放在等待隊列頭上的單個線程,所以它可以進入就緒隊列並重新得到鎖。可以把這個過程想象成一個 停車場:你排在收費處的第一個來確認你的票(等待隊列);你再一次排隊在格擋門前來被放掉(就緒隊列)。
隊列結構有它固有的順序,但對於Wait/Pulse程序來說通常是不重要的; 在這些場合中它容易被想成一個等待線程“池”,每 次pulse都從池中釋放了一個等待線程。 Monitor也提供了PulseAll方法在一剎那之間通過這等待線程釋放整個隊列,或池。已pulse的線程不會在同一時刻同時開始 執行,而是在一個順序隊列中,每次Wait語句試圖重新回去那個相同的鎖。實際上,PulseAll將線程從等待隊列移到就緒隊 列中,所以它們可以以順序的方式繼續執行。
如何使用 Pulse 和 Wait
這展示我們如何開始,設想有兩條規則:
- 同步結構僅在也被稱為Monitor.Enter 和 Monitor.Exit的lock語句中。
- 在CPU輪詢上沒有限制!
有了它們在腦子里,讓我們做一個簡單的例子:一個工作線程暫停直到收到主線程的發的信號:
class SimpleWaitPulse { bool go; object locker = new object(); void Work() { Console.Write("Waiting... "); lock (locker) { // 開始輪詢! while (!go) { // 釋放鎖,以讓其它線程可以更改go標志 Monitor.Exit(locker); // 收回鎖,以便我們可以在循環中重測go的值 Monitor.Enter(locker); } } Console.WriteLine("Notified!"); } void Notify()// 從另一個線程調用 { lock (locker) { Console.Write("Notifying... "); go = true; } } }
讓事情可以運轉的主方法:
static void Main() { SimpleWaitPulse test = new SimpleWaitPulse(); // 在單獨的線程中運行Work方法 new Thread(test.Work).Start(); // "Waiting..." // 暫停一秒,然后通過我們的主線程通知工作線程: Thread.Sleep(1000); test.Notify(); // "Notifying... Notified!" }
我門所輪詢的Work方法——使用循環揮霍着CPU的時間直到go標志變為true! 在這個循環中我們必須切換鎖——釋放和重新得到它通過Monitor的Exit 和 Enter 方法——以便另一個運行Notify方法的線程可以修改go標志。共享的go字段必須總是可以在一個鎖內訪問,來避免易變問題。(要記得所有的同步結構,比如volatile關鍵字,在這個階段的設計超出范圍了!)
下一步是去運行它並測試它是否可以工作,下面是是測試Main方法的輸出結果:
Waiting... (pause) Notifying... Notified!
現在我們來介紹Wait 和 Pulse,我們由:
用Monitor.Wait替換切換鎖(Monitor.Exit和Monitor.Enter)
在阻止條件改變后,插入調用Monitor.Pulse(比如go字段被修改)
下面是更新后的類,Console語句被省略了:
class SimpleWaitPulse { bool go; object locker = new object(); void Work() { lock (locker) while (!go) Monitor.Wait(locker); } void Notify() { lock (locker) { go = true; Monitor.Pulse(locker); } } }
這個類與之前的表現一致,但沒有CPU的輪詢,Wait命令立即執行我們移除的代碼——Monitor.Exit 和之后的Monitor.Enter,但中間由個擴充步驟:當鎖被釋放,它等待另一個線程調用Pulse。提示方法完成這個功能,在設置go為true后,工作就做完了。
Pulse 和 Wait 的歸納
我們現在來擴充這個模式。在之前的例子中,我們的阻止條件以一個bool字段——go標志來實現。我們可以換種設定,需要一
個額外的標志來表明等待線程它是就緒或完成了。如果根據我們的推斷,將有很多字段來實現很多的阻止條件,程序可以被歸
納為下面的偽代碼(以輪詢模式):
就像之前做的一樣,我們將之應用到 Pulse 和 Wait 上:
在等待循環中,用Monitor.Wait替換鎖切換
無論何時阻止條件被改變了,在釋放鎖之前調用Pulse
這是更新后的偽代碼:
這提供了一個在使用Wait and Pulse時的健壯模式。這有些對這個模式的關鍵特征:
- 使用自定義字段來實現阻止條件(也可以不用Wait 和 Pulse,雖然會輪詢)
- Wait總是在while循環內調用來檢查條件(它本身又在lock中)
- 一個單一的同步對象(在上面的例子里是locker),被用於所有的Wait 和 Pulse, 並且來保護訪問所有實現阻止條件的對象。
- 鎖的掌控只是暫時地
這種模式最重要的是pulse不強迫等待線程繼續執行,代替為它通知等待線程有些東西改變了,建議它重新檢查它的阻止條件,等待線程然后決定是否需要它該繼續進行(通過另一個while循環),而不是脈沖發生器。這個方式的好處是它允許復雜的阻止條件,而沒有復雜的同步邏輯。這個模式的另一個好處是對丟失的脈沖具有抗擾性。當Pulse在Wait之前被調用的時候,脈沖發生丟失——可能歸咎於提示線程和等待線程的競爭。當時因為在這個模式里一個脈沖意味着“重新檢查你的阻止條件”(而不是“繼續運行”),早的脈沖可以被
安全的忽略,因為阻止條件總是在調用Wait之前檢查,這要感謝while語句。依托這種設計,你可以定義多個阻止字段,讓它們參與到多個阻止條件之中,並且在這期間只需要用一個單一的同步對象(在我們例子里是locker)。這經常優於在lock, Pulse 和 Wait上有各自的同步對象,因為這樣有效的避免了死鎖。此外使用了同步鎖定對象,所有阻止字段被以單元模式讀和寫,就避免了微秒的原子錯誤。這是一個好主意,但是,不要試圖必要的區域之外用同步對象(這可以用private聲明同步對象來實現,對阻止字段來說也是一樣)。
生產者/消費者隊列
一個普通的Wait/Pulse程序是一個生產消費隊列——我們之前用AutoResetEvent來寫的一種結構。生產者入隊任務(通 常在主線程中),同時一個或多個消費者運行工作線程來一個接一個地摘掉和執行任務。在這個例子中我們將用字符串來表示任務,我們的任務隊列看起來會像這樣: Queue<string> taskQ = new Queue<string>(); 因為隊列用於多線程,我們必須用lock來包住所有讀寫隊列的語句。這是如何入隊任務:
lock (locker) { taskQ.Enqueue ("my task"); Monitor.PulseAll (locker); // 我們改變阻止條件 }
因為我們潛在修改了阻止條件,我們必須脈沖。我們調用PulseAll代替Pulse,因為我們將允許多個消費者。多於一個線程可能正在等待。我們要讓工作線程阻止當它沒有什么可做的時候,換句話說就是隊列里沒有條目了。
因此我們的阻止條件是taskQ.Count==0。
這是實現了一個等待語句:
lock (locker) while (taskQ.Count == 0) Monitor.Wait (locker);
下一步是工作線程出列任務並執行它:
lock (locker) while (taskQ.Count == 0) Monitor.Wait (locker); string task; lock (locker) task = taskQ.Dequeue();
但是這個邏輯是非線程安全的:我們以一個在舊的信息上的出列為判定基礎——從之前的鎖結構獲得的。考慮當我們並行地打開2個消費者線程,對一個已在隊列上的單一條目,可能沒有線程會進入while循環來阻止——當然他們在隊列中都看到這個單一的條目的時候。它們都試圖出列相同的條目,在第二個實例中將拋出異常!為了修復這個問題,我們簡單地將lock擴大一點直到我們完成與隊列的結合:
string task; lock (locker) { while (taskQ.Count == 0) Monitor.Wait (locker); task = taskQ.Dequeue(); }
(我們不需要在出列之后調用Pulse,因為沒有消費者在隊列有較少的的條目時可以永遠處於非阻止狀態。) 一旦任務出列后,沒必要在保持鎖了,這時就釋放它以允許消費者去執行一個可能耗時的任務,而沒必要去阻止其它線程。
這里是完整的程序。與AutoResetEvent 版本一樣,我們入列一個null任務來通知消費者退出(在完成所有任務之后)。因 為我們支持多個消費者,我們必須為每個消費者入列一個null任務來關閉隊列:
using System; using System.Threading; using System.Collections.Generic; public class TaskQueue : IDisposable { object locker = new object(); Thread[] workers; Queue<string> taskQ = new Queue<string>(); public TaskQueue(int workerCount) { workers = new Thread[workerCount]; // Create and start a separate thread for each worker for (int i = 0; i < workerCount; i++) (workers[i] = new Thread(Consume)).Start(); } public void Dispose() { // Enqueue one null task per worker to make each exit. foreach (Thread worker in workers) EnqueueTask(null); foreach (Thread worker in workers) worker.Join(); } public void EnqueueTask(string task) { lock (locker) { taskQ.Enqueue(task); Monitor.PulseAll(locker); } } void Consume() { while (true) { string task; lock (locker) { while (taskQ.Count == 0) Monitor.Wait(locker); task = taskQ.Dequeue(); } if (task == null) return; // This signals our exit Console.Write(task); Thread.Sleep(1000); // Simulate time-consuming task } } }
這是一個開始任務隊列的主方法,定義了兩個並發的消費者線程,然后在兩個消費者之間入列10個任務:
static void Main() { using (TaskQueue q = new TaskQueue(2)) { for (int i = 0; i < 10; i++) q.EnqueueTask(" Task" + i); Console.WriteLine("Enqueued 10 tasks"); Console.WriteLine("Waiting for tasks to complete..."); } //使用TaskQueue的Dispose方法退出 //在所有的任務完成之后,它關閉了消費者 Console.WriteLine("\r\nAll tasks done!"); }
輸出結果:
Enqueued 10 tasks
Waiting for tasks to complete...
Task1 Task0 (pause...) Task2 Task3 (pause...) Task4 Task5 (pause...)
Task6 Task7 (pause...) Task8 Task9 (pause...)
All tasks done!
Pulse 還是 PulseAll?
這個例子中,進一步的pulse節約成本問題隨之而來,在入列一個任務之后,我們可以用調用Pulse來代替PulseAll,這不會破壞什么。讓我們看看它們的不同:對於Pulse,最多一個線程會被喚醒(重新檢查它的while-loop阻止條件); 對於PulseAll來說,所有的等待線程都被喚醒(並重新檢查它們的阻止條件)。如果我們入列一個單一的任務只有一個工作線程能夠得到它,所以我們只需要使用一個Pulse喚醒一個工作線程。這就像有一個班級的孩子 ——如果僅僅只有一個冰激凌,沒必要把他們都叫醒去排隊得到它!
在我們的例子中,我們僅僅使用了兩個消費者線程,所以我們不會有什么獲利。但是如果我們使用了10個消費者線程,使用Pulse 代替PulseAll可以讓我們可能微微獲利。這將意味着,我們每入列多個任務,我們必須Pulse多次。這可以在一個單獨lock語句中進行,像這樣:
lock (locker) { taskQ.Enqueue("task 1"); taskQ.Enqueue("task 2"); Monitor.Pulse(locker); // "發兩此信號 Monitor.Pulse(locker); // 給等待線程" }
其中一個Pulse的價值對於一個堅持工作的線程來說價值幾乎為零。這也經常出現間歇性的bug,因為它會突然出現僅僅在當一個消費線程處於Waiting狀態時,因此你可以擴充之前的信條為“如果對Pulse有疑問”為“如果對PulseAll有疑問!”。對於這個規則的可能出現的異常一般是由於判斷阻止條件是耗時的。
使用等待超時
有時候當非阻止條件發生時Pulse是不切實際或不可能的。一個可能的例子就是阻止條件調用一個周期性查詢數據庫得到信息的方法。如果反應時間不是問題,解決方案就很簡單:你可以定義一個timeout在調用Wait的時候,如下:
lock (locker) { while ( blocking condition ) Monitor.Wait (locker, timeout); }
這就強迫阻止條件被重新檢查,至少為超時定義一個正確的區間,就可以立刻接受一個pulse。阻止條件越簡單,超時越容易造成高效率。
同一系統工作的相當號如果pulse缺席,會歸咎於程序的bug!所以值得在程序中的所有同步非常復雜的Wait上加上超時——這可作為復雜的pulse錯誤最終后備支持。這也提供了一定程度的bug抗繞度,如果程序被稍后修改了Pulse部分!
競爭與確認
如果說,我們想要一個信號,一個工作線程連續5次顯示:
class Race { static object locker = new object(); static bool go; static void Main() { new Thread(SaySomething).Start(); for (int i = 0; i < 5; i++) { lock (locker) { go = true; Monitor.Pulse(locker); } } } static void SaySomething() { for (int i = 0; i < 5; i++) { lock (locker) { while (!go) Monitor.Wait(locker); go = false; } Console.WriteLine("Wassup?"); } } }
實際輸出:
Wassup?
(終止)
這個程序是有缺陷的:主線程中的for循環可以任意的執行它的5次迭代在工作線程還沒得到鎖的任何時候內,可能工作線程甚至還沒開始的時候!生產者/消費者的例子沒有這個問題,因為主線程勝過工作線程,每個請求只會排隊。但是在這個情況下,我們需要在工作線程仍然忙於之前的任務的時候,主線程阻止迭代。比較簡單的解決方案是讓主線程在每次循環后等待,直到go標志而被工作線程清除掉,這樣就需要工作線程在清除go標志后調用Pulse:
class Acknowledged { static object locker = new object(); static bool go; static void Main() { new Thread(SaySomething).Start(); for (int i = 0; i < 5; i++) { lock (locker) { go = true; Monitor.Pulse(locker); } lock (locker) { while (go) Monitor.Wait(locker); } } } static void SaySomething() { for (int i = 0; i < 5; i++) { lock (locker) { while (!go) Monitor.Wait(locker); go = false; Monitor.Pulse(locker); // Worker must Pulse } Console.WriteLine("Wassup?"); } } }
這次,Wassup? (重復了5次)
這個程序的一個重要特性是工作線程,在執行可能潛在的耗時工作之前釋放了它的鎖(此處是發生在我們的Console.WriteLine處)。這就確保了當工作線程仍在執行它的任務時調用者不會被過分的阻止,因為它已經被發過信號了(並且只有在工作線程仍忙於之前的任務時才被阻止)。
在這個例子中,只有一個線程(主線程)給工作線程發信號執行任務,如果多個線程一起發信號給工作線程—— 使用我們的主方法的邏輯——我們將出亂子的。兩個發信號線程可能彼此按序執行下面的這行代碼:
lock (locker) { go = true; Monitor.Pulse (locker); }
如果工作線程沒有發生完成處理第一個的時候,導致第二個信號丟失。我們可以在這種情形下通過一對標記來讓我們的設計更健壯些。 “ready”標記指示工作線程能夠接受一個新任務;“go”標記來指示繼續執行,就像之前的一樣。這與之前的執行相同的事情的使用兩個AutoResetEvent的例子類似,除了更多的可擴充性。下面是模式,重分解了實例字段:
public class Acknowledged { object locker = new object(); bool ready; bool go; public void NotifyWhenReady() { lock (locker) { // 等待當工作線程已在忙之前的時 while (!ready) Monitor.Wait(locker); ready = false; go = true; Monitor.PulseAll(locker); } } public void AcknowledgedWait() { // 預示我們准備處理一個請求 lock (locker) { ready = true; Monitor.Pulse(locker); } lock (locker) { while (!go) Monitor.Wait(locker); // 等待一個“go”信號 go = false; Monitor.PulseAll(locker); // 肯定信號(確認相應) } Console.WriteLine("Wassup?"); // 執行任務 } }
為了證實,我們啟動兩個並發線程,每個將通知工作線程5次,期間,主線程將等待10次報告:
public class Test { static Acknowledged a = new Acknowledged(); static void Main() { new Thread(Notify5).Start(); // Run two concurrent new Thread(Notify5).Start(); // notifiers... Wait10(); // ... and one waiter. } static void Notify5() { for (int i = 0; i < 5; i++) a.NotifyWhenReady(); } static void Wait10() { for (int i = 0; i < 10; i++) a.AcknowledgedWait(); } }
輸出:
Wassup?
Wassup?
Wassup?
Wassup?
Wassup?
Wassup?
Wassup?
Wassup?
Wassup?
Wassup?
在Notify方法中,當離開lock語句時ready標記被清除。這是及其重要的:它保證了兩個通告程序持續的發信號而不用重新檢查標記。為了簡單,我們在相同的lock語句中也設置了go標記並且調用PulseAll語句。
模擬等待句柄
你可能已經注意到了之前的例子里的一個模式:兩個等待循環都有下面的結構: lock (locker) { while (!flag) Monitor.Wait (locker); flag = false; ... } flag 在另一個線程里被設置為true,這個作用就是模擬AutoResetEvent。如果我們忽略flag=false,我們就相當於得到 了ManualResetEvent。使用一個整型字段,Pulse 和 Wait 也能被用於模擬Semaphore。實際上唯一用Pulse 和 Wait不能模擬的等待句柄是Mutex,因為這個功能被lock提供。 模擬跨多個等待句柄的工作的靜態方法大多數情況下是很容易的。相當於在多個EventWaitHandle間調用WaitAll,無非是 阻止條件囊括了所有用於標識用以代替等待句柄: lock (locker) { while (!flag1 && !flag2 && !flag3...) Monitor.Wait (locker); 這特別有用,假設waitall是在大多數情況由於com遺留問題不可用。模擬WaitAny更容易了,大概只要把&&操作符替換 成||操作符就可以了。
SignalAndWait 是需要技巧的。回想這個順序發信號一個句柄而同時在同一個原子操作中等待另一個。 我們情形與分布式的 數據庫事務操作類似——我們需要雙相確(commit)!假定我們想要發信號flagA同時等待flagB,我們必須分開每個標識 為2個,導致代碼看起來像這樣:
lock (locker) { flagAphase1 = true; Monitor.Pulse(locker); while (!flagBphase1) Monitor.Wait(locker); flagAphase2 = true; Monitor.Pulse(locker); while (!flagBphase2) Monitor.Wait(locker); }
如果第一個Wait語句拋出異常作為中斷或終止的結果,多半附加"rollback"邏輯到取消flagAphase1。
等待匯集
就像WaitHandle.SignalAndWait 可以用於匯集一對線程一樣,Wait和Pulse它們也可以。接下來這個例子,我們要模擬兩個ManualResetEvent(換言之,我們要定義兩個布爾標識!)並且然后執行彼此的Wait 和 Pulse,通過設置某個標識同時等待另一個。這個情形下我們在Wait 和 Pulse不需要真正的原子操作,所以我們避免需要“雙相確認”。當我們設置我們的標識為true,並且在相同的lock語句中進行等待,匯集就會工作了:
class Rendezvous { static object locker = new object(); static bool signal1, signal2; static void Main() { // Get each thread to sleep a random amount of time. Random r = new Random(); new Thread(Mate).Start(r.Next(10000)); Thread.Sleep(r.Next(10000)); lock (locker) { signal1 = true; Monitor.Pulse(locker); while (!signal2) Monitor.Wait(locker); } Console.Write("Mate! "); } // This is called via a ParameterizedThreadStart static void Mate(object delay) { Thread.Sleep((int)delay); lock (locker) { signal2 = true; Monitor.Pulse(locker); while (!signal1) Monitor.Wait(locker); } Console.Write("Mate! "); } }
結果:Mate! Mate! (幾乎同時出現)
Wait 和 Pulse vs. 等待句柄
並且阻止條件在外部為設置為false。僅有的開銷就是去掉鎖(數十納秒間),而調用WaitHandle.WaitOne要花費幾毫秒,當然這要保證鎖是無競爭的鎖。明智的原則是使用等待句柄在那些有助於它自然地完成工作的特殊結構中,否則就選擇使用Wait 和 Pulse。
三、Suspend 和 Resume
線程可以被明確的掛起和恢復通過Thread.Suspend 和 Thread.Resume 這個機制與之前討論的阻止完全分離。它們兩個是獨立的和並發的執行的。一個線程可以掛起它本身或其它的線程,調用Suspend 導致線程暫時進入了 SuspendRequested狀態,然后在達到無用單元收集的安全點之前,它進入Suspended狀態。從那時起,它只能通過另一個線程調用Resume方法恢復。Resume只對掛起的線程有用,而不是阻止的線程。
從.NET 2.0開始Suspend 和 Resume被不贊成使用了,因為任意掛起起線程本身就是危險的。如果在安全權限評估期間掛起持有鎖的線程,整個程序(或計算機)可能會死鎖。
四、終止線程
一個線程可以通過Abort方法被強制終止:
class Abort { static void Main() { Thread t = new Thread(delegate() { while (true);}); // 永遠輪詢 t.Start(); Thread.Sleep(1000); // 讓它運行幾秒... t.Abort(); // 然后終止它 } }
線程在被終止之前立即進入AbortRequested 狀態。如果它如預期一樣終止了,它進入Stopped狀態。調用者可以通過調用Join來等待這個過程完成:
class Abort { static void Main() { Thread t = new Thread(delegate() { while (true); }); Console.WriteLine(t.ThreadState); // Unstarted t.Start(); Thread.Sleep(1000); Console.WriteLine(t.ThreadState); // Running t.Abort(); Console.WriteLine(t.ThreadState); // AbortRequested t.Join(); Console.WriteLine(t.ThreadState); // Stopped } }
Abort引起ThreadAbortException 異常被拋出在目標線程上, 大多數情況下就使線程執行的那個時候。線程被終止可以選擇處理異常,但是異常會自動在catch語句最后被重新拋出(來幫助保證線程確實如期望的結束了)。盡管如此,可能避免自動地重新拋出通過調用Thread.ResetAbort在catch語句塊內。線程然后重新進入Running 狀態(由於它可能潛在被又一次終止)。在下面的例子中,每當Abort試圖終止的時候工作線程都從死恢復回來:
class Terminator { static void Main() { Thread t = new Thread(Work); t.Start(); Thread.Sleep(1000); t.Abort(); Thread.Sleep(1000); t.Abort(); Thread.Sleep(1000); t.Abort(); } static void Work() { while (true) { try { while (true); } catch (ThreadAbortException) { Thread.ResetAbort(); } Console.WriteLine("I will not die!"); } } }
ThreadAbortException被運行時處理過,導致它當沒有處理時不會引起整個程序結束,而不像其它類型的線程。Abort幾乎對處於任何狀態的線程都有效, running,blocked,suspended或 stopped。盡管掛起的線程會失敗,但ThreadStateException會被拋出,這時在正調用的線程中, 異常終止不會踢開直到線程隨后恢復,這演示了如何終止一
個掛起的線程:
try { suspendedThread.Abort(); } catch (ThreadStateException) //現在suspendedThread將被終止
Thread.Abort的復雜因素
假設一個被終止的線程沒有調用ResetAbort,你可能期待它正確地並迅速地結束。但是爭先它所發生的那樣,懂法規的線程可能駐留在死那行一段時間!有一點原因可能保持它延遲在AbortRequested狀態:
- 靜態類的構造器執行一半是不能被終止的(免得可能破壞類對於程序域內的生命周期)
- 所有的catch/finally語句塊被尊重,不會在這期間終止
- 如果線程正執行到非托管的代碼時進行終止,執行會繼續直到下次進入托管的代碼中
最后因素尤為麻煩,.NET framework本身提供了調用非托管的代碼,有時候會持續很長的時間。一個例子就是當使用網絡或數據庫類的時候。如果網資源或數據庫死了或很慢的相應,有可能執行完全的保留在非托管的代碼中,至於多長時間依賴於類的實現。在這些情況,確定你不能用Join 來終止線程——至少不能沒有超時!
終止純.NET代碼是很少有問題的,在try/finally或using語句組成合體來保證正確時,終止發生應拋出ThreadAbortException異常。但是,即使那樣,還是容易出錯。比如考慮下面的代碼:
using (StreamWriter w = File.CreateText ("myfile.txt")) w.Write ("Abort-Safe?");
C#using語句是簡潔地語法操作,可以擴充為下面的:
StreamWriter w; w = File.CreateText ("myfile.txt"); try { w.Write ("Abort-Safe"); } finally { w.Dispose(); }
有可能Abort引發在StreamWriter創建之后,但是在try 之前,也有可能它引發在StreamWriter被創建和賦值給w之間。無論哪種,在finally中的Dispose方法,導致拋棄了打開文件的句柄 —— 應用程序域結束之前一直阻止創建myfile.txt操作。
最溫和的退出工作線程是通過調用在它自己的線程上調用Abort——盡管明確地拋出異常,也可以很好的工作。這確保了線程正常終止,在執行任何catch/finally語句的時候——相當像從另一個線程調用終止,除了異常是在設計的地方拋出的:
class ProLife { public static void Main() { RulyWorker w = new RulyWorker(); Thread t = new Thread(w.Work); t.Start(); Thread.Sleep(500); w.Abort(); } public class RulyWorker { // The volatile keyword ensures abort is not cached by a thread volatile bool abort; public void Abort() { abort = true; } public void Work() { while (true) { CheckAbort(); // Do stuff... try { OtherMethod(); } finally { /* any required cleanup */ } } } void OtherMethod() { // Do stuff... CheckAbort(); } void CheckAbort() { if (abort) Thread.CurrentThread.Abort(); } } }
結束應用程序域
另一個方式實現友好的終止工作線程是通過終止持有它的應用程序域。在調用Abort后,你簡單地銷毀應用程序域,因此釋放 了任何不正確引用的資源。 嚴格地來講,第一步——終止線程——是不必要的,因為當一個應用程序域卸載之后,所有期內線程都被終止了。盡管如此, 依賴這個特性的缺點是如果被終止的線程沒有即時的退出(可能歸咎於finally的代碼,或之前討論的理由) 應用程序域不會卸載,CannotUnloadAppDomainException異常將被拋出。因此最好明確終止線程,然后在卸載應用程序域之前帶着超 時參數(受你所控)調用Join方法。 在下面的例子里,工作線程訪問一個死循環,使用非終止安全的File.CreateText方法創建並關閉一個文件。主線程然后重 復地開始和終止工作線程。它總是在一或兩次迭代中失敗,CreateText在獲取了終止部分通過它的內部實現機制,留下了一 個被拋棄的打開文件的句柄:
using System; using System.IO; using System.Threading; class Program { static void Main() { while (true) { Thread t = new Thread(Work); t.Start(); Thread.Sleep(100); t.Abort(); Console.WriteLine("Aborted"); } } static void Work() { while (true) using (StreamWriter w = File.CreateText("myfile.txt")) { } } }
輸出引發異常:
Aborted
Aborted
IOException: The process cannot access the file 'myfile.txt' because it
is being used by another process.
下面是一個經修改類似的例子,工作線程在它自己的應用程序域中運行,應用程序域在線程被終止后被卸載掉。它會永遠的運
行而沒有錯誤,因為卸載應用程序域釋放了被拋棄的文件句柄:
class Program { static void Main(string[] args) { while (true) { AppDomain ad = AppDomain.CreateDomain("worker"); Thread t = new Thread(delegate() { ad.DoCallBack(Work); }); t.Start(); Thread.Sleep(100); t.Abort(); if (!t.Join(2000)) { // 線程不會結束——這里我們可以放置一些操作, // 如果,實際上,我們不能做任何事,幸運地是 // 這種情況,我們期待*線程*總是能結束。 } AppDomain.Unload(ad); // 卸載“受污染”的應用程序域 Console.WriteLine("Aborted"); } } static void Work() { while (true) using (StreamWriter w = File.CreateText("myfile.txt")) { } } }
輸出:
Aborted
Aborted
Aborted
Aborted
...
創建和結束一個應用程序域在線程的世界里是被分類到相關耗時的操作的(數毫秒),所以應該不定期的使用它,而不是把它放入循環中。同時,實行分離,由應用程序域推出的另一項內容可以帶來有利或不利,這取決於多線程程序展示出來的實現。在單元測試方面,比如,在分離的應用程序域中運行線程,可以帶來極大的好處。
結束進程
另一個線程結束的方式是通過它的父進程被終止掉。當一個線程由於它的父進程被終止了,它突然停止,不會有finally被執行。相同的情形在一個用戶通過Windows任務管理器或一個進程被編程的方式通過Process.Kill時發生。