線程 Z


原文:http://www.albahari.com/threading/part5.aspx

專題:C#中的多線程

1並行編程Permalink

在這一部分,我們討論 Framework 4.0 加入的多線程 API,它們可以充分利用多核處理器。

這些 API 可以統稱為 PFX(Parallel Framework,並行框架)。Parallel類與任務並行構造一起被稱為 TPL(Task Parallel Library,任務並行庫)。

Framework 4.0 也增加了一些更底層的線程構造,它們針對傳統的多線程。我們之前講過的:

在繼續閱讀前,你需要了解第 1 部分 - 第 4 部分中的基本原理,特別是線程安全

並行編程這一部分提供的所有代碼都可以在LINQPad中試驗。LINQPad 是一個 C# 代碼草稿板,可以用來測試代碼段,而無需創建類、項目或解決方案。想要獲取這些示例代碼,可以在 LINQPad 左下方的 Samples 標簽頁中點擊 Download More Samples,並且選擇 C# 4.0 in a Nutshell: More Chapters。(譯者注:現在應該是 C# 5.0 in a Nutshell 和 C# 6.0 in a Nutshell 了)

2為何需要 PFX?Permalink

近年來,CPU 時鍾頻率發展陷於停滯,制造商已經將重心轉移至增加核心數量。這對我們程序員來說是個問題,因為標准的單線程代碼無法自動利用那些增加的核心來提升程序運行速度。

利用多個核心對大多數服務端應用程序來說很容易,每個線程可以獨立處理單獨的客戶端請求,但在桌面環境下就不那么容易了,因為通常這需要你優化計算密集型代碼,按如下步驟進行:

  1. 將工作分解成塊。
  2. 多線程並行處理這些工作塊。
  3. 以線程安全和高效的方式整理結果。

盡管你可以使用傳統的多線程構造,但那比較笨拙,尤其是在分解工作和整理結果的步驟。並且,為確保線程安全,通常的策略是使用,而它在很多線程同時訪問一份數據時會導致大量競爭。

PFX 庫就是專門被設計用來為這些場景提供幫助的。

利用多核心或多處理器的編程被稱為並行編程(parallel programming)。它是多線程這個更寬泛概念的子集。

2.1PFX 概念Permalink

有兩種分解工作的策略:數據並行(data parallelism)和任務並行(task parallelism)。

當一系列任務需要處理很多數據時,可以讓每個線程都執行這一系列(相同的)任務來處理一部分數據(即所有數據的一個子集)。這樣實現的並行化稱為數據並行,因為我們是為線程分解了數據。與此相對,任務並行是指對任務進行分解,換句話說就是讓每個線程執行不同的任務。

通常,對高度並行的硬件來說,數據並行更簡單,可伸縮性也更好,因為它減少或消除了共享數據(也就減少了競爭和線程安全問題)。並且,事實上一般都是數據比任務要多,所以數據並行可以增加並發的可能。

數據並行也有利於結構化並行(structured parallelism),意思是說並行工作單元的啟動和完成是在程序中的同一位置。相對的,任務並行趨向於非結構化,就是說並行工作單元的啟動和完成可 能分散在程序各處。結構化並行比較簡單,並且不易出錯,也讓你可以把工作分解和線程協調(甚至包括結果整理)這些復雜的任務交給 PFX 庫來完成。

2.2PFX 組件Permalink

PFX 包含兩層功能。上層是由結構化數據並行 API:PLINQParallel類組成。下層包含任務並行的類,以及一組額外的構造,來幫助你實現並行編程。

並行編程組件

PLINQ 提供了最豐富的功能:它能夠自動化並行的所有步驟,包括分解工作、多線程執行、最后把結果整理成一個輸出序列。它被稱為聲明式(declarative) 的,因為你只是聲明希望並行化你的工作(構造一個 LINQ 查詢),然后讓 Framework 來處理實現細節。相對的,另一種方式是指令式(imperative)的,這種方式是需要你顯式編寫代碼來處理工作分解和結果整理。例如使用Parallel類時,你必須自己整理結果,而如果使用任務並行構造,你還必須自己分解工作。

  分解工作 整理結果
PLINQ    
Parallel   -
PFX 的任務並行 - -

並發集合自旋基元可 以幫助你實現低層次的並行編程。這很重要,因為 PFX 不僅被設計適用於當今的硬件,也適用於未來更多核心的處理器。如果你希望搬運一堆木塊,並且有 32 個工人,最麻煩的是如何讓工人們搬運木塊時不互相擋道。這與把算法分解運行在 32 個核心上類似:如果普通的鎖被用於保護公共資源,所產生的阻塞可能意味着同時只有一小部分核心真正在工作。並發集合專門針對於高並發訪問,致力於最小化或消除阻塞。PLINQ 和 Parallel類就依賴於並發集合和自旋基元來實現高效的工作管理。

PFX 與傳統的多線程Permalink

傳統多線程的場景是,即使在單核的機器上,使用多線程也有好處,而此時並沒有真正的並行發生。就像我們之前討論過的:保持用戶界面的響應以及同時下載兩個網頁。

這一部分將要講到的一些構造有時對於傳統多線程也有用。特別是:

2.3何時使用 PFXPermalink

PFX 主要用於並行編程:充分利用多核處理器來加速執行計算密集型代碼。

充分利用多個核心的挑戰在於阿姆達爾定律(Amdahl’s law),它指出通過並行化產生的最大性能提升,取決於有多少必須順序執行的代碼段。例如,如果一個算法只有三分之二的執行時間可以並行,即使有無數核心,也無法獲得超過三倍的性能提升。

因此,在使用 PFX 前,有必要先檢查可並行代碼中的瓶頸。還需要考慮下,你的代碼是否有必要是計算密集的,優化這里往往是最簡單有效的方法。然而,這也需要平衡,因為一些優化技術會使代碼難以並行化。

最容易獲益的是“不好意思不並行的問題(embarrassingly parallel problems)”:工作可以很容易地被分解為多個任務,每個任務自己可以高效執行(結構化並行非常適合這種問題)。例如:很多圖片處理任務、光線跟蹤 算法、數學和密碼學方面的暴力計算和破解。而相反的例子是:實現快速排序算法的優化版本,想把它實現得好需要一定思考,並且可能需要非結構化並行。

3PLINQPermalink

PLINQ 會自動並行化本地的 LINQ 查詢。其優勢在於使用簡單,因為將工作分解和結果整理的負擔交給了 Framework。

使用 PLINQ 時,只要在輸入序列上調用AsParallel(),然后像平常一樣繼續 LINQ 查詢就可以了。下邊的查詢計算 3 到 100,000 內的素數,這會充分利用目標機器上的所有核心。

// 使用一個簡單的(未優化)算法計算素數。 // // 注意:這一部分提供的所有代碼都可以在 LINQPad 中試驗。 IEnumerable<int> numbers = Enumerable.Range (3, 100000-3); var parallelQuery = from n in numbers.AsParallel() where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0) select n; int[] primes = parallelQuery.ToArray(); 

AsParallelSystem.Linq.ParallelEnumerable中的一個擴展方法。它使用ParallelQuery<TSource>來封裝輸入,就會將你隨后調用的 LINQ 查詢操作符綁定在ParallelEnumerable中定義的另外一組擴展方法上。它們提供了所有標准查詢操作符的並行化實現。本質上,它們就是將輸入序列進行分區,形成工作塊,並在不同的線程上執行,之后再將結果整理成一個輸出序列:

PLINQ 執行

調用AsSequential()可以拆封ParallelQuery,使隨后的查詢操作符綁定到標准查詢操作符來順序執行。在調用有副作用或非線程安全的方法前,有必要這樣做。

對於那些接受兩個輸入序列的查詢操作符(JoinGroupJoinContactUnionIntersectZip)來說,必須在這兩個輸入序列上都使用AsParallel()(否則將拋出異常)。然而,不需要為中間過程的查詢使用AsParallel,因為 PLINQ 的查詢操作符會輸出另一個ParallelQuery序列。實際上,在這個輸出序列上再次調用AsParallel會降低效率,它會強制對序列進行合並和重新分區。

mySequence.AsParallel() // 使用 ParallelQuery<int> 封裝序列 .Where (n => n > 100) // 輸出另一個 ParallelQuery<int> .AsParallel() // 不需要,會降低效率! .Select (n => n * n) 

並非所有的查詢操作符都可以被有效地並行化。對於那些不能的,PLINQ 使用了順序的實現。如果 PLINQ 認為並行化的開銷實際會使查詢變慢,它也會順序執行。

PLINQ 僅適用於本地集合:它無法在 LINQ to SQL 或 Entity Framework 中使用,因為在那些場景中,LINQ 會被翻譯成 SQL 語句,然后在數據庫服務器上執行。然而,你可以使用 PLINQ 對從數據庫查詢獲得的結果執行進一步的本地查詢。

如果 PLINQ 查詢拋出異常,它會被封裝進AggregateException重新拋出,其InnerExceptions屬性包含真正的異常。詳見使用 AggregateException

為什么 AsParallel 不是默認的?Permalink

我們知道AsParallel可以透明的並行化 LINQ 查詢,那么問題來了,“微軟為什么不直接並行化標准查詢操作符,使 PLINQ 成為默認的?”

有很多原因使其成為這種選擇使用(opt-in)的方式。首先,要使 PLINQ 有用,必須要有一定數量的計算密集型任務,它們可以被分配到多個工作線程。大多數 LINQ to Objects 的查詢執行非常快,根本不需要並行化,並行化過程中的任務分區、結果整理以及線程協調反而會使程序變慢。

其次:

  • PLINQ 查詢的輸出(默認情況下)在元素排序方面不同於 LINQ 查詢
  • PLINQ 將異常封裝在AggregateException中(能夠處理拋出的多個異常)。
  • 如果查詢引用了非線程安全的方法,PLINQ 會給出不可靠的結果。

最后,PLINQ 為了進行微調提供了一些鈎子(hook)。把這些累贅加入標准的 LINQ to Objects 的 API 會增加使用障礙。

3.1並行執行的特征Permalink

與普通的 LINQ 查詢一樣,PLINQ 查詢也是延遲估值的。這意味着只有當結果開始被使用時,查詢才會被觸發執行。通常結果是通過一個foreach循環被使用(通過轉換操作符也會觸發,例如ToArray,還有返回單個元素或值的操作符)。

當枚舉結果時,執行過程與普通的順序查詢略有不同。順序查詢完全由使用方通過“拉”的方式驅動:每個元素都在使用方需要時從輸入序列中被提取。並行 查詢通常使用獨立的線程從輸入序列中提取元素,這可能比使用方的需要稍微提前了一些(很像一個給播報員使用的提詞機,或者 CD 機中的防震緩沖區)。然后通過查詢鏈並行處理這些元素,將結果保存在一個小緩沖區中,以准備在需要的時候提供給使用方。如果使用方在枚舉過程中暫停或中 斷,查詢也會暫停或停止,這樣可以不浪費 CPU 時間或內存。

你可以通過在AsParallel之后調用WithMergeOptions來調整 PLINQ 的緩沖行為。默認值AutoBuffered通常能產生最佳的整體效果;NotBuffered禁用緩沖,如果你希望盡快看到結果可以使用這個;FullyBuffered在呈現給使用方前緩存整個查詢的輸出(OrderByReverse操作符天生以這種方式工作,取元素、聚合和轉換操作符也是一樣)。

3.2PLINQ 與排序Permalink

並行化查詢操作符的一個副作用是:當整理結果時,不一定能與它們提交時的順序保持一致,就如同之前圖中所示的那樣。換句話說,就是無法像普通的 LINQ 那樣能保證序列的正常順序。

如果你需要保持序列順序,可以通過在AsParallel后調用AsOrdered()來強制它保證:

myCollection.AsParallel().AsOrdered()... 

在大量元素的情況下調用AsOrdered會造成一定性能損失,因為 PLINQ 必須跟蹤每個元素原始位置。

之后你可以通過調用AsUnordered來取消AsOrdered的效果:這會引入一個“隨機洗牌點(random shuffle point)”,允許查詢從這個點開始更高效的執行。因此,如果你希望僅為前兩個查詢操作保持輸入序列的順序,可以這樣做:

inputSequence.AsParallel().AsOrdered() .QueryOperator1() .QueryOperator2() .AsUnordered() // 從這開始順序無關緊要 .QueryOperator3() // ... 

AsOrdered不是默認的,因為對於大多數查詢來說,原始的輸入順序無關緊要。換句話說,如果AsOrdered是默認的,你就不得不為大多數並行查詢使用AsUnordered來獲得最好的性能,這會成為負擔。

3.3PLINQ 的限制Permalink

目前,PLINQ 在能夠並行化的操作上有些實用性限制。這些限制可能會在之后的更新包或 Framework 版本中解決。

下列查詢操作符會阻止查詢的並行化,除非源元素是在它們原始的索引位置:

  • TakeTakeWhileSkipSkipWhile
  • SelectSelectManyElementAt這幾個操作符的帶索引版本

大多數查詢操作符都會改變元素的索引位置(包括可能移除元素的那些操作符,例如Where)。這意味着如果你希望使用上述操作符,就要在查詢開始的地方使用。

下列查詢操作符可以並行化,但會使用代價高昂的分區策略,有時可能比順序執行還慢。

  • JoinGroupByGroupJoinDistinctUnionIntersectExcept

Aggregate操作符的帶種子(seed)的重載是不能並行化的,PLINQ 提供了專門的重載來解決。

其它所有操作符都是可以並行化的,然而使用這些操作符並不能確保你的查詢會被並行化。如果 PLINQ 認為進行分區的開銷會導致部分查詢變慢,它也許會順序執行查詢。你可以覆蓋這個行為,方法是在AsParallel()之后調用如下代碼來強制並行化:

.WithExecutionMode (ParallelExecutionMode.ForceParallelism) 

3.4例:並行拼寫檢查Permalink

假設我們希望實現一個拼寫檢查程序,它在處理大文檔時,能夠通過充分利用所有可用的核心來快速運行。我們把算法設計成一個 LINQ 查詢,這樣就可以很容易的並行化它。

第一步是下載英文單詞字典,為了能夠高效查找,將其放在一個HashSet中:

if (!File.Exists ("WordLookup.txt")) // 包含約 150,000 個單詞 new WebClient().DownloadFile ( "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt"); var wordLookup = new HashSet<string> ( File.ReadAllLines ("WordLookup.txt"), StringComparer.InvariantCultureIgnoreCase); 

然后,使用wordLookup來創建一個測試“文檔”,該“文檔”是個包含了一百萬個隨機單詞的數組。創建完數組后,引入兩個拼寫錯誤:

var random = new Random(); string[] wordList = wordLookup.ToArray(); string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] = "woozsh"; // 引入兩個 wordsToTest [23456] = "wubsie"; // 拼寫錯誤 

現在,通過對比wordLookup檢查wordsToTest,來完成這個並行的拼寫檢查程序。PLINQ 讓這變得很簡單:

var query = wordsToTest .AsParallel() .Select ((word, index) => new IndexedWord { Word=word, Index=index }) .Where (iword => !wordLookup.Contains (iword.Word)) .OrderBy (iword => iword.Index); query.Dump(); // 在 LINQPad 中顯示輸出 

下邊是 LINQPad 中的顯示的輸出:

LINQPad的查詢輸出

IndexedWord是一個自定義的結構體,定義如下:

struct IndexedWord { public string Word; public int Index; } 

判定器中的wordLookup.Contains方法作為查詢的主要部分,它使得這個查詢值得並行化。

我們可以使用匿名類型來代替IndexedWord結構體,從而稍微簡化下這個查詢。然而這會降低性能,因為匿名類型(是類,因此是引用類型)會產生分配堆內存的開銷,以及之后的垃圾回收。

這個區別對於順序查詢來說沒太大關系,但對於並行查詢來說,基於棧的內存分配則相當有利。這是因為基於棧的內存分配是可以高度並行化的(因為每個線程有其自己的棧),反之基於堆的內存分配會使所有線程競爭同一個堆,它是由單一的內存管理器和垃圾回收器管理的。

使用 ThreadLocal<T>Permalink

來擴展一下我們的例子,讓創建隨機測試單詞列表的過程並行化。我們把它作為 LINQ 查詢來構造,這樣事情就簡單多了。以下是順序執行版本:

string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); 

不幸的是,對Random.Next的調用不是線程安全的,所以實現並行化不是向查詢語句直接插入AsParallel()這么簡單。一個可能的解決辦法是寫個方法對random.Next加鎖,然而這會限制並發能力。更好的處理辦法是使用ThreadLocal<Random>為每個線程創建獨立的Random對象。然后我們可以使用如下代碼來並行化查詢:

var localRandom = new ThreadLocal<Random> ( () => new Random (Guid.NewGuid().GetHashCode()) ); string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel() .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)]) .ToArray(); 

在實例化Random對象的工廠方法中,我們傳遞了一個Guid的散列值,用來確保:如果兩個Random對象在很短的時間范圍內被創建,它們可以生成不同的隨機數序列。

何時使用 PLINQPermalink

在你的程序中尋找 LINQ 查詢,嘗試並行化它們貌似是很誘人的。然而這通常沒什么用,因為絕大多數明顯應該使用 LINQ 的地方執行都很快,所以並行化並沒有什么好處。更好的方法是找到 CPU 密集型工作的瓶頸,然后考慮“這能寫成 LINQ 查詢嗎?”(這樣重構的一個好處是 LINQ 通常可以使代碼變得更短,並且更具可讀性。)

PLINQ 非常適合於“不好意思不並行的問題(embarrassingly parallel problems)”。它也能很好的應用於結構化阻塞任務(structured blocking tasks),例如同時調用多個 web 服務(見調用阻塞或 I/O 密集型功能)。

對於圖像處理來說 PLINQ 是個糟糕的選擇,因為整理幾百萬個像素到輸出序列將形成瓶頸。更好的方法是把像素直接寫入數組或非托管的內存塊,然后使用Parallel類或任務並行來管理多線程。(也可以使用ForAll來繞過結果整理。如果該圖像處理算法天生適合 LINQ,這么做可能有益。)

3.5純方法Permalink

(譯者注:pure function 譯為純方法,是指一個方法 / 函數不能改變任何狀態,也不能進行任何 I/O 操作,它的返回值不能依賴任何可能被改變的狀態,並且使用相同的輸入調用就會產生相同的輸出。)

因為 PLINQ 會在並行的線程上運行查詢,因此必須注意不要執行非線程安全的操作。特別需要注意,對變量進行寫操作有副作用(side-effecting),是非線程安全的。

// 下列查詢將每個元素與其索引相乘。 // 給定一個 0 到 999 的輸入序列, 它應該輸出元素的平方。 int i = 0; var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++; 

可以通過使用鎖或Interlocked來確保i的自增是線程安全的,但是問題仍然存在,i並不能保證對應輸入元素的原始索引。並且加上AsOrdered也無法解決這個問題,因為AsOrdered僅僅確保輸出是按順序的,就像順序執行的輸出順序一樣。但這並不意味着實際的處理過程也是按順序的。

替代方法是將這個查詢重寫,使用帶索引的Select版本。

var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i); 

為了達到最佳性能,任何被查詢操作符調用的方法必須是線程安全的:不要給字段或屬性賦值(無副作用,純方法)。如果用來保證線程安全,查詢的並行能力將會受到限制。這個限制可以通過鎖定的持續時間除以花費在方法上的總時間來計算。

3.6調用阻塞或 I/O 密集型功能Permalink

有時一個查詢的長時間運行並不是因為是 CPU 密集型操作,而是因為它在等待某些東西,例如等待網頁下載或是硬件的響應。PLINQ 能夠有效地並行化這種類型的查詢,可以通過在AsParallel后調用WithDegreeOfParallelism來提示這種特征。例如,假設我們希望同時 ping 6 個網站。比起使用異步委托或手動讓 6 個線程自旋,使用 PLINQ 查詢可以輕松實現它:

from site in new[] { "www.albahari.com", "www.linqpad.net", "www.oreilly.com", "www.takeonit.com", "stackoverflow.com", "www.rebeccarey.com" } .AsParallel().WithDegreeOfParallelism(6) let p = new Ping().Send (site) select new { site, Result = p.Status, Time = p.RoundtripTime } 

WithDegreeOfParallelism強制 PLINQ 同時運行指定數量的任務。在調用阻塞方法(例如Ping.Send)時有必要這么做,否則的話,PLINQ 會認為這個查詢是 CPU 密集型的,並進行相應的任務分配。在雙核機器上,PLINQ 會默認同時運行 2 個任務,對於上述情況來說這顯然不是我們希望看到的。

線程池的影響,PLINQ 通常為每個任務分配一個線程。可以通過調用ThreadPool.SetMinThreads來加速初始線程的創建速度。

再給一個例子:假設我們要實現一個監控系統,希望它不斷將來自 4 個安全攝像頭的圖像合並成一個圖像,並在閉路電視上顯示。使用下邊的類來表示一個攝像頭:

class Camera { public readonly int CameraID; public Camera (int cameraID) { CameraID = cameraID; } // 獲取來自攝像頭的圖像: 返回一個字符串來代替圖像 public string GetNextFrame() { Thread.Sleep (123); // 模擬獲取圖像的時間 return "Frame from camera " + CameraID; } } 

要獲取一個合成圖像,我們必須分別在 4 個攝像頭對象上調用GetNextFrame。假設操作主要是受 I/O 影響的,通過並行化我們能將幀率提升 4 倍,即使是在單核機器上。PLINQ 使用一小段程序就能實現它:

Camera[] cameras = Enumerable.Range (0, 4) // 創建 4 個攝像頭對象 .Select (i => new Camera (i)) .ToArray(); while (true) { string[] data = cameras .AsParallel().AsOrdered().WithDegreeOfParallelism (4) .Select (c => c.GetNextFrame()).ToArray(); Console.WriteLine (string.Join (", ", data)); // 顯示數據... } 

GetNextFrame是一個阻塞方法,所以我們使用了WithDegreeOfParallelism來獲得期望的並發度。在我們的例子中,阻塞是在調用Sleep時發生。而在真實情況下,阻塞的發生是因為從攝像頭中獲取圖像是 I/O 密集型操作,而不是 CPU 密集型操作。

調用AsOrdered可以確保圖像按照一致的順序顯示。因為序列中只有 4 個元素,所以它對性能的影響可以忽略不計。

改變並發度Permalink

在一個 PLINQ 查詢內,僅能夠調用WithDegreeOfParallelism一次。如果你需要再次調用它,必須在查詢中通過再次調用AsParallel()強制進行查詢的合並和重新分區:

"The Quick Brown Fox" .AsParallel().WithDegreeOfParallelism (2) .Where (c => !char.IsWhiteSpace (c)) .AsParallel().WithDegreeOfParallelism (3) // 強制合並和重新分區 .Select (c => char.ToUpper (c)) 

3.7取消Permalink

當在foreach循環中使用 PLINQ 查詢的結果時,取消該查詢很簡單:使用break退出循環就可以了。查詢會被自動取消,因為枚舉器會被隱式銷毀。

對於結束一個使用轉換、取元素或聚合操作符的查詢來說,你可以在其它線程使用取消標記來取消它。在AsParallel后調用WithCancellation來添加一個標記,並把CancellationTokenSource對象的Token屬性作為參數傳遞。之后另一個線程就可以在這個CancellationTokenSource對象上調用Cancel,它會在查詢的使用方那邊拋出OperationCanceledException異常。

IEnumerable<int> million = Enumerable.Range (3, 1000000); var cancelSource = new CancellationTokenSource(); var primeNumberQuery = from n in million.AsParallel().WithCancellation (cancelSource.Token) where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0) select n; new Thread (() => { Thread.Sleep (100); // 在 100 毫秒后 cancelSource.Cancel(); // 取消查詢 } ).Start(); try { // 開始運行查詢 int[] primes = primeNumberQuery.ToArray(); // 永遠到不了這里,因為其它線程會進行取消操作。 } catch (OperationCanceledException) { Console.WriteLine ("Query canceled"); } 

PLINQ 不會直接中止線程,因為這么做是危險的。在取消時,它會等待所有工作線程處理完當前的元素,然后結束查詢。這意味着查詢調用的任何外部方法都會執行完成。

3.8優化 PLINQPermalink

輸出端優化Permalink

PLINQ 的一個優點是它能夠很容易地將並行化任務的結果整理成一個輸出序列。然而有時,最終要做的是在輸出序列的每個元素上運行一些方法:

foreach (int n in parallelQuery) DoSomething (n); 

如果是上述情況,並且不關心元素的處理順序,那么可以使用 PLINQ 的ForAll方法來提高效率。

ForAll方法在ParallelQuery的每個輸出元素上運行一個委托。它直接掛鈎(hook)到 PLINQ 內部,繞過整理和枚舉結果的步驟。舉個栗子:

"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write); 

ForAll 方法

整理和枚舉結果的開銷不是非常大,所以當有大量輸入元素且處理執行很快的時候,才能最大化ForAll優化的收益。

輸入端優化Permalink

PLINQ 有 3 種分區策略,用來分配輸入元素到線程:

策略 元素分配 相對性能
塊分區(Chunk partitioning) 動態 平均
范圍分區(Range partitioning) 靜態 差 - 極好
散列分區(Hash partitioning) 靜態

對於那些需要比較元素的查詢操作符(GroupByJoinGroupJoinIntersectExceptUnionDistinct),PLINQ 總是使用散列分區。散列分區相對低效,因為它必須預先計算每個元素的散列值(擁有同樣散列值的元素會在同一個線程中被處理)。如果發現運行太慢,唯一的選擇是調用AsSequential來禁止並行處理。

對於其它所有查詢操作符,你可以選擇使用范圍分區或塊分區。默認情況下:

  • 如果輸入序列可以通過索引訪問(數組或是IList<T>的實現),PLINQ 選用范圍分區。
  • 否則,PLINQ 選用塊分區。

概括來講,對於較長的序列且處理每個元素所需的 CPU 時間比較近似時,范圍分區更快。否則,塊分區通常更快。

如果想強制使用范圍分區:

  • 如果查詢以Enumerable.Range開始,將其替換為ParallelEnumerable.Range
  • 否則,在輸入序列上調用ToListToArray(顯然,你需要考慮在這里產生的性能開銷)。

ParallelEnumerable.Range並不是對Enumerable.Range(…).AsParallel()的簡單封裝。它通過激活范圍分區改變了查詢的性能。

如果想強制使用塊分區,就通過調用Partitioner.Create(在命名空間System.Collection.Concurrent中)來封裝輸入序列,例如:

int[] numbers = { 3, 4, 5, 6, 7, 8, 9 }; var parallelQuery = Partitioner.Create (numbers, true).AsParallel() .Where (...) 

Partitioner.Create的第二個參數表示:希望對查詢開啟負載均衡(load-balance),這是另一個使用塊分區的動機。

塊分區的工作方式是定期從輸入序列中抓取小塊元素來處理。PLINQ 一開始會分配非常小的塊(一次 1 到 2 個元素),然后隨着查詢的進行增加塊的大小:這確保小序列能夠被有效地並行化,而大序列不會導致過多的抓取工作。如果一個工作線程碰巧拿到了一些“容易” 的元素(處理很快),它最終將拿到更多的塊。這個系統使每個線程保持均等的繁忙程度(使核心負載均衡)。唯一的不利因素是從共享的輸入序列中獲取元素需要 同步(通常使用一個排它鎖),這會產生一定的開銷和競爭。

分區策略

范圍分區會繞過正常的輸入端枚舉,並且為每個工作線程預分配相同數量的元素,避免了在輸入序列上的競爭。但是如果某些線程拿到了容易的元素並很早就 完成了處理,在其它工作線程仍在繼續工作的時候它就會是空閑的。我們之前的素數計算的例子在使用范圍分區時就性能不高。舉個范圍分區適用的例子,計算 1000 萬以內數字的平方和:

ParallelEnumerable.Range (1, 10000000).Sum (i => Math.Sqrt (i)) 

ParallelEnumerable.Range返回一個ParallelQuery<T>,因此不需要在之后調用AsParallel

范圍分區不是必須把元素分成相鄰的塊,它也許會選用一種 “條紋式(striping)”策略。例如,有兩個工作線程,一個工作線程可能會處理奇數位置的元素,而另一個工作線程處理偶數位置的元素。TakeWhile操作符幾乎一定會觸發條紋式策略,用來避免處理序列后邊不必要的元素。

3.9並行化自定義聚合Permalink

PLINQ 可以在無需額外干預的情況下有效地並行化SumAverageMinMax操作符。然而,Aggregate操作符對於 PLINQ 來說是個特殊的麻煩。

如果不熟悉Aggregate操作符,你可以認為它就是一個SumAverageMinMax的泛化版本,換句話說,就是一個可以使你通過自定義的聚合算法實現非通常聚合操作的操作符。如下代碼展現了Aggregate如何實現Sum操作符的工作:

int[] numbers = { 1, 2, 3 }; int sum = numbers.Aggregate (0, (total, n) => total + n); // 6 

Aggregate的第一個參數是 seed(種子,初值),聚合操作從這里開始。第二個參數是一個用於更新聚合值的表達式,該表達式生成一個新的元素。第三個參數是可選的,用來表示如何通過聚合值生成最終的結果值。

大多數Aggregate被設計用來解決的問題都能夠使用foreach循環輕松解決,並且這也是更熟悉的語法。而Aggregate的優點在於對龐大或復雜的聚合操作可以使用 PLINQ 來進行聲明式的並行化。

無種子的聚合Permalink

調用Aggregate時可以省略種子值,這種情況下第一個元素會被隱式當作種子,之后聚合處理會從第二個元素開始進行。下邊是一個無種子的例子:

int[] numbers = { 1, 2, 3 }; int sum = numbers.Aggregate ((total, n) => total + n); // 6 

這得到了與之前相同的結果,然而實際上卻是進行了不同的計算。之前例子計算的是 0+1+2+3,而現在計算的是1+2+3。通過乘法運算來代替加法運算能夠更好地說明這個不同:

int[] numbers = { 1, 2, 3 }; int x = numbers.Aggregate (0, (prod, n) => prod * n); // 0*1*2*3 = 0 int y = numbers.Aggregate ( (prod, n) => prod * n); // 1*2*3 = 6 

如同我們馬上將要看到的,無種子的聚合的優點在於被並行化時不需要使用特殊的重載。然而,無種子的聚合存在一個陷阱:無種子的聚合方法期望使用的委 托中的計算應滿足交換律和結合律。如果用在別的情況下,結果要不然是反直覺的(普通查詢),要不然是不確定的(PLINQ 並行化查詢)。例如考慮如下函數:

(total, n) => total + n * n 

它既不滿足交換律也不滿足結合律。(例如:1+2*2 != 2+1*1)。我們來看一下使用它來對數字 2、3、4 計算平方和時會發生什么:

int[] numbers = { 2, 3, 4 }; int sum = numbers.Aggregate ((total, n) => total + n * n); // 27 

本來的計算應該是:

2*2 + 3*3 + 4*4 // 29 

但現在的計算是:

2 + 3*3 + 4*4 // 27 

可以通過多種方法解決這個問題。首先,我們可以在序列最前端加入 0 作為第一個元素:

int[] numbers = { 0, 2, 3, 4 }; 

這不僅不優雅,而且在並行執行的情況下仍然會產生錯誤的結果,因為 PLINQ 會選擇多個元素作為種子,這相當於假定了計算滿足結合律。為說明這個問題,用如下方式表示我們的聚合函數:

f(total, n) => total + n * n 

LINQ to Objects 會這樣計算:

f(f(f(0, 2),3),4) 

PLINQ 可能會這樣計算:

f(f(0,2),f(3,4)) 

結果是:

第一個分區:   a = 0 + 2*2  (= 4)
第二個分區:   b = 3 + 4*4  (= 19)
最終結果:     a + b*b  (= 365)
甚至可能是:    b + a*a  (= 35)

有兩種好的解決方案:第一種是將其轉換為有種子的聚合,使用 0 作為種子。這種方案帶來的復雜度的提升僅僅是使用 PLINQ 時,我們需要使用特殊的重載,確保查詢並行執行(馬上會看到)。

第二種解決方案是:重構查詢,使聚合函數滿足交換律和結合律:

int sum = numbers.Select (n => n * n).Aggregate ((total, n) => total + n); 

當然,在這種簡單的場景下你可以(並且應該)使用Sum操作符來代替Aggregate

int sum = numbers.Sum (n => n * n); 

實際上可以更進一步使用SumAverage。例如,可以使用Average來計算均方根(root-mean-square):

Math.Sqrt (numbers.Average (n => n * n)) 

甚至是標准差:

double mean = numbers.Average(); double sdev = Math.Sqrt (numbers.Average (n => { double dif = n - mean; return dif * dif; })); 

上述兩個方法都是安全、高效並且可完全並行化的。

並行化聚合Permalink

我們剛剛看到了無種子的聚合,提供的委托必須滿足交換律和結合律。如果違反這個規則,PLINQ 會給出錯誤的結果,因為它可能使用輸入序列中多個的元素作為種子,來同時聚合多個分區。

指定種子的聚合也許看起來像是使用 PLINQ 的安全選擇,然而不幸的是,這樣通常會導致順序執行,因為它依賴於單獨一個種子。為減緩這個問題,PLINQ 提供了另一個Aggregate的重載,允許你指定多個種子,或者是一個種子工廠方法。對每個線程,它執行這個方法來生成一個獨立的種子,這就形成了一個線程局部的累加器,通過它在聚合局部元素。

你必須再提供一個方法來指示如何合並局部累加器至主累加器。最后,Aggregate的這個重載還需要一個委托,用來對結果進行任意的最終變換(有些沒必要,你可以之后對結果運行一些代碼完成同樣操作)。所以,這里有 4 個委托,按照它們被傳遞的順序:

  • 種子工廠(seedFactory):
    返回一個新的局部累加器
  • 更新累加器方法(updateAccumulatorFunc):
    聚合元素至局部累加器
  • 合並累加器方法(combineAccumulatorFunc):
    合並局部累加器至主累加器
  • 結果選擇器(resultSelector):
    在結果上應用任意最終變換

在簡單的場景中,你可以指定一個種子值來代替種子工廠。當種子是你需要改變的引用類型時這種策略行不通,因為同一個實例將在線程間共享。

提供一個簡單的例子,下邊的代碼對numbers數組中的值進行求和:

numbers.AsParallel().Aggregate ( () => 0, // 種子工廠 (localTotal, n) => localTotal + n, // 更新累加器方法 (mainTot, localTot) => mainTot + localTot, // 合並累加器方法 finalResult => finalResult) // 結果選擇器 

這個例子有些刻意,我們可以使用更簡單的方式獲取相同的結果(例如無種子的聚合,或者更好的選擇是使用Sum操作符)。給一個更加實際的例子,假設我們要計算字符串中每個英文字母的出現頻率。簡單的順序執行方案看起來是這樣:

string text = "Let’s suppose this is a really long string"; var letterFrequencies = new int[26]; foreach (char c in text) { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) letterFrequencies [index]++; }; 

基因序列是一個輸入文本可能會非常長的例子,它的“字母表”是由字母 acgt 組成。

為了將它並行化,我們可以把foreach替換為Parallel.ForEach(在接下來的一節會講到),但這會導致共享數組上的並發問題。對數組的訪問加可以解決問題,但會降低並發的可能性。

Aggregate提供了一個好的解決方案。這種情況下,累加器是一個數組,就像是之前例子中letterFrequencies數組。使用Aggregate的順序執行版本如下:

int[] result = text.Aggregate ( new int[26], // 創建“累加器” (letterFrequencies, c) => // 聚合一個字母至累加器 { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) letterFrequencies [index]++; return letterFrequencies; }); 

下面是並行版本,它使用 PLINQ 的專門重載:

int[] result = text.AsParallel().Aggregate ( () => new int[26], // 新建局部累加器 (localFrequencies, c) => // 聚合至局部累加器 { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) localFrequencies [index]++; return localFrequencies; }, // 聚合局部累加器至主累加器 (mainFreq, localFreq) => mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(), finalResult => finalResult // 對結果進行 ); // 最終變換 

注意:局部累加方法會改動localFrequencies數組。這個優化是非常重要的,也是合法的,因為localFrequencies是每個線程的局部變量。

4Parallel 類Permalink

PFX 通過Parallel類上的三個靜態方法提供了結構化並行的基本形式:

Parallel.Invoke
並行執行一組委托
Parallel.For
C# for循環的並行版本
Parallel.ForEach
C# foreach循環的並行版本

三個方法都是在工作完成前會阻塞。類似於PLINQ,如果有未處理的異常,其它工作線程會在當前迭代完成之后停止,異常會被封裝在AggregateException中拋給調用方。

4.1Parallel.InvokePermalink

Parallel.Invoke並行執行一組Action類型的委托,然后等待它們完成。這個方法最簡單的版本如下:

public static void Invoke (params Action[] actions); 

下面是使用Parallel.Invoke來同時下載兩個網頁:

Parallel.Invoke ( () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"), () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html")); 

這表面上看起來像是創建了兩個Task對象(或異步委托)並等待它們。但是有個重要的區別:Parallel.Invoke在你傳遞一百萬個委托時仍然能高效工作。這是因為它會對大量元素進行分區(partition),形成多個塊,再對其分配底層的Task。而不是直接對每一個委托創建獨立的Task

使用Parallel上的所有方法時,都需要自行實現整理結果的代碼。這意味着你需要注意線程安全。例如,下面的代碼不是線程安全的:

var data = new List<string>(); Parallel.Invoke ( () => data.Add (new WebClient().DownloadString ("http://www.foo.com")), () => data.Add (new WebClient().DownloadString ("http://www.far.com"))); 

對添加的過程加可以解決問題,但是如果你的委托數量更多,它們每一個執行的又很快,那么鎖可能造成瓶頸。更好的解決方案是使用線程安全的集合,比如ConcurrentBag就是這里的理想方案。

Parallel.Invoke也有接受ParallelOptions對象的重載:

public static void Invoke (ParallelOptions options, params Action[] actions); 

通過ParallelOptions,你可以添加取消標記、限制最大並發數量和指定自定義任務調度器。如果要執行的委托數量(大致上)大於核心數,那么使用取消標記才有意義:在取消時,所有未啟動的委托都會被拋棄。而所有已經在執行的委托會繼續完成。對於如何使用取消標記,可以參考取消中的例子。

4.2Parallel.For 和 Parallel.ForEachPermalink

Parallel.ForParallel.ForEach與 C# forforeach類似,但會並行執行,而不是順序執行。下面是它們(最簡單的)方法簽名:

public static ParallelLoopResult For ( int fromInclusive, int toExclusive, Action<int> body) public static ParallelLoopResult ForEach<TSource> ( IEnumerable<TSource> source, Action<TSource> body) 

對於下面的for循環:

for (int i = 0; i < 100; i++) Foo (i); 

並行版本是這樣:

Parallel.For (0, 100, i => Foo (i)); 

或更簡潔的:

Parallel.For (0, 100, Foo); 

而對於下面的foreach循環:

foreach (char c in "Hello, world") Foo (c); 

並行版本是這樣:

Parallel.ForEach ("Hello, world", Foo); 

給一個實際點的例子。引入System.Security.Cryptography命名空間,然后我們可以像這樣並行生成六組密鑰對的字符串形式:

var keyPairs = new string[6]; Parallel.For (0, keyPairs.Length, i => keyPairs[i] = RSA.Create().ToXmlString (true)); 

Parallel.Invoke同樣,我們也可以讓Parallel.ForParallel.ForEach執行大量工作項,它們也會被分區,分配給任務高效執行。

上面的例子也可以使用PLINQ來實現:

string[] keyPairs = ParallelEnumerable.Range (0, 6) .Select (i => RSA.Create().ToXmlString (true)) .ToArray(); 

外循環 vs 內循環Permalink

Parallel.ForParallel.ForEach通常更適合用於外循環,而不是內循環。這是因為前者會帶來更大的分區塊,就稀釋了管理並行的開銷。一般沒有必要同時並行內外循環。對於下面的例子,我們需要 100 個核心才能讓內循環的並行有益處:

Parallel.For (0, 100, i => { Parallel.For (0, 50, j => Foo (i, j)); // 對於內循環, }); // 順序執行更好。 

帶索引的 Parallel.ForEachPermalink

有時需要獲知循環迭代的索引。在順序的foreach中這很簡單:

int i = 0; foreach (char c in "Hello, world") Console.WriteLine (c.ToString() + i++); 

然而在並行環境中,讓共享變量自增並不是線程安全的。你必須使用下面這個ForEach版本:

public static ParallelLoopResult ForEach<TSource> ( IEnumerable<TSource> source, Action<TSource,ParallelLoopState,long> body) 

先忽略ParallelLoopState(下一節會講)。現在我們關注的是Action的第三個long類型的參數,它代表了循環的索引:

Parallel.ForEach ("Hello, world", (c, state, i) => { Console.WriteLine (c.ToString() + i); }); 

為了把它用到實際場景中,我們來回顧下使用 PLINQ 的拼寫檢查。下面的代碼加載了一個字典,並生成了一個用來測試的數組,有一百萬個測試項:

if (!File.Exists ("WordLookup.txt")) // 包含約 150,000 個單詞 new WebClient().DownloadFile ( "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt"); var wordLookup = new HashSet<string> ( File.ReadAllLines ("WordLookup.txt"), StringComparer.InvariantCultureIgnoreCase); var random = new Random(); string[] wordList = wordLookup.ToArray(); string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] = "woozsh"; // 引入兩個 wordsToTest [23456] = "wubsie"; // 拼寫錯誤 

我們可以使用帶索引的Parallel.ForEach來對wordsToTest數組進行拼寫檢查,如下:

var misspellings = new ConcurrentBag<Tuple<int,string>>(); Parallel.ForEach (wordsToTest, (word, state, i) => { if (!wordLookup.Contains (word)) misspellings.Add (Tuple.Create ((int) i, word)); }); 

注意,必須使用線程安全的集合來整理結果:這一點是相對於使用 PLINQ 的劣勢。而優勢是我們可以避免使用帶索引的Select查詢操作符,它沒有帶索引的ForEach高效。

ParallelLoopState:提前退出循環Permalink

因為對於並行的ForForEach循環,循環體是一個委托,所以就無法使用break語句來提前退出循環。在這里,你必須使用ParallelLoopState對象上的BreakStop

public class ParallelLoopState { public void Break(); public void Stop(); public bool IsExceptional { get; } public bool IsStopped { get; } public long? LowestBreakIteration { get; } public bool ShouldExitCurrentIteration { get; } } 

獲取ParallelLoopState很容易:所有版本的ForForEach都有重載可以接受Action<TSource,ParallelLoopState>類型的循環體。所以,如果要並行化:

foreach (char c in "Hello, world") if (c == ',') break; else Console.Write (c); 

可以使用:

Parallel.ForEach ("Hello, world", (c, loopState) => { if (c == ',') loopState.Break(); else Console.Write (c); }); 

輸出:

Hlloe

從結果中可以發現,循環體會以隨機順序完成。除這點不同以外,調用Break會給出與順序循環至少相同數量的元素:在上例中總是以一定順序至少輸出 Hello 這幾個字母。而如果改為調用Stop,會強制所有線程在當前迭代完成后立即結束。在上例中,如果有些線程滯后了,調用Stop可能給出 Hello 的子集。當發現已經找到了需要的東西時,或是發現出錯了不想看結果的情況下,Stop比較適用。

Parallel.ForParallel.ForEach方法都返回一個ParallelLoopResult對象,它暴露了IsCompletedLowestBreakIteration屬性。它們可以告知循環是否完成,如果沒有完成,是在哪個迭代中斷的。

如果LowestBreakIteration返回null,意味着在循環中調用了Stop(而不是Break)。

如果你的循環體很長,可能會希望其它線程能夠在執行中途中斷循環體,來讓使用BreakStop時更快的退出。實現方法是,在代碼中多個地方查詢ShouldExitCurrentIteration屬性,它會在調用Stop后立即為true,或者是在Break后很快為true

ShouldExitCurrentIteration在請求取消或者循環中有異常拋出時也會為true

IsExceptional屬性可以告知其它線程上是否有異常產生。任何未處理的異常都會導致循環在所有線程完成當前迭代后結束:如果想要避免,必須在代碼中顯式處理異常。

使用局部值進行優化Permalink

Parallel.ForParallel.ForEach都提供了擁有TLocal泛型變量的重載。這是為了協助你優化密集迭代的循環中的數據整理工作。最簡單的形式如下:

public static ParallelLoopResult For <TLocal> ( int fromInclusive, int toExclusive, Func <TLocal> localInit, Func <int, ParallelLoopState, TLocal, TLocal> body, Action <TLocal> localFinally); 

這些方法在實際中很少用到,因為它們的目標場景基本都被PLINQ覆蓋了(好開森,因為這些重載真可怕!)。

本質上,問題在於:假設我們要計算從 1 到 10,000,000 的平方根的和。並行計算一千萬個平方根很容易,但是求和是個問題,因為必須像這樣加才能更新和值:

object locker = new object(); double total = 0; Parallel.For (1, 10000000, i => { lock (locker) total += Math.Sqrt (i); }); 

並行化的收益都被獲取一千萬個鎖的開銷抵消了,還不算導致的阻塞。

然而,實際上並不需要一千萬個鎖。想象一隊志願者撿一大堆垃圾的場景,如果大家都共享單獨一個垃圾桶,那沖突就會使整個過程極端低效。明顯的方案是每個人都有自己“局部”的垃圾桶,偶爾去一趟主垃圾桶傾倒干凈。

ForForEachTLocal版本就是這樣工作的。志願者就是內部的工作線程,局部值(local value)就是局部垃圾桶。想要讓Parallel以這種方式工作,那么必須提供兩個額外的委托:

  1. 如何初始化新的局部值
  2. 如何將局部的聚合值合並到主值

另外,循環體委托現在不能返回void,而是應該返回局部值新的聚合結果。下面是重構后的例子:

object locker = new object(); double grandTotal = 0; Parallel.For (1, 10000000, () => 0.0, // 初始化局部值 (i, state, localTotal) => // 循環體委托。注意現在 localTotal + Math.Sqrt (i), // 返回新的局部值 localTotal => // 把局部值 { lock (locker) grandTotal += localTotal; } // 加入主值 ); 

我們還是需要鎖,但是只需要鎖定將局部和加入總和的過程。這讓處理效率有了極大的提升。

前面說過,PLINQ 一般更適合這些場景。我們的例子如果使用 PLINQ 來並行會很簡單:

ParallelEnumerable.Range(1, 10000000) .Sum (i => Math.Sqrt (i)) 

(注意我們使用了ParallelEnumerable來強制范圍分區:在這里可以提高性能,因為對所有數字的計算都是相等時間的。)

更復雜的場景中,你可能會用到 LINQ 的Aggregate操作符而不是Sum。如果指定了局部種子工廠,那情況就和使用局部值的Parallel.For差不多了。

5任務並行Permalink

任務並行(task parallelism)是 PFX 中最底層的並行方式。這一層次的類定義在System.Threading.Tasks命名空間中,如下所示:

作用
Task 管理工作單元
Task<TResult> 管理有返回值的工作單元
TaskFactory 創建任務
TaskFactory<TResult> 創建有相同返回類型的任務和任務延續
TaskScheduler 管理任務調度
TaskCompletionSource 手動控制任務的工作流

本質上,任務是用來管理可並行工作單元的輕量級對象。任務使用 CLR 的線程池來避免啟動獨立線程的開銷:它和ThreadPool.QueueUserWorkItem使用的是同一個線程池,在 CLR 4.0 中這個線程池被調節過,讓Task工作的更有效率(一般來說)。

需要並行執行代碼時都可以使用任務。然而,它們是為了充分利用多核而調節的:事實上,Parallel類和PLINQ內部就是基於任務並行構建的。

任務並不只是提供了簡單高效的使用線程池的方式。它們還提供了一些強大的功能來管理工作單元,包括:

任務也實現了局部工作隊列(local work queues),這個優化能夠讓你高效的創建很多快速執行的子任務,而不會帶來單一工作隊列會導致的競爭開銷。

TPL 可以讓你使用極小的開銷創建幾百個(甚至幾千個)任務,但如果你要創建上百萬個任務,那需要把這些任務分成大一些的工作單元才能有效率。Parallel類和 PLINQ 可以自動實現這種工作分解。

Visual Studio 2010 提供了一個新的窗口來監視任務(調試 | 窗口 | 並行任務)。它和線程窗口類似,只是用於任務。並行棧窗口也有一個專門的模式用於任務。

5.1創建與啟動任務Permalink

如同我們在第 1 部分線程池的討論中那樣,你可以調用Task.Factory.StartNew,並給它傳遞一個Action委托來創建並啟動Task

Task.Factory.StartNew (() => Console.WriteLine ("Hello from a task!")); 

泛型的版本Task<TResult>Task的子類)可以讓你在任務結束時獲得返回的數據:

Task<string> task = Task.Factory.StartNew<string> (() => // 開始任務 { using (var wc = new System.Net.WebClient()) return wc.DownloadString ("http://www.linqpad.net"); }); RunSomeOtherMethod(); // 我們可以並行的做其它工作... string result = task.Result; // 等待任務結束並獲取結果 

Task.Factory.StartNew是一步創建並啟動任務。你也可以分解它,先創建Task實例,再調用Start

var task = new Task (() => Console.Write ("Hello")); // ... task.Start(); 

使用這種方式創建的任務也可以同步運行(在當前線程上):使用RunSynchronously替代Start

可以使用Status屬性來追蹤任務的執行狀態。

指定狀態對象Permalink

當創建任務實例或調用Task.Factory.StartNew時,可以指定一個狀態對象(state object),它會被傳遞給目標方法。如果你希望直接調用方法而不是 lambda 表達式,則可以使用它。

static void Main() { var task = Task.Factory.StartNew (Greet, "Hello"); task.Wait(); // 等待任務結束 } static void Greet (object state) { Console.Write (state); } // 打印 "Hello" 

因為 C# 中有 lambda 表達式,我們可以更好的使用狀態對象,用它來給任務賦予一個有意義的名字。然后就可以使用AsyncState屬性來查詢這個名字:

static void Main() { var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting"); Console.WriteLine (task.AsyncState); // 打印 "Greeting" task.Wait(); } static void Greet (string message) { Console.Write (message); } 

Visual Studio 會在並行任務窗口顯示每個任務的AsyncState屬性,所以指定有意義的名字可以很大程度的簡化調試。

TaskCreationOptionsPermalink

在調用StartNew(或實例化Task)時,可以指定一個TaskCreationOptions枚舉來調節線程的執行。TaskCreationOptions是一個按位組合的枚舉,它有下列(可組合的)值:LongRunningPreferFairnessAttachedToParent

LongRunning向調度器建議為任務使用一個獨立的線程。這對長時間運行的任務有好處,因為它們可能會“霸占”隊列,強迫短時間任務等待過長的時間后才能被調度。LongRunning對於會阻塞的任務也有好處。

由於任務調度器一般會試圖保持剛好足夠數量的任務在線程上運行,來保持所有 CPU 核心都工作。所以不要超額分配(oversubscribing) CPU,或者說不要使用過多的活動線程,以避免由於操作系統被迫進行大量耗時的時間切片和上下文切換導致的性能下降。

PreferFairness讓調度器試圖確保任務以它們啟動的順序被調度。默認情況下是使用另一種方式,因為內部使用了局部工作竊取隊列來優化任務調度。這個優化對於非常小的(細粒度)任務有實際的好處。

AttachedToParent用來創建子任務。

子任務Permalink

當一個任務啟動另一個任務時,你可以通過指定TaskCreationOptions.AttachedToParent選擇性地建立父子關系:

Task parent = Task.Factory.StartNew (() => { Console.WriteLine ("I am a parent"); Task.Factory.StartNew (() => // 分離的任務 { Console.WriteLine ("I am detached"); }); Task.Factory.StartNew (() => // 子任務 { Console.WriteLine ("I am a child"); }, TaskCreationOptions.AttachedToParent); }); 

子任務的特殊之處在於,當你等待父任務結束時,也同樣會等待所有子任務。這對於子任務是一個延續任務時非常有用,稍后我們會看到。

5.2等待任務Permalink

有兩種方式可以顯式等待任務完成:

  • 調用Wait方法(可選擇指定超時時間)
  • 訪問Result屬性(當使用Task<TResult>時)

也可以同時等待多個任務:通過靜態方法Task.WaitAll(等待所有指定任務完成)和Task.WaitAny(等待任意一個任務完成)。

WaitAll和依次等待每個任務類似,但它更高效,因為它只需要(至多)一次上下文切換。並且,如果有一個或多個任務拋出未處理的異常,WaitAll仍然能夠等待所有任務,並在之后重新拋出一個AggregateException異常,它聚合了所有出錯任務的異常,功能相當於下面的代碼:

// 假設 t1、t2 和 t3 是任務: var exceptions = new List<Exception>(); try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } if (exceptions.Count > 0) throw new AggregateException (exceptions); 

調用WaitAny相當於在一個ManualResetEventSlim上等待,每個任務結束時都對它發信號。

除了使用超時時間,你也可以傳遞一個取消標記Wait方法:這樣可以取消等待。注意這不是取消任務。

5.3異常處理Permalink

當你等待一個任務結束時(通過調用Wait方法或訪問其Result屬性),所有未處理的異常都會用一個AggregateException對象封裝,方便重新拋給調用方。一般就無需在任務代碼中處理異常,而是這么做:

int x = 0; Task<int> calc = Task.Factory.StartNew (() => 7 / x); try { Console.WriteLine (calc.Result); } catch (AggregateException aex) { Console.Write (aex.InnerException.Message); // 試圖以 0 為除數 } 

你仍然需要對獨立的任務(無父任務並且沒有在等待它)進行異常處理,以免當任務失去作用域被垃圾回收時(見以下注釋)有未處理的異常,那會導致程序結束。如果對任務的等待指定了超時時間,那也是如此,因為所有超時時間過后拋出的異常都是未處理的。

TaskScheduler.UnobservedTaskException靜態事件提供了應對未處理的任務異常的最后手段。通過掛接這個事件,你就可以攔截這些原本會導致程序結束的異常,並且使用自己的邏輯對它們進行處理。

對於有父子關系的任務,在父任務上等待也會隱式的等待子任務,所有子任務的異常也會傳遞出來。

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; var parent = Task.Factory.StartNew (() => { Task.Factory.StartNew (() => // 子 { Task.Factory.StartNew (() => { throw null; }, atp); // 孫 }, atp); }); // 下面的調用會拋出 NullReferenceException 異常 (封裝在 // 嵌套的 AggregateExceptions 中): parent.Wait(); 

有趣的是,如果你在任務拋出異常后檢查它的Exception屬性,這個讀取屬性的動作會防止因為該異常導致程序結束。基本原則是:PFX 的設計者不希望你忽略異常,只要采取某種方式接收異常,就不會受到結束程序的懲罰。

任務中的未處理異常不會導致程序立即結束:它會延遲直到垃圾回收器處理到這個任務,並調用它的析構方法時。這個延遲是因為在進行垃圾回收前,還無法判斷是否會調用Wait,或檢查ResultException屬性。它有時也會誤導你對錯誤源頭的判斷(Visual Studio 的調試器如果開啟了在首個異常處中斷,可以幫助進行判斷)。

馬上我們會看到處理異常的另一種策略,就是使用任務延續

5.4取消任務Permalink

啟動任務時可以可選的傳遞一個取消標記(cancellation token)。它可以讓你通過協作取消模式取消任務,像之前描述的那樣:

var cancelSource = new CancellationTokenSource(); CancellationToken token = cancelSource.Token; Task task = Task.Factory.StartNew (() => { // 做些事情... token.ThrowIfCancellationRequested(); // 檢查取消請求 // 做些事情... }, token); // ... cancelSource.Cancel(); 

如果要檢測任務取消,可以用如下方式捕捉AggregateException,並檢查它的內部異常:

try { task.Wait(); } catch (AggregateException ex) { if (ex.InnerException is OperationCanceledException) Console.Write ("Task canceled!"); } 

如果希望顯式的拋出OperationCanceledException異常(而不是通過調用ThrowIfCancellationRequested),那么必須把取消標記傳遞給OperationCanceledException的構造方法。如果不這么做,這個任務就不會以TaskStatus.Canceled狀態結束,並且也不會觸發使用OnlyOnCanceled條件的任務延續。

如果任務在啟動前被取消,它就不會被調度,而是直接在任務中拋出OperationCanceledException

因為取消標記也可以被其它 API 識別,所以可以在其它構造中無縫使用:

var cancelSource = new CancellationTokenSource(); CancellationToken token = cancelSource.Token; Task task = Task.Factory.StartNew (() => { // 傳遞取消標記給 PLINQ 查詢: var query = someSequence.AsParallel().WithCancellation (token)... // ... enumerate query ... }); 

調用cancelSource上的Cancel方法就可以取消該 PLINQ 查詢,它會在任務中拋出OperationCanceledException異常,從而取消該任務。

也可以給WaitCancelAndWait這類方法傳遞取消標記,它可以讓你取消等待操作,而不是任務本身。

5.5任務延續Permalink

有時,在一個任務完成(或失敗)后馬上啟動另一個任務會很有用。Task類上的ContinueWith方法正是實現了這種功能:

Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant..")); Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation")); 

一旦task1(前項,antecedent)完成、失敗或取消,task2(延續,continuation)會自動啟動。(如果task1在運行第二行代碼前已經結束,那么task2會被立即調度執行。)傳遞給延續的 lambda 表達式的ant參數是對前項任務的引用。

我們的例子演示了最簡單的延續,它和以下代碼功能類似:

Task task = Task.Factory.StartNew (() => { Console.Write ("antecedent.."); Console.Write ("..continuation"); }); 

但是通過延續的方式可以更加靈活,比如先等待task1完成,之后再等待task2。如果task1返回數據,這樣就非常有用。

另一個(不明顯的)差異是:默認情況下,前項和延續任務可能是在不同的線程上執行。你可以在調用ContinueWith時指定TaskContinuationOptions.ExecuteSynchronously來強制它們在同一個線程執行:如果延續是非常細粒度的,這樣做可以通過減少開銷來提升性能。

延續和 Task<TResult>Permalink

像普通任務一樣,延續也可以使用Task<TResult>類型並返回數據。下面的例子中,我們使用鏈狀任務來計算Math.Sqrt(8*2)並打印結果:

Task.Factory.StartNew<int> (() => 8) .ContinueWith (ant => ant.Result * 2) .ContinueWith (ant => Math.Sqrt (ant.Result)) .ContinueWith (ant => Console.WriteLine (ant.Result)); // 4 

我們的例子比較簡單,實際應用中,這些 lambda 表達式可能會調用計算密集型的方法。

延續與異常Permalink

延續可以通過前項的Exception屬性來獲取前項拋出的異常。下面的代碼會輸出NullReferenceException信息:

Task task1 = Task.Factory.StartNew (() => { throw null; }); Task task2 = task1.ContinueWith (ant => Console.Write (ant.Exception)); 

如果前項拋出了異常但延續沒有檢查前項的Exception屬性(並且也沒有在等待前項),那么異常會被認為是未處理的,就會導致程序結束(除非使用TaskScheduler.UnobservedTaskException進行了處理)。

安全的模式是重新拋出前項的異常。只要延續被Wait等待,異常就能夠傳播並重新拋出給等待方。

Task continuation = Task.Factory.StartNew (() => { throw null; }) .ContinueWith (ant => { if (ant.Exception != null) throw ant.Exception; // 繼續處理... }); continuation.Wait(); // 異常被拋回調用方 

另一種處理異常的方法是為異常和正常情況指定不同的延續。需要用到TaskContinuationOptions

Task task1 = Task.Factory.StartNew (() => { throw null; }); Task error = task1.ContinueWith (ant => Console.Write (ant.Exception), TaskContinuationOptions.OnlyOnFaulted); Task ok = task1.ContinueWith (ant => Console.Write ("Success!"), TaskContinuationOptions.NotOnFaulted); 

這種模式在結合子任務使用時非常有用,我們馬上會看到

下面的擴展方法會“吞掉”任務的未處理異常:

public static void IgnoreExceptions (this Task task) { task.ContinueWith (t => { var ignore = t.Exception; }, TaskContinuationOptions.OnlyOnFaulted); } 

(可以添加對異常的日志記錄來進一步改進它。)以下是用法:

Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions(); 

延續與子任務Permalink

延續的一個強大功能是它僅在所有子任務都完成時才會啟動。這時,所有子任務拋出的異常都會被封送給延續。

接下來的例子中,我們啟動三個子任務,每個都拋出NullReferenceException。然后使用父任務的延續來一次性捕捉這些異常:

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; Task.Factory.StartNew (() => { Task.Factory.StartNew (() => { throw null; }, atp); Task.Factory.StartNew (() => { throw null; }, atp); Task.Factory.StartNew (() => { throw null; }, atp); }) .ContinueWith (p => Console.WriteLine (p.Exception), TaskContinuationOptions.OnlyOnFaulted); 

延續

條件延續Permalink

默認情況下,延續是被無條件調度的,也就是說無論前項是完成、拋出異常還是取消,延續都會執行。你可以通過設置TaskContinuationOptions枚舉中的標識(可組合)來改變這種行為。三種控制條件延續的核心標識是:

NotOnRanToCompletion = 0x10000, NotOnFaulted = 0x20000, NotOnCanceled = 0x40000, 

這些標識是做減法的,也就是組合的越多,延續越不可能被執行。為了方便使用,也提供了以下預先組合好的值:

OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled, OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled, OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted 

(組合所有Not*標識[NotOnRanToCompletion, NotOnFaulted, NotOnCanceled]沒有意義,這會導致延續始終被取消。)

RanToCompletion代表前項成功完成,沒有被取消,也沒有未處理的異常。

Faulted代表前項中有未處理的異常拋出。

Canceled代表以下兩種情況之一:

  • 前項通過其取消標記被取消。換句話說,OperationCanceledException在前項中拋出,它的CancellationToken屬性與啟動時傳遞給前項的標記取消匹配。

  • 前項被隱式的取消,因為無法滿足指定的延續條件。

特別需要注意的是,如果這些標識導致延續無法執行,延續並不是被忘記或拋棄,而是被取消。這意味着所有延續任務上的延續就會開始運行,除非你指定了NotOnCanceled。例如:

Task t1 = Task.Factory.StartNew (...); Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"), TaskContinuationOptions.OnlyOnFaulted); Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3")); 

像之前說的一樣,t3始終會被調度,即使是t1沒有拋出異常也是如此。因為t1成功完成,fault任務會被取消,而t3上並沒有定義任何限制延續的條件,所以t3就會被無條件執行。

條件延續

如果希望僅在fault真正運行的情況下執行t3,需要把代碼改成:

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"), TaskContinuationOptions.NotOnCanceled); 

(此外,也可以指定OnlyOnRanToCompletion,不同之處就是t3fault拋出異常的情況下不會執行。)

多前項的延續Permalink

延續的另一個有用的功能是它可以在多個前項完成后調度執行。ContinueWhenAll是在多個前項都完成后調度,而ContinueWhenAny是在任意一個前項完成后調度。這兩個方法都定義在TaskFactory類上:

var task1 = Task.Factory.StartNew (() => Console.Write ("X")); var task2 = Task.Factory.StartNew (() => Console.Write ("Y")); var continuation = Task.Factory.ContinueWhenAll ( new[] { task1, task2 }, tasks => Console.WriteLine ("Done")); 

上面的例子會在打印 “ XY “ 或 “ YX “ 之后打印 “ Done “。Lambda 表達式中的tasks參數可以用來訪問完成的任務數組,當前項返回數據時可以用到。下面的例子對兩個前項返回的數字求和:

// 真實場景中 task1 和 task2 可能調用復雜的功能: Task<int> task1 = Task.Factory.StartNew (() => 123); Task<int> task2 = Task.Factory.StartNew (() => 456); Task<int> task3 = Task<int>.Factory.ContinueWhenAll ( new[] { task1, task2 }, tasks => tasks.Sum (t => t.Result)); Console.WriteLine (task3.Result); // 579 

在這個例子中,我們使用了<int>類型參數來調用Task.Factory是為了演示獲得了一個泛型的任務工廠。這個類型參數不是必須的,它可以被編譯器推斷。

單前項的多個延續Permalink

對一個任務調用一次以上的ContinueWith會創建單前項的多個延續。當該前項完成時,所有延續會一起啟動(除非指定了TaskContinuationOptions.ExecuteSynchronously,這會導致延續順序執行)。

下面的代碼會等待一秒,然后打印 “ XY “ 或者 “ YX “:

var t = Task.Factory.StartNew (() => Thread.Sleep (1000)); t.ContinueWith (ant => Console.Write ("X")); t.ContinueWith (ant => Console.Write ("Y")); 

5.6任務調度器與 UIPermalink

任務調度器(task scheduler)為任務分配線程,其由抽象類TaskScheduler類代表,所有任務都會和一個任務調度器關聯。Framework 提供了兩種具體實現:默認調度器(default scheduler)是使用 CLR 線程池工作,還有同步上下文調度器(synchronization context scheduler),它(主要)是為了對於使用 WPF 和 Windows Forms 的場景提供幫助,這里的線程模型需要 UI 控件只能在創建它們的線程上訪問。例如,假設我們需要在后台從一個 web 服務獲取數據,然后使用它更新一個叫做lblResult的 WPF 標簽。這可以分解為兩個任務:

  1. 調用方法從 web 服務獲取數據(前項任務)。
  2. 使用結果更新lblResult延續任務)。

如果對延續任務指定了窗口創建時獲取的同步上下文調度器,那么就可以安全的更新lblResult

public partial class MyWindow : Window { TaskScheduler _uiScheduler; // 定義一個字段以便於 // 在類中使用 public MyWindow() { InitializeComponent(); // 從創建窗口的線程獲取 UI 調度器: _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext(); Task.Factory.StartNew<string> (SomeComplexWebService) .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler); } string SomeComplexWebService() { ... } } 

也可以實現自己的任務調度器(通過繼承TaskScheduler),但是一般只會在非常特殊的場景下才會這么做。對於自定義調度,需要經常使用TaskCompletionSource,我們馬上會講到。

5.7TaskFactoryPermalink

當調用Task.Factory時,就是通過Task上的靜態屬性獲取了默認的TaskFactory對象。這個任務工廠的作用就是創建任務,具體的說,有三種任務:

  • 普通任務(通過StartNew
  • 多前項的延續(通過ContinueWhenAllContinueWhenAny
  • 封裝了異步編程模型(APM)的任務(通過FromAsync

有趣的是,TaskFactory是創建后兩種任務的唯一方法。而對於StartNewTaskFactory純粹是為了方便,技術上說是多余的,這完全等同於創建Task對象然后調用其Start方法。

創建自己的任務工廠Permalink

TaskFactory不是抽象工廠:你可以實例化這個類,在希望重復使用同樣的(非默認的)TaskCreationOptions值、TaskContinuationOptions值或者TaskScheduler時有用。例如,如果希望重復創建長時間運行的子任務,我們可以這樣創建一個自定義工廠:

var factory = new TaskFactory ( TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None); 

然后創建任務就可以僅調用這個工廠上的StartNew

Task task1 = factory.StartNew (Method1); Task task2 = factory.StartNew (Method2); // ... 

在調用ContinueWhenAllContinueWhenAny時,自定義的延續選項會被應用。

5.8TaskCompletionSourcePermalink

Task類做了兩件事情:

  • 它可以調度一個委托到線程池線程上運行。
  • 它提供了管理工作項的豐富功能(延續、子任務、異常封送等等)。

有趣的是,這兩件事可以是分離的:可以只利用任務的管理工作項的功能而不讓它調度到線程池上運行。TaskCompletionSource類開啟了這個模式。

使用TaskCompletionSource時,就創建它的實例。它暴露一個Task屬性來返回一個任務,你可以對其等待或附加延續,就和對一般的任務一樣。然而這個任務可以通過TaskCompletionSource對象的下列方法進行完全控制:

public class TaskCompletionSource<TResult> { public void SetResult (TResult result); public void SetException (Exception exception); public void SetCanceled(); public bool TrySetResult (TResult result); public bool TrySetException (Exception exception); public bool TrySetCanceled(); // ... } 

如果調用多次,SetResultSetExceptionSetCanceled會拋出異常,而Try*方法會返回false

TResult對應任務的返回類型,所以TaskCompletionSource<int>會給你一個Task<int>。如果需要不返回結果的任務,可以使用object類型來創建TaskCompletionSource,並在調用SetResult時傳遞null。可以把Task<object>轉換為Task類型來使用。

下面的代碼在等待五秒之后打印 “ 123 “:

var source = new TaskCompletionSource<int>(); new Thread (() => { Thread.Sleep (5000); source.SetResult (123); }) .Start(); Task<int> task = source.Task; // 我們的“奴隸”任務 Console.WriteLine (task.Result); // 123 

稍后,我們會展示使用如何BlockingCollection寫一個生產者 / 消費者隊列。然后會演示使用TaskCompletionSource來改進這個方案,它可以使隊列中的工作項可以被等待和取消。

6使用 AggregateExceptionPermalink

如前所屬,PLINQParallel類和Task都會自動封送異常給使用者。為了明白這么做的重要性,考慮以下 LINQ 查詢,它在第一次迭代時會拋出DivideByZeroException

try { var query = from i in Enumerable.Range (0, 1000000) select 100 / i; // ... } catch (DivideByZeroException) { // ... } 

如果我們使用 PLINQ 來並行化查詢而假設它並沒有進行異常處理,那么DivideByZeroException可能會在一個線程中被拋出,就會無視catch塊從而導致程序結束。

因此,異常會被自動捕捉並重新拋給調用方。然而不幸的是,情況並不是就像捕捉一個DivideByZeroException那般簡單。因為這些類庫會利用很多線程,很可能有兩個或更多的異常被同時拋出。為了確保能夠報告所有異常,就使用了AggregateException作為容器來封裝它們,並通過InnerExceptions屬性來暴露:

try { var query = from i in ParallelEnumerable.Range (0, 1000000) select 100 / i; // 對查詢進行枚舉 // ... } catch (AggregateException aex) { foreach (Exception ex in aex.InnerExceptions) Console.WriteLine (ex.Message); } 

PLINQ 和Parallel類都會在遇到第一個異常時停止查詢或循環執行,它使用的方式是不處理之后的元素或循環體。而在本輪循環結束前,還有可能拋出更多的異常。第一個異常可以通過AggregateException上的InnerException屬性獲取。

6.1Flatten 和 HandlePermalink

AggregateException類提供了一對方法來簡化異常處理:FlattenHandle

FlattenPermalink

AggregateException經常會包含其它的AggregateException。比如在子任務拋出異常時就可能如此。你可以通過調用Flatten來消除任意層級的嵌套以簡化處理。這個方法會返回一個新的AggregateException,它的InnerExceptions就是展平之后的結果:

catch (AggregateException aex) { foreach (Exception ex in aex.Flatten().InnerExceptions) myLogWriter.LogException (ex); } 

HandlePermalink

有時只需要捕捉特定類型的異常,並重新拋出其它類型的異常。AggregateException上的Handle方法提供了一個快捷方案。它接受一個異常判定器,來對所有封裝的異常進行判定:

public void Handle (Func<Exception, bool> predicate) 

如果判定器返回true,則該異常被認為是“已處理”。對於所有異常都運行判定之后,接下來會發生:

  • 如果所有異常都“已處理”(判定器返回true),則不會重新拋出異常。
  • 如果有異常被判定為false(“未處理”),則會生成一個新的AggregateException來封裝這些異常,並重新拋出。

例如,下面的代碼最后會重新拋出一個AggregateException,並且其中僅包含一個NullReferenceException

var parent = Task.Factory.StartNew (() => { // 我們使用 3 個子任務同時拋出 3 個異常: int[] numbers = { 0 }; var childFactory = new TaskFactory (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None); childFactory.StartNew (() => 5 / numbers[0]); // 除數為零 childFactory.StartNew (() => numbers [1]); // 索引越界 childFactory.StartNew (() => { throw null; }); // 空引用 }); try { parent.Wait(); } catch (AggregateException aex) { aex.Flatten().Handle (ex => // 注意這里還是需要調用 Flatten { if (ex is DivideByZeroException) { Console.WriteLine ("Divide by zero"); return true; // 該異常“已處理” } if (ex is IndexOutOfRangeException) { Console.WriteLine ("Index out of range"); return true; // 該異常“已處理” } return false; // 其它所有異常會被重新拋出 }); } 

7並發集合Permalink

Framework 4.0 在System.Collections.Concurrent命名空間中提供了一組新的集合。它們都是完全線程安全的:

並發集合 對應的非並發集合
ConcurrentStack<T> Stack<T>
ConcurrentQueue<T> Queue<T>
ConcurrentBag<T> ( none )
BlockingCollection<T> ( none )
ConcurrentDictionary<TKey,TValue> Dictionary<TKey,TValue>

在一般的多線程場景中,需要線程安全的集合時可能會用到這些並發集合。但是,有些注意事項:

  • 並發集合是為了並行編程而調整的。除了高並發場景,傳統的集合都比它們更高效。
  • 線程安全的集合並不能確保使用它的代碼也是線程安全的
  • 如果在對並發集合進行枚舉的同時有其它線程修改了集合,並不會產生異常,而是會得到一個新舊內容的混合結果。
  • 沒有List<T>的並發版本。
  • 並發的棧、隊列和包(bag)類內部都是使用鏈表實現的。這使得它們的空間效率不如非並發的StackQueue類,但是這對於並發訪問更好,因為鏈表有助於實現無鎖或更少的鎖。(這是因為向鏈表中插入一個節點只需要更新兩個引用,而對於List<T>這種結構插入一個元素可能需要移動幾千個已存在的元素。)

換句話說,這些集合並不是提供了加鎖使用普通集合的快捷辦法。為了演示這一點,如果我們在單一線程上執行以下代碼:

var d = new ConcurrentDictionary<int,int>(); for (int i = 0; i < 1000000; i++) d[i] = 123; 

它會比下面的代碼慢三倍:

var d = new Dictionary<int,int>(); for (int i = 0; i < 1000000; i++) lock (d) d[i] = 123; 

(但是對ConcurrentDictionary讀取會更快,因為讀是無鎖的。)

並發集合與普通集合的另一個不同之處是它們暴露了一些特殊的方法,來進行原子的檢查並行動(test-and-act)的操作,例如TryPop。這些方法中的大部分都是由IProducerConsumerCollection<T>接口統一的。

7.1IProducerConsumerCollection<T>Permalink

生產者 / 消費者集合有兩個主要用例:

  • 添加一個元素(“生產”)
  • 獲取一個元素並移除它(“消費”)

典型的例子是棧和隊列。生產者 / 消費者集合在並行編程中非常重要,因為它有助於高效的無鎖實現。

IProducerConsumerCollection<T>接口代表了線程安全的生產者 / 消費者集合。以下類實現了該接口:ConcurrentStack<T>ConcurrentQueue<T>ConcurrentBag<T>

IProducerConsumerCollection<T>擴展自ICollection,並加入了以下方法:

void CopyTo (T[] array, int index); T[] ToArray(); bool TryAdd (T item); bool TryTake (out T item); 

TryAddTryTake方法檢查是否能進行添加 / 移除操作,如果可以,就進行添加 / 移除。檢查和操作是原子的,所以無需像普通集合那樣使用鎖:

int result; lock (myStack) if (myStack.Count > 0) result = myStack.Pop(); 

TryTake在集合為空時返回falseTryAdd在三種實現中都總會成功並返回true。而如果你要寫自己的不允許重復元素的並發集合,就可以在元素已存在時讓TryAdd返回false(比如自己寫並發集(set))。

TryTake移除的具體元素是在子類中定義的:

  • 對於棧,TryTake移除最新添加的元素。
  • 對於隊列,TryTake移除最早添加的元素。
  • 對於包,TryTake移除可以最快移除的元素。

這三個具體類基本都是顯式實現了TryTakeTryAdd方法,也通過更具體的的名字暴露了同樣的功能,比如TryDequeueTryPop

7.2ConcurrentBag<T>Permalink

ConcurrentBag<T>用來存儲一組無序的對象(允許重復)。它適用於你不關心調用TakeTryTake會返回哪個元素的場景。

ConcurrentBag<T>相比並發隊列和棧的好處是它的Add方法被很多線程同時調用時幾乎沒有競爭沖突。而對於並發隊列和棧,並行調用Add會有一些競爭沖突(但是比對非並發集合加鎖的方式要小得多)。並發包的Take方法也非常高效,只要每個線程不要拿出比它添加的數量更多的元素。

在並發包的內部,每一個線程都有其私有的鏈表。元素會加入到調用Add的線程對應的私有鏈表中,就消除了競爭沖突。在對包進行枚舉時,枚舉器會遍歷所有線程的私有鏈表,返回其中的每一個元素。

調用Take時,包會首先檢查當前線程的私有鏈表。如果其中有至少一個元素,就可以沒有沖突的輕松完成任務(大多數情況都是如此)。但是如果鏈表沒有元素,它就必須從其它線程的私有鏈表中“偷”一個元素,就可能導致競爭沖突。

所以,准確的說,調用Take會返回當前線程最新添加的元素,如果當前線程沒有對應的元素,就會隨機取一個其它線程,返回它最新添加的元素。

如果你的並行操作基本都是在添加元素,或者每個線程的AddTake是平衡的,那么使用並發包就很理想。我們來看前面的一個例子,是使用Parallel.ForEach來實現並行拼寫檢查:

var misspellings = new ConcurrentBag<Tuple<int,string>>(); Parallel.ForEach (wordsToTest, (word, state, i) => { if (!wordLookup.Contains (word)) misspellings.Add (Tuple.Create ((int) i, word)); }); 

對於實現生產者 / 消費者隊列,並發包就不是一個好的選擇,因為元素是在不同的線程進行添加和移除的。

7.3BlockingCollection<T>Permalink

如果在ConcurrentStack<T>ConcurrentQueue<T>ConcurrentBag<T>這些生產者 / 消費者集合上調用TryTake時,集合為空,該方法會返回false。這種場景下,有時可能等待一個元素被添加會更有用。

與其重載TryTake方法來實現這個功能(如果還要允許取消和超時就可能需要大量成員),不如使用 PFX 的設計者已經實現好的BlockingCollection<T>類。阻塞集合可以封裝任意實現了IProducerConsumerCollection<T>接口的對象,就可以調用這個封裝上面的Take方法,它在沒有元素時會阻塞。

阻塞集合也可以讓你限制集合的大小,如果超過限制就阻塞生產者。這樣限制了大小的集合被稱為有界阻塞集合(bounded blocking collection)。

使用BlockingCollection<T>時:

  1. 創建其實例,可選的指定一個IProducerConsumerCollection<T>來封裝,還有集合的最大大小(上界)。
  2. 調用AddTryAdd來對底層集合添加元素。
  3. 調用TakeTryTake來移除(消費)底層集合中的元素。

如果調用構造方法的時候沒有指定目標集合,就會自動使用一個ConcurrentQueue<T>的實例。進行生成和消費的方法都可以指定取消標記和超時時間。AddTryAdd在集合有界時可能會阻塞,TakeTryTake在集合為空時會阻塞。

另一種消費元素的方式是調用GetConsumingEnumerable。它會返回一個(可能)無限的序列,當有元素時就可以返回它。你可以調用CompleteAdding來強行結束這個序列,它也會阻止之后再添加元素。

前面我們寫過一個使用 WaitPulse的生產者 / 消費者隊列。這里使用BlockingCollection<T>來重構同一個類(不考慮異常處理):

public class PCQueue : IDisposable { BlockingCollection<Action> _taskQ = new BlockingCollection<Action>(); public PCQueue (int workerCount) { // 為每個消費者創建並啟動單獨的任務: for (int i = 0; i < workerCount; i++) Task.Factory.StartNew (Consume); } public void Dispose() { _taskQ.CompleteAdding(); } public void EnqueueTask (Action action) { _taskQ.Add (action); } void Consume() { // 沒有元素時,對序列的枚舉就會被阻塞, // 而調用 CompleteAdding 可以結束枚舉。 foreach (Action action in _taskQ.GetConsumingEnumerable()) action(); // 進行任務 } } 

因為沒有給BlockingCollection的構造方法傳遞任何參數,所以會自動創建一個並發隊列。而如果傳遞一個ConcurrentStack,我們就會得到生產者 / 消費者棧。

BlockingCollection還提供了AddToAnyTakeFromAny這些靜態方法,它們可以讓你對指定的多個阻塞集合進行添加或移除元素。操作會對第一個能夠進行操作的集合進行。

利用 TaskCompletionSourcePermalink

我們之前實現的生產者 / 消費者模式還不夠靈活,因為工作項添加后無法追蹤它們。如果能夠實現以下功能會更好:

  • 能夠獲知工作項的完成。
  • 取消未啟動的工作項。
  • 優雅的處理工作項拋出的異常。

理想的解決方案是讓EnqueueTask方法返回一個對象,來提供我們上面描述的功能。好消息是這個類已經存在,正是Task類。我們需要做的只是通過TaskCompletionSource來操控它:

public class PCQueue : IDisposable { class WorkItem { public readonly TaskCompletionSource<object> TaskSource; public readonly Action Action; public readonly CancellationToken? CancelToken; public WorkItem ( TaskCompletionSource<object> taskSource, Action action, CancellationToken? cancelToken) { TaskSource = taskSource; Action = action; CancelToken = cancelToken; } } BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>(); public PCQueue (int workerCount) { // 為每個消費者創建並啟動單獨的任務: for (int i = 0; i < workerCount; i++) Task.Factory.StartNew (Consume); } public void Dispose() { _taskQ.CompleteAdding(); } public Task EnqueueTask (Action action) { return EnqueueTask (action, null); } public Task EnqueueTask (Action action, CancellationToken? cancelToken) { var tcs = new TaskCompletionSource<object>(); _taskQ.Add (new WorkItem (tcs, action, cancelToken)); return tcs.Task; } void Consume() { foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable()) if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested) { workItem.TaskSource.SetCanceled(); } else try { workItem.Action(); workItem.TaskSource.SetResult (null); // 表示完成 } catch (OperationCanceledException ex) { if (ex.CancellationToken == workItem.CancelToken) workItem.TaskSource.SetCanceled(); else workItem.TaskSource.SetException (ex); } catch (Exception ex) { workItem.TaskSource.SetException (ex); } } } 

EnqueueTask中,我們入隊一個工作項,它封裝了目標委托和任務完成源,從而讓我們之后可以控制返回給消費者的任務。

Consume中,我們在出隊一個工作項后先檢查任務是否被取消。如果沒有,就運行委托然后調用任務完成源上的SetResult來表示任務完成。

下面是如何使用這個類:

var pcQ = new PCQueue (1); Task task = pcQ.EnqueueTask (() => Console.WriteLine ("Easy!")); // ... 

我們現在可以對task等待、附加延續、讓延續中的異常傳播給父任務等等。換句話說,我們獲得了任務模型的豐富功能,同時也相當於自行實現了一個調度器。

8SpinLock 和 SpinWaitPermalink

在並行編程中,短暫的自旋經常比阻塞更好,因為它避免了上下文切換和內核模式轉換的開銷。SpinLockSpinWait被設計用來在這種場景下提供幫助。它們的主要用途是實現自定義的同步構造。

SpinLockSpinWait是結構體而不是類!這個設計是一種避免間址和垃圾回收的極限優化技術。它意味着你必須小心,不能不經意地復制了實例,比如不使用ref修飾符把它們傳遞給另一個方法,或者把它們定義成了readonly的字段。這在使用SpinLock時十分重要。

8.1SpinLockPermalink

SpinLock結構體可以讓你進行鎖定,而無需上下文切換的開銷,它的代價是保持一個線程自旋(空忙)。這種方式適用於高競爭的場景下鎖定非常短暫的情況(比如,從頭寫一個線程安全的鏈表)。

如果讓自旋鎖等待的太久(最多是幾毫秒),它會和普通的鎖一樣出讓其時間片,導致上下文切換。再被重新調度后,它會繼續出讓,就這樣不斷的“自旋出讓”。這比完全使用自旋消耗的 CPU 資源要少得多,但是比阻塞要高。

在單核的機器上,自旋鎖在遇到競爭時會立即開始“自旋出讓”。

使用SpinLock和普通的鎖差不多,除了以下幾點:

  • 自旋鎖是結構體(前面有提到)。
  • 自旋鎖不可重入,意味着不能在一個線程上連續兩次調用同一個SpinLock上的Enter方法。如果違反了這條規則,要不然會拋出異常(啟用所有者追蹤(owner tracking)時),要不然會死鎖(禁用所有者追蹤時)。在構造自旋鎖時,可以指定是否啟用所有者追蹤,啟用會影響性能。
  • SpinLock可以讓你通過IsHeld屬性查詢鎖是否已被獲取,如果啟用了所有者追蹤,那么使用IsHeldByCurrentThread屬性。
  • 沒有像 C# 的lock語句那樣的語法糖來簡化SpinLock的使用。

另一個不同之處是當調用Enter時,你必須遵循提供 lockTaken 參數的健壯模式(幾乎總是使用try / finally一起實現)。

下面是個例子:

var spinLock = new SpinLock (true); // 啟用所有者追蹤 bool lockTaken = false; try { spinLock.Enter (ref lockTaken); // 做些事情... } finally { if (lockTaken) spinLock.Exit(); } 

和普通的鎖一樣,當(且僅當)Enter方法拋出異常並且鎖沒有被獲取時,lockTaken會為false。這種場景非常罕見(當調用該線程的Abort,或者OutOfMemoryException異常被拋出時),但可以讓你確定之后是否需要調用Exit

SpinLock也提供了接受超時時間的TryEnter方法。

由於SpinLock笨拙的值類型語義和缺乏語法支持,幾乎每次想用它都是受罪!在替換掉普通的鎖前請三思。

SpinLock在需要寫自己的可重用同步構造時最有意義。即便如此,自旋鎖也不像看上去那么有用。它仍然限制了並發。並且會什么都不做的浪費 CPU 時間。經常更好的選擇都是把時間花在一些“投機”的事情上,並使用SpinWait來輔助。(譯者注:這里“投機”是指先進行操作並檢測搶占,如果發現被搶占就重試,詳見SpinWait

8.2SpinWaitPermalink

SpinWait可以幫助實現無鎖的代碼,使用自旋而非阻塞。它實現了安全措施來避免普通自旋可能會造成的資源飢餓和優先級倒置。

使用SpinWait的無鎖編程是多線程中最難的,它是為了應對沒有其它高層構造可以使用的場景。先決條件是理解非阻塞同步

為什么需要 SpinWaitPermalink

假設我們寫了一個純粹基於一個簡單標識的自旋信號系統:

bool _proceed; void Test() { // 自旋直到其它線程把 _proceed 設置為 true: while (!_proceed) Thread.MemoryBarrier(); // ... } 

如果Test運行時_proceed已經為true,或者幾次循環內就能變為true,那么這個實現就會很高效。但是現在假設_proceed在幾秒內保持false,並且有四個線程同時調用Test。這個自旋就會完全占用一個四核的 CPU!這會導致其它線程運行緩慢(資源飢餓),包括那個會把_proceed設置為true的線程(優先級倒置)。在單核機器上,狀況會進一步惡化,因為自旋幾乎總是導致優先級倒置。(雖然現在單核機器已經很少見了,可是單核的虛擬機並不少。)

SpinWait使用兩種方法解決這個問題。首先,它會限制消耗 CPU 的自旋,在經過一定次數的自旋后,就會每次循環都出讓其時間片(通過調用Thread.YieldThread.Sleep),從而減少資源消耗。其次,它會檢測是否是在單核機器上運行,如果是,就會每次循環都出讓其時間片。

如何使用 SpinWaitPermalink

有兩種方式使用SpinWait。第一種是調用靜態方法SpinUntil。這個方法接受一個判定器(和一個可選的超時時間):

bool _proceed; void Test() { SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; }); // ... } 

另一種(更靈活)的方式是創建SpinWait結構體的實例,並在循環中調用SpinOnce

bool _proceed; void Test() { var spinWait = new SpinWait(); while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); } // ... } 

前者就是使用后者提供的快捷方式。

SpinWait 如何工作Permalink

在當前的實現中,SpinWait會在出讓之前進行 10 次消耗 CPU 的迭代。但它並不會在每次迭代后立即返回調用方,而是調用Thread.SpinWait來 通過 CLR(最終是通過操作系統)再自旋一定時間。這個時間最初是幾十納秒,每次迭代都會加倍,直到 10 次迭代結束。這在一定程度上保證了消耗 CPU 的自旋階段的總時間的可預測性,CLR 和操作系統可以根據情況來調節。一般來說,這會在幾十微秒的區間,很小,但是要大於上下文切換的開銷。

在單核機器上,SpinWait每次迭代都會出讓。可以通過NextSpinWillYield屬性來檢查SpinWait在下一次自旋時會不會出讓。

如果SpinWait在自旋出讓模式保持了很久(大概 20 次),就會定期Sleep幾微秒來進一步節約資源給其它線程使用。

使用 SpinWait 和 Interlocked.CompareExchange 進行無鎖更新Permalink

結合SpinWaitInterlocked.CompareExchange可以原子的更新一個通過自己的值進行計算的字段(讀 - 改 - 寫)。例如,假設我們要把字段 x 乘 10。非線程安全的簡單代碼就是:

x = x * 10; 

它不是線程同步的原因就和我們在非阻塞同步中看到的對字段自增不是線程同步的原因一樣。

正確的無鎖方式如下:

  1. 使用局部變量獲取 x 的一個“快照”。
  2. 計算新值(這里就是將快照乘 10)。
  3. 如果快照還是最新的,就將計算后的值寫回(這一步必須是原子的,通過調用Interlocked.CompareExchange實現)。
  4. 如果快照過期了,自旋並返回第 1 步。

例如:

int x; void MultiplyXBy (int factor) { var spinWait = new SpinWait(); while (true) { int snapshot1 = x; Thread.MemoryBarrier(); int calc = snapshot1 * factor; int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1); if (snapshot1 == snapshot2) return; // 沒有被搶占 spinWait.SpinOnce(); } } 

我們可以去掉對Thread.MemoryBarrier的調用來略微提高性能。這是因為CompareExchange也會生成內存屏障。最壞的情況就是如果snapshot1在第一次迭代時就讀取了一個過期的值,那么會多進行一次自旋。

Interlocked.CompareExchange是在字段的當前值與第三個參數相等時使用指定的值來更新字段。它會返回字段的舊值,就可以用來與原快照比較,檢查是否過期。如果值不相等,意味着被另一個線程搶占,就需要自旋重試。

CompareExchange也有重載可以對於object類型使用。我們可以利用這個重載來實現對所有引用類型的無鎖更新方法:

static void LockFreeUpdate<T> (ref T field, Func <T, T> updateFunction) where T : class { var spinWait = new SpinWait(); while (true) { T snapshot1 = field; T calc = updateFunction (snapshot1); T snapshot2 = Interlocked.CompareExchange (ref field, calc, snapshot1); if (snapshot1 == snapshot2) return; spinWait.SpinOnce(); } } 

下面是如何使用這個方法來寫一個無鎖的線程安全的事件(實際上,這是 C# 4.0 的編譯器對於事件默認的處理):

EventHandler _someDelegate; public event EventHandler SomeEvent { add { LockFreeUpdate (ref _someDelegate, d => d + value); } remove { LockFreeUpdate (ref _someDelegate, d => d - value); } } 

SpinWait vs SpinLockPermalink

我們也可以通過把對共享的字段的訪問放進SpinLock里來解決上面的問題。問題是自旋鎖同一時間只允許一個線程進入,盡管它(通常)能夠消除上下文切換的開銷。而使用SpinWait時,我們可以假設沒有競爭,投機的運行。如果被搶占就重試。花費 CPU 時間做事情也許比在自旋鎖中浪費 CPU 時間好!

最后,考慮下面的類:

class Test { ProgressStatus _status = new ProgressStatus (0, "Starting"); class ProgressStatus // 不可變類 { public readonly int PercentComplete; public readonly string StatusMessage; public ProgressStatus (int percentComplete, string statusMessage) { PercentComplete = percentComplete; StatusMessage = statusMessage; } } } 

我們可以使用LockFreeUpdate方法來增加_statusPercentComplete字段的值:

LockFreeUpdate (ref _status, s => new ProgressStatus (s.PercentComplete + 1, s.StatusMessage)); 

注意我們基於現有值創建了新的ProgressStatus對象。要感謝LockFreeUpdate方法,讀取PercentComplete的值、增加它並寫回的操作不會被不安全的搶占:任何搶占都可以被可靠的檢測到,觸發自旋重試。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM