並發編程概述--C#並發編程經典實例


優秀軟件的一個關鍵特征就是具有並發性。過去的幾十年,我們可以進行並發編程,但是難度很大。以前,並發性軟件的編寫、調試和維護都很難,這導致很多開發人員為圖省事放棄了並發編程。新版.NET 中的程序庫和語言特征,已經讓並發編程變得簡單多了。隨着Visual Studio 2012 的發布,微軟明顯降低了並發編程的門檻。以前只有專家才能做並發編程,而今天,每一個開發人員都能夠(而且應該)接受並發編程。

1.1簡介

首先,我來解釋幾個貫穿本書始終的術語。先來介紹並發。

  • 並發 
    同時做多件事情

這個解釋直接表明了並發的作用。終端用戶程序利用並發功能,在輸入數據庫的同時響應用戶輸入。服務器應用利用並發,在處理第一個請求的同時響應第二個請求。只要你希望程序同時做多件事情,你就需要並發。幾乎每個軟件程序都會受益於並發。大多數開發人員一看到“並發”就會想到“多線程”。對這兩個概念,需要做一下區分。

  • 多線程 
    並發的一種形式,它采用多個線程來執行程序。

從字面上看,多線程就是使用多個線程,多線程是並發的一種形式,但不是唯一的形式。實際上,直接使用底層線程類型在現代程序中基本不起作用。比起老式的多線程機制,采用高級的抽象機制會讓程序功能更加強大、效率更高因此,這里盡量不涉及一些過時的技術。書中所有多線程的方法都采用高級類型,而不是Thread或BackgroundWorker。

一旦你輸入new Thread(),那就糟糕了,說明項目中的代碼太過時了。

但是,不要認為多線程已經徹底被淘汰了!因為線程池要求多線程繼續存在。線程池存放任務的隊列,這個隊列能夠根據需要自行調整。相應地,線程池產生了另一個重要的並發形式:並行處理

  • 並行處理 
    把正在執行的大量的任務分割成小塊,分配給多個同時運行的線程。

為了讓處理器的利用效率最大化,並行處理(或並行編程)采用多線程。當現代多核CPU執行大量任務時,若只用一個核執行所有任務,而其他核保持空閑,這顯然是不合理的。並行處理把任務分割成小塊並分配給多個線程,讓它們在不同的核上獨立運行。並行處理是多線程的一種,而多線程是並發的一種。在現代程序中,還有一種非常重要但很多人還不熟悉的並發類型:異步編程

  • 異步編程 
    並發的一種形式,它采用future 模式或回調(callback)機制,以避免產生不必要的線程。

一個future(或promise)類型代表一些即將完成的操作。在.NET 中,新版future 類型有Task 和Task。在老式異步編程API 中,采用回調或事件(event),而不是future。異步編程的核心理念是異步操作:啟動了的操作將會在一段時間后完成。這個操作正在執行時,不會阻塞原來的線程。啟動了這個操作的線程,可以繼續執行其他任務。當操作完成時,會通知它的future,或者調用回調函數,以便讓程序知道操作已經結束。異步編程是一種功能強大的並發形式,但直至不前,實現異步編程仍需要特別復雜的代碼。VS2012 支持async 和await,這讓異步編程變得幾乎和同步(非並發)編程一樣容易。並發編程的另一種形式是響應式編程(reactive programming)。異步編程意味着程序啟動一個操作,而該操作將會在一段時間后完成。響應式編程與異步編程非常類似,不過它是基於異步事件(asynchronous event)的,而不是異步操作(asynchronous operation)。異步事件可以沒有一個實際的“開始”,可以在任何時間發生,並且可以發生多次,例如用戶輸入。

  • 響應式編程 
    一種聲明式的編程模式,程序在該模式中對事件做出響應。

如果把一個程序看作一個大型的狀態機,則該程序的行為便可視為它對一系列事件做出響應,即每換一個事件,它就更新一次自己的狀態。這聽起來很抽象和空洞,但實際上並非如此。利用現代的程序框架,響應式編程已經在實際開發中廣泛使用。響應式編程不一定是並發的,但它與並發編程聯系緊密,因此本書介紹了響應式編程的基礎知識。通常情況下,一個並發程序要使用多種技術。大多數程序至少使用了多線程(通過線程池)和異步編程。要大膽地把各種並發編程形式進行混合和匹配,在程序的各個部分使用合適的工具。

 

1.2 異步編程簡介

異步編程有兩大好處。第一個好處是對於面向終端用戶的GUI程序:異步編程提高了響應能力。我們都遇到過在運行時會臨時鎖定界面的程序,異步編程可以使程序在執行任務時仍能響應用戶的輸入。第二個好處是對於服務器端應用:異步編程實現了可擴展性。服務器應用可以利用線程池滿足其可擴展性,使用異步編程后,可擴展性通常可以提高一個數量級。

現代的異步.NET程序使用兩個關鍵字:asyncawaitasync關鍵字加在方法聲明上,它的主要目的是使方法內的await關鍵字生效(為了保持向后兼容,同時引入了這兩個關鍵字)。如果async方法有返回值,應返回Task<T>;如果沒有返回值,應返回Task。這些task類型相當於future,用來在異步方法結束時通知主程序。

圖像說明文字不要用void作為async方法的返回類型!async方法可以返回void,但是這僅限於編寫事件處理程序。一個普通的async方法如果沒有返回值,要返回Task,而不是void

有了上述背景知識,我們來快速看一個例子:

async Task DoSomethingAsync() { int val = 13; //異步方式等待1秒 await Task.Delay(TimeSpan.FromSeconds(1)); val *= 2; //異步方式等待1秒 await Task.Delay(TimeSpan.FromSeconds(1)); Trace.WriteLine(val); }

和其他方法一樣,async方法在開始時以同步方式執行。在async方法內部,await關鍵字對它的參數執行一個異步等待。它首先檢查操作是否已經完成,如果完成了,就繼續運行(同步方式)。否則,它會暫停async方法,並返回,留下一個未完成的task。一段時間后,操作完成,async方法就恢復運行。

一個async方法是由多個同步執行的程序塊組成的,每個同步程序塊之間由await語句分隔。第一個同步程序塊在調用這個方法的線程中運行,但其他同步程序塊在哪里運行呢?情況比較復雜。

最常見的情況是,用await語句等待一個任務完成,當該方法在await處暫停時,就可以捕捉上下文(context)。如果當前SynchronizationContext不為空,這個上下文就是當前SynchronizationContext。如果當前SynchronizationContext為空,則這個上下文為當前TaskScheduler。該方法會在這個上下文中繼續運行。一般來說,運行UI線程時采用UI上下文,處理ASP.NET請求時采用ASP.NET請求上下文,其他很多情況下則采用線程池上下文。

因此,在上面的代碼中,每個同步程序塊會試圖在原始的上下文中恢復運行。如果在UI線程中調用DoSomethingAsync,這個方法的每個同步程序塊都將在此UI線程上運行。但是,如果在線程池線程中調用,每個同步程序塊將在線程池線程上運行。

要避免這種錯誤行為,可以在await中使用ConfigureAwait方法,將參數continueOnCapturedContext設為false。接下來的代碼剛開始會在調用的線程里運行,在被await暫停后,則會在線程池線程里繼續運行:

async Task DoSomethingAsync() { int val = 13; //異步方式等待1秒 await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); val *= 2; //異步方式等待1秒 await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); Trace.WriteLine(val.ToString()); }

圖像說明文字最好的做法是,在核心庫代碼中一直使用ConfigureAwait。在外圍的用戶界面代碼中,只在需要時才恢復上下文。

關鍵字await不僅能用於任務,還能用於所有遵循特定模式的awaitable類型。例如,Windows Runtime API定義了自己專用的異步操作接口。這些接口不能轉化為Task類型,但確實遵循了可等待的(awaitable)模式,因此可以直接使用await。這種awaitable類型在Windows應用商店程序中更加常見,但是在大多數情況下,await使用TaskTask<T>

有兩種基本的方法可以創建Task實例。有些任務表示CPU需要實際執行的指令,創建這種計算類的任務時,使用Task.Run(如需要按照特定的計划運行,則用TaskFactory.StartNew)。其他的任務表示一個通知(notification),創建這種基於事件的任務時,使用TaskCompletionSource<T>。大部分I/O型任務采用TaskCompletionSource<T>

使用asyncawait時,自然要處理錯誤。在下面的代碼中,PossibleExceptionAsync會拋出一個NotSupportedException異常,而TrySomethingAsync方法可很順利地捕捉到這個異常。這個捕捉到的異常完整地保留了棧軌跡,沒有人為地將它封裝進TargetInvocationExceptionAggregateException類:

async Task TrySomethingAsync() { try { await PossibleExceptionAsync(); } catch(NotSupportedException ex) { LogException(ex); throw; } }

一旦異步方法拋出(或傳遞出)異常,該異常會放在返回的Task對象中,並且這個Task對象的狀態變為“已完成”。當await調用該Task對象時,await會獲得並(重新)拋出該異常,並且保留着原始的棧軌跡。因此,如果PossibleExceptionAsync是異步方法,以下代碼就能正常運行:

async Task TrySomethingAsync() { //發生異常時,任務結束。不會直接拋出異常。 Task task = PossibleExceptionAsync(); try { //Task對象中的異常,會在這條await語句中引發 await task; } catch(NotSupportedException ex) { LogException(ex); throw; } }

關於異步方法,還有一條重要的准則:你一旦在代碼中使用了異步,最好一直使用。調用異步方法時,應該(在調用結束時)用await等待它返回的task對象。一定要避免使用Task.WaitTask<T>.Result方法,因為它們會導致死鎖。參考一下下面這個方法:

async Task WaitAsync() { //這里awati會捕獲當前上下文…… await Task.Delay(TimeSpan.FromSeconds(1)); // ……這里會試圖用上面捕獲的上下文繼續執行 } void Deadlock() { //開始延遲 Task task = WaitAsync(); //同步程序塊,正在等待異步方法完成 task.Wait(); }

如果從UI或ASP.NET的上下文調用這段代碼,就會發生死鎖。這是因為,這兩種上下文每次只能運行一個線程。Deadlock方法調用WaitAsync方法,WaitAsync方法開始調用delay語句。然后,Deadlock方法(同步)等待WaitAsync方法完成,同時阻塞了上下文線程。當delay語句結束時,await試圖在已捕獲的上下文中繼續運行WaitAsync方法,但這個步驟無法成功,因為上下文中已經有了一個阻塞的線程,並且這種上下文只允許同時運行一個線程。這里有兩個方法可以避免死鎖:在WaitAsync中使用ConfigureAwait(false)(導致await忽略該方法的上下文),或者用await語句調用WaitAsync方法(讓Deadlock變成一個異步方法)。

圖像說明文字如果使用了async,最好就一直使用它。

若想更全面地了解關於異步編程的知識,可參閱Alex Davies(O'Reilly)編寫的Async in C# 5.0,這本書非常不錯。另外,微軟公司有關異步編程的在線文檔也很不錯,建議你至少讀一讀“async overview”和“Task-based Asynchronous Pattern(TAP) overview”這兩篇。如果要深入了解,官方FAQ和博客上也有大量的信息。

1.3 並行編程簡介

如果程序中有大量的計算任務,並且這些任務能分割成幾個互相獨立的任務塊,那就應該使用並行編程。並行編程可臨時提高CPU利用率,以提高吞吐量,若客戶端系統中的CPU經常處於空閑狀態,這個方法就非常有用,但通常並不適合服務器系統。大多數服務器本身具有並行處理能力,例如ASP.NET可並行地處理多個請求。某些情況下,在服務器系統中編寫並行代碼仍然有用(如果你知道並發用戶數量會一直是少數)。但通常情況下,在服務器系統上進行並行編程,將降低本身的並行處理能力,並且不會有實際的好處。

並行的形式有兩種:數據並行(data parallelism)和任務並行(task parallelim)。數據並行是指有大量的數據需要處理,並且每一塊數據的處理過程基本上是彼此獨立的。任務並行是指需要執行大量任務,並且每個任務的執行過程基本上是彼此獨立的。任務並行可以是動態的,如果一個任務的執行結果會產生額外的任務,這些新增的任務也可以加入任務池。

實現數據並行有幾種不同的做法。一種做法是使用Parallel.ForEach方法,它類似於foreach循環,應盡可能使用這種做法。在3.1節將會詳細介紹Parallel.ForEach方法。Parallel類也提供Parallel.For方法,這類似於for循環,當數據處理過程基於一個索引時,可使用這個方法。下面是使用Parallel.ForEach的代碼例子:

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees) { Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees)); }

另一種做法是使用PLINQ(Parallel LINQ),它為LINQ查詢提供了AsParallel擴展。跟PLINQ相比,Parallel對資源更加友好,Parallel與系統中的其他進程配合得比較好,而PLINQ會試圖讓所有的CPU來執行本進程。Parallel的缺點是它太明顯。很多情況下,PLINQ的代碼更加優美。PLINQ在3.5節有詳細介紹:

IEnumerable<bool> PrimalityTest(IEnumerable<int> values) { return values.AsParallel().Select(val => IsPrime(val)); }

不管選用哪種方法,在並行處理時有一個非常重要的准則。

圖像說明文字每個任務塊要盡可能的互相獨立。

只要任務塊是互相獨立的,並行性就能做到最大化。一旦你在多個線程中共享狀態,就必須以同步方式訪問這些狀態,那樣程序的並行性就變差了。第11章將詳細講述同步。

有多種方式可以控制並行處理的輸出。可以把結果存在某些並發集合,或者對結果進行聚合。聚合在並行處理中很常見,Parallel類的重載方法,也支持這種map/reduce函數。關於聚合的詳細內容在3.2節。

下面講任務並行。數據並行重點在處理數據,任務並行則關注執行任務。

Parallel類的Parallel.Invoke方法可以執行“分叉/聯合”(fork/join)方式的任務並行。3.3節將詳細介紹這個方法。調用該方法時,把要並行執行的委托(delegate)作為傳入參數:

void ProcessArray(double[] array) { Parallel.Invoke( () => ProcessPartialArray(array, 0, array.Length / 2), () => ProcessPartialArray(array, array.Length / 2, array.Length) ); } void ProcessPartialArray(double[] array, int begin, int end) { // CPU密集型的操作…… }

現在Task這個類也被用於異步編程,但當初它是為了任務並行而引入的。任務並行中使用的一個Task實例表示一些任務。可以使用Wait方法等待任務完成,還可以使用ResultException屬性來檢查任務執行的結果。直接使用Task類型的代碼比使用Parallel類要復雜,但是,如果在運行前不知道並行任務的結構,就需要使用Task類型。如果使用動態並行機制,在開始處理時,任務塊的個數是不確定的,只有繼續執行后才能確定。通常情況下,一個動態任務塊要啟動它所需的所有子任務,然后等待這些子任務執行完畢。為實現這個功能,可以使用Task類型中的一個特殊標志:TaskCreationOptions.AttachedToParent。動態並行機制在3.4節中詳述。

跟數據並行一樣,任務並行也強調任務塊的獨立性。委托(delegate)的獨立性越強,程序的執行效率就越高。在編寫任務並行程序時,要格外留意下閉包(closure)捕獲的變量。記住閉包捕獲的是引用(不是值),因此可以在結束時以不明顯地方式地分享這些變量。

對所有並行處理類型來講,錯誤處理的方法都差不多。由於操作是並行執行的,多個異常就會同時發生,系統會把這些異常封裝在AggregateException類里,在程序中拋給代碼。這一特點對所有方法都是一樣的,包括Parallel.ForEachParalle.InvokeTask.Wait等。AggregateException類型有幾個實用的FlattenHandle方法,用來簡化錯誤處理的代碼:

try { Parallel.Invoke(() => { throw new Exception(); }, () => { throw new Exception(); }); } catch (AggregateException ex) { ex.Handle(exception => { Trace.WriteLine(exception); return true; // "已經處理" }); }

通常情況下,沒必要關心線程池處理任務的具體做法。數據並行和任務並行都使用動態調整的分割器,把任務分割后分配給工作線程。線程池在需要的時候會增加線程數量。線程池線程使用工作竊取隊列(work-stealing queue)。微軟公司為了讓每個部分盡可能高效,做了很多優化。要讓程序得到最佳的性能,有很多參數可以調節。只要任務時長不是特別短,采用默認設置就會運行得很好。

圖像說明文字任務不要特別短,也不要特別長。

如果任務太短,把數據分割進任務和在線程池中調度任務的開銷會很大。如果任務太長,線程池就不能進行有效的動態調整以達到工作量的平衡。很難確定“太短”和“太長”的判斷標准,這取決於程序所解決問題的類型以及硬件的性能。根據一個通用的准則,只要沒有導致性能問題,我會讓任務盡可能短(如果任務太短,程序性能會突然降低)。更好的做法是使用Parallel類型或者PLINQ,而不是直接使用任務。這些並行處理的高級形式,自帶有自動分配任務的算法(並且會在運行時自動調整)。

要更深入的了解並行編程,這方面最好的書是Colin Campbell等人編寫的Parallel Programming with Microsoft.NET(微軟出版社)。

1.4 響應式編程簡介

跟並發編程的其他形式相比,響應式編程的學習難度較大。如果對響應式編程不是非常熟悉,代碼維護相對會更難一點。一旦你學會了,就會發現響應式編程的功能特別強大。響應式編程可以像處理數據流一樣處理事件流。根據經驗,如果事件中帶有參數,那么最好采用響應式編程,而不是常規的事件處理程序。

響應式編程基於“可觀察的流”(observable stream)這一概念。你一旦申請了可觀察流,就可以收到任意數量的數據項(OnNext),並且流在結束時會發出一個錯誤(OnError)或一個“流結束”的通知(OnCompleted)。有些可觀察流是不會結束的。實際的接口就像這樣:

interface IObserver<in T> { void OnNext(T item); void OnCompleted(); void OnError(Exception error); } interface IObservable<out T> { IDisposable Subscribe(IObserver<T> observer); }

不過,開發人員不需要實現這些接口。微軟的Reactive Extensions(Rx)庫已經實現了所有接口。響應式編程的最終代碼非常像LINQ,可以認為它就是“LINQ to events”。下面的代碼中,前面是我們不熟悉的操作符(IntervalTimestamp),最后是一個Subscribe,但是中間部分是我們在LINQ中熟悉的操作符:WhereSelect。LINQ具有的特性,Rx也都有。Rx在此基礎上增加了很多它自己的操作符,特別是與時間有關的操作符:

Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(x => x.Value % 2 == 0) .Select(x => x.Timestamp) .Subscribe(x => Trace.WriteLine(x));

上面的代碼中,首先是一個延時一段時間的計數器(Interval),隨后為每個事件加了一個時間戳(Timestamp)。接着對事件進行過濾,只包含偶數值(Where),選擇了時間戳的值(Timestamp),然后當每個時間戳值到達時,把它輸入調試器(Subscribe)。如果沒有理解上述新的操作符(例如Interval),不要緊,我們會在后面講述。現在只要記住這是一個LINQ查詢,與你以前見過的LINQ查詢很類似。主要區別在於:LINQ to Object和LINQ to Entity使用“拉取”模式,LINQ的枚舉通過查詢拉出數據。而LINQ to event(Rx)使用“推送”模式,事件到達后就自行穿過查詢。

可觀察流的定義和其訂閱是互相獨立的。上面最后一個例子與下面的代碼等效:

IObservable<DateTimeOffset> timestamps = Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(x => x.Value % 2 == 0) .Select(x => x.Timestamp); timestamps.Subscribe(x => Trace.WriteLine(x));

一種常規的做法是把可觀察流定義為一種類型,然后將其作為IObservable<T>資源使用。其他類型可以訂閱這些流,或者把這些流與其他操作符組合,創建另一個可觀察流。

Rx的訂閱也是一個資源。Subscribe操作符返回一個IDisposable,即表示訂閱完成。當你響應了那個可觀察流,就得處理這個訂閱。

對於hot observable(熱可觀察流)和cold observable(冷可觀察流)這兩種對象,訂閱的做法各有不同。一個hot observable對象是指一直在發生的事件流,如果在事件到達時沒有訂閱者,事件就丟失了。例如,鼠標的移動就是一個hot observable對象。cold observable對象是始終沒有輸入事件(不會主動產生事件)的觀察流,它只會通過啟動一個事件隊列來響應訂閱。例如,HTTP下載是一個cold observable對象,只有在訂閱后才會發出HTTP請求。

同樣,所有Subscribe操作符都需要有處理錯誤的參數。前面的例子沒有錯誤處理參數。下面則是一個更好的例子,在可觀察流發生錯誤時,它能正確處理:

Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(x => x.Value % 2 == 0) .Select(x => x.Timestamp) .Subscribe(x => Trace.WriteLine(x), ex => Trace.WriteLine(ex));

在進行Rx實驗性編程時,Subject<T>這個類型很有用。這個“subject”就像手動實現一個可觀察流。可以在代碼中調用OnNextOnErrorOnCompleted,這個subject會把這些調用傳遞給訂閱者。Subject<T>用於實驗時效果非常不錯,但在實際產品開發時,應該使用第5章介紹的操作符。

Rx的操作符非常多,本書只介紹了一部分。想了解關於Rx的更多信息,建議閱讀優秀的在線圖書Introduction to Rx。

1.5 數據流簡介

TPL數據流很有意思,它把異步編程和並行編程這兩種技術結合起來。如果需要對數據進行一連串的處理,TPL數據流就很有用。例如,需要從一個URL上下載數據,接着解析數據,然后把它與其他數據一起做並行處理。TPL數據流通常作為一個簡易的管道,數據從管道的一端進入,在管道中穿行,最后從另一端出來。不過,TPL數據流的功能比普通管道要強大多了。對於處理各種類型的網格(mesh),在網格中定義分叉(fork)、連接(join)、循環(loop)的工作,TPL數據流都能正確地處理。當然了,大多數時候TPL數據流網格還是被用作管道。

數據流網格的基本組成單元是數據流塊(dataflow block)。數據流塊可以是目標塊(接收數據)或源塊(生成數據),或兩者皆可。源塊可以連接到目標塊,創建網格。連接的具體內容在4.1節介紹。數據流塊是半獨立的,當數據到達時,數據流塊會試圖對數據進行處理,並且把處理結果推送給下一個流程。使用TPL數據流的常規方法是創建所有的塊,再把它們鏈接起來,然后開始在一端填入數據。然后,數據會自行從另一端出來。再強調一次,數據流的功能比這要強大得多,數據穿過的同時,可能會斷開連接、創建新的塊並加入到網格,不過這是非常高級的使用場景。

目標塊帶有緩沖區,用來存放收到的數據。因此,在還來不及處理數據的時候,它仍能接收新的數據項,這就讓數據可以持續地在網格上流動。在有分叉的情況下,一個源塊鏈接了兩個目標塊,這種緩沖機制就會產生問題。當源塊有數據需要傳遞下去時,它會把數據傳給與它鏈接的塊,並且一次只傳一個數據。默認情況下,第一個目標塊會接收數據並緩存起來,而第二個目標塊就收不到任何數據。解決這個問題的方法是把目標塊設置為“非貪婪”模式,以限制緩沖區的數量,這部分將在4.4節介紹。

如果某些步驟出錯,例如委托在處理數據項時拋出異常,數據流塊就會出錯。數據流塊出錯后就會停止接收數據。默認情況下,一個塊出錯不會摧毀整個網格。這讓程序有能力重建部分網格,或者對數據重新定向。然而這是一個高級用法。通常來講,你是希望這些錯誤通過鏈接傳遞給目標塊。數據流也提供這個選擇,唯一比較難辦的地方是當異常通過鏈接傳遞時,它就會被封裝在AggregateException類中。因此,如果管道很長,最后異常的嵌套層次會非常多,這時就可以使用AggregateException.Flatten方法:

try { var multiplyBlock = new TransformBlock<int, int>(item => { if (item == 1) throw new InvalidOperationException("Blech."); return item * 2; }); var subtractBlock = new TransformBlock<int, int>(item => item - 2); multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true }); multiplyBlock.Post(1); subtractBlock.Completion.Wait(); } catch (AggregateException exception) { AggregateException ex = exception.Flatten(); Trace.WriteLine(ex.InnerException); }

數據流錯誤的處理方法將在4.2節詳細介紹。

數據流網格給人的第一印象是與可觀察流非常類似,實際上它們確實有很多共同點。網格和流都有“數據項”這一概念,數據項從網格或流的中間穿過。還有,網格和流都有“正常完成”(表示沒有更多數據需要接收時發出的通知)和“不正常完成”(在處理數據中發生錯誤時發出的通知)這兩個概念。但是,Rx和TPL數據流的性能並不相同。如果執行需要計時的任務,最好使用Rx的observable對象,而不是數據流塊。如果進行並行處理,最好使用數據流塊,而不是Rx的observable對象。從概念上說,Rx更像是建立回調函數:observable對象中的每個步驟都會直接調用下一步。相反,數據流網格中的每一塊都是互相獨立的。Rx和TPL數據流有各自的應用領域,也有一些交叉的領域。另一方面,Rx和TPL數據流也非常適合同時使用。Rx和TPL數據流的互操作性將在7.7節詳細介紹。

最常用的塊類型有TransformBlock<TInput, TOutput>(與LINQ的Select類似)、TransformManyBlock<TInput, Toutput>(與LINQ的SelectMany類似)和ActionBlock<T>(為每個數據項運行一個委托)。要了解TPL數據流的更多知識,建議閱讀MSDN的文檔和Guide to Implementing Custom TPL Dataflow Blocks。

1.6 多線程編程簡介

線程是一個獨立的運行單元,每個進程內部有多個線程,每個線程可以各自同時執行指令。每個線程有自己獨立的棧,但是與進程內的其他線程共享內存。對某些程序來說,其中有一個線程是特殊的,例如用戶界面程序有一個UI線程,控制台程序有一個main線程。

每個.NET程序都有一個線程池,線程池維護着一定數量的工作線程,這些線程等待着執行分配下來的任務。線程池可以隨時監測線程的數量。配置線程池的參數多達幾十個,但是建議采用默認設置,線程池的默認設置是經過仔細調整的,適用於絕大多數現實中的應用場景。

應用程序幾乎不需要自行創建新的線程。你若要為COM interop程序創建STA線程,就得創建線程,這是唯一需要線程的情況。

線程是低級別的抽象,線程池是稍微高級一點的抽象,當代碼段遵循線程池的規則運行時,線程池就會在需要時創建線程。本書介紹的技術抽象級別更高:並行和數據流的處理隊列會根據情況遵循線程池運行。抽象級別更高,正確代碼的編寫就更容易。

基於這個原因,本書根本不介紹ThreadBackgroundWorker這兩種類型。它們曾經非常流行,但那個時代已經過去了。

1.7 並發編程的集合

並發編程所用到的集合有兩類:並發集合和不可變集合。這兩種類別的集合將在第8章詳細介紹。多個線程可以用安全的方式同時更新並發集合。大多數並發集合使用快照(snapshot),當一個線程在增加或刪除數據時,另一個線程也能枚舉數據。比起給常規集合加鎖以保護數據的方式,采用並發集合的方式要高效得多。

不可變集合則有些不同。不可變集合實際上是無法修改的。要修改一個不可變集合,需要建立一個新的集合來代表這個被修改了的集合。這看起來效率非常低,但是不可變集合的各個實例之間盡可能多地共享存儲區,因此實際上效率沒想象得那么差。不可變集合的優點之一,就是所有的操作都是簡潔的,因此特別適合在函數式代碼中使用。

1.8 現代設計

大多數並發編程技術有一個類似點:它們本質上都是函數式(functional)的。這里“functional”的意思不是“實用,能完成任務”1,而是把它作為一種基於函數組合的編程模式。如果你接受函數式的編程理念,並發編程的設計就會簡單得多。

1英文中“函數式”和“實用”是同一個單詞functional。——譯者注

函數式編程的一個原則就是簡潔(換言之,就是避免副作用)。解決方案中的每一個片段都用一些值作為輸入,生成一些值作為輸出。應該盡可能避免讓這些段落依賴於全局(或共享)變量,或者修改全局(或共享)數據結構。不論這個片段是異步方法、並行任務、Rx操作還是數據流塊,都應該這么做。當然了,具體做法遲早會受到計算內容的影響,但如果能用簡潔的段落來處理,然后用結果來執行更新,代碼就會更加清晰。

函數式編程的另一個原則是不變性。不變性是指一段數據是不能被修改的。在並發編程中使用不可變數據的原因之一,是程序永遠不需要對不可變數據進行同步。數據不能修改,這一事實讓同步變得沒有必要。不可變數據也能避免副作用。在編寫本書時(2014年),雖然不可變數據還沒有被廣泛接受,但本書中有幾節會介紹不可變數據結構。

1.9 技術要點總結

在.NET剛推出時,就對異步編程提供了一定的支持。但是異步編程一直是很難的,直到2012年.NET 4.5(同時發布C# 5.0和VB 2012)引入asyncawait這兩個關鍵字。本書中的異步編程方法,將全部采用現代的async/await。同時介紹一些方法,來實現async和老式異步編程模式的交互。要支持老式平台的話,需要下載NuGet包Microsoft.Bcl.Async

圖像說明文字不要在基於.NET 4.0的ASP.NET代碼中使用Microsoft.Bcl.Async進行異步編程!在.NET中,ASP.NET管道已經進行修改以支持async。對於異步ASP.NET項目,必須使用.NET 4.5或更高版本。

.NET 4.0引入了任務並行庫(TPL),完全支持數據並行和任務並行。但是一些資源較少的平台(例如手機),通常不支持TPL。TPL是.NET框架自帶的。

Reactive Extensions團隊已經讓它盡可能多地支持多種平台。Reactive Extensions和asyncawait一樣,對所有類型的應用都有好處,包括客戶端和服務器端應用。Rx在NuGet包Rx-Main中。

TPL數據流庫只支持較新的平台,它的官方版本在NuGet包Microsoft.Tpl.Dataflow中。

並發編程的集合是.NET框架的一部分,但是不可變集合在NuGet包Microsoft.Bcl.Immutable中。表1-1列出了各主流平台對各種技術的支持情況。

表1-1:各平台對並發編程的支持

平台

async

並行編程

Rx

數據流

並發集合

不可變集合

.NET 4.5

.NET 4.0

×

×

Mono iOS/Droid

Windows Store

Windows Phone Apps 8.1

Windows Phone SL 8.0

×

×

Windows Phone SL 7.1

×

×

×

×

Silverlight 5

×

×

×

×

目錄


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM