線程基礎
創建線程

static void Main(string[] args) { Thread t = new Thread(PrintNumbers); t.Start();//線程開始執行 PrintNumbers(); Console.ReadKey(); } static void PrintNumbers() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Console.WriteLine(i); } }
暫停線程

class Program { static void Main(string[] args) { Thread t = new Thread(PrintNumbersWithDelay); t.Start(); PrintNumbers(); Console.ReadKey(); } static void PrintNumbers() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Console.WriteLine(i); } } static void PrintNumbersWithDelay() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2));//暫停2S Console.WriteLine(i); } } }
工作原理
當程序運行時,會創建一個線程,該線程會執行PrintNumbersWithDelay方法中的代碼。然后會立即執行PrintNumbers方法。關鍵之處在於在PrintNumbersWithDelay方法中加入了Thread.Sleep方法調用。這將導致線程執行該代碼時,在打印任何數字之前會等待指定的時間(本例中是2秒鍾),當線程處於休眠狀態時,它會占用盡可能少的CPU時間。結果我們4·會發現通常后運行的PrintNumbers方法中的代碼會比獨立線程中的PrintNumbersWithDelay方法中的代碼先執行。
線程等待

class Program { static void Main(string[] args) { Console.WriteLine("Starting program..."); Thread t = new Thread(PrintNumbersWithDelay); t.Start(); t.Join(); Console.WriteLine("Thread completed"); } static void PrintNumbersWithDelay() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(i); } } }
工作原理
當程序運行時,啟動了一個耗時較長的線程來打印數字,打印每個數字前要等待兩秒。但我們在主程序中調用了t.Join方法,該方法允許我們等待直到線程t完成。當線程t完成 "時,主程序會繼續運行。借助該技術可以實現在兩個線程間同步執行步驟。第一個線程會等待另一個線程完成后再繼續執行。第一個線程等待時是處於阻塞狀態(正如暫停線程中調用 Thread.Sleep方法一樣),
終止線程

class Program { static void Main(string[] args) { Console.WriteLine("Starting program..."); Thread t = new Thread(PrintNumbersWithDelay); t.Start(); Thread.Sleep(TimeSpan.FromSeconds(6)); t.Abort(); Console.WriteLine("A thread has been aborted"); } static void PrintNumbersWithDelay() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(i); } } }
工作原理
當主程序和單獨的數字打印線程運行時,我們等待6秒后對線程調用了t.Abort方法。這給線程注入了ThreadAbortException方法,導致線程被終結。這非常危險,因為該異常可以在任何時刻發生並可能徹底摧毀應用程序。另外,使用該技術也不一定總能終止線程。目-標線程可以通過處理該異常並調用Thread.ResetAbort方法來拒絕被終止。因此並不推薦使用,Abort方法來關閉線程。可優先使用一些其他方法,比如提供一個CancellationToken方法來,取消線程的執行。
監測線程狀態

class Program { static void Main(string[] args) { Console.WriteLine("Starting program..."); Thread t = new Thread(PrintNumbersWithStatus); Thread t2 = new Thread(DoNothing); Console.WriteLine(t.ThreadState.ToString()); t2.Start(); t.Start(); for (int i = 1; i < 30; i++) { Console.WriteLine(t.ThreadState.ToString()); } Thread.Sleep(TimeSpan.FromSeconds(6)); t.Abort(); Console.WriteLine("A thread has been aborted"); Console.WriteLine(t.ThreadState.ToString()); Console.WriteLine(t2.ThreadState.ToString()); Console.ReadKey(); } static void DoNothing() { Thread.Sleep(TimeSpan.FromSeconds(2)); } static void PrintNumbersWithStatus() { Console.WriteLine("Starting..."); Console.WriteLine(Thread.CurrentThread.ThreadState.ToString()); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(i); } } }
工作原理
當主程序啟動時定義了兩個不同的線程。一個將被終止,另一個則會成功完成運行。線,.程狀態位於Thread對象的ThreadState屬性中。ThreadState屬性是一個C#枚舉對象。剛開始線程狀態為ThreadState.Unstarted,然后我們啟動線程,並估計在一個周期為30次迭代的,區間中,線程狀態會從ThreadState.Running變為ThreadState. WaitSleepJoin。
請注意始終可以通過Thread.CurrentThread靜態屬性獲得當前Thread對象。
如果實際情況與以上不符,請增加迭代次數。終止第一個線程后,會看到現在該線程狀態為ThreadState.Aborted,程序也有可能會打印出ThreadState.AbortRequested狀態。這充分說明了同步兩個線程的復雜性。請記住不要在程序中使用線程終止。我在這里使用它只是為 ,了展示相應的線程狀態。
最后可以看到第二個線程t2成功完成並且狀態為ThreadState.Stopped。另外還有一些其,他的線程狀態,但是要么已經被棄用,要么沒有我們實驗過的幾種狀態有用。
線程優先級

class Program { static void Main(string[] args) { Console.WriteLine("Current thread priority: {0}", Thread.CurrentThread.Priority); Console.WriteLine("Running on all cores available"); RunThreads(); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Running on a single core"); Process.GetCurrentProcess().ProcessorAffinity = new IntPtr(1); RunThreads(); } static void RunThreads() { var sample = new ThreadSample(); var threadOne = new Thread(sample.CountNumbers); threadOne.Name = "ThreadOne"; var threadTwo = new Thread(sample.CountNumbers); threadTwo.Name = "ThreadTwo"; threadOne.Priority = ThreadPriority.Highest; threadTwo.Priority = ThreadPriority.Lowest; threadOne.Start(); threadTwo.Start(); Thread.Sleep(TimeSpan.FromSeconds(2)); sample.Stop(); Console.ReadKey(); } class ThreadSample { private bool _isStopped = false; public void Stop() { _isStopped = true; } public void CountNumbers() { long counter = 0; while (!_isStopped) { counter++; } Console.WriteLine("{0} with {1,11} priority " + "has a count = {2,13}", Thread.CurrentThread.Name, Thread.CurrentThread.Priority, counter.ToString("N0")); } } }
工作原理
當主程序啟動時定義了兩個不同的線程。第一個線程優先級為ThreadPriority.Highest,即具有最高優先級。第二個線程優先級為ThreadPriority.Lowest,即具有最低優先級。我們先, ,打印出主線程的優先級值,然后在所有可用的CPU核心上啟動這兩個線程。如果擁有一個1以上的計算核心,將在兩秒鍾內得到初步結果。最高優先級的線程通常會計算更多的迭代.但是兩個值應該很接近。然而,如果有其他程序占用了所有的CPU核心運行負載,結果則會截然不同。
為了模擬該情形,我們設置了ProcessorAffinity選項,讓操作系統將所有的線程運,行在單個CPU核心(第一個核心)上。現在結果完全不同,並且計算耗時將超過2秒鍾。 .這是因為CPU核心大部分時間在運行高優先級的線程,只留給剩下的線程很少的時間來,運行。
請注意這是操作系統使用線程優先級的一個演示。通常你無需使用這種行為編寫程序。
前台線程和后台線程

class Program { static void Main(string[] args) { var sampleForeground = new ThreadSample(10); var sampleBackground = new ThreadSample(20); var threadOne = new Thread(sampleForeground.CountNumbers); threadOne.Name = "ForegroundThread"; var threadTwo = new Thread(sampleBackground.CountNumbers); threadTwo.Name = "BackgroundThread"; threadTwo.IsBackground = true; threadOne.Start(); threadTwo.Start(); Console.ReadKey(); } class ThreadSample { private readonly int _iterations; public ThreadSample(int iterations) { _iterations = iterations; } public void CountNumbers() { for (int i = 0; i < _iterations; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine("{0} prints {1}", Thread.CurrentThread.Name, i); } } } }
工作原理
當主程序啟動時定義了兩個不同的線程。默認情況下,顯式創建的線程是前台線程。通過手動的設置threadTwo對象的IsBackground屬性為ture來創建一個后台線程。通過配置來實現第一個線程會比第二個線程先完成。然后運行程序。
第一個線程完成后,程序結束並且后台線程被終結。這是前台線程與后台線程的主要區,別:進程會等待所有的前台線程完成后再結束工作,但是如果只剩下后台線程,則會直接結束工作。
一個重要注意事項是如果程序定義了一個不會完成的前台線程,主程序並不會正常結束。
向線程傳遞參數

class Program { static void Main(string[] args) { var sample = new ThreadSample(10); var threadOne = new Thread(sample.CountNumbers); threadOne.Name = "ThreadOne"; threadOne.Start(); threadOne.Join(); Console.WriteLine("--------------------------"); var threadTwo = new Thread(Count); threadTwo.Name = "ThreadTwo"; threadTwo.Start(8); threadTwo.Join(); Console.WriteLine("--------------------------"); var threadThree = new Thread(() => CountNumbers(12)); threadThree.Name = "ThreadThree"; threadThree.Start(); threadThree.Join(); Console.WriteLine("--------------------------"); int i = 10; var threadFour = new Thread(() => PrintNumber(i)); i = 20; var threadFive = new Thread(() => PrintNumber(i)); threadFour.Start(); threadFive.Start(); } static void Count(object iterations) { CountNumbers((int)iterations); } static void CountNumbers(int iterations) { for (int i = 1; i <= iterations; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine("{0} prints {1}", Thread.CurrentThread.Name, i); } } static void PrintNumber(int number) { Console.WriteLine(number); } class ThreadSample { private readonly int _iterations; public ThreadSample(int iterations) { _iterations = iterations; } public void CountNumbers() { for (int i = 1; i <= _iterations; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine("{0} prints {1}", Thread.CurrentThread.Name, i); } } } }
工作原理
當主程序啟動時,首先創建了ThreadSample類的一個對象,並提供了一個迭代次數。然后使用該對象的CountNumbers方法啟動線程。該方法運行在另一個線程中,但是使用數 ,字10,該數字是通過ThreadSample對象的構造函數傳入的。因此,我們只是使用相同的間接方式將該迭代次數傳遞給另一個線程。
另一種傳遞數據的方式是使用Thread.Start方法。該方法會接收一個對象,並將該對象,傳遞給線程。為了應用該方法,在線程中啟動的方法必須接受object類型的單個參數。在創建threadTwo線程時演示了該方式。我們將8作為一個對象傳遞給了Count方法,然后 Count方法被轉換為整型。
接下來的方式是使用lambda表達式。lambda表達式定義了一個不屬於任何類的方法。我們創建了一個方法,該方法使用需要的參數調用了另一個方法,並在另一個線程中運行該 ,方法。當啟動threadThree線程時,打印出了12個數字,這正是我們通過lambda表達式傳遞,的數字。
使用lambda表達式引用另一個C#對象的方式被稱為閉包。當在lambda表達式中使用任何局部變量時, C#會生成一個類,並將該變量作為該類的一個屬性。所以實際上該方式與 threadOne線程中使用的一樣,但是我們無須定義該類, C#編譯器會自動幫我們實現。
這可能會導致幾個問題。例如,如果在多個lambda表達式中使用相同的變量,它們會共享該變量值。在前一個例子中演示了這種情況。當啟動threadFour和threadFive線程時,.它們都會打印20,因為在這兩個線程啟動之前變量被修改為20。
使用C#中的lock關鍵字

class Program { static void Main(string[] args) { Console.WriteLine("Incorrect counter"); var c = new Counter(); var t1 = new Thread(() => TestCounter(c)); var t2 = new Thread(() => TestCounter(c)); var t3 = new Thread(() => TestCounter(c)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}",c.Count); Console.WriteLine("--------------------------"); Console.WriteLine("Correct counter"); var c1 = new CounterWithLock(); t1 = new Thread(() => TestCounter(c1)); t2 = new Thread(() => TestCounter(c1)); t3 = new Thread(() => TestCounter(c1)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}", c1.Count); Console.ReadKey(); } static void TestCounter(CounterBase c) { for (int i = 0; i < 100000; i++) { c.Increment(); c.Decrement(); } } class Counter : CounterBase { public int Count { get; private set; } public override void Increment() { Count++; } public override void Decrement() { Count--; } } class CounterWithLock : CounterBase { private readonly object _syncRoot = new Object(); public int Count { get; private set; } public override void Increment() { lock (_syncRoot) { Count++; } } public override void Decrement() { lock (_syncRoot) { Count--; } } } abstract class CounterBase { public abstract void Increment(); public abstract void Decrement(); } }
工作原理
當主程序啟動時,創建了一個Counter類的對象。該類定義了一個可以遞增和遞減的簡,單的計數器。然后我們啟動了三個線程。這三個線程共享同一個counter實例,在一個周期中進行一次遞增和一次遞減。這將導致不確定的結果。如果運行程序多次,則會打印出多個不同的計數器值。結果可能是0,但大多數情況下則不是0.
這是因為Counter類並不是線程安全的。當多個線程同時訪問counter對象時,第一個線程得到的counter值10並增加為11,然后第二個線程得到的值是11並增加為12,第一個線程得到counter值12,但是遞減操作發生前,第二個線程得到的counter值也是12,然后 , 第一個線程將12遞減為11並保存回counter中,同時第二個線程進行了同樣的操作。結果,我們進行了兩次遞增操作但是只有一次遞減操作,這顯然不對。這種情形被稱為競爭條件, (race condition),競爭條件是多線程環境中非常常見的導致錯誤的原因。
為了確保不會發生以上情形,必須保證當有線程操作counter對象時,所有其他線程必須等待直到當前線程完成操作。我們可以使用lock關鍵字來實現這種行為。如果鎖定了一個對象,需要訪問該對象的所有其他線程則會處於阻塞狀態,並等待直到該對象解除鎖定。這,可能會導致嚴重的性能問題,在第2章中將會進一步學習該知識點。
使用Monitor類鎖定資源

class Program { static void Main(string[] args) { object lock1 = new object(); object lock2 = new object(); new Thread(() => LockTooMuch(lock1, lock2)).Start(); lock (lock2) { Thread.Sleep(1000); Console.WriteLine("Monitor.TryEnter allows not to get stuck, returning false after a specified timeout is elapsed"); if (Monitor.TryEnter(lock1, TimeSpan.FromSeconds(5))) { Console.WriteLine("Acquired a protected resource succesfully"); } else { Console.WriteLine("Timeout acquiring a resource!"); } } new Thread(() => LockTooMuch(lock1, lock2)).Start(); Console.WriteLine("----------------------------------"); lock (lock2) { Console.WriteLine("This will be a deadlock!"); Thread.Sleep(1000); lock (lock1) { Console.WriteLine("Acquired a protected resource succesfully"); } } Console.ReadKey(); } static void LockTooMuch(object lock1, object lock2) { lock (lock1) { Thread.Sleep(1000); lock (lock2); } } }
工作原理
先看看LockTooMuch方法。在該方法中我們先鎖定了第一個對象,等待一秒后鎖定了 ,第二個對象。然后在另一個線程中啟動該方法。最后嘗試在主線程中先后鎖定第二個和第一個對象。
如果像該示例的第二部分一樣使用lock關鍵字,將會造成死鎖。第一個線程保持對, lock1對象的鎖定,等待直到lock2對象被釋放。主線程保持對lock2對象的鎖定並等待直到。lock1對象被釋放,但lock1對象永遠不會被釋放。
實際上lock關鍵字是Monitor類用例的一個語法糖。如果我們分解使用了lock關鍵字的代碼,將會看到它如下面代碼片段所示:

bool acquiredLock = false; try { Monitor.Enter(lockObject, ref acquiredLock); } finally { if (acquiredLock) { Monitor.Exit(lockObject); } }
因此,我們可以直接使用Monitor類。其擁有TryEnter方法,該方法接受一個超時, "參數。如果在我們能夠獲取被lock保護的資源之前,超時參數過期,則該方法會返回 false.
處理異常

class Program { static void Main(string[] args) { var t = new Thread(FaultyThread); t.Start(); t.Join(); try { t = new Thread(BadFaultyThread); t.Start(); } catch (Exception ex) { Console.WriteLine("We won't get here!"); } } static void BadFaultyThread() { Console.WriteLine("Starting a faulty thread..."); Thread.Sleep(TimeSpan.FromSeconds(2)); throw new Exception("Boom!"); } static void FaultyThread() { try { Console.WriteLine("Starting a faulty thread..."); Thread.Sleep(TimeSpan.FromSeconds(1)); throw new Exception("Boom!"); } catch (Exception ex) { Console.WriteLine("Exception handled: {0}", ex.Message); } } }
工作原理
當主程序啟動時,定義了兩個將會拋出異常的線程。其中一個對異常進行了處理,另一個則沒有。可以看到第二個異常沒有被包裹啟動線程的try/catch代碼塊捕獲到。所以如果直接使用線程,一般來說不要在線程中拋出異常,而是在線程代碼中使用try/catch代碼塊。
在較老版本的.NET Framework中(1.0和1.1),該行為是不一樣的,未被捕獲的異常不會強制應用程序關閉。可以通過添加一個包含以下代碼片段的應用程序配置文件(比如app config)來使用該策略。

<configuration> <runtime> <legacyUnhandledExceptionPolicy enable="1" /> </runtime> </configuration>
線程同步
正如前面所看到的一樣,多個線程同時使用共享對象會造成很多問題。同步這些線程使得對共享對象的操作能夠以正確的順序執行是非常重要的。在使用C#中的lock關鍵字,我們遇到了一個叫作競爭條件的問題。導致這問題的原因是多線程的執行並沒有正確同步。當一個線程執行遞增和遞減操作時,其他線程需要依次等待。這種常見問題通常被稱為線程同步。
有多種方式來實現線程同步。首先,如果無須共享對象,那么就無須進行線程同步。令,人驚奇的是大多數時候可以通過重新設計程序來除移共享狀態,從而去掉復雜的同步構造。請盡可能避免在多個線程間使用單一對象。
如果必須使用共享的狀態,第二種方式是只使用原子操作。這意味着一個操作只占用一個量子的時間,一次就可以完成。所以只有當前操作完成后,其他線程才能執行其他操作。因此,你無須實現其他線程等待當前操作完成,這就避免了使用鎖,也排除了死鎖的情況。
如果上面的方式不可行,並且程序的邏輯更加復雜,那么我們不得不使用不同的方式來,協調線程。方式之一是將等待的線程置於阻塞狀態。當線程處於阻塞狀態時,只會占用盡可能少的CPU時間。然而,這意味着將引入至少一次所謂的上下文切換( context switch),上下文切換是指操作系統的線程調度器。該調度器會保存等待的線程的狀態,並切換到另一個.線程,依次恢復等待的線程的狀態。這需要消耗相當多的資源。然而,如果線程要被掛起很,長時間,那么這樣做是值得的。這種方式又被稱為內核模式(kernel-mode),因為只有操作系,統的內核才能阻止線程使用CPU時間。
萬一線程只需要等待一小段時間,最好只是簡單的等待,而不用將線程切換到阻塞狀,態。雖然線程等待時會浪費CPU時間,但我們節省了上下文切換耗費的CPU時間。該方式又被稱為用戶模式(user-mode),該方式非常輕量,速度很快,但如果線程需要等待較長時間則會浪費大量的CPU時間。
為了利用好這兩種方式,可以使用混合模式(hybrid),混合模式先嘗試使用用戶模式等,待,如果線程等待了足夠長的時間,則會切換到阻塞狀態以節省CPU資源。
執行基本的原子操作(Interlocked)
本節將展示如何對對象執行基本的原子操作,從而不用阻塞線程就可避免競爭條件。

internal class Program { private static void Main(string[] args) { Console.WriteLine("Incorrect counter"); var c = new Counter(); var t1 = new Thread(() => TestCounter(c)); var t2 = new Thread(() => TestCounter(c)); var t3 = new Thread(() => TestCounter(c)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}", c.Count); Console.WriteLine("--------------------------"); Console.WriteLine("Correct counter"); var c1 = new CounterNoLock(); t1 = new Thread(() => TestCounter(c1)); t2 = new Thread(() => TestCounter(c1)); t3 = new Thread(() => TestCounter(c1)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}", c1.Count); Console.ReadKey(); } static void TestCounter(CounterBase c) { for (int i = 0; i < 100000; i++) { c.Increment(); c.Decrement(); } } class Counter : CounterBase { private int _count; public int Count { get { return _count; } } public override void Increment() { _count++; } public override void Decrement() { _count--; } } class CounterNoLock : CounterBase { private int _count; public int Count { get { return _count; } } public override void Increment() { Interlocked.Increment(ref _count); } public override void Decrement() { Interlocked.Decrement(ref _count); } } abstract class CounterBase { public abstract void Increment(); public abstract void Decrement(); } }
工作原理
當程序運行時,會創建三個線程來運行TestCounter方法中的代碼。該方法對一個對象,按序執行了遞增或遞減操作。起初的Counter對象不是線程安全的,我們會遇到競爭條件。所以第一個例子中計數器的結果值是不確定的。我們可能會得到數字0,然而如果運行程序多次,你將最終得到一些不正確的非零結果。在第1部分中,我們通過鎖定對象解決了這個問題。在一個線程獲取舊的計數器值並計,算后賦予新的值之前,其他線程都被阻塞了。然而,如果我們采用上述方式執行該操作中途不能停止。而借助於Interlocked類,我們無需鎖定任何對象即可獲取到正確的結果。Interlocked提供了Increment, Decrement和Add等基本數學操作的原子方法,從而幫助我們,在編寫Counter類時無需使用鎖。
使用Mutex類
本節將描述如何使用Mutex類來同步兩個單獨的程序。Mutex是一種原始的同步方式,其只對一個線程授予對共享資源的獨占訪問。

class Program { static void Main(string[] args) { const string MutexName = "CSharpThreadingCookbook"; using (var m = new Mutex(false, MutexName)) { if (!m.WaitOne(TimeSpan.FromSeconds(5), false)) { Console.WriteLine("Second instance is running!"); } else { Console.WriteLine("Running!"); Console.ReadLine(); m.ReleaseMutex(); } } } }
工作原理
當主程序啟動時,定義了一個指定名稱的互斥量,設置initialOwner標志為false。這意.味着如果互斥量已經被創建,則允許程序獲取該互斥量。如果沒有獲取到互斥量,程序則簡單地顯示Running,等待直到按下了任何鍵,然后釋放該互斥量並退出。
如果再運行同樣一個程序,則會在5秒鍾內嘗試獲取互斥量。如果此時在第一個程序中,按下了任何鍵,第二個程序則會開始執行。然而,如果保持等待5秒鍾,第二個程序將無法,獲取到該瓦斥量。
使用SemaphoreSlim類
本節將展示SemaphoreSlim類是如何作為Semaphore類的輕量級版本的。該類限制了同時訪問同一個資源的線程數量。

class Program { static void Main(string[] args) { for (int i = 1; i <= 6; i++) { string threadName = "Thread " + i; int secondsToWait = 2 + 2 * i; var t = new Thread(() => AccessDatabase(threadName, secondsToWait)); t.Start(); } } static SemaphoreSlim _semaphore = new SemaphoreSlim(4); static void AccessDatabase(string name, int seconds) { Console.WriteLine("{0} waits to access a database", name); _semaphore.Wait(); Console.WriteLine("{0} was granted an access to a database", name); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} is completed", name); _semaphore.Release(); } }
工作原理
當主程序啟動時,創建了SemaphoreSlim的一個實例,並在其構造函數中指定允許的並發線程數量。然后啟動了6個不同名稱和不同初始運行時間的線程。
每個線程都嘗試獲取數據庫的訪問,但是我們借助於信號系統限制了訪問數據庫的並發,數為4個線程。當有4個線程獲取了數據庫的訪問后,其他兩個線程需要等待,直到之前線,程中的某一個完成工作並調用semaphore.Release方法來發出信號。
這里我們使用了混合模式,其允許我們在等待時間很短的情況下無需使用上下文切換。然而,有一個叫作Semaphore的SemaphoreSlim類的老版本。該版本使用純粹的內核時間 ( kernel-time)方式。一般沒必要使用它,除非是非常重要的場景。我們可以創建一個具名的semaphore,就像一個具名的mutex一樣,從而在不同的程序中同步線程。SemaphoreSlim並不使用Windows內核信號量,而且也不支持進程間同步。所以在跨程序同步的場景下可以使用Semaphore.
使用AutoResetEvent類
本示例借助於AutoResetEvent類來從一個線程向另一個線程發送通知。AutoResetEvent類可以通知等待的線程有某事件發生。

class Program { static void Main(string[] args) { var t = new Thread(() => Process(10)); t.Start(); Console.WriteLine("Waiting for another thread to complete work"); _workerEvent.WaitOne(); Console.WriteLine("First operation is completed!"); Console.WriteLine("Performing an operation on a main thread"); Thread.Sleep(TimeSpan.FromSeconds(5)); _mainEvent.Set(); Console.WriteLine("Now running the second operation on a second thread"); _workerEvent.WaitOne(); Console.WriteLine("Second operation is completed!"); Console.ReadKey(); } private static AutoResetEvent _workerEvent = new AutoResetEvent(false); private static AutoResetEvent _mainEvent = new AutoResetEvent(false); static void Process(int seconds) { Console.WriteLine("Starting a long running work..."); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("Work is done!"); _workerEvent.Set(); Console.WriteLine("Waiting for a main thread to complete its work"); _mainEvent.WaitOne(); Console.WriteLine("Starting second operation..."); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("Work is done!"); _workerEvent.Set(); } }
工作原理
當主程序啟動時,定義了兩個AutoResetEvent實例。其中一個是從子線程向主線程發信號,另一個實例是從主線程向子線程發信號。我們向AutoResetEvent構造方法傳人false,定義了這兩個實例的初始狀態為unsignaled。這意味着任何線程調用這兩個對象中的任何一個的WaitOne方法將會被阻塞,直到我們調用了Set方法。如果初始事件狀態為true,那么 AutoResetEvent實例的狀態為signaled,如果線程調用WaitOne方法則會被立即處理。然后事件狀態自動變為unsignaled,所以需要再對該實例調用一次Set方法,以便讓其他的線程對,該實例調用WaitOne方法從而繼續執行。
然后我們創建了第二個線程,其會執行第一個操作10秒鍾,然后等待從第二個線程發,出的信號。該信號意味着第一個操作已經完成。現在第二個線程在等待主線程的信號。我們對主線程做了一些附加工作,並通過調用mainEvent.Set方法發送了一個信號。然后等待從第二個線程發出的另一個信號。
AutoResetEvent類采用的是內核時間模式,所以等待時間不能太長。使用ManualResetEventslim類更好,因為它使用的是混合模式。
使用ManualResetEventSlim類
本節將描述如何使用ManualResetEventSlim類來在線程間以更靈活的方式傳遞信號。

class Program { static void Main(string[] args) { var t1 = new Thread(() => TravelThroughGates("Thread 1", 5)); var t2 = new Thread(() => TravelThroughGates("Thread 2", 6)); var t3 = new Thread(() => TravelThroughGates("Thread 3", 12)); t1.Start(); t2.Start(); t3.Start(); Thread.Sleep(TimeSpan.FromSeconds(6)); Console.WriteLine("The gates are now open!"); _mainEvent.Set(); Thread.Sleep(TimeSpan.FromSeconds(2)); _mainEvent.Reset(); Console.WriteLine("The gates have been closed!"); Thread.Sleep(TimeSpan.FromSeconds(10)); Console.WriteLine("The gates are now open for the second time!"); _mainEvent.Set(); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("The gates have been closed!"); _mainEvent.Reset(); Console.ReadKey(); } static void TravelThroughGates(string threadName, int seconds) { Console.WriteLine("{0} falls to sleep", threadName); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} waits for the gates to open!", threadName); _mainEvent.Wait(); Console.WriteLine("{0} enters the gates!", threadName); } static ManualResetEventSlim _mainEvent = new ManualResetEventSlim(false); }
工作原理
當主程序啟動時,首先創建了ManualResetEventSlim類的一個實例。然后啟動了三個線程,等待事件信號通知它們繼續執行。
ManualResetEvnetSlim的整個工作方式有點像人群通過大門。而AutoResetEvent事件像一個旋轉門,一次只允許一人通過。ManualResetEventSlim是ManualResetEvent的混合版本,一直保持大門敞開直到手動調用Reset方法。當調用mainEvent.Set時,相當於打開了大門從而允許准備好的線程接收信號並繼續工作。然而線程3還處於睡眠 "狀態,沒有趕上時間。當調用mainEvent.Reset相當於關閉了大門。最后一個線程已經准備好執行,但是不得不等待下一個信號,即要等待好幾秒鍾。
使用CountdownEvent類
本節將描述如何使用CountdownEvent信號類來等待直到一定數量的操作完成。

class Program { static void Main(string[] args) { Console.WriteLine("Starting two operations"); var t1 = new Thread(() => PerformOperation("Operation 1 is completed", 4)); var t2 = new Thread(() => PerformOperation("Operation 2 is completed", 8)); t1.Start(); t2.Start(); _countdown.Wait(); Console.WriteLine("Both operations have been completed."); _countdown.Dispose(); Console.ReadKey(); } static CountdownEvent _countdown = new CountdownEvent(2); static void PerformOperation(string message, int seconds) { Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine(message); _countdown.Signal(); } }
工作原理
當主程序啟動時,創建了一個CountdownEvent實例,在其構造函數中指定了當兩個操,作完成時會發出信號。然后我們啟動了兩個線程,當它們執行完成后會發出信號。一旦第二個線程完成,主線程會從等待CountdownEvent的狀態中返回並繼續執行。針對需要等待多,個異步操作完成的情形,使用該方式是非常便利的。
然而這有一個重大的缺點。如果調用countdown.Signal()沒達到指定的次數,那么-countdown. Wait()將一直等待。請確保使用CountdownEvent時,所有線程完成后都要調用,Signal方法。
使用Barrier類
本節將展示另一種有意思的同步方式,被稱為Barrier, Barrier類用於組織多個線程及時, 在某個時刻碰面。其提供了一個回調函數,每次線程調用了SignalAndWait方法后該回調函數會被執行。

class Program { static void Main(string[] args) { var t1 = new Thread(() => PlayMusic("the guitarist", "play an amazing solo", 5)); var t2 = new Thread(() => PlayMusic("the singer", "sing his song", 2)); t1.Start(); t2.Start(); Console.ReadKey(); } static Barrier _barrier = new Barrier(2,b => Console.WriteLine("End of phase {0}", b.CurrentPhaseNumber + 1)); static void PlayMusic(string name, string message, int seconds) { for (int i = 1; i < 3; i++) { Console.WriteLine("----------------------------------------------"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} starts to {1}", name, message); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} finishes to {1}", name, message); _barrier.SignalAndWait(); } } }
工作原理
我們創建了Barrier類,指定了我們想要同步兩個線程。在兩個線程中的任何一個調用了-barrier.SignalAndWait方法后,會執行一個回調函數來打印出階段。
每個線程將向Barrier發送兩次信號,所以會有兩個階段。每次這兩個線程調用Signal AndWait方法時, Barrier將執行回調函數。這在多線程迭代運算中非常有用,可以在每個迭代,結束前執行一些計算。當最后一個線程調用SignalAndWait方法時可以在迭代結束時進行交互。
使用ReaderWriterLockSlim類
本節將描述如何使用ReaderWriterLockSlim來創建一個線程安全的機制,在多線程中對,一個集合進行讀寫操作。ReaderWriterLockSlim代表了一個管理資源訪問的鎖,允許多個線程同時讀取,以及獨占寫。

class Program { static void Main(string[] args) { new Thread(Read){ IsBackground = true }.Start(); new Thread(Read){ IsBackground = true }.Start(); new Thread(Read){ IsBackground = true }.Start(); new Thread(() => Write("Thread 1")){ IsBackground = true }.Start(); new Thread(() => Write("Thread 2")){ IsBackground = true }.Start(); Thread.Sleep(TimeSpan.FromSeconds(30)); Console.ReadKey(); } static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim(); static Dictionary<int, int> _items = new Dictionary<int, int>(); static void Read() { Console.WriteLine("Reading contents of a dictionary"); while (true) { try { _rw.EnterReadLock(); foreach (var key in _items.Keys) { Thread.Sleep(TimeSpan.FromSeconds(0.1)); } } finally { _rw.ExitReadLock(); } } } static void Write(string threadName) { while (true) { try { int newKey = new Random().Next(250); _rw.EnterUpgradeableReadLock(); if (!_items.ContainsKey(newKey)) { try { _rw.EnterWriteLock(); _items[newKey] = 1; Console.WriteLine("New key {0} is added to a dictionary by a {1}", newKey, threadName); } finally { _rw.ExitWriteLock(); } } Thread.Sleep(TimeSpan.FromSeconds(0.1)); } finally { _rw.ExitUpgradeableReadLock(); } } } }
工作原理
當主程序啟動時,同時運行了三個線程來從字典中讀取數據,還有另外兩個線程向該字典中寫入數據。我們使用ReaderWriterLockSlim類來實現線程安全,該類專為這樣的場景而設計。
這里使用兩種鎖:讀鎖允許多線程讀取數據,寫鎖在被釋放前會阻塞了其他線程的所,有操作。獲取讀鎖時還有一個有意思的場景,即從集合中讀取數據時,根據當前數據而決,定是否獲取一個寫鎖並修改該集合。一旦得到寫鎖,會阻止閱讀者讀取數據,從而浪費大量的時間,因此獲取寫鎖后集合會處於阻塞狀態。為了最小化阻塞浪費的時間,可以使用 EnterUpgradeableReadLock和ExitUpgradeableReadLock方法。先獲取讀鎖后讀取數據。如果發現必須修改底層集合,只需使用EnterWriteLock方法升級鎖,然后快速執行一次寫操作.最后使用ExitWriteLock釋放寫鎖。
在本例中,我們先生成一個隨機數。然后獲取讀鎖並檢查該數是否存在於字典的鍵集合中。如果不存在,將讀鎖更新為寫鎖然后將該新鍵加入到字典中。始終使用tyr/finaly代碼塊來確保在捕獲鎖后一定會釋放鎖,這是一項好的實踐。所有的線程都被創建為后台線程。
主線程在所有后台線程完成后會等待30秒。
使用SpinWait類
本節將描述如何不使用內核模型的方式來使線程等待。另外,我們介紹了SpinWait,它, ,是一個混合同步構造,被設計為使用用戶模式等待一段時間,然后切換到內核模式以節省CPU時間。

class Program { static void Main(string[] args) { var t1 = new Thread(UserModeWait); var t2 = new Thread(HybridSpinWait); Console.WriteLine("Running user mode waiting"); t1.Start(); Thread.Sleep(20); _isCompleted = true; Thread.Sleep(TimeSpan.FromSeconds(1)); _isCompleted = false; Console.WriteLine("Running hybrid SpinWait construct waiting"); t2.Start(); Thread.Sleep(5); _isCompleted = true; Console.ReadKey(); } static volatile bool _isCompleted = false; static void UserModeWait() { while (!_isCompleted) { Console.Write("."); } Console.WriteLine(); Console.WriteLine("Waiting is complete"); } static void HybridSpinWait() { var w = new SpinWait(); while (!_isCompleted) { w.SpinOnce(); Console.WriteLine(w.NextSpinWillYield); } Console.WriteLine("Waiting is complete"); } }
工作原理
當主程序啟動時,定義了一個線程,將執行一個無止境的循環,直到20毫秒后主線程,設置_isCompleted變量為true,我們可以試驗運行該周期為20-30秒,通過Windows任務管理器測量CPU的負載情況。取決於CPU內核數量,任務管理器將顯示一個顯著的處理時間。
我們使用volatile關鍵字來聲明isCompleted靜態字段。Volatile關鍵字指出一個字段可能會被同時執行的多個線程修改。聲明為volatile的字段不會被編譯器和處理器優化為只能被單個線程訪問。這確保了該字段總是最新的值。
然后我們使用了SpinWait版本,用於在每個迭代打印一個特殊標志位來顯示線程是否切換為阻塞狀態。運行該線程5毫秒來查看結果。剛開始, SpinWait嘗試使用用戶模式,在9 個迭代后,開始切換線程為阻塞狀態。如果嘗試測量該版本的CPU負載,在Windows任務管理器將不會看到任何CPU的使用。
使用線程池
簡介
在之前的章節中我們討論了創建線程和線程協作的幾種方式。現在考慮另一種情況,即只花費極少的時間來完成創建很多異步操作。創建線程是昂貴的操作,所以為每個短暫的異步操作創建線程會產生顯著的開銷。
為了解決該問題,有一個常用的方式叫做池( pooling),線程池可以成功地適應於任何需要大量短暫的開銷大的資源的情形。我們事先分配一定的資源,將這些資源放入到資源池。每次需要新的資源,只需從池中獲取一個,而不用創建一個新的。當該資源不再被使用,時,就將其返回到池中。
.NET線程池是該概念的一種實現。通過System.Threading.ThreadPool類型可以使用線程池。線程池是受,NET通用語言運行時( Common Language Runtime,簡稱CLR)管理的。這意味着每個CLR都有一個線程池實例。ThreadPool類型擁有一個QueueUserWorkItem靜態方法。該靜態方法接受一個委托,代表用戶自定義的一個異步操作。在該方法被調用后,委,托會進入到內部隊列中。如果池中沒有任何線程,將創建一個新的工作線程( worker thread) 並將隊列中第一個委托放入到該工作線程中。如果想線程池中放入新的操作,當之前的所有操作完成后,很可能只需重用一個線程來執行這些新的操作。然而,如果放置新的操作過快,線程池將創建更多的線程來執行這些操,作。創建太多的線程是有限制的,在這種情況下新的操作將在隊列中等待直到線程池中的工作線程有能力來執行它們。
當停止向線程池中放置新操作時,線程池最終會刪除一定時間后過期的不再使用的線程。這將釋放所有那些不再需要的系統資源。我想再次強調線程池的用途是執行運行時間短的操作。使用線程池可以減少並行度耗費,及節省操作系統資源。
我們只使用較少的線程,但是以比平常更慢的速度來執行異步操作, ,使用一定數量的可用的工作線程批量處理這些操作。如果操作能快速地完成則比較適用線程!池,但是執行長時間運行的計算密集型操作則會降低性能。
另一個重要事情是在ASPNET應用程序中使用線程池時要相當小心。ASPNET基礎設施使用自己的線程池,如果在線程池中浪費所有的工作線程, Web服務器將不能夠服務新的請求。在ASPNET中只推薦使用輸入/輸出密集型的異步操作,因為其使用了一個不同的方式,叫做IO線程。
在本章中,我們將學習使用線程池來執行異步操作。本章將覆蓋將操作放入線程池的不,,同方式,以及如何取消一個操作,並防止其長時間運行。
保持線程中的操作都是短暫的是非常重要的。不要在線程池中放入長時間運行的操作,或者阻塞工作線程。這將導致所有工作線程變得繁忙,從而無法服務用戶操作。這會導致性能問題和非常難以調試的錯誤。
請注意線程池中的工作線程都是后台線程。這意味着當所有的前台線程(包括主程序線程)完成后,所有的后台線程將停止工作。
在線程池中調用委托
本節將展示在線程池中如何異步的執行委托。另外,我們將討論一個叫做異步編程模型(Asynchronous Programming Model,簡稱APM)的方式,這是NET歷史中第一個異步編程模式。

class Program { static void Main(string[] args) { int threadId = 0; RunOnThreadPool poolDelegate = Test; var t = new Thread(() => Test(out threadId)); t.Start(); t.Join(); Console.WriteLine("Thread id: {0}", threadId); IAsyncResult r = poolDelegate.BeginInvoke(out threadId, Callback, "a delegate asynchronous call"); r.AsyncWaitHandle.WaitOne(); string result = poolDelegate.EndInvoke(out threadId, r); Console.WriteLine("Thread pool worker thread id: {0}", threadId); Console.WriteLine(result); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.ReadKey(); } private delegate string RunOnThreadPool(out int threadId); private static void Callback(IAsyncResult ar) { Console.WriteLine("Starting a callback..."); Console.WriteLine("State passed to a callbak: {0}", ar.AsyncState); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Console.WriteLine("Thread pool worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); } private static string Test(out int threadId) { Console.WriteLine("Starting..."); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); threadId = Thread.CurrentThread.ManagedThreadId; return string.Format("Thread pool worker thread id was: {0}", threadId); } }
工作原理
當程序運行時,使用舊的方式創建了一個線程,然后啟動它並等待完成。由於線程的構造函數只接受一個無任何返回結果的方法,我們使用了lambda表達式來將對Test方法的調用包起來。我們通過打印出Thread. CurrentThread.IsThreadPoolThread屬性值來確,保該線程不是來自線程池。我們也打印出了受管理的線程ID來識別代碼是被哪個線程執行的。
然后定義了一個委托並調用Beginlnvoke方法來運行該委托。BeginInvoke方法接受一個回調函數。該回調函數會在異步操作完成后會被調用,並且一個用戶自定義的狀態會傳給該回調函數。該狀態通常用於區分異步調用。結果,我們得到了一個實現了IAsyncResult接口的result對象。BeginInvoke立即返回了結果,當線程池中的工作線程在執行異步操作時,仍允許我們繼續其他工作。當需要異步操作的結果時,可以使用BeginInvoke方法調用返回的result對象。我們可以使用result對象的IsCompleted屬性輪詢結果。但是在本例子中,使用的是AsyncWaitHandle屬性來等待直到操作完成。當操作完成后,會得到一個結果,可以通過委托調用EndInvoke方法,將IAsyncResult對象傳遞給委托參數。
事實上使用AsyncWaitHandle並不是必要的。如果注釋掉r.AsyncWaitHandle.WaitOne,代碼照樣可以成功運行, 因為EndInvoke方法事實上會等待異步操作完成。調用 "EndInvoke方法(或者針對其他異步API的EndOperationName方法)是非常重要的, '因為該方法會將任何未處理的異常拋回到調用線程中。當使用這種異步API時,請確保始終調用了Begin和End方法。
當操作完成后,傳遞給BeginInvoke方法的回調函數將被放置到線程池中,確切地說是,一個工作線程中。如果在Main方法定義的結尾注釋掉Thread.Sleep方法調用,回調函數將不,會被執行。這是因為當主線程完成后,所有的后台線程會被停止,包括該回調函數。對委托和回調函數的異步調用很可能會被同一個工作線程執行。通過工作線程ID可以容易地看出。使用BeginOperationName/EndOperationName方法和.NET中的IAsyncResult對象等方 ,式被稱為異步編程模型(或APM模式),這樣的方法對被稱為異步方法。該模式也被應用於多個,NET類庫的API中,但在現代編程中,更推薦使用任務並行庫( Task Parallel Library,簡稱TPL)來組織異步API
向線程池中放入異步操作

class Program { static void Main(string[] args) { const int x = 1; const int y = 2; const string lambdaState = "lambda state 2"; ThreadPool.QueueUserWorkItem(AsyncOperation); Thread.Sleep(TimeSpan.FromSeconds(1)); ThreadPool.QueueUserWorkItem(AsyncOperation, "async state"); Thread.Sleep(TimeSpan.FromSeconds(1)); ThreadPool.QueueUserWorkItem( state => { Console.WriteLine("Operation state: {0}", state); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); }, "lambda state"); ThreadPool.QueueUserWorkItem( _ => { Console.WriteLine("Operation state: {0}, {1}", x+y, lambdaState); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); }, "lambda state"); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.ReadKey(); } private static void AsyncOperation(object state) { Console.WriteLine("Operation state: {0}", state ?? "(null)"); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); } }
工作原理
首先定義了AsyncOperation方法,其接受單個object類型的參數。然后使用QueueUser WorkItem方法將該方法放到線程池中。接着再次放入該方法,但是這次給方法調用傳入了一個狀態對象。該對象將作為狀態參數傳遞給AsynchronousOperation方法。
在操作完成后讓線程睡眠一秒鍾,從而讓線程池擁有為新操作重用線程的可能性。如果注釋掉所有的Thread.Sleep調用,那么所有打印出的線程ID多半是不一樣的。如果ID是一樣的,那很可能是前兩個線程被重用來運行接下來的兩個操作。
首先將一個lambda表達式放置到線程池中。這里沒什么特別的。我們使用了labmbda表達式語法,從而無須定義一個單獨的方法。
然后,我們使用閉包機制,從而無須傳遞lambda表達式的狀態。閉包更靈活,允許我,們向異步操作傳遞一個以上的對象而且這些對象具有靜態類型。所以之前介紹的傳遞對象給,方法回調的機制既冗余又過時。在C#中有了閉包后就不再需要使用它了。
線程池與並行度
本節將展示線程池如何工作於大量的異步操作,以及它與創建大量單獨的線程的方式有何不同。

class Program { static void Main(string[] args) { const int numberOfOperations = 500; var sw = new Stopwatch(); sw.Start(); UseThreads(numberOfOperations); sw.Stop(); Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); sw.Reset(); sw.Start(); UseThreadPool(numberOfOperations); sw.Stop(); Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); Console.ReadKey(); } static void UseThreads(int numberOfOperations) { using (var countdown = new CountdownEvent(numberOfOperations)) { Console.WriteLine("Scheduling work by creating threads"); for (int i = 0; i < numberOfOperations; i++) { var thread = new Thread(() => { Console.Write("{0},", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(0.1)); countdown.Signal(); }); thread.Start(); } countdown.Wait(); Console.WriteLine(); } } static void UseThreadPool(int numberOfOperations) { using (var countdown = new CountdownEvent(numberOfOperations)) { Console.WriteLine("Starting work on a threadpool"); for (int i = 0; i < numberOfOperations; i++) { ThreadPool.QueueUserWorkItem( _ => { Console.Write("{0},", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(0.1)); countdown.Signal(); }); } countdown.Wait(); Console.WriteLine(); } } }
工作原理
當主程序啟動時,創建了很多不同的線程,每個線程都運行一個操作。該操作打印出線,程ID並阻塞線程100毫秒。結果我們創建了500個線程,全部並行運行這些操作。雖然在,我的機器上的總耗時是300毫秒,但是所有線程消耗了大量的操作系統資源。
然后我們使用了執行同樣的任務,只不過不為每個操作創建一個線程,而將它們放入到線程池中。然后線程池開始執行這些操作。線程池在快結束時創建更多的線程,但是仍然花,費了更多的時間,在我機器上是12秒。我們為操作系統節省了內存和線程數,但是為此付,出了更長的執行時間。
實現一個取消選項
.本節將通過一個示例來展示如何在線程池中取消異步操作。

class Program { static void Main(string[] args) { using (var cts = new CancellationTokenSource()) { CancellationToken token = cts.Token; ThreadPool.QueueUserWorkItem(_ => AsyncOperation1(token)); Thread.Sleep(TimeSpan.FromSeconds(2)); cts.Cancel(); } using (var cts = new CancellationTokenSource()) { CancellationToken token = cts.Token; ThreadPool.QueueUserWorkItem(_ => AsyncOperation2(token)); Thread.Sleep(TimeSpan.FromSeconds(2)); cts.Cancel(); } using (var cts = new CancellationTokenSource()) { CancellationToken token = cts.Token; ThreadPool.QueueUserWorkItem(_ => AsyncOperation3(token)); Thread.Sleep(TimeSpan.FromSeconds(2)); cts.Cancel(); } Thread.Sleep(TimeSpan.FromSeconds(2)); } static void AsyncOperation1(CancellationToken token) { Console.WriteLine("Starting the first task"); for (int i = 0; i < 5; i++) { if (token.IsCancellationRequested) { Console.WriteLine("The first task has been canceled."); return; } Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine("The first task has completed succesfully"); } static void AsyncOperation2(CancellationToken token) { try { Console.WriteLine("Starting the second task"); for (int i = 0; i < 5; i++) { token.ThrowIfCancellationRequested(); Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine("The second task has completed succesfully"); } catch (OperationCanceledException) { Console.WriteLine("The second task has been canceled."); } } private static void AsyncOperation3(CancellationToken token) { bool cancellationFlag = false; token.Register(() => cancellationFlag = true); Console.WriteLine("Starting the third task"); for (int i = 0; i < 5; i++) { if (cancellationFlag) { Console.WriteLine("The third task has been canceled."); return; } Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine("The third task has completed succesfully"); } }
工作原理
本節中介紹了CancellationTokenSource和CancellationToken兩個新類。它們在.NET4.0被引人, 目前是實現異步操作的取消操作的事實標准。由於線程池已經存在了很長時間,並,沒有特殊的API來實現取消標記功能,但是仍然可以對線程池使用上述API。
在本程序中使用了三種方式來實現取消過程。第一個是輪詢來檢查CancellationToken.IsCancellationRequested屬性。如果該屬性為true,則說明操作需要被取消,我們必須放棄該操作。
第二種方式是拋出一個OperationCancelledException異常。這允許在操作之外控制取消過程,即需要取消操作時,通過操作之外的代碼來處理。
最后一種方式是注冊一個回調函數。當操作被取消時,在線程池將調用該回調函數。這允許鏈式傳遞一個取消邏輯到另一個異步操作中。
在線程池中使用等待事件處理器及超時
本節將描述如何在線程池中對操作實現超時,以及如何在線程池中正確地等待。

class Program { static void Main(string[] args) { RunOperations(TimeSpan.FromSeconds(5)); RunOperations(TimeSpan.FromSeconds(7)); } static void RunOperations(TimeSpan workerOperationTimeout) { using (var evt = new ManualResetEvent(false)) using (var cts = new CancellationTokenSource()) { Console.WriteLine("Registering timeout operations..."); var worker = ThreadPool.RegisterWaitForSingleObject(evt, (state, isTimedOut) => WorkerOperationWait(cts, isTimedOut), null, workerOperationTimeout, true); Console.WriteLine("Starting long running operation..."); ThreadPool.QueueUserWorkItem(_ => WorkerOperation(cts.Token, evt)); Thread.Sleep(workerOperationTimeout.Add(TimeSpan.FromSeconds(2))); worker.Unregister(evt); } } static void WorkerOperation(CancellationToken token, ManualResetEvent evt) { for(int i = 0; i < 6; i++) { if (token.IsCancellationRequested) { return; } Thread.Sleep(TimeSpan.FromSeconds(1)); } evt.Set(); } static void WorkerOperationWait(CancellationTokenSource cts, bool isTimedOut) { if (isTimedOut) { cts.Cancel(); Console.WriteLine("Worker operation timed out and was canceled."); } else { Console.WriteLine("Worker operation succeded."); } } }
工作原理
線程池還有一個有用的方法: ThreadPool.RegisterWaitForSingleObject,該方法允許我們將回調函數放入線程池中的隊列中。當提供的等待事件處理器收到信號或發生超時時,該回調函數將被調用。這允許我們為線程池中的操作實現超時功能。
首先按順序向線程池中放入一個耗時長的操作。它運行6秒鍾然后一旦成功完成,會設置一個ManualResetEvent信號類。其他的情況下,比如需要取消操作,則該操作會被丟棄。 .
然后我們注冊了第二個異步操作。當從ManualResetEvent對象接受到一個信號后,該異步操作會被調用。如果第一個操作順利完成,會設置該信號量。另一種情況是第一個操作還未完成就已經超時。如果發生了該情況,我們會使用CancellationToken來取消第一個操作。
最后,為操作提供5秒的超時時間是不夠的。這是因為操作會花費6秒來完成,只能取消該操作。所以如果提供7秒的超時時間是可行的,該操作會順利完成。
當有大量的線程必須處於阻塞狀態中等待一些多線程事件發信號時,以上方式非常有,用。借助於線程池的基礎設施,我們無需阻塞所有這樣的線程。可以釋放這些線程直到信號事件被設置。在服務器端應用程序中這是個非常重要的應用場景,因為服務器端應用程序要求高伸縮性及高性能。
使用計時器
本節將描述如何使用System.Threading. Timer對象來在線程池中創建周期性調用的異步

class Program { static void Main(string[] args) { Console.WriteLine("Press 'Enter' to stop the timer..."); DateTime start = DateTime.Now; _timer = new Timer(_ => TimerOperation(start), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)); Thread.Sleep(TimeSpan.FromSeconds(6)); _timer.Change(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(4)); Console.ReadLine(); _timer.Dispose(); Console.ReadKey(); } static Timer _timer; static void TimerOperation(DateTime start) { TimeSpan elapsed = DateTime.Now - start; Console.WriteLine("{0} seconds from {1}. Timer thread pool thread id: {2}", elapsed.Seconds, start,Thread.CurrentThread.ManagedThreadId); } }
工作原理
我們首先創建了一個Timer實例。第一個參數是一個1ambda表達式,將會在線程池中被執行。我們調用TimerOperation方法並給其提供一個起始時間。由於無須使用用戶狀態對象,所以第二個參數為null,然后指定了什么時候會第一次運行TimerOperation,以及之后 "再次調用的間隔時間。所以第一個值實際上說明一秒后會啟動第一次操作,然后每隔兩秒再,次運行。
之后等待6秒后修改計時器。在調用timer.Change方法一秒后啟動TimerOperation,然后每隔4秒再次運行。
計時器還可以更復雜:可以以更復雜的方式使用計時器。比如,可以通過Timeout.Infinet值提供給計時器個間隔參數來只允許計時器操作一次。然后在計時器異步操作內,能夠設置下一次計,時器操作將被執行的時間。具體時間取決於自定義業務邏輯。
使用BackgroundWorker組件

class Program { static void Main(string[] args) { var bw = new BackgroundWorker(); bw.WorkerReportsProgress = true; bw.WorkerSupportsCancellation = true; bw.DoWork += Worker_DoWork; bw.ProgressChanged += Worker_ProgressChanged; bw.RunWorkerCompleted += Worker_Completed; bw.RunWorkerAsync(); Console.WriteLine("Press C to cancel work"); do { if (Console.ReadKey(true).KeyChar == 'C') { bw.CancelAsync(); } } while(bw.IsBusy); } static void Worker_DoWork(object sender, DoWorkEventArgs e) { Console.WriteLine("DoWork thread pool thread id: {0}", Thread.CurrentThread.ManagedThreadId); var bw = (BackgroundWorker) sender; for (int i = 1; i <= 100; i++) { if (bw.CancellationPending) { e.Cancel = true; return; } if (i%10 == 0) { bw.ReportProgress(i); } Thread.Sleep(TimeSpan.FromSeconds(0.1)); } e.Result = 42; } static void Worker_ProgressChanged(object sender, ProgressChangedEventArgs e) { Console.WriteLine("{0}% completed. Progress thread pool thread id: {1}", e.ProgressPercentage, Thread.CurrentThread.ManagedThreadId); } static void Worker_Completed(object sender, RunWorkerCompletedEventArgs e) { Console.WriteLine("Completed thread pool thread id: {0}", Thread.CurrentThread.ManagedThreadId); if (e.Error != null) { Console.WriteLine("Exception {0} has occured.", e.Error.Message); } else if (e.Cancelled) { Console.WriteLine("Operation has been canceled."); } else { Console.WriteLine("The answer is: {0}", e.Result); } } }
工作原理
當程序啟動時,創建了一個BackgroundWorker組件的實例。顯式地指出該后台工作線,程支持取消操作及該操作進度的通知。
接下來是最有意思的部分。我們沒有使用線程池和委托,而是使用了另一個C#語法,稱為事件。事件表示了一些通知的源或當通知到達時會有所響應的一系列訂閱者。在本例中,我們將訂閱三個事件,當這些事件發生時,將調用相應的事件處理器。當事件通知其訂,閱者時,具有特殊的定義簽名的方法將被調用。
因此,除了將異步API組織為Begin/End方法對,還可以只啟動一個異步操作然后訂閱給不同的事件。這些事件在該操作執行時會被觸發。這種方式被稱為基於事件的異步模式, ( Event-based Asynchronous Pattern,簡稱EAP)。這是歷史上第二種用來構造異步程序的方,式,現在更推薦使用TPL
我們共定義了三個事件。第一個是oWork事件。當一個后台工作對象通過RunWorkerAsync方法啟動一個異步操作時,該事件處理器將被調用。該事件處理器將會運行在線程池中。如果需要取消操作,則這里是主要的操作點來取消執行。同時也可以提供該操作的運行進程信,息。最后,得到結果后,將結果設置給事件參數,然后RunWorkerCompleted事件處理器將,被調用。在該方法中,可以知道操作是成功完成,還是發生錯誤,抑或被取消。
基於此, BackgroundWorker組件實際上被使用於Windows窗體應用程序(Windows Forms Applications,簡稱WPF)中。該實現通過后台工作事件處理器的代碼可以直接與UI控制器交互。與線程池中的線程與UI控制器交互的方式相比較,使用BackgroundWorker組件的方式更加自然和好用。
使用任務並行庫
簡介
我們在之前的章節中學習了什么是線程,如何使用線程,以及為什么需要線程池。使用線程池可以使我們在減少並行度花銷時節省操作系統資源。我們可以認為線程池是一個抽象層,其向程序員隱藏了使用線程的細節,使我們專心處理程序邏輯,而不是各種線程,問題。
然而使用線程池也相當復雜。從線程池的工作線程中獲取結果並不容易。我們需要實現,自定義方式來獲取結果,而且萬一有異常發生,還需將異常正確地傳播到初始線程中。除此,以外,創建一組相關的異步操作,以及實現當前操作執行完成后下一操作才會執行的邏輯也不容易。在嘗試解決這些問題的過程中,創建了異步編程模型及基於事件的異步模式。在第3章中提到過基於事件的異步模式。這些模式使得獲取結果更容易,傳播異常也更輕松,但是組,合多個異步操作仍需大量工作,需要編寫大量的代碼。
為了解決所有的問題, Net Framework4.0引入了一個新的關於異步操作的API,它叫做.任務並行庫( Task Parallel Library,簡稱TPL), .Net Framework 4.5版對該API進行了輕微的改進,使用更簡單。在本書的項目中將使用最新版的TPL,即.Net Framework 4.5版中的 API, TPL可被認為是線程池之上的又一個抽象層,其對程序員隱藏了與線程池交互的底層代碼,並提供了更方便的細粒度的APL, TPL的核心概念是任務。一個任務代表了一個異步操作,該操作可以通過多種方式運行,可以使用或不使用獨立線程運行。在本章中將探究任務的所有使用細節。
默認情況下,程序員無須知道任務實際上是如何執行的。TPL通過向用戶隱藏任務的實現細節從而創建一個抽象層。遺憾的是,有些情況下這會導致詭秘的錯誤,比如試圖獲取任務的結果時程序被掛起。本章有助於理解TPL底層的原理,以及如何避免不恰當的使用方式。
一個任務可以通過多種方式和其他任務組合起來。例如,可以同時啟動多個任務,等待所有任務完成,然后運行一個任務對之前所有任務的結果進行一些計算。TPL與之前的模式相比,其中一個關鍵優勢是其具有用於組合任務的便利的API,
處理任務中的異常結果有多種方式。由於一個任務可能會由多個其他任務組成,這些任,務也可能依次擁有各自的子任務,所以有一個AggregateException的概念。這種異常可以捕獲底層任務內部的所有異常,並允許單獨處理這些異常。
而且,最后但並不是最不重要的, C# 5.0已經內置了對TPL的支持,允許我們使用新的 await和async關鍵字以平滑的、舒服的方式操作任務。
在本章中我們將學習使用TPL來執行異步操作。我們將學習什么是任務,如何用不同的,方式創建任務,以及如何將任務組合在一起。我們會討論如何將遺留的APM和EAP模式轉換為使用任務,還有如何正確地處理異常,如何取消任務,以及如何使多個任務同時執行。另外,還將講述如何在Windows GUI應用程序中正確地使用任務。
創建任務

class Program { static void Main(string[] args) { var t1 = new Task(() => TaskMethod("Task 1")); var t2 = new Task(() => TaskMethod("Task 2")); t2.Start(); t1.Start(); Task.Run(() => TaskMethod("Task 3")); Task.Factory.StartNew(() => TaskMethod("Task 4")); Task.Factory.StartNew(() => TaskMethod("Task 5"), TaskCreationOptions.LongRunning); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } static void TaskMethod(string name) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
工作原理
當程序運行時,我們使用Task的構造函數創建了兩個任務。我們傳入一個lambda表達式作為Action委托。這可以使我們給TaskMethod提供一個string參數。然后使用Start方法運行這些任務。
請注意只有調用了這些任務的Start方法,才會執行任務。很容易忘記真正啟動任務。
然后使用Task.Run和Task.Factory.StartNew方法來運行了另外兩個任務。與使用Task構造函數的不同之處在於這兩個被創建的任務會立即開始工作,所以無需顯式地調用這些任務的Start方法。從Task 1到Task 4的所有任務都被放置在線程池的工作線程中並以未指定,的順序運行。如果多次運行該程序,就會發現任務的執行順序是不確定的。
Task.Run方法只是Task.Factory.StartNew的一個快捷方式,但是后者有附加的選項。通!常如果無特殊需求,則可使用前一個方法,如Task 5所示。我們標記該任務為長時間運行,結果該任務將不會使用線程池,而在單獨的線程中運行。然而,根據運行該任務的當前的任務調度程序( task scheduler)運行方式有可能不同。
使用任務執行基本的操作
本節將描述如何從任務中獲取結果值。我們將通過幾個場景來了解在線程池中和主線程中運行任務的不同之處。

class Program { static void Main(string[] args) { TaskMethod("Main Thread Task"); Task<int> task = CreateTask("Task 1"); task.Start(); int result = task.Result; Console.WriteLine("Result is: {0}", result); task = CreateTask("Task 2"); task.RunSynchronously(); result = task.Result; Console.WriteLine("Result is: {0}", result); task = CreateTask("Task 3"); Console.WriteLine(task.Status); task.Start(); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); result = task.Result; Console.WriteLine("Result is: {0}", result); Console.ReadKey(); } static Task<int> CreateTask(string name) { return new Task<int>(() => TaskMethod(name)); } static int TaskMethod(string name) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); return 42; } }
工作原理
首先直接運行TaskMethod方法,這里並沒有把它封裝到一個任務中。結果根據它提供給我們的主線程的信息可以得知該方法是被同步執行的。很顯然它不是線程池中的線程。
然后我們運行了Task 1,使用Start方法啟動該任務並等待結果。該任務會被放置在線程池中,並且主線程會等待,直到任務返回前一直處於阻塞狀態。
Task 2和Task 1類似,除了Task 2是通過RunSynchronously()方法運行的。該任務會運行在主線程中,該任務的輸出與第一個例子中直接同步調用TaskMethod的輸出完全一樣。這是個非常好的優化,可以避免使用線程池來執行非常短暫的操作。
我們用以運行Task 1相同的方式來運行Task 3,但這次沒有阻塞主線程,只是在該任務完成前循環打印出任務狀態。結果展示了多種任務狀態,分別是Creatd, Running和 RanToCompletion.
組合任務
本節將展示如何設置相互依賴的任務。我們將學習如何創建一個任務,使其在父任務完成后才會被運行。另外,將探尋為非常短暫的任務節省線程開銷的可能性。

class Program { static void Main(string[] args) { var firstTask = new Task<int>(() => TaskMethod("First Task", 3)); var secondTask = new Task<int>(() => TaskMethod("Second Task", 2)); firstTask.ContinueWith( t => Console.WriteLine("The first answer is {0}. Thread id {1}, is thread pool thread: {2}", t.Result, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread), TaskContinuationOptions.OnlyOnRanToCompletion); firstTask.Start(); secondTask.Start(); Thread.Sleep(TimeSpan.FromSeconds(4)); Task continuation = secondTask.ContinueWith( t => Console.WriteLine("The second answer is {0}. Thread id {1}, is thread pool thread: {2}", t.Result, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread), TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously); continuation.GetAwaiter().OnCompleted( () => Console.WriteLine("Continuation Task Completed! Thread id {0}, is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread)); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(); firstTask = new Task<int>(() => { var innerTask = Task.Factory.StartNew(() => TaskMethod("Second Task", 5), TaskCreationOptions.AttachedToParent); innerTask.ContinueWith(t => TaskMethod("Third Task", 2), TaskContinuationOptions.AttachedToParent); return TaskMethod("First Task", 2); }); firstTask.Start(); while (!firstTask.IsCompleted) { Console.WriteLine(firstTask.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(firstTask.Status); Thread.Sleep(TimeSpan.FromSeconds(10)); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } }
工作原理
當主程序啟動時,我們創建了兩個任務,並為第一個任務設置了一個后續操作( continuation,一個代碼塊,會在當前任務完成后運行),然后啟動這兩個任務並等待4秒,這個時間足夠兩個任務完成。然后給第二個任務運行另一個后續操作,並通過指定TaskContinuationOptions."ExecuteSynchronously選項來嘗試同步執行該后續操作。如果后續操作耗時非常短暫,使用以上方式是非常有用的,因為放置在主線程中運行比放置在線程池中運行要快。可以實現這一點是因為第二個任務恰好在那刻完成。如果注釋掉4秒的Thread.Sleep方法,將會看到該代碼被放置到線程池中,這是因為還未從之前的任務中得到結果。
最后我們為之前的后續操作也定義了一個后續操作,但這里使用了一個稍微不同的方式,即使用了新的GetAwaiter和OnCompleted方法。這些方法是C# 5.0語言中異步機制中的方法。
本節示例的最后部分與父子線程有關。我們創建了一個新任務,當運行該任務時,通過提供一個TaskCreationOptions.AttachedToParent選項來運行一個所謂的子任務。
子任務必須在父任務運行時創建,並正確的附加給父任務!
這意味着只有所有子任務結束工作,父任務才會完成。通過提供一個TaskContinuation Options選項也可以給在子任務上運行后續操作。該后續操作也會影響父任務,並且直到最后一個子任務結束它才會運行完成。
將APM模式轉換成任務
本節將說明如何將過時的APM API轉換為任務。多個示例覆蓋了轉換過程中可能發生的不同情況。

class Program { private static void Main(string[] args) { int threadId; AsynchronousTask d = Test; IncompatibleAsynchronousTask e = Test; Console.WriteLine("Option 1"); Task<string> task = Task<string>.Factory.FromAsync( d.BeginInvoke("AsyncTaskThread", Callback, "a delegate asynchronous call"), d.EndInvoke); task.ContinueWith(t => Console.WriteLine("Callback is finished, now running a continuation! Result: {0}", t.Result)); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.WriteLine("----------------------------------------------"); Console.WriteLine(); Console.WriteLine("Option 2"); task = Task<string>.Factory.FromAsync( d.BeginInvoke, d.EndInvoke, "AsyncTaskThread", "a delegate asynchronous call"); task.ContinueWith(t => Console.WriteLine("Task is completed, now running a continuation! Result: {0}", t.Result)); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.WriteLine("----------------------------------------------"); Console.WriteLine(); Console.WriteLine("Option 3"); IAsyncResult ar = e.BeginInvoke(out threadId, Callback, "a delegate asynchronous call"); task = Task<string>.Factory.FromAsync(ar, _ => e.EndInvoke(out threadId, ar)); task.ContinueWith(t => Console.WriteLine("Task is completed, now running a continuation! Result: {0}, ThreadId: {1}", t.Result, threadId)); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } private delegate string AsynchronousTask(string threadName); private delegate string IncompatibleAsynchronousTask(out int threadId); private static void Callback(IAsyncResult ar) { Console.WriteLine("Starting a callback..."); Console.WriteLine("State passed to a callbak: {0}", ar.AsyncState); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Console.WriteLine("Thread pool worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); } private static string Test(string threadName) { Console.WriteLine("Starting..."); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); Thread.CurrentThread.Name = threadName; return string.Format("Thread name: {0}", Thread.CurrentThread.Name); } private static string Test(out int threadId) { Console.WriteLine("Starting..."); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); threadId = Thread.CurrentThread.ManagedThreadId; return string.Format("Thread pool worker thread id was: {0}", threadId); } }
工作原理
這里我們定義了兩種委托。其中一個使用了out參數,因此在將APM模式轉換為任務,時,與標准的TPLAPI是不兼容的。這樣的轉換有三個示例。
將APM轉換為TPL的關鍵點是Task<T>.Factory.FromAsync方法, T是異步操作結果的類型。該方法有數個重載。在第一個例子中傳人了IAsyncResult和Func<lAsyncResult, string?,這是一個將IAsyncResult的實現作為參數並返回一個字符串的方法。由於第一個委托類型提供的EndMethod與該簽名是兼容的,所以將該委托的異步調用轉換為任務沒有任何問題。
第二個例子做的事與第一個非常相似,但是使用了不同的FromAsync方法重載,該重載 ,並不允許指定一個將會在異步委托調用完成后被調用的回調函數。但我們可以使用后續操作,替代它。但如果回調函數很重要,可以使用第一個例子所示的方法。
最后一個例子展示了一個小技巧。這次IncompatibleAsynchronousTask委托的 EndMethod使用了out參數,與FromAsync方法重載並不兼容。然而,可以很容易地將 EndMethod調用封裝到一個lambda表達式中,從而適合任務工廠方法。
可以在等待異步操作結果過程中打印出任務狀態,從而了解底層任務的運行情況。可以看到第一個任務的狀態為WaitingForActivation,這意味着TPL基礎設施實際上還未啟動該任務。
將EAP模式轉換成任務
本節將描述如何將基於事件的異步操作轉換為任務。在本節中,你將發現有一個可靠的模式可適用於.Net Framework類庫中的所有基於事件的異步API.

class Program { static void Main(string[] args) { var tcs = new TaskCompletionSource<int>(); var worker = new BackgroundWorker(); worker.DoWork += (sender, eventArgs) => { eventArgs.Result = TaskMethod("Background worker", 5); }; worker.RunWorkerCompleted += (sender, eventArgs) => { if (eventArgs.Error != null) { tcs.SetException(eventArgs.Error); } else if (eventArgs.Cancelled) { tcs.SetCanceled(); } else { tcs.SetResult((int)eventArgs.Result); } }; worker.RunWorkerAsync(); int result = tcs.Task.Result; Console.WriteLine("Result is: {0}", result); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } }
工作原理
這是一個將EAP模式轉換為任務的既簡單又優美的示例。關鍵點在於使用TaskCompletionSource<T>類型, T是異步操作結果類型。
不要忘記將tcs.SetResult調用封裝在try-catch代碼塊中,從而保證錯誤信息始終會設置給任務完成源對象。也可以使用TrySetResult方法來替代SetResult方法,以保證結果能被成功設置。
實現取消選項
本節是關於如何給基於任務的異步操作實現取消流程。我們將學習如何正確的使用取消標志,以及在任務真正運行前如何得知其是否被取消。

class Program { private static void Main(string[] args) { var cts = new CancellationTokenSource(); var longTask = new Task<int>(() => TaskMethod("Task 1", 10, cts.Token), cts.Token); Console.WriteLine(longTask.Status); cts.Cancel(); Console.WriteLine(longTask.Status); Console.WriteLine("First task has been cancelled before execution"); cts = new CancellationTokenSource(); longTask = new Task<int>(() => TaskMethod("Task 2", 10, cts.Token), cts.Token); longTask.Start(); for (int i = 0; i < 5; i++ ) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine(longTask.Status); } cts.Cancel(); for (int i = 0; i < 5; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine(longTask.Status); } Console.WriteLine("A task has been completed with result {0}.", longTask.Result); Console.ReadKey(); } private static int TaskMethod(string name, int seconds, CancellationToken token) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); for (int i = 0; i < seconds; i ++) { Thread.Sleep(TimeSpan.FromSeconds(1)); if (token.IsCancellationRequested) return -1; } return 42*seconds; } }
工作原理
第3章中我們已經討論了取消標志概念,你已經相當熟悉了。而本節又是一個關於為TPL任務實現取消選項的簡單例子。
首先仔細看看longTask的創建代碼。我們將給底層任務傳遞一次取消標志,然后給任務構造函數再傳遞一次。為什么需要提供取消標志兩次呢?
答案是如果在任務實際啟動前取消它,該任務的TPL基礎設施有責任處理該取消操作,因為這些代碼根本不會執行。通過得到的第一個任務的狀態可以知道它被取消了。如果嘗試對該任務調用Start方法,將會得到InvalidOperationException異常。
然后需要自己寫代碼來處理取消過程。這意味着我們對取消過程全權負責,並且在取消,任務后,任務的狀態仍然是RanToCompletion,因為從TPL的視角來看,該任務正常完成了它的工作。辨別這兩種情況是非常重要的,並且需要理解每種情況下職責的不同。
處理任務中的異常
本節將描述異步任務中處理異常這一重要的主題。我們將討論任務中拋出異常的不同情況及如何獲取這些異常信息

class Program { static void Main(string[] args) { Task<int> task; try { task = Task.Run(() => TaskMethod("Task 1", 2)); int result = task.Result; Console.WriteLine("Result: {0}", result); } catch (Exception ex) { Console.WriteLine("Exception caught: {0}", ex); } Console.WriteLine("----------------------------------------------"); Console.WriteLine(); try { task = Task.Run(() => TaskMethod("Task 2", 2)); int result = task.GetAwaiter().GetResult(); Console.WriteLine("Result: {0}", result); } catch (Exception ex) { Console.WriteLine("Exception caught: {0}", ex); } Console.WriteLine("----------------------------------------------"); Console.WriteLine(); var t1 = new Task<int>(() => TaskMethod("Task 3", 3)); var t2 = new Task<int>(() => TaskMethod("Task 4", 2)); var complexTask = Task.WhenAll(t1, t2); var exceptionHandler = complexTask.ContinueWith(t => Console.WriteLine("Exception caught: {0}", t.Exception), TaskContinuationOptions.OnlyOnFaulted ); t1.Start(); t2.Start(); Thread.Sleep(TimeSpan.FromSeconds(5)); Console.ReadKey(); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); throw new Exception("Boom!"); return 42 * seconds; } }
工作原理
當程序啟動時,創建了一個任務並嘗試同步獲取任務結果。Result屬性的Get部分會使,當前線程等待直到該任務完成,並將異常傳播給當前線程。在這種情況下,通過catch代碼塊可以很容易地捕獲異常,但是該異常是一個被封裝的異常,叫做AggregateException。在本例中,它里面包含一個異常,因為只有一個任務拋出了異常。可以訪問InnerException屬性來得到底層異常。
第二個例子與第一個非常相似,不同之處是使用GetAwaiter和GetResult方法來訪問任務結果。這種情況下,無需封裝異常,因為TPL基礎設施會提取該異常。如果只有一個底層,任務,那么一次只能獲取一個原始異常,這種設計非常合適。
最后一個例子展示了兩個任務拋出異常的情形。現在使用后續操作來處理異常。只有之前,的任務完成前有異常時,該后續操作才會被執行。通過給后續操作傳遞TaskContinuationOptions.OnlyOnFaulted選項可以實現該行為。結果打印出了AggregateException,其內部封裝了兩個任,務拋出的異常。
並行運行任務
本節展示了如何同時運行多個異步任務。我們將學習當所有任務都完成或任意一個任務,完成了工作時,如何高效地得到通知。

class Program { static void Main(string[] args) { var firstTask = new Task<int>(() => TaskMethod("First Task", 3)); var secondTask = new Task<int>(() => TaskMethod("Second Task", 2)); var whenAllTask = Task.WhenAll(firstTask, secondTask); whenAllTask.ContinueWith(t => Console.WriteLine("The first answer is {0}, the second is {1}", t.Result[0], t.Result[1]), TaskContinuationOptions.OnlyOnRanToCompletion ); firstTask.Start(); secondTask.Start(); Thread.Sleep(TimeSpan.FromSeconds(4)); var tasks = new List<Task<int>>(); for (int i = 1; i < 4; i++) { int counter = i; var task = new Task<int>(() => TaskMethod(string.Format("Task {0}", counter), counter)); tasks.Add(task); task.Start(); } while (tasks.Count > 0) { var completedTask = Task.WhenAny(tasks).Result; tasks.Remove(completedTask); Console.WriteLine("A task has been completed with result {0}.", completedTask.Result); } Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } }
工作原理
當程序啟動時,創建了兩個任務。然后借助於Task.WhenAll方法,創建了第三個任務,該任務將會在所有任務完成后運行。該任務的結果提供了一個結果數組,第一個元素是第.個任務的結果,第二個元素是第二個任務的結果,以此類推。
然后我們創建了另外一系列任務,並使用Task.WhenAny方法等待這些任務中的任何一 ,個完成。當有一個完成任務后,從列表中移除該任務並繼續等待其他任務完成,直到列表為, 4空。獲取任務的完成進展情況或在運行任務時使用超時,都可以使用Task.WhenAny方法。例如,我們等待一組任務運行,並且使用其中一個任務用來記錄是否超時。如果該任務先完,成,則只需取消掉其他還未完成的任務。
使用TaskScheduler配置任務的執行
1、新建一個C# WPF應用程序項目
2、在MainWindow.xaml文件中,將下面的標記代碼加入到一個網格元素中(即<Grid和<Grid>標簽間):

<TextBlock Name="ContentTextBlock" HorizontalAlignment="Left" Margin="44,134,0,0" VerticalAlignment="Top" Width="425" Height="40"/> <Button Content="Sync" HorizontalAlignment="Left" Margin="45,190,0,0" VerticalAlignment="Top" Width="75" Click="ButtonSync_Click"/> <Button Content="Async" HorizontalAlignment="Left" Margin="165,190,0,0" VerticalAlignment="Top" Width="75" Click="ButtonAsync_Click"/> <Button Content="Async OK" HorizontalAlignment="Left" Margin="285,190,0,0" VerticalAlignment="Top" Width="75" Click="ButtonAsyncOK_Click"/>
3、在MainWindow.xaml.cs文件中使用以下using指令;

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Input;
4、在MainWindow構造函數下面加入以下代碼片段:

void ButtonSync_Click(object sender, RoutedEventArgs e) { ContentTextBlock.Text = string.Empty; try { //string result = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext()).Result; string result = TaskMethod().Result; ContentTextBlock.Text = result; } catch (Exception ex) { ContentTextBlock.Text = ex.InnerException.Message; } } void ButtonAsync_Click(object sender, RoutedEventArgs e) { ContentTextBlock.Text = string.Empty; Mouse.OverrideCursor = Cursors.Wait; Task<string> task = TaskMethod(); task.ContinueWith(t => { ContentTextBlock.Text = t.Exception.InnerException.Message; Mouse.OverrideCursor = null; }, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.FromCurrentSynchronizationContext()); } void ButtonAsyncOK_Click(object sender, RoutedEventArgs e) { ContentTextBlock.Text = string.Empty; Mouse.OverrideCursor = Cursors.Wait; Task<string> task = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext()); task.ContinueWith(t => Mouse.OverrideCursor = null, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.FromCurrentSynchronizationContext()); } Task<string> TaskMethod() { return TaskMethod(TaskScheduler.Default); } Task<string> TaskMethod(TaskScheduler scheduler) { Task delay = Task.Delay(TimeSpan.FromSeconds(5)); return delay.ContinueWith(t => { string str = string.Format("Task is running on a thread id {0}. Is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); ContentTextBlock.Text = str; return str; }, scheduler); }
工作原理
本例中引人了很多新鮮的東西。首先,創建了一個WPF應用程序,而不是一個命令行,程序。這是很有必要的,因為我們需要一個擁有消息循環的用戶界面線程來演示異步運行任,務的不同情形。
TaskScheduler是一個非常重要的抽象。該組件實際上負責如何執行任務。默認的任務調度程序將任務放置到線程池的工作線程中。這是非常常見的場景,所以TPL將其作為默認選項並不用奇怪。我們已經知道了如何同步運行任務,以及如何將任務附加到父任務上從而一起運行。現在讓我們看看使用任務的其他方式。
當程序啟動時,創建了一個包含三個按鈕的窗口。第一個按鈕調用了一個同步任務的執行。該代碼被放置在ButtonSync Click方法中。當任務運行時,我們甚至無法移動應用程序,窗口。當用戶界面線程忙於運行任務時,整個用戶界面被完全凍結,在任務完成前無法響應任何消息循環。對於GUI窗口程序來說這是一個相當不好的實踐,我們需要找到一個方式來,解決該問題 ,
第二個問題是我們嘗試從其他線程訪問UI控制器。圖形用戶界面控制器從沒有被設計,為可被多線程使用,並且為了避免可能的錯誤,不允許從創建UI的線程之外的線程中訪問, U1組件。當我們嘗試這樣做時,得到了一個異常,該異常信息5秒后打印到了主窗口中。
為了解決第一個問題,我們嘗試異步運行任務。第二個按鈕就是這樣做的。該代碼被,.放置在ButtonAsync Click方法中。當使用調試模式運行該任務時,將會看到該任務被放置,在線程池中,最后將得到同樣的異常。然而,當任務運行時用戶界面一直保持響應。這是好事,但是我們仍需要除掉異常。
其實我們已經解決了該問題。給TaskScheduler.FromCurrentSynchronizationContext選項提供一個后續操作用於輸出錯誤信息。如果不這樣做,我們將無法看到錯誤信息,因為可能會得到在任務中產生的相同異常。該選項驅使TPL基礎設施給U1線程的后續操作中放入代碼,並借助UI線程消息循環來異步運行該代碼。這解決了從其他線程訪問UI控制器並仍保持U1處於響應狀態的問題。
為了檢查是否真的是這樣,可以按下最后一個按鈕來運行ButtonAsyncOK-Click方法中的代碼。與其余例子不同之處在於我們將UI線程任務調度程序提供給了該任務。你將看到 ,任務以異步的方式運行在UI線程中。U1依然保持響應。甚至盡管等待光標處於激活狀態,你仍可以按下另一個按鈕,
然而使用U1線程運行任務有一些技巧。如果回到同步任務代碼,取消對使用UI線程任務調度程序獲取結果的代碼行的注釋,我們將永遠得不到任何結果。這是一個經典的死鎖情,況:我們在UI線程隊列中調度了一個操作, U1線程等待該操作完成,但當等待時,它又無法運行該操作,這將永不會結束(甚至永不會開始),如果在任務中調用Wait方法也會發生死鎖。為了避免死鎖,絕對不要通過任務調度程序在U1線程中使用同步操作,請使用C# 5.0中的ContinueWith或async/await方法。
使用C#5.0
簡介
到現在為止,我們學習了任務並行庫,這是微軟提供的最新的異步編程基礎設施。它允許我們以模塊化的方式設計程序,來組合不同的異步操作。
遺憾的是,當閱讀此類程序時仍然非常難理解程序的實際執行順序。在大型程序中將會,.有許多相互依賴的任務和后續操作,用於運行其他后續操作的后續操作,處理異常的后續操,作,並且它們都出現在程序代碼中不同的地方。因此了解程序的先后執行次序變成了一個極具挑戰性的問題。
另一個需要關注的問題是,能夠接觸用戶界面控制器的每個異步任務是否得到了正確的,同步上下文。程序只允許通過UI線程使用這些控制器,否則將會得到多線程訪問異常。
說到異常,我們不得不使用單獨的后續操作任務來處理在之前的異步操作中發生的錯誤。這又導致了分散在代碼的不同部分的復雜的處理錯誤的代碼,邏輯上無法相互關聯。
為了解決這些問題, C#5.0的作者引入了新的語言特性,稱為異步函數(asynchronous function),它是TPL之上的更高級別的抽象,真正簡化了異步編程。正如在第4章提到的,抽象隱藏了主要的實現細節,使得程序員無須考慮許多重要的事情,從而使異步編程更容易。了解異步函數背后的概念是非常重要的,有助於我們編寫健壯的高擴展性的應用程序。
要創建一個異步函數,首先需要用async關鍵字標注一個方法。如果不先做這個,就不可能擁有async屬性或事件訪問方法和構造函數。代碼如下所示:
另一個重要的事實是,異步函數必須返回Task或Task<T>類型。可以使用async void方法,但是更推薦使用async Task方法。使用async void方法唯一合理的地方是在程序中使,用頂層UI控制器事件處理器的時候。
使用async關鍵字標注的方法內部,可以使用await操作符。該操作符可與TPL的任務,一起工作,並獲取該任務中異步操作的結果。在本章中稍后會講述細節。在async方法外不能使用await關鍵字,否則會有編譯錯誤。另外,異步函數在其代碼中至少要擁有一個await操作符。然而,如果沒有只會導致編譯警告,而不是編譯錯誤。
需要注意的是,在執行完await調用的代碼行后該方法會立即返回。如果是同步執行,執行線程將會阻塞兩秒然后返回結果。這里當執行完await操作后,立即將工作線程,放回線程池的過程中,我們會異步等待。2秒后,我們又一次從線程池中得到工作線程並繼續運行其中剩余的異步方法。這允許我們在等待2秒時重用工作線程做些其他事,這對提高應用程序的可伸縮性非常重要。借助於異步函數我們擁有了線性的程序控制流,但它,的執行依然是異步的。這雖然好用,但是難以理解。本章將幫助你學習異步函數所有重要的方面。
以我的自身經驗而言,如果程序中有兩個連續的await操作符,此時程序如何工作有一個常見的誤解。很多人認為如果在另一個異步操作之后使用await函數,它們將會並行運行。然而,事實上它們是順序運行的,即第一個完成后第二個才會開始運行。記住這一點很重要,在本章中稍后會覆蓋該細節。
在C# 5.0中關聯async和await有一定的限制。例如,不能把控制台程序的Main方法標,記為async,不能在catch, finally, lock或unsafe代碼塊中使用await操作符。不允許對任何異步函數使用ref或out參數。還有其他微妙的地方,但是以上已經包括了主要的需要注意的,地方。
異步函數會被C#編譯器在后台編譯成復雜的程序結構。這里我不會說明該細節。生,成的代碼與另一個C#構造很類似,稱為迭代器。生成的代碼被實現為一種狀態機。盡管很多程序員幾乎開始為每個方法使用async修飾符,我還是想強調如果方法本來無需異步 ,或並行運行,那么將該方法標注為async是沒有道理的。調用async方法會有顯著的性能。損失,通常的方法調用比使用async關鍵字的同樣的方法調用要快上40~50倍。請注意這一點。
在本章中我們將學習如何使用C# 5.0中的async和await關鍵字實現異步操作。本章將講述如何使用await按順序或並行地執行異步操作,還將討論如何在lambda表達式中使,用await,如何處理異常,以及在使用async void方法時如何避免陷阱。在本章結束前,我們會深入探究同步上下文傳播機制並學習如何創建自定義的awaitable對象,從而無需使用任務。
使用await操作符獲取異步任務結果
.本節將講述使用異步函數的基本場景。我們將比較使用TPL和使用await操作符獲取異步操作結果的不同之處。

class Program { static void Main(string[] args) { Task t = AsynchronyWithTPL(); t.Wait(); t = AsynchronyWithAwait(); t.Wait(); Console.ReadKey(); } static Task AsynchronyWithTPL() { Task<string> t = GetInfoAsync("Task 1"); Task t2 = t.ContinueWith(task => Console.WriteLine(t.Result), TaskContinuationOptions.NotOnFaulted); Task t3 = t.ContinueWith(task => Console.WriteLine(t.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted); return Task.WhenAny(t2, t3); } async static Task AsynchronyWithAwait() { try { string result = await GetInfoAsync("Task 2"); Console.WriteLine(result); } catch (Exception ex) { Console.WriteLine(ex); } } async static Task<string> GetInfoAsync(string name) { await Task.Delay(TimeSpan.FromSeconds(2)); //throw new Exception("Boom!"); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
工作原理
當程序運行時運行了兩個異步操作。其中一個是標准的TPL模式的代碼,第二個使用了 C#的新特性async和awaito。AsynchronyWithTPL方法啟動了一個任務,運行兩秒后返回關於工作線程信息的字符串。然后我們定義了一個后續操作,用於在異步操作完成后打印出該 "操作結果,還有另一個后續操作,用於萬一有錯誤發生時打印出異常的細節。最終,返回了一個代表其中一個后續操作任務的任務,並等待其在Main函數中完成。
在AsynchronyWithAwait方法中,我們對任務使用await並得到了相同的結果。這和編寫通常的同步代碼的風格一樣,即我們獲取任務的結果,打印出結果,如果任務完成時帶有 "錯誤則捕獲異常。關鍵不同的是這實際上是一個異步程序。使用await后, C#立即創建了一 1個任務,其有一個后續操作任務,包含了await操作符后面的所有剩余代碼。這個新任務也處理了異常傳播。然后,將該任務返回到主方法中並等待其完成
請注意根據底層異步操作的性質和當前異步的上下文,執行異步代碼的具體方式可能會不同。稍后在本章中會解釋這一點。
因此可以看到程序的第一部分和第二部分在概念上是等同的,但是在第二部分中C# ,編譯器隱式地處理了異步代碼。事實上,第二部分比第一部分更復雜,接下來我們將講述,細節。
請記住在Windows GUI或ASPNET之類的環境中不推薦使用Task.Wait和Task.Result方法。如果程序員不是百分百地清楚代碼在做什么,很可能會導致死鎖。在第4章的4.10節中,在WPF應用程序中使用Task.Result時已經演示了該一點。
請取消對GetInfoAsync方法的throw new Exception代碼行的注釋來測試異常處理是否工作。
在lambda表達式中使用await操作符
本節將展示如何在lambda表達式中使用await,我們將編寫一個使用了await的匿名方法,並且獲取異步執行該方法的結果。

class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); Console.ReadKey(); } async static Task AsynchronousProcessing() { Func<string, Task<string>> asyncLambda = async name => { await Task.Delay(TimeSpan.FromSeconds(2)); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); }; string result = await asyncLambda("async lambda"); Console.WriteLine(result); } }
工作原理
首先,由於不能在Main方法中使用async,我們將異步函數移到了Asynchronous Processing方法中。然后使用async關鍵字聲明了一個lambda表達式。由於任何lambda表達式的類型都不能通過lambda自身來推斷,所以不得不顯式向C#編譯器指定它的類型。在本例中,該類型說明該lambda表達式接受一個字符串參數,並返回一個Task<string>對象。
接着,我們定義了lambda表達式體。有個問題是該方法被定義為返回一個Task<string>對象,但實際上返回的是字符串,卻沒有編譯錯誤!這是因為C#編譯器自動產生一個任務,並返回給我們。
最后一步是等待異步lambda表達式執行並打印出結果。
對連續的異步任務使用await操作符
本節將展示當代碼中有多個連續的await方法時程序的實際流程是怎樣的。我們將學習如何閱讀有await方法的代碼,以及理解為什么await調用是異步操作。

class Program { static void Main(string[] args) { Task t = AsynchronyWithTPL(); t.Wait(); t = AsynchronyWithAwait(); t.Wait(); Console.ReadKey(); } static Task AsynchronyWithTPL() { var containerTask = new Task(() => { Task<string> t = GetInfoAsync("TPL 1"); t.ContinueWith(task => { Console.WriteLine(t.Result); Task<string> t2 = GetInfoAsync("TPL 2"); t2.ContinueWith(innerTask => Console.WriteLine(innerTask.Result), TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent); t2.ContinueWith(innerTask => Console.WriteLine(innerTask.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent); }, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent); t.ContinueWith(task => Console.WriteLine(t.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent); }); containerTask.Start(); return containerTask; } async static Task AsynchronyWithAwait() { try { string result = await GetInfoAsync("Async 1"); Console.WriteLine(result); result = await GetInfoAsync("Async 2"); Console.WriteLine(result); } catch (Exception ex) { Console.WriteLine(ex); } } async static Task<string> GetInfoAsync(string name) { Console.WriteLine("Task {0} started!", name); await Task.Delay(TimeSpan.FromSeconds(2)); if(name == "TPL 2") throw new Exception("Boom!"); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
工作原理
當程序運行時,與上節一樣運行了兩個異步操作。然而這次從AsynchronyWithAwait方法講起。它看起來仍然像平常的同步代碼,唯一不同之處是使用了兩個await聲明。最重要的一點是該代碼依然是順序執行的, Async2任務只有等之前的任務完成后才會開始執行。當閱讀該代碼時,程序流很清晰,可以看到什么先運行,什么后運行。但該程序如何是異步程序呢?首先,它不總是異步的。當使用await時如果一個任務已經完成,我們會異步地得到該任務結果。否則,當在代碼中看到await聲明時,通常的行為是方法執行到該await代碼行時將立即返回,並且剩下的代碼將會在一個后續操作任務中運行。因此等待操作結果時並沒有阻塞程序執行,這是一個異步調用。當AsynchronyWithAwait方法中的代碼在執行時,除了在Main方法中調用t.Wait外,我們可以執行任何其他任務。然而, "主線程必須等待直到所有異步操作完成,否則主線程完成后所有運行異步操作的后台線程! ",會停止運行。
AsynchronyWithTPL方法模仿了AsynchronyWithAwait的程序流。我們需要一個容器任務來處理所有相互依賴的任務。然后啟動主任務,給其加了一組后續操作。當該任務完成后,會打印出其結果。然后又啟動了一個任務,在該任務完成后會依次運行更多的后續操"作。為了測試對異常的處理,當運行第二個任務時故意拋出一個異常,並打印出異常信息。這組后續操作創建了與第一個方法中一樣的程序流。如果用它與await方法比較,可以看到它更容易閱讀和理解。唯一的技巧是請記住異步並不總是意味着並行執行。
對並行執行的異步任務使用await操作符
本節將學習如何使用await來並行地運行異步任務,而不是采用常用的順序執行。

class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); } async static Task AsynchronousProcessing() { Task<string> t1 = GetInfoAsync("Task 1", 3); Task<string> t2 = GetInfoAsync("Task 2", 5); string[] results = await Task.WhenAll(t1, t2); foreach (string result in results) { Console.WriteLine(result); } Console.ReadKey(); } async static Task<string> GetInfoAsync(string name, int seconds) { await Task.Delay(TimeSpan.FromSeconds(seconds)); //await Task.Run(() => Thread.Sleep(TimeSpan.FromSeconds(seconds))); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
工作原理
這里定義了兩個異步任務,分別運行3秒和5秒。然后使用Task.WhenAll輔助方法創!建了另一個任務,該任務只有在所有底層任務完成后才會運行。之后我們等待該組合任務的,結果。5秒后,我們獲取了所有結果,說明了這些任務是同時運行的。
然而這里觀察到一個有意思的現象。當運行該程序時,你可能注意到這兩個任務似平是,被線程池中的同一個工作線程執行的。當我們並行運行任務時怎么可能發生這樣的事情呢?為了讓事情更有趣,我們來注釋掉GetIntroAsync方法中的await Task.Delay代碼行,並解除,對await Task.Run代碼行的注釋,然后再次運行程序。
我們會看到該情況下兩個任務會被不同的工作線程執行。不同之處是Task.Delay在幕后使用了一個計時器,過程如下:從線程池中獲取工作線程,它將等待Task.Delay方法返回結,果。然后, Task.Delay方法啟動計時器並指定一塊代碼,該代碼會在計時器時間到了Task.Delay方法中指定的秒數后被調用。之后立即將工作線程返回到線程池中。當計時器事件運,行時,我們又從線程池中任意獲取一個可用的工作線程(可能就是運行一個任務時使用的線,程)並運行計時器提供給它的代碼。
當使用Task.Run方法時,從線程池中獲取了一個工作線程並將其阻塞幾秒,具體秒數,由Thread.Sleep方法提供。然后獲取了第二個工作線程並且也將其阻塞。在這種場景下.我們消費了兩個工作線程,而它們絕對什么事沒做,因為在它們等待時不能執行任何其他,操作。
我們將在第9章中討論第一個場景的細節。在第9章我們將討論用大量的異步操作進行,數據輸入和輸出。盡可能地使用第一種方式是創建高伸縮性的服務器程序的關鍵。
處理異步操作中的異常
本節將描述在C#中使用異步函數時如何處理異常。我們將學習對多個並行的異步操作,使用await時如何聚合異常。

class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); } async static Task AsynchronousProcessing() { Console.WriteLine("1. Single exception"); try { string result = await GetInfoAsync("Task 1", 2); Console.WriteLine(result); } catch (Exception ex) { Console.WriteLine("Exception details: {0}", ex); } Console.WriteLine(); Console.WriteLine("2. Multiple exceptions"); Task<string> t1 = GetInfoAsync("Task 1", 3); Task<string> t2 = GetInfoAsync("Task 2", 2); try { string[] results = await Task.WhenAll(t1, t2); Console.WriteLine(results.Length); } catch (Exception ex) { Console.WriteLine("Exception details: {0}", ex); } Console.WriteLine(); Console.WriteLine("2. Multiple exceptions with AggregateException"); t1 = GetInfoAsync("Task 1", 3); t2 = GetInfoAsync("Task 2", 2); Task<string[]> t3 = Task.WhenAll(t1, t2); try { string[] results = await t3; Console.WriteLine(results.Length); } catch { var ae = t3.Exception.Flatten(); var exceptions = ae.InnerExceptions; Console.WriteLine("Exceptions caught: {0}", exceptions.Count); foreach (var e in exceptions) { Console.WriteLine("Exception details: {0}", e); Console.WriteLine(); } } Console.ReadKey(); } async static Task<string> GetInfoAsync(string name, int seconds) { await Task.Delay(TimeSpan.FromSeconds(seconds)); throw new Exception(string.Format("Boom from {0}!", name)); } }
工作原理
我們運行了三個場景來展示在C#中使用async和await時關於錯誤處理的最常見情況。第一種情況是最簡單的,並且與常見的同步代碼幾乎完全一樣。我們只使用try/catch聲明即 ,可獲取異常細節。
一個很常見的錯誤是對一個以上的異步操作使用await時還使用以上方式。如果仍像第一種情況一樣使用catch代碼塊,則只能從底層的AggregateException對象中得到第一個異常。
為了收集所有異常信息,可以使用await任務的Exception屬性。在第三種情況中,我們使用AggregateException的Flatten方法將層級異常放入一個列表,並且從中提取出所有的底層異常。
避免使用捕獲的同步上下文
本節描述了當使用await來獲取異步操作結果時,同步上下文行為的細節。我們將學習,如何以及何時關閉同步上下文流。
加入對Windows Presentation Foundation庫的引用。
(1)右鍵點擊項目中的引用文件夾,選擇添加引用菜單選項。
(2)添加對PresentationCore, PresentationFramework, System.Xaml及Windows.Base庫的引用。

class Program { [STAThread] static void Main(string[] args) { var app = new Application(); var win = new Window(); var panel = new StackPanel(); var button = new Button(); _label = new Label(); _label.FontSize = 32; _label.Height = 200; button.Height = 100; button.FontSize = 32; button.Content = new TextBlock { Text = "Start asynchronous operations" }; button.Click += Click; panel.Children.Add(_label); panel.Children.Add(button); win.Content = panel; app.Run(win); Console.ReadLine(); } async static void Click(object sender, EventArgs e) { _label.Content = new TextBlock { Text = "Calculating..." }; TimeSpan resultWithContext = await Test(); TimeSpan resultNoContext = await TestNoContext(); //TimeSpan resultNoContext = await TestNoContext().ConfigureAwait(false); var sb = new StringBuilder(); sb.AppendLine(string.Format("With the context: {0}", resultWithContext)); sb.AppendLine(string.Format("Without the context: {0}", resultNoContext)); sb.AppendLine(string.Format("Ratio: {0:0.00}", resultWithContext.TotalMilliseconds / resultNoContext.TotalMilliseconds)); _label.Content = new TextBlock { Text = sb.ToString() }; } async static Task<TimeSpan> Test() { const int iterationsNumber = 100000; var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < iterationsNumber; i++) { var t = Task.Run(() => { }); await t; } sw.Stop(); return sw.Elapsed; } async static Task<TimeSpan> TestNoContext() { const int iterationsNumber = 100000; var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < iterationsNumber; i++) { var t = Task.Run(() => { }); await t.ConfigureAwait( continueOnCapturedContext: false); } sw.Stop(); return sw.Elapsed; } private static Label _label; }
工作原理
在本例中,我們將學習異步函數默認行為的最重要的方面之一。我們已經從第4章中了解了任務調度程序和同步上下文。默認情況下, await操作符會嘗試捕獲同步上下文,並在其中執行代碼。我們已經知道這有助於我們編寫與用戶界面控制器協作的異步代碼。另外,使用await不會發生在之前章節中描述過的死鎖情況,因為當等待結果時並不會阻塞UI線程。
這是合理的,但是讓我們看看潛在會發生什么事。在本例中,我們使用編程方式創建了·一個Windows Presentation Foundation應用程序並訂閱了它的按鈕點擊事件。當點擊該按鈕!時,運行了兩個異步操作。其中一個使用了一個常規的await操作符,另一個使用了帶false參數值的ConfigureAwait方法。false參數明確指出我們不能對其使用捕獲的同步上下文來運行后續操作代碼。在每個操作中,我們測量了執行完成花費的時間,然后將各自的時間和比例顯示在主屏幕上。
結果看到常規的await操作符花費了更多的時間來完成。這是因為我們向UI線程中放,入了成百上千個后續操作任務,這會使用它的消息循環來異步地執行這些任務。在本例中,我們無需在UI線程中運行該代碼,因為異步操作並未訪問UI組件。使用帶false參數值的, ConfigureAwait方法是一個更高效的方案。
還有一件事值得一提。嘗試運行程序並只點擊按鈕然后等待結果,然后再這樣做一次,但是這次點擊按鈕后嘗試隨機地拖拽應用程序窗口從一側到另一側。你將注意到在捕獲的同步上下文中的代碼執行速度變慢了!這個有趣的副作用完美演示了異步編程是多么危險。經歷類似的情況是非常容易的,而且如果你之前從未經歷過這樣的情況,那么幾乎不可能通過,調試來找出問題所在。
公平起見,讓我們來看看相反的情況。在前面的代碼片段中,在Click方法中,取消注,釋的代碼行,並注釋掉緊挨着它的前一行代碼。當運行程序時,我們將得到多線程控制訪問異常,因為設置Label控制器文本的代碼不會放置到捕捉的上下文中,而是在線程池的工作,線程中執行。
使用 async void 方法
本節描述了為什么使用async void方法非常危險。我們將學習以及如何盡可能地替代該方法。在哪種情況下可使用該方,

class Program { static void Main(string[] args) { Task t = AsyncTask(); t.Wait(); AsyncVoid(); Thread.Sleep(TimeSpan.FromSeconds(3)); t = AsyncTaskWithErrors(); while(!t.IsFaulted) { Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine(t.Exception); //try //{ // AsyncVoidWithErrors(); // Thread.Sleep(TimeSpan.FromSeconds(3)); //} //catch (Exception ex) //{ // Console.WriteLine(ex); //} int[] numbers = new[] {1, 2, 3, 4, 5}; Array.ForEach(numbers, async number => { await Task.Delay(TimeSpan.FromSeconds(1)); if (number == 3) throw new Exception("Boom!"); Console.WriteLine(number); }); Console.ReadLine(); } async static Task AsyncTaskWithErrors() { string result = await GetInfoAsync("AsyncTaskException", 2); Console.WriteLine(result); } async static void AsyncVoidWithErrors() { string result = await GetInfoAsync("AsyncVoidException", 2); Console.WriteLine(result); } async static Task AsyncTask() { string result = await GetInfoAsync("AsyncTask", 2); Console.WriteLine(result); } private static async void AsyncVoid() { string result = await GetInfoAsync("AsyncVoid", 2); Console.WriteLine(result); } async static Task<string> GetInfoAsync(string name, int seconds) { await Task.Delay(TimeSpan.FromSeconds(seconds)); if(name.Contains("Exception")) throw new Exception(string.Format("Boom from {0}!", name)); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
工作原理
當程序啟動時,我們通過調用AsyncTask和AsyncVoid這兩個方法啟動了兩個異步操作。第一個方法返回一個Task對象,而另一個由於被聲明為async void所以沒有返回值。由於它們都是異步的所以都會立即返回。但是第一個方法通過返回的任務狀態或對其調用, Wait方法從而很容易實現監控。等待第二個方法完成的唯一方式是確切地等待多長時間,因為我們沒有聲明任何對象可以監控該異步操作的狀態。當然可以使用某種共享的狀態變量,將其設置到async void方法中,並從調用方法中檢查其值,但返回一個Task對象的方式更好些。
最危險的部分是異常處理。使用async void方法,異常處理方法將被放置到當前的同步上下文中,在本例中即線程池中。線程池中未被處理的異常會終結整個進程。使用 AppDomain.UnhandledException事件可以攔截未被處理的異常,但不能從攔截的地方恢復進程。為了重現該場景,可以取消Main方法中對try/catch代碼塊的注釋,然后運行,程序,
關於使用async void lambda表達式的另一個事實是:它們與Action類型是兼容的,而 Action類型在標准.NET Framework類庫中的使用非常廣泛。在lambda表達式中很容易忘記對異常的處理,這將再次導致程序崩潰。可以取消在Main方法中第二個被注釋的代碼塊的,注釋來重現該場景。
強烈建議只在UI事件處理器中使用async void方法。在其他所有的情況下,請使用返,回Task的方法。
設計一個自定義的 awaitable 類型
本節將展示如何設計一個與await操作符兼容的非常基礎的awaitable類型。

class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); Console.ReadKey(); } async static Task AsynchronousProcessing() { var sync = new CustomAwaitable(true); string result = await sync; Console.WriteLine(result); var async = new CustomAwaitable(false); result = await async; Console.WriteLine(result); } class CustomAwaitable { public CustomAwaitable(bool completeSynchronously) { _completeSynchronously = completeSynchronously; } public CustomAwaiter GetAwaiter() { return new CustomAwaiter(_completeSynchronously); } private readonly bool _completeSynchronously; } class CustomAwaiter : INotifyCompletion { private string _result = "Completed synchronously"; private readonly bool _completeSynchronously; public bool IsCompleted { get { return _completeSynchronously; } } public CustomAwaiter(bool completeSynchronously) { _completeSynchronously = completeSynchronously; } public string GetResult() { return _result; } public void OnCompleted(Action continuation) { ThreadPool.QueueUserWorkItem( state => { Thread.Sleep(TimeSpan.FromSeconds(1)); _result = GetInfo(); if (continuation != null) continuation(); }); } private string GetInfo() { return string.Format("Task is running on a thread id {0}. Is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } } }
工作原理
為了與await操作符保持兼容,類型應當遵守在C#5.0規格說明中的規定的一些要,求。如果你安裝了Visual Studio 2012,那么可以在C:Program FilesMicrosoft Visual Studio11.0VC#Specifications\1033 (假設你使用的是默認安裝路徑)目錄中找到該規格說明文檔。
在規格說明文檔的7.7.7.1節,我們發現了awaitable表達式的定義:
Await表達式的任務被要求是awaitable,如果一個表達式t滿足下面任意一條則認為是, awaitable的:
-
- t是動態編譯時的類型
- t有一個名為GetAwaiter的可訪問的實例或擴展方法該方法沒有參數和類型參數,並且返回值類型A滿足以下所有條件:
- A實現了System.Runtime.CompilerServices.INotifyCompletion接口(為簡單起見, '以后簡稱為INotifyCompletion)
- A有一個可訪問的、可讀的類型為bool的實例屬性IsCompleted
- A有一個名為GetResult的可訪問的實例方法,該方法沒有任何參數和類型參數。
這些信息足夠我們開始了。首先我們定義一個awaitable類型CustomAwaitable,並實現GetAwaiter方法,該方法返回一個CustomAwaiter類型的實例。CustomAwaiter實現了 .INotifyCompletion接口,擁有類型為bool的IsCompleted屬性,並且有GetResult方法,該方法返回一個字符串類型。最后,我們寫了一些代碼來創建兩個CustomAwaitable對象並對,其使用await關鍵字。
現在我們應該理解await表達式執行的方式了。這里並沒有引用規格說明文檔,以免陷入不必要的細節。基本上,如果IsCompleted屬性返回true,則只需同步調用GetResult方法。這種做法防止了該操作已經完成后我們仍然為執行異步任務而分配資源。通過給 CustomAwaitable對象的構造函數傳遞completeSynchronously參數來展示該場景。
另外,我們給CustomAwaiter的OnCompleted方法注冊了一個回調函數並啟動該異步操作。當操作完成時,就會調用提供的回調函數,該回調函數將會通過調用CustomAwaiter對象的GetResult方法來獲取結果。
對動態類型使用 await
本節展示了如何設計一個非常基本的類型,該類型能夠與await操作符和動態C#類型兼容。
請執行以下步驟來添加對Impromptulnterface NuGet包的引用:
(1)右鍵點擊項目中的引用文件夾,並選擇管理NuGet包 菜單選項。
(2)添加對你喜歡的Impromptulnterface NuGet包的引用。可以使用管理NuGet包對話框的搜索功能

class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); Console.ReadKey(); } async static Task AsynchronousProcessing() { string result = await GetDynamicAwaitableObject(true); Console.WriteLine(result); result = await GetDynamicAwaitableObject(false); Console.WriteLine(result); } static dynamic GetDynamicAwaitableObject(bool completeSynchronously) { dynamic result = new ExpandoObject(); dynamic awaiter = new ExpandoObject(); awaiter.Message = "Completed synchronously"; awaiter.IsCompleted = completeSynchronously; awaiter.GetResult = (Func<string>)(() => awaiter.Message); awaiter.OnCompleted = (Action<Action>) ( callback => ThreadPool.QueueUserWorkItem(state => { Thread.Sleep(TimeSpan.FromSeconds(1)); awaiter.Message = GetInfo(); if (callback != null) callback(); }) ); IAwaiter<string> proxy = Impromptu.ActLike(awaiter); result.GetAwaiter = (Func<dynamic>) ( () => proxy ); return result; } static string GetInfo() { return string.Format("Task is running on a thread id {0}. Is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } } public interface IAwaiter<T> : INotifyCompletion { bool IsCompleted { get; } T GetResult(); }
工作原理
這里我們重復了5.9節的技巧,但是這次借助於動態表達式,可以使用NuGet來實現該目標。NuGet是一個包含了很多有用的庫的包管理器。這次我們將使用一個庫來動態地創建,封裝對象,實現我們需要的接口。
首先我們創建了ExpandoObject類型的兩個實例,並把它們分配給動態的局部變量。這些變量將成為awaitable和awaiter對象。由於一個awaitable對象只需要擁有GetAwaiter方,法,提供該方法沒有問題。使用dynamic關鍵字組合ExpandoOibect允許我們自定義該對象,並通過分配相應的值來添加屬性和方法。事實上它是一個字典類型的集合,鍵類型是string,值類型是object,如果你很熟悉JavaScript編程語言,你可能會注意到它與JavaScript對象很相似。
由於dynamic關鍵字允許我們跳過C#的編譯時檢查。ExpandObject是以這樣的方式編,寫的:當你給屬性分配值時, ExpandObject創建了一個字典條目,鍵是屬性名,值是賦予的任何值。當嘗試獲取屬性值時,會在字典中查找並提供存儲在相應的字典條目中的值。如果該值是Action或Func類型,我們實際上存儲了一個委托,它可以當做方法使用。因此, ExpandoObject與dynamic類型的組合允許我們創建一個對象並動態地賦予其屬性和方法。
現在我們需要構造自定義的awaiter和awaitable對象。先從awaiter開始。首先提供一個名為Message的屬性並賦予初始值,然后使用Func<string>類型定義了GetResult方法.並分配一個lambda表達式,該表達式返回Message屬性值。接下來實現IsCompleted屬性。如果其值為true,則跳過剩下的工作並處理存儲在result局部變量中的awaitable對象。我們只需要添加一個方法用於返回該dynamic對象並從該對象返回awaiter對象。我們可以使用 result作為await表達式。然而,它將會同步運行。
主要的挑戰是在動態對象中實現異步處理。C#語言規格說明規定awaiter必須實現, INotifyCompletion或ICriticalNotifyCompletion接口,但是ExpandoObject卻沒有。甚至當我們動態地實現OnCompleted方法並添加到awaiter對象時,這仍然行不通,因為該對象沒有,實現上面提到的任何一個接口。
為了解決該問題,我們使用了NuGet提供的Impromptulnterface庫。它允許我們使用 Impromptu.ActLike方法來動態地創建代理對象,該對象將實現任何需要的接口。如果我們嘗試創建一個實現了INotifyCompletion接口的代理,仍然行不通,因為該代理對象不再是動態的,並且該接口只有OnCompleted方法,但沒有IsCompleted屬性或GetResult方法。作為最后的解決辦法,我們定義了一個泛型接口, IAwaiter<T>,它實現了INotifyCompletion並添加了所有需要的屬性和方法。現在,我們使用它生成代理並修改result對象來從GetAwaiter方法返回一個代理,而不是返回awaiter對象。現在程序可以工作了,我們構造了一個在運行時完全動態的awaitable對象。
使用並發集合
簡介
編程需要對基本的數據結構和算法有所了解。程序員為並發情況選擇最合適的數據結構,那就需要知道很多事情,例如算法運行時間、空間復雜度,以及大寫0標記法等。在不同的廣為人知的場景中,我們總知道哪種數據結構更高效。
對於並行計算,我們需要使用適當的數據結構。這些數據結構具備可伸縮性,盡可能地, "避免鎖,同時還能提供線程安全的訪問。.NET framework版本4引入了System.Collections.Concurrent命名空間,其中包含了一些數據結構。在本章中,我們將展示這些數據結構並通過簡單的例子來說明如何使用它們。
先從ConcurrentQueue開始。該集合使用了原子的比較和交換(Compare and Swap,簡稱CAS)操作,以及SpinWait來保證線程安全。它實現了一個先進先出( First In FirstOut,簡稱FIFO)的集合,這意味着元素出隊列的順序與加入隊列的順序是一致的。可以調用Enqueue方法向隊列中加入元素。TryDequeue方法試圖取出隊列中的第一個元素,而 TryPeek方法則試圖得到第一個元素但並不從隊列中刪除該元素。
ConcurrentStack的實現也沒有使用任何鎖,只采用了CAS操作。它是一個后進先出, (Last In First Out,簡稱LIFO)的集合,這意味着最近添加的元素會先返回。可以使用Push和PushRange方法添加元素,使用TryPop和TryPopRange方法獲取元素,以及使用TryPeek方法檢查元素。
ConcurrentBag是一個支持重復元素的無序集合。它針對這樣以下情況進行了優化,即多個線程以這樣的方式工作:每個線程產生和消費自己的任務,極少與其他線程的任務交互 (如果要交互則使用鎖),添加元素使用Add方法,檢查元素使用TryPeek方法,獲取元素使,用TryTake方法。
請避免使用上面提及的集合的Count屬性。實現這些集合使用的是鏈表, Count操作的時間復雜度為0(N)。如果想檢查集合是否為空,請使用IsEmpty屬性,其時間復雜度為0(1),
ConcurrentDictionary是一個線程安全的字典集合的實現。對於讀操作無需使用鎖。但是對於寫操作則需要鎖。該並發字典使用多個鎖,在字典桶之上實現了一個細粒度的鎖模型。使用參數concurrencyLevel可以在構造函數中定義鎖的數量,這意味着預估的線程數量將並發地更新該字典。
由於並發字典使用鎖,所以一些操作需要獲取該字典中的所有鎖。如果沒必要請避免使用以下操作: Count, IsEmpty, Keys, Values, CopyTo及ToArray。
BlockingCollection是對IProducerConsumerCollection泛型接口的實現的一個高級封裝。它有很多先進的功能來實現管道場景,即當你有一些步驟需要使用之前步驟運行的結果時。BlockingCollectione類支持如下功能:分塊、調整內部集合容量、取消集合操作、從多個塊集合中獲取元素。
使用 ConcurrentDictionary
本節展示了一個非常簡單的場景,比較在單線程環境中使用通常的字典集合與使用並發字典的性能。

class Program { static void Main(string[] args) { var concurrentDictionary = new ConcurrentDictionary<int, string>(); var dictionary = new Dictionary<int, string>(); var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { dictionary[i] = Item; } } sw.Stop(); Console.WriteLine("Writing to dictionary with a lock: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { concurrentDictionary[i] = Item; } sw.Stop(); Console.WriteLine("Writing to a concurrent dictionary: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { CurrentItem = dictionary[i]; } } sw.Stop(); Console.WriteLine("Reading from dictionary with a lock: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { CurrentItem = concurrentDictionary[i]; } sw.Stop(); Console.WriteLine("Reading from a concurrent dictionary: {0}", sw.Elapsed); Console.ReadKey(); } const string Item = "Dictionary item"; public static string CurrentItem; }
工作原理
當程序啟動時我們創建了兩個集合,其中一個是標准的字典集合,另一個是新的並發字典集合。然后采用鎖的機制向標准的字典中添加元素,並測量完成100萬次迭代的時間。同樣也采用同樣的場景來測量ConcurrentDictionary的性能,最后比較從兩個集合中獲取值的性能。
通過這個非常簡單的場景,我們發現ConcurrentDictionary寫操作比使用鎖的通常的字典要慢得多,而讀操作則要快些。因此如果對字典需要大量的線程安全的讀操作, ConcurrentDictionary是最好的選擇。
如果你對字典只需要多線程訪問只讀元素,則沒必要執行線程安全的讀操作。在此場景中最好只使用通常的字典或ReadOnlyDictionary集合。
ConcurrentDictionary的實現使用了細粒度鎖( fine-grained locking)技術,這在多線程寫入方面比使用鎖的通常的字典(也被稱為粗粒度鎖)的可伸縮性更好。正如本例中所示,當只用一個線程時,並發字典非常慢,但是擴展到5到6個線程(如果有足夠的CPU核心來同時運行它們),並發字典的性能會更好。
使用 ConcurrentQueue 實現異步處理
本節將展示創建能被多個工作者異步處理的一組任務的例子

class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.ReadKey(); } static async Task RunProgram() { var taskQueue = new ConcurrentQueue<CustomTask>(); var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskQueue)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i-1] = Task.Run( () => TaskProcessor(taskQueue, "Processor " + processorId, cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentQueue<CustomTask> queue) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask {Id = i}; queue.Enqueue(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } } static async Task TaskProcessor( ConcurrentQueue<CustomTask> queue, string name, CancellationToken token) { CustomTask workItem; bool dequeueSuccesful = false; await GetRandomDelay(); do { dequeueSuccesful = queue.TryDequeue(out workItem); if (dequeueSuccesful) { Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } }
工作原理
當程序運行時,我們使用ConcurrentQueue集合實例創建了一個任務隊列。然后創建了一個取消標志,它是用來在我們將任務放入隊列后停止工作的。接下來啟動了一個單獨的工,作線程來將任務放入任務隊列中。該部分為異步處理產生了工作量。
現在定義該程序中消費任務的部分。我們創建了四個工作者,它們會隨機等待一段時,間,然后從任務隊列中獲取一個任務,處理該任務,一直重復整個過程直到我們發出取消標志信號。最后,我們啟動產生任務的線程,等待該線程完成。然后使用取消標志給消費者發信號我們完成了工作。最后一步將等待所有的消費者完成。
我們看到隊列中的任務按從前到后的順序被處理,但一個后面的任務是有可能會比前面的任務先處理的,因為我們有四個工作者獨立地運行,而且任務處理時間並不是恆定的。我,們看到訪問該隊列是線程安全的,沒有一個元素會被提取兩次。
改變 ConcurrentStack 異步處理順序
.本節是前一小節的細微修改版。我們又一次創建了被多個工作者異步處理的一組任務,但是這次使用ConcurrentStack來實現並看看有什么不同。

class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.ReadKey(); } static async Task RunProgram() { var taskStack = new ConcurrentStack<CustomTask>(); var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskStack)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i - 1] = Task.Run( () => TaskProcessor(taskStack, "Processor " + processorId, cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentStack<CustomTask> stack) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask { Id = i }; stack.Push(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } } static async Task TaskProcessor( ConcurrentStack<CustomTask> stack, string name, CancellationToken token) { await GetRandomDelay(); do { CustomTask workItem; bool popSuccesful = stack.TryPop(out workItem); if (popSuccesful) { Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } }
工作原理
當程序運行時,我們創建了一個ConcurrentStack集合的實侈e其余的代碼與前一小節中幾乎一樣,唯一不同之處是我們對並發堆棧使用Push和TryPop方法,而對並發隊列使用Enqueue和TryDequeue方法。
現在可以看到任務處理的順序被改變了。堆棧是一個LIFO集合,工作者先處理最近的,任務。在並發隊列中,任務被處理的順序與被添加的順序幾乎一致。這意味着根據工作者的!數量,我們必將在一定時間窗內處理先被創建的任務。而在堆棧中,早先創建的任務具有較低的優先級,而且直到生產者停止向堆棧中放入更多任務后,該任務才有可能被處理。這種行為是確定的,最好在該場景下使用隊列。
使用 ConcurrentBag 創建一個爬蟲
本節展示了在多個獨立的既可生產工作又可消費工作的工作者間如何擴展工作量。

class Program { static void Main(string[] args) { CreateLinks(); Task t = RunProgram(); t.Wait(); } static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>(); static async Task RunProgram() { var bag = new ConcurrentBag<CrawlingTask>(); string[] urls = new[] {"http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/"}; var crawlers = new Task[4]; for (int i = 1; i <= 4; i++) { string crawlerName = "Crawler " + i.ToString(); bag.Add(new CrawlingTask { UrlToCrawl = urls[i-1], ProducerName = "root"}); crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName)); } await Task.WhenAll(crawlers); Console.ReadKey(); } static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName) { CrawlingTask task; while (bag.TryTake(out task)) { IEnumerable<string> urls = await GetLinksFromContent(task); if (urls != null) { foreach (var url in urls) { var t = new CrawlingTask { UrlToCrawl = url, ProducerName = crawlerName }; bag.Add(t); } } Console.WriteLine("Indexing url {0} posted by {1} is completed by {2}!", task.UrlToCrawl, task.ProducerName, crawlerName); } } static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task) { await GetRandomDelay(); if (_contentEmulation.ContainsKey(task.UrlToCrawl)) return _contentEmulation[task.UrlToCrawl]; return null; } static void CreateLinks() { _contentEmulation["http://microsoft.com/"] = new [] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" }; _contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" }; _contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" }; _contentEmulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" }; _contentEmulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" }; _contentEmulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" }; _contentEmulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" }; _contentEmulation["http://facebook.com/"] = new [] { "http://facebook.com/a.html", "http://facebook.com/b.html" }; _contentEmulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" }; _contentEmulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" }; _contentEmulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" }; _contentEmulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" }; _contentEmulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" }; _contentEmulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" }; _contentEmulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" }; _contentEmulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" }; } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(150, 200); return Task.Delay(delay); } class CrawlingTask { public string UrlToCrawl { get; set; } public string ProducerName { get; set; } } }
工作原理
該程序模擬了使用多個網絡爬蟲進行網頁索引的場景。網絡爬蟲是這樣一個程序:它使用網頁地址打開一個網頁,索引該網頁內容,嘗試訪問該頁面包含的所有鏈接,並且也索引這些鏈接頁面。剛開始,我們定義了一個包含不同網頁URL的字典。該字典模擬了包含其,他頁面鏈接的網頁。該實現非常簡單,並不關心索引已經訪問過的頁面,但正因為它如此簡單我們才可以關注並行工作負載。
接着創建了一個並發包,其中包含爬蟲任務。我們創建了四個爬蟲,並且給每個爬蟲都提供了一個不同的網站根URL,然后等待所有爬蟲完成工作。現在每個爬蟲開始檢索提供給,它的網站URL,我們通過等待一個隨機事件來模擬網絡10處理。如果頁面包含的URL越多,爬蟲向包中放入的任務也會越多。然后檢查包中是否還有任何需要爬蟲處理的任務,如果沒有說明爬蟲完成了工作。
如果檢查前四個根URL后的第一行輸出內容,我們將看到被爬蟲N放置的任務通常會,被同一個爬蟲處理。然而,接下來的行則會不同。這是因為ConcurrentBag內部針對多個線程既可以添加元素又可以刪除元素的場景進行了優化。實現方式是每個線程使用自己的本地,隊列的元素,所以使用該隊列時無需任何鎖。只有當本地隊列中沒有任何元素時,我們才執,行一些鎖定操作並嘗試從其他線程的本地隊列中“偷取”工作。這種行為有助於在所有工作,者間分發工作並避免使用鎖。
使用 BlockingCollection 進行異步處理
本節將描述如何使用BlockingCollection來簡化實現異步處理的工作負載。

class Program { static void Main(string[] args) { Console.WriteLine("Using a Queue inside of BlockingCollection"); Console.WriteLine(); Task t = RunProgram(); t.Wait(); Console.WriteLine(); Console.WriteLine("Using a Stack inside of BlockingCollection"); Console.WriteLine(); t = RunProgram(new ConcurrentStack<CustomTask>()); t.Wait(); } static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null) { var taskCollection = new BlockingCollection<CustomTask>(); if(collection != null) taskCollection= new BlockingCollection<CustomTask>(collection); var taskSource = Task.Run(() => TaskProducer(taskCollection)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = "Processor " + i; processors[i - 1] = Task.Run( () => TaskProcessor(taskCollection, processorId)); } await taskSource; await Task.WhenAll(processors); } static async Task TaskProducer(BlockingCollection<CustomTask> collection) { for (int i = 1; i <= 20; i++) { await Task.Delay(20); var workItem = new CustomTask { Id = i }; collection.Add(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } collection.CompleteAdding(); } static async Task TaskProcessor( BlockingCollection<CustomTask> collection, string name) { await GetRandomDelay(); foreach (CustomTask item in collection.GetConsumingEnumerable()) { Console.WriteLine("Task {0} has been processed by {1}", item.Id, name); await GetRandomDelay(); } } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } }
工作原理
先說第一個場景,這里我們使用了BlockingCollection類,它帶來了很多優勢。首先,我們能夠改變任務存儲在阻塞集合中的方式。默認情況下它使用的是ConcurrentQueue容器,但是我們能夠使用任何實現了IProducerConsumerCollection泛型接口的集合。為了演示該點,我們運行了該程序兩次,第二次時使用ConcurrentStack作為底層集合。
工作者通過對阻塞集合迭代調用GetConsumingEnumerable方法來獲取工作項。如果在該集合中沒有任何元素,迭代器會阻塞工作線程直到有元素被放置到集合中。當生產者調用集合的CompleteAdding時該迭代周期會結束。這標志着工作完成了。
這里很容易犯一個錯誤,即對BlockingCollection進行迭代,因為它自身實現了IEnumerable接口。不要忘記使用GetConsumingEnumerable,否則你迭代的只是集合的“快照”,這並不是期望的程序行為。
工作量生產者將任務插入到BlockingCollection然后調用CompleteAdding方法,這會使所有工作者完成工作。現在在程序輸出中我們看到兩個結果序列,演示了並發隊列和堆棧集合的不同之處。
使用 PLINQ
簡介
NET Framework庫中有個子集叫做並行庫,通常被稱為並行框架擴展( Parallel Framework Extensions,簡稱PFX),這是這些庫非常早期的版本的名稱。並行庫隨着.NET Framework 4.0一起發布,包含三大主要部分:
- 任務並行庫(TPL)
- 並發集合
- 並行LINQ (或PLINQ)
事實上我們將 "程序分割成一組任務並使用不同的線程來運行不同的任務。這種方式被稱為任務並行( task parallelism), 目前我們只學習了任務並行。.
想象一下我們有一個程序針對一組大數據進行重量級運算。並行運行該程最容易的方式,是將該組數據分割成較小的數據塊,對這些數據塊進行並行計算,然后聚合這些計算結果。這種編程模型稱為數據並行(data parallelism)。
任務並行是最底層的抽象層。我們將程序定義為任務的組合,顯式地定義這些任務如何組合。由此方式組成的程序會非常復雜和細節化。並行操作被定義在該程序的不同位置,隨着並行操作的增長,程序變得越來越難理解和維護。采用這種方式來並行程序被稱為無結構的並行(unstructured parallelism),這就是我們為復雜的並行邏輯付出的代價。
然而,當我們有較簡單的程序邏輯時,我們可以將更多的並行細節推給PFX庫和C#編譯器。例如,我們可以說, “我想以並行方式運行這三個方法,但我不關心是如何實現並行的,讓NET基礎設施決定細節。”這產生了一個抽象層使得我們不用提供一個關於如何實現並行的細節描述。這種方式被稱為結構並行( structured parallelism),因為並行通常是一組聲明,並且在程序中每個並行情況並定義在確切的地方。
這可能導致一種印象,即無結構並行是一種不好的實踐,應該始終使用結構並行替代它。我想強調這一點是不對的。結構並行確實更易維護,應該盡可能地使用,但是它並不是萬能的。通常有很多情況我們不能簡單地使用結構並行,那么以非結構化的方式使用TPL任務並行也是完全可以的。
任務並行庫中有一個名為Parallel的類,其提供了一組API用來實現結構並行。它仍然是TPL的一部分,我們在本章介紹它的原因是它是從較低的抽象層向較高的抽象層過渡的完美例子。當使用Parallel類的API時,我們無需提供分割工作的細節。但是我們仍要顯式定義如何從分割的結果中得到單個結果。
PLINQ具有最高級抽象。它自動將數據分割為數據塊,並且決定是否真的需要並行化查詢,或者使用通常的順序查詢處理更高效。PLINO基礎設施會將分割任務的執行結果組合到一起。有很多選項可供程序員來優化查詢,使用盡可能高的性能獲取結果。
在本章中我們將涵蓋Parallel類的用法以及很多不同的PLINQ選項,例如讓LINQ查詢並行化,設置異常模型及設置PLINQ查詢的並行等級,處理查詢項的順序,以及處理, PLINQ異常。我們也會學習如何管理PLINO查詢的數據分割。
使用 Parallel 類
本節展示了如何使用Parallel類的API,我們將學習如何並行地調用方法,如何執行並, "行的循環,以及調整並行機制。

class Program { static void Main(string[] args) { Parallel.Invoke( () => EmulateProcessing("Task1"), () => EmulateProcessing("Task2"), () => EmulateProcessing("Task3") ); var cts = new CancellationTokenSource(); var result = Parallel.ForEach( Enumerable.Range(1, 30), new ParallelOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = Environment.ProcessorCount, TaskScheduler = TaskScheduler.Default }, (i, state) => { Console.WriteLine(i); if (i == 20) { state.Break(); Console.WriteLine("Loop is stopped: {0}", state.IsStopped); } }); Console.WriteLine("---"); Console.WriteLine("IsCompleted: {0}", result.IsCompleted); Console.WriteLine("Lowest break iteration: {0}", result.LowestBreakIteration); Console.ReadKey(); } static string EmulateProcessing(string taskName) { Thread.Sleep(TimeSpan.FromMilliseconds( new Random(DateTime.Now.Millisecond).Next(250, 350))); Console.WriteLine("{0} task was processed on a thread id {1}", taskName, Thread.CurrentThread.ManagedThreadId); return taskName; } }
工作原理
該程序演示了Parallel類的不同功能。與在任務並行庫中定義任務的方式相比,調用 "Invoke方法可以免去很多麻煩就可實現並行地運行多個任務。Invoke方法會阻塞其他線程直到所有的任務都被完成,這是一個非常常見的方面使用Invoke方法的場景。
下一個功能是並行循環,使用For和ForEach方法來定義循環。由ForEach方法與For方法非常相似,我們將仔細講解ForEach方法。並行ForEach循環可以通過給每個集合項應用一個action委托的方式,實現並行地處理任何IEnumerable集合。我們可以提供幾種選項,自定義並行行為,並得到一個結果來說明循環是否成功完成。
可以給ForEach方法提供一個ParallelOptions類的實例來控制並行循環。其允許我們使用CancellationToken取消循環,限制最大並行度(並行運行的最大操作數),還可以提供一個自定義的TaskScheduler類來調度任務。Action可以接受一個附加的ParallelLoopState參數.可用於從循環中跳出或者檢查當前循環的狀態。
使用ParallelLoopState有兩種方式停止並行循環。既可以使用Break方法,也可以使用Stop方法。Stop方法告訴循環停止處理任何工作,並設置並行循環狀態屬性, IsStopped值為true, Break方法停止其之后的迭代,但之前的迭代還要繼續工作。在那,種情況下,循環結果的LowestBreaklteration屬性將會包含當Break方法被調用時的最低,循環次數。
並行化 LINQ 查詢
本節將描述如何使用PLINQ來並行化查詢,以及如何將並行查詢改為順序處理。

class Program { static void Main(string[] args) { var parallelQuery = from t in GetTypes().AsParallel() select EmulateProcessing(t); var cts = new CancellationTokenSource(); cts.CancelAfter(TimeSpan.FromSeconds(3)); try { parallelQuery .WithDegreeOfParallelism(Environment.ProcessorCount) .WithExecutionMode(ParallelExecutionMode.ForceParallelism) .WithMergeOptions(ParallelMergeOptions.Default) .WithCancellation(cts.Token) .ForAll(Console.WriteLine); } catch (OperationCanceledException) { Console.WriteLine("---"); Console.WriteLine("Operation has been canceled!"); } Console.WriteLine("---"); Console.WriteLine("Unordered PLINQ query execution"); var unorderedQuery = from i in ParallelEnumerable.Range(1, 30) select i; foreach (var i in unorderedQuery) { Console.WriteLine(i); } Console.WriteLine("---"); Console.WriteLine("Ordered PLINQ query execution"); var orderedQuery = from i in ParallelEnumerable.Range(1, 30).AsOrdered() select i; foreach (var i in orderedQuery) { Console.WriteLine(i); } Console.ReadKey(); } static string EmulateProcessing(string typeName) { Thread.Sleep(TimeSpan.FromMilliseconds( new Random(DateTime.Now.Millisecond).Next(250,350))); Console.WriteLine("{0} type was processed on a thread id {1}", typeName, Thread.CurrentThread.ManagedThreadId); return typeName; } static IEnumerable<string> GetTypes() { return from assembly in AppDomain.CurrentDomain.GetAssemblies() from type in assembly.GetExportedTypes() where type.Name.StartsWith("Web") orderby type.Name.Length select type.Name; } }
工作原理
當程序運行時,我們創建了一個LINQ查詢,其使用反射API來查詢加載到當前應用程,序域中的所有組件中名稱以“Web"開頭的類型。我們使用EmulateProcessing方法模擬處理每個項時間的延遲,並使用PrintInfo方法打印結果。我們也使用了Stopwatch類來測量每個查詢的執行時間。
首先我們運行了一個通常的順序LINQ查詢。此時並沒有並行化,所有任何操作都運,行在當前線程。該查詢的第二版顯式地使用了ParallelEnumerable類。ParallelEnumerable包含了PLINO的邏輯實現,並且作為IEnumerable集合功能的一組擴展方法。通常無需顯式,地使用該類,在這里是為了演示PLINQ的實際工作方式。第二個版本以並行的方式運行, "EmulateProcessing操作。然而,默認情況下結果會被合並到單個線程中,所以查詢的執行時,間應該比第一個版本少幾秒。
第三個版本展示了如何使用AsParallel方法來將LINO查詢按聲明的方式並行化運行。這里我們並不關心實現細節,只是為了說明我們想以並行的方式運行。然而,該版本的關鍵不同處是我們使用了ForAll方法來打印查詢結果。打印結果操作與任務被處理的線程是同一個線程,跳過了結果合並步驟。它允許我們也能以並行的方式運行PrintInfo方法,甚至該版本運行速度比之前的版本更快。
最后一個例子展示了如何使用AsSequential方法將PLINQ查詢以順序方式運行。可以看到該查詢運行方式與第一個示例完全一樣。
使用異步I/O
簡介
如果在客戶端運行程序,最重要的事情之一是有一個響應的用戶界面。這意味着無論應用程序發生什么,所有的用戶界面元素(比如按鈕和進度條)都要保持快速運行,用戶能夠從應用程序得到快速響應。達到該點並不容易!如果你嘗試在Windows系統中打開記事本編輯器並加載一個有幾個兆字節大小的文檔,應用程序窗口將凍結一段顯著的時間,因為整個文檔要先從硬盤中加載,然后程序才能開始處理用戶輸入。
這是一個非常重要的問題,在該情況下,唯一方案是無論如何都要避免阻塞UI線程。這反過來意味着為了防止阻塞UI線程,每個與UI有關的API必須只被允許異步調用。這是Window 8操作系統重新升級API的關鍵原因,其幾乎把每個方法替換為異步方式。但是如果應用程序使用多線程來達到此目的會影響性能嗎?當然會!然而考慮到只有一個用戶,那么這是划算的。如果應用程序可以使用電腦的所有能力從而變得更加高效,而且該能力只為運行程序的唯一用戶服務,這是好事。
接下來看看第二種情況。如果程序運行在服務器端,則是完全不同的情形。可伸縮性是最高優先級,這意味着單個用戶消耗越少的資源越好。如果為每個用戶創建多個線程,則!可伸縮性並不好。以高效的方式來平衡應用程序資源的消耗是個非常復雜的問題。例如,在ASPNET (其是微軟提供的web應用程序平台)中,我們使用工作線程池來服務客戶端請求。該池的工作線程數是有限的,所以不得不最小化每個工作線程的使用時間以便達到高伸縮性。這意味着需要把工作線程越快越好地放回到池中,從而可以服務下一個請求。如果我們啟動了一個需要計算的異步操作,則整個工作流程會很低效。首先從線程池中取出一個工作線程用以服務客戶端請求。然后取出另一個工作線程並開始處理異步操作。現在有兩個工作線程都在處理請求,如果第一個線程能做些有用的事則非常好!遺憾的是,通常情況是我們簡單等待異步操作完成,但是我們卻消費了兩個工作線程,而不是一個。在該場景中,異步比同步執行實際上更糟糕!我們不需要使用所有CPU核心,因為我們已經在服務很多客戶端,它們已經使用了CP的所有計算能力。我們無須保持第一個線程響應,因為這沒有用戶界面。那么為什么我們應該在服務器端使用異步呢?
答案是只有異步輸人/輸出操作才應該使用異步。目前,現代計算機通常有一個磁盤驅動器來存儲文件,一塊網卡來通過網絡發送與接收數據。所有這些設備都有自己的微型計算機,以非常底層的方式來管理輸入/輸出操作並發信號給操作系統結果。這又是一個非常復雜的主題。但為了讓概念清楚,我們可以這樣說,有一種方式讓程序員開始一個輸人/輸出,操作,並提供給操作系統一段代碼,當操作完成后被該代碼會被調用。在啟動I/O任務與完我之間,並不需要CPU工作。這是由相應的磁盤和網絡控制器的微型計算機完成的。這種執行I/O任務的方式被稱為I/O線程。實現時使用的是,NET線程池,並且使用了一個來自操作系統的基礎設施,叫做I/O完成端口。
在APSNET中,一旦有一個異步的I/O操作在工作線程中開始時,它會被立即返回到線程池中。當該操作繼續運行時,該線程可以服務其他的客戶端。最終,當操作發出信號完成時, ASPNET基礎設施從線程池中獲取一個空閑的工作線程(該線程可能與操作開始時的!線程不同),然后會完成該操作。
好的,我們現在了解了I/O線程對服務器應用程序的重要性。遺憾的是,很難看出,哪些API在底層使用了I/O線程。除了學習源代碼外,唯一的方式是簡單知道哪個NET , Framework類庫對I/O線程進行了優化。在本章中,我們將學習如何使用一些這樣的API,我們將學習如何異步操作文件,如何使用網絡I/O來創建一個HTTP服務器並調用Windows Communication Foundation服務,以及如何使用異步API來查詢數據庫。
另一個需要考慮的重要問題是並行。由於一些原因,集中地並行磁盤操作可能導致很低的性能。請記住並行I/O操作經常非常低效,順序執行I/O要好一些,但是要以異步的方式執行。
異步的使用文件
本節講述了如何創建一個文件,並且以異步的方式讀寫數據。

internal class Program { static void Main(string[] args) { var t = ProcessAsynchronousIO(); t.GetAwaiter().GetResult(); Console.ReadKey(); } const int BUFFER_SIZE = 4096; async static Task ProcessAsynchronousIO() { using (var stream = new FileStream("test1.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.None, BUFFER_SIZE)) { Console.WriteLine("1. Uses I/O Threads: {0}", stream.IsAsync); byte[] buffer = Encoding.UTF8.GetBytes(CreateFileContent()); var writeTask = Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, 0, buffer.Length, null); await writeTask; } using (var stream = new FileStream("test2.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.None, BUFFER_SIZE, FileOptions.Asynchronous)) { Console.WriteLine("2. Uses I/O Threads: {0}", stream.IsAsync); byte[] buffer = Encoding.UTF8.GetBytes(CreateFileContent()); var writeTask = Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, 0, buffer.Length, null); await writeTask; } using (var stream = File.Create("test3.txt", BUFFER_SIZE, FileOptions.Asynchronous)) using (var sw = new StreamWriter(stream)) { Console.WriteLine("3. Uses I/O Threads: {0}", stream.IsAsync); await sw.WriteAsync(CreateFileContent()); } using (var sw = new StreamWriter("test4.txt", true)) { Console.WriteLine("4. Uses I/O Threads: {0}", ((FileStream)sw.BaseStream).IsAsync); await sw.WriteAsync(CreateFileContent()); } Console.WriteLine("Starting parsing files in parallel"); Task<long>[] readTasks = new Task<long>[4]; for (int i = 0; i < 4; i++) { readTasks[i] = SumFileContent(string.Format("test{0}.txt", i + 1)); } long[] sums = await Task.WhenAll(readTasks); Console.WriteLine("Sum in all files: {0}", sums.Sum()); Console.WriteLine("Deleting files..."); Task[] deleteTasks = new Task[4]; for (int i = 0; i < 4; i++) { string fileName = string.Format("test{0}.txt", i + 1); deleteTasks[i] = SimulateAsynchronousDelete(fileName); } await Task.WhenAll(deleteTasks); Console.WriteLine("Deleting complete."); } static string CreateFileContent() { var sb = new StringBuilder(); for (int i = 0; i < 100000; i++) { sb.AppendFormat("{0}", new Random(i).Next(0, 99999)); sb.AppendLine(); } return sb.ToString(); } async static Task<long> SumFileContent(string fileName) { using (var stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.None, BUFFER_SIZE, FileOptions.Asynchronous)) using (var sr = new StreamReader(stream)) { long sum = 0; while (sr.Peek() > -1) { string line = await sr.ReadLineAsync(); sum += long.Parse(line); } return sum; } } static Task SimulateAsynchronousDelete(string fileName) { return Task.Run(() => File.Delete(fileName)); } }
工作原理
當程序運行時,我們以不同的方式創建了4個文件,並且填充了隨機數據。在第一個例 子中,使用的是FileStream類以及其方法,將異步編程模型API轉換成任務。第二個例子中也一樣,但是給FileStream構造函數提供了FileStrearn.Asynchronous參數。
使用FileOptions.Asynchronous選項是非常重要的。如果忽略該選項,我們依然可以以異步的方式使用該文件,但這只是在線程池中的異步委托調用。只有提供了該選項(或者在另一個構造函數重載中使用bool useAsync),才能對FileStream類使用異步1O,
第三個例子使用了一些簡化的API,比如File.Create方法和StreamWrite類。它也使用 1/0線程,我們可以使用Stream.IsAsync屬性來檢查。最后一個例子說明了過分簡化也不好。這里我們借助於異步委托調用來模擬異步1O,其實並沒有使用異步1O。
接着並行地異步地從所有文件中讀取數據,統計每個文件內容,然后求總和。最后,刪除所有文件。由於在任何非Windows商店應用程序中並沒有異步刪除文件的API,我們使用 Task.Run工廠方法來模擬異步刪除文件。
編寫一個異步的HTTP服務器和客戶端
本節展示了如何編寫一個簡單的異步HTTP服務器。

class Program { static void Main(string[] args) { var server = new AsyncHttpServer(portNumber: 1234); var t = Task.Run(() => server.Start()); Console.WriteLine("Listening on port 1234. Open http://localhost:1234 in your browser."); Console.WriteLine("Trying to connect:"); Console.WriteLine(); GetResponseAsync("http://localhost:1234").GetAwaiter().GetResult(); Console.WriteLine(); Console.WriteLine("Press Enter to stop the server."); Console.ReadLine(); server.Stop().GetAwaiter().GetResult(); Console.ReadKey(); } static async Task GetResponseAsync(string url) { using (var client = new HttpClient()) { HttpResponseMessage responseMessage = await client.GetAsync(url); string responseHeaders = responseMessage.Headers.ToString(); string response = await responseMessage.Content.ReadAsStringAsync(); Console.WriteLine("Response headers:"); Console.WriteLine(responseHeaders); Console.WriteLine("Response body:"); Console.WriteLine(response); } } class AsyncHttpServer { readonly HttpListener _listener; const string RESPONSE_TEMPLATE = "<html><head><title>Test</title></head><body><h2>Test page</h2><h4>Today is: {0}</h4></body></html>"; public AsyncHttpServer(int portNumber) { _listener = new HttpListener(); _listener.Prefixes.Add(string.Format("http://+:{0}/", portNumber)); } public async Task Start() { _listener.Start(); while (true) { var ctx = await _listener.GetContextAsync(); Console.WriteLine("Client connected..."); var response = string.Format(RESPONSE_TEMPLATE, DateTime.Now); using (var sw = new StreamWriter(ctx.Response.OutputStream)) { await sw.WriteAsync(response); await sw.FlushAsync(); } } } public async Task Stop() { _listener.Abort(); } } }
工作原理
這里我們通過HttpListener類實現了一個非常簡單的web服務器。也使用了TcpListener類進行TCP套接字10操作。我們配置該監聽器接收任何主機到本地機器1234端口的連接。然后在單獨的工作線程中啟動該監聽器,從而在主線程中可以控制該監聽器。
當使用GetContextAsync方法時會發生異步I/O操作。遺憾的是,其並不接收, CancellationToken從而實現取消功能。所以如果想關閉該服務器,只需調用listener.Abort.方法,這將丟棄所有連接並關閉該服務器。
為了對該服務器執行一個異步請求,我們使用了統一命名空間下的System.Net.Http集合中的HttpClient類。我們使用Get.Async方法來發起一個異步的HTTP GET請求。還有其他的方法用於發起其他HTTP請求,比如POST, DELETE以及PUT, HttpClient還有很多其他,的選項,比如使用不同的格式(比如XML和JSON)來序列化和反序列化對象,指定代理服,務器地址,認證以及其他配置。
當運行該程序時,可以看到該服務器被啟動起來。在服務器端代碼中,我們使用, GetContextAsync方法來接收新的客戶端連接。當有新的客戶端連接時該方法就會返回,我,們簡單的輸出一個包含當前日期和時間的非常基礎的HTML作為響應。然后我們請求服務器,並打印出響應頭和內容。你也可以打開瀏覽器訪問http://localhost:1234/地址。你將看到相同的響應結果顯示在瀏覽器窗口。
異步操作數據庫
本節演示了創建數據庫,以及異步地操作數據、讀取數據的過程。

class Program { static void Main(string[] args) { const string dataBaseName = "CustomDatabase"; var t = ProcessAsynchronousIO(dataBaseName); t.GetAwaiter().GetResult(); Console.WriteLine("Press Enter to exit"); Console.ReadLine(); } async static Task ProcessAsynchronousIO(string dbName) { try { const string connectionString = @"Data Source=(LocalDB)\v11.0;Initial Catalog=master;Integrated Security=True"; string outputFolder = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); string dbFileName = Path.Combine(outputFolder, string.Format(@".\{0}.mdf", dbName)); string dbLogFileName = Path.Combine(outputFolder, string.Format(@".\{0}_log.ldf", dbName)); string dbConnectionString = string.Format(@"Data Source=(LocalDB)\v11.0;AttachDBFileName={1};Initial Catalog={0};Integrated Security=True;", dbName, dbFileName); using (var connection = new SqlConnection(connectionString)) { await connection.OpenAsync(); if (File.Exists(dbFileName)) { Console.WriteLine("Detaching the database..."); var detachCommand = new SqlCommand("sp_detach_db", connection); detachCommand.CommandType = CommandType.StoredProcedure; detachCommand.Parameters.AddWithValue("@dbname", dbName); await detachCommand.ExecuteNonQueryAsync(); Console.WriteLine("The database was detached succesfully."); Console.WriteLine("Deleteing the database..."); if(File.Exists(dbLogFileName)) File.Delete(dbLogFileName); File.Delete(dbFileName); Console.WriteLine("The database was deleted succesfully."); } Console.WriteLine("Creating the database..."); string createCommand = String.Format("CREATE DATABASE {0} ON (NAME = N'{0}', FILENAME = '{1}')", dbName, dbFileName); var cmd = new SqlCommand(createCommand, connection); await cmd.ExecuteNonQueryAsync(); Console.WriteLine("The database was created succesfully"); } using (var connection = new SqlConnection(dbConnectionString)) { await connection.OpenAsync(); var cmd = new SqlCommand("SELECT newid()", connection); var result = await cmd.ExecuteScalarAsync(); Console.WriteLine("New GUID from DataBase: {0}", result); cmd = new SqlCommand(@"CREATE TABLE [dbo].[CustomTable]( [ID] [int] IDENTITY(1,1) NOT NULL, [Name] [nvarchar](50) NOT NULL, CONSTRAINT [PK_ID] PRIMARY KEY CLUSTERED ([ID] ASC) ON [PRIMARY]) ON [PRIMARY]", connection); await cmd.ExecuteNonQueryAsync(); Console.WriteLine("Table was created succesfully."); cmd = new SqlCommand(@"INSERT INTO [dbo].[CustomTable] (Name) VALUES ('John'); INSERT INTO [dbo].[CustomTable] (Name) VALUES ('Peter'); INSERT INTO [dbo].[CustomTable] (Name) VALUES ('James'); INSERT INTO [dbo].[CustomTable] (Name) VALUES ('Eugene');", connection); await cmd.ExecuteNonQueryAsync(); Console.WriteLine("Inserted data succesfully"); Console.WriteLine("Reading data from table..."); cmd = new SqlCommand(@"SELECT * FROM [dbo].[CustomTable]", connection); using (SqlDataReader reader = await cmd.ExecuteReaderAsync()) { while (await reader.ReadAsync()) { var id = reader.GetFieldValue<int>(0); var name = reader.GetFieldValue<string>(1); Console.WriteLine("Table row: Id {0}, Name {1}", id, name); } } } } catch(Exception ex) { Console.WriteLine("Error: {0}", ex.Message); } } }
工作原理
該程序使用了一個軟件,叫做SOL Server 2012 LocalDb,安裝Visual Studio 2012時會附帶安裝它,應該能正常使用。但是如果有什么錯誤,你可以通過安裝向導來修復該組件。
先要配置數據庫文件的存放路徑。我們將數據庫文件放置在應用程序執行目錄中。有兩個文件,一個是數據庫本身,另一個是事務日志文件。我們也配置了兩個連接字符串來定義如何連接數據庫。第一個字符串是連接到LocalDb引擎來分離數據庫。如果數據庫已經存在、則刪除並重建。當打開連接以及單獨使用OpenAsync和ExecuteNonQueryAsync方法執,行SQL命令時、我們使用了10異步操作。
在該任務完成后,我們附加了一個最新創建的數據庫。我們創建了一張新的表並插入了一些數據。除了之前提到的方法,我們還使用了ExecuteScalarAsync來異步地從數據庫引擎中得到一個標量值,並且使用SqIDataReaderReadAsync方法來從數據庫表中異步地讀取數據行。
如果在數據庫有一個大數據量的表,里面數據行中包含大數據量的二進制值,可以使用CommandBehavior.SequentialAcess枚舉來創建數據閱讀器異步地通過數據閱讀器獲取大字段值。,並使用GetFieldValueAsync方法
異步調用 WCF 服務
本節描述了如何創建一個WCF服務,並宿主在命令行應用程序中。客戶端可以訪問服務元數據,並以異步的方式消費它
請執行以下步驟來了解如何使用WCF服務:
- 新建一個C#命令行應用程序項目。
- 添加對System.ServiceModel庫的引用。右鍵點擊該項目的引用目錄,選擇添加引用.菜單選項。添加對System.ServiceModel庫的引用。
- 在Program.cs文件中加入以下using指令:
using System; using System.ServiceModel; using System.ServiceModel.Description; using System.Threading.Tasks;
- 在Program類中加入以下代碼片段:
const string SERVICE_URL = "http://localhost:1234/HelloWorld"; static async Task RunServiceClient() { var endpoint = new EndpointAddress(SERVICE_URL); var channel = ChannelFactory<IHelloWorldServiceClient>.CreateChannel(new BasicHttpBinding(), endpoint); var greeting = await channel.GreetAsync("Eugene"); Console.WriteLine(greeting); } [ServiceContract(Namespace = "Packt", Name = "HelloWorldServiceContract")] public interface IHelloWorldService { [OperationContract] string Greet(string name); } [ServiceContract(Namespace = "Packt", Name = "HelloWorldServiceContract")] public interface IHelloWorldServiceClient { [OperationContract] string Greet(string name); [OperationContract] Task<string> GreetAsync(string name); } public class HelloWorldService : IHelloWorldService { public string Greet(string name) { return string.Format("Greetings, {0}!", name); } }
- 在Main方法中加人以下代碼片段:
ServiceHost host = null; try { host = new ServiceHost(typeof (HelloWorldService), new Uri(SERVICE_URL)); var metadata = host.Description.Behaviors.Find<ServiceMetadataBehavior>(); if (null == metadata) { metadata = new ServiceMetadataBehavior(); } metadata.HttpGetEnabled = true; metadata.MetadataExporter.PolicyVersion = PolicyVersion.Policy15; host.Description.Behaviors.Add(metadata); host.AddServiceEndpoint(ServiceMetadataBehavior.MexContractName, MetadataExchangeBindings.CreateMexHttpBinding(),"mex"); var endpoint = host.AddServiceEndpoint(typeof (IHelloWorldService), new BasicHttpBinding(), SERVICE_URL); host.Faulted += (sender, e) => Console.WriteLine("Error!"); host.Open(); Console.WriteLine("Greeting service is running and listening on:"); Console.WriteLine("{0} ({1})", endpoint.Address, endpoint.Binding.Name); var client = RunServiceClient(); client.GetAwaiter().GetResult(); Console.WriteLine("Press Enter to exit"); Console.ReadLine(); } catch (Exception ex) { Console.WriteLine("Error in catch block: {0}", ex); } finally { if (null != host) { if (host.State == CommunicationState.Faulted) { host.Abort(); } else { host.Close(); } } }
工作原理
Windows Communication Foundation (簡稱WCF)是一個框架,用於以不同的方式調用,遠程服務。其中一個有一段時間非常流行,用於通過HTTP使用基於XML的協議來調用遠,程服務,它叫做簡單對象訪問協議(Simple Object Access Protocol,簡稱SOAP)。
Visual Studio 2012對WCF服務有着非常豐富的支持。例如,你可以使用添加服務引用,菜單項給這樣的服務添加引用。你也可對本節中的服務使用此功能,因為我們提供了服務元數據。
為了創建這樣的服務,我們需要使用ServiceHost類來宿主我們的服務。我們通過提供,一個服務實現類型和服務地址URL來描述如何宿主服務。然后配置了元數據終端和服務終,端。最后,使用Faulted事件來處理錯誤,並運行該宿主服務。
為了消費該服務,我們創建了一個客戶端,這是主要的技巧所在。在服務器端,我們有,.一個服務,是一個普通的同步方法,叫做Greet,服務契約1HelloWorldService定義了該方,法。然而,如果想使用異步網絡1O,我們需要異步地調用該方法。可以通過使用匹配的命名空間和服務名來創建一個新的服務契約,然后同時定義同步方法和基於任務的異步方法。盡管事實上在服務器端我們沒有異步方法,但是如果我們遵循命名約定, WCF基礎設施明白,我們想創建一個異步的代理方法。
因此,當我們創建一個1HelloworldServiceClient代理渠道, WCF會正確地路由一個異步調用到該服務器端同步方法。如果你運行程序,然后打開瀏覽器並使用該服務的URL http://localhost: 1234/Helloworld來訪問該服務。你會看到該服務的描述,還可以瀏覽XML元數據,該元數據可用於從Visual Studio 2012添加服務引用。如果你嘗試生成引用,將看到稍,微有點復雜的代碼,但它是自動創建的,並且易於使用。