並行編程(Parallel Framework)


前言

並行編程:通過編碼方式利用多核或多處理器稱為並行編程,多線程概念的一個子集。

並行處理:把正在執行的大量的任務分割成小塊,分配給多個同時運行的線程。多線程的一種。

並行編程分為如下幾個結構

1.並行的LINQPLINQ

2.Parallel

3.任務並行結構

4.並發集合

5.SpinLockSpinWait

這些是.NET 4.0引入的功能,一般被稱為PFX(Parallel Framework,並行框架)

Parallel類和任務並行結構稱為TPL(Task Parallel Library,任務並行庫)

 

並行框架(PFX)

1.並行框架基礎

當前CPU技術達到瓶頸,而制造商將關注重點轉移到提高內核技術上,而標准單線程代碼並不會因此而自動提高運行速度。
利用多核提升程序性能通常需要對計算密集型代碼進行一些處理:
1.將代碼划分成塊。
2.通過多線程並行執行這些代碼塊。
3.結果變為可用后,以線程安全和高性能的方式整合這些結果。
傳統多線程結構雖然實現功能,但難度頗高且不方便,特別是划分和整理的步驟(本質問題是:多線程同時使用相同數據時,出於線程安全考慮進行鎖定的常用策略會引發大量競爭)。
並行框架(Parallel Framework)專門用於在這些應用場景中提供幫助。

2.並行框架組成

PFX:高層由兩個數據並行API組成:PLINQ或Parallel類。底層包含任務並行類和一組另外的結構為並行編程提供幫助。

 

基礎並行語言集成查詢(PLINQ)

語言集成查詢(Language Integrated Query,LINQ)提供了一個簡捷的語法來查詢數據集合。而這種由一個線程順序處理數據集合的方式我們稱為順序查詢(sequential query)

並行語言集成查詢(Parallel LINQ)LINQ並行版。它將順序查詢轉換為並行查詢,在內部使用任務,將集合中數據項的處理工作分散到多個CPU上,以並發處理多個數據項。

PLINQ將自動並行化本地的LINQ查詢System.Linq.ParallelEnumerable類(它定義在System.Core.dll中,需要引用System.Linq)公開了所有標准LINQ操作符的並行版本。這些所有方法是依據System.Linq.ParallelQuery<T>擴展而來。

1.LINQ to PLINQ

要讓LINQ查詢調用並行版本,必須將自己的順序查詢(基於IEnumerable或IEnumerable<T>)轉換成並行查詢(基於ParallelQuery或ParallelQuery<T>),使用ParallelEnumerableAsParallel方法實現,如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             ParallelQuery parallelQuery =
 7                 from n in numbers.AsParallel()//轉換為並行
 8                 where n > 3
 9                 select n;
10             foreach (var item in parallelQuery)
11             {
12                 Console.WriteLine(item);
13             }
14             Console.ReadKey();
15         }
16     }

結果如下:使用Enumerable.Range生成的集合是順序的,但是經過並行查詢后順序被打亂。

2.PLINQ to LINQ

 將執行並行查詢的操作切換回執行順序查詢(並不常用),通過ParalleIEnumerableAsSequential實現。此時操作只由一個線程執行。

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             IEnumerable<int> enumerable = numbers.AsParallel().AsSequential().Where(c => c > 3);
 7             foreach (var item in enumerable)
 8             {
 9                 Console.WriteLine(item);
10             }
11             Console.ReadKey();
12         }
13     }

3.整合結果集(ForAll)

通常,一個LINQ查詢的結果數據是讓某個線程執行一個foreach來處理,此時只有一個線程遍歷查詢的所有結果,如果希望以並行方式處理查詢結果,通過ParalleIEnumerableForAll方法處理查詢,如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             (from n in numbers.AsParallel() where n > 3 select n).ForAll((d) =>
 7              {
 8                  d = d + 1000;
 9                  Console.WriteLine(d);//Console在此回損害性能,因為內部回對線程進行同步,此處因演示所以暫且一用 10              });
11             Console.ReadKey();
12         }
13     }

 執行結果如下:

 

解析PLINQ

1.PLINQ執行模型

如圖所示:

2.異常處理

PLINQ的報錯將以AggregateException形式被重拋,其中InnerExceeptions屬性包含一個或多個真正異常,示例可看 異步編程(async&await)內的異常處理部分。

3.PLINQ結果的排序

 並行化查詢當整理結果時不能保持初始化數據的原始次序。如果要保持序列的原始序列,可以通過在AsParallel之后調用AsOrdered來強制實現:

1             IEnumerable<int> numbers = Enumerable.Range(1, 10000);
2             var enumerable = numbers.AsParallel().Where(c => c > 3);

調用AsOrdered時,因為PLINQ要保持跟蹤每個元素的原始位置,會導致性能損失。

調用AsUnordered,可以在后續的查詢中低效AsOrdered產生的副作用,允許查詢從調用AsUnordered時起更高效的執行。

4.PLINQ存在的局限與限制

1.若要使PLINQ發揮作用,必須具有一定數量的計算密集型工作可分配給工作者線程。大多數的LINQ to Objects查詢執行速度很快,不僅沒有必要並行化,而且划分、整理和協調額外線程的開銷實際上會降低執行速度。而且查詢若調用了非線程安全的方法,PLINQ的結果有可能不正確。

2.PLINQ能夠並行化的內容還有些限制,以下查詢運算符防止查詢被並行化,除非源元素位於他們的元素索引位置:Take、TakeWhile、Skip和SkipWhileSelect、SelectMany和ElementAt的索引版本。

3.以下查詢運算符是並行化的,但所使用的復雜划分策略有時可能比順序處理的速度還要低:Join、GroupBy、GroupJonin、Distinct、Union、Intersect和Except。

5.PLINQ的結果

和普通LINQ查詢一樣,PLINQ查詢也是延遲求值的。意味着執行只在開始使用時觸發。但是列舉結果集時和普通順序查詢有區別:

順序查詢:完全由使用者從輸入序列中“拉取”每個元素。

並行查詢:通常使用獨立線程來獲取序列中的元素,時間上比使用者需要它們時要提前,再通過查詢鏈並行處理元素后將結果保存在一塊緩存中,以便使用者按需取用。

注意:過早暫停結果列舉,查詢處理器也會暫停或結束,目的是不浪費CPU的時間或內存。在調用AsParallel之后調用WithMergeOptions可以調節PLINQ的緩沖行為。

6.如何使用PLINQ

為何優化將LINQ都並行化是不可取的,因為LINQ能解決大多數問題,執行速度也很快,因此無法從並行化中收益。

一種更好的方式是找出CPU密集的瓶頸,然后考慮通過LINQ的形式表達(這類重構,LINQ往往會使代碼量變少,而且增強可讀性)。

PLINQ十分適用於易並行問題。他還可以很好地處理結構化的阻塞任務。

PLINQ不適於鏡像制作,因為將數百萬元素整理為一個輸出序列將帶來瓶頸,相反將元素寫入一個數組或托管內存塊中,然后使用Parallel類或任務並行管理多線程是更好的選擇。

 

Parallel類

Parallel類是對線程的一個很好的抽象。該類位於System.Threading.Tasks命名空間中,提供了數據和任務並行性

PFX通過Parallel類中的三個靜態方法,提供了一種基本形式的結構化並行機制:

1.Parallel.Invoke

Parallel.Invoke用於並行執行一組委托,示例如下:

1         static void Main(string[] args)
2         {
3             Parallel.Invoke(
4                 () => Console.WriteLine($"當前線程Id:{Thread.CurrentThread.ManagedThreadId}"),
5                 () => Console.WriteLine($"當前線程Id:{Thread.CurrentThread.ManagedThreadId}")
6                 );
7             Console.ReadKey();
8         }

執行結果

Parallel.Invoke方法並行執行一組Action委托,然后等待它們完成。

1 public static void Invoke(params Action[] actions);

示例看起來像是創建和等待兩個Task對象的一種捷徑。但兩者存在重要的區別:
如果傳入一個包含數據量非常大的委托數組時,Parallel.Invoke方法仍然能高效工作,這是因為在底層,Parallel.Invoke方法是將大量元素划分成較小的塊,分配給底層的Task執行,而不是每個委托創建一個獨立Task

2.Parallel.For

Parallel.For執行C# for循環的並行化等價循環,示例如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             //順序循環
 6             {
 7                 for (int i = 0; i < 10; i++)
 8                 {
 9                     Test(i);
10                 }
11             }
12             Console.WriteLine("並行化for開始");
13             //順序執行轉換為並行化
14             {
15                 Parallel.For(0, 10, i => Test(i));
16             }
17             //順序執行轉換為並行化(更簡單的方式)
18             {
19                 Parallel.For(0, 10, Test);
20             }
21             Console.ReadKey();
22         }
23         static void Test(int i)
24         {
25             Console.WriteLine($"當前線程Id:{Thread.CurrentThread.ManagedThreadId},輸出結果為:{i}");
26         }
27     }

結果如下:

3.Parallel.ForEach

Parallel.ForEach執行C# foreach循環的並行化等價循環,示例如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             //順序循環
 7             {
 8                 foreach (string num in data)
 9                 {
10                     Test(num);
11                 }
12             }
13             Console.WriteLine("並行化foreach開始");
14             //順序執行轉換為並行化
15             {
16                 Parallel.ForEach(data, num => Test(num));
17             }
18             Console.ReadKey();
19         }
20         static void Test(string str)
21         {
22             Console.WriteLine($"當前線程Id:{Thread.CurrentThread.ManagedThreadId},輸出結果為:{str}");
23         }
24     }

執行結果:

注意:以上三個方法都會引發阻塞直到所有工作完成為止。和PLINQ一樣,在出現未處理的異常之后,余下的工作者在它們當前的迭代之后停止,而一場將被拋回給調用者,並封裝在一個AggregateException中。

4.索引&跳出(ParallelLoopState)

有時迭代索引很有用處,但是切忌不可像順序循環的用法使用共享變量(循環內i++)的方式使用,因為共享變量值在並行上下文中是線程不安全的

同樣的,因為並行ForForEach中的循環體是一個委托,所以無法使用break語句提前退出循環,必須調用ParallelLoopState對象上的BreakStop方法。

ForEach為例,ForEach重載的其中之一如下,它包含Acton的其中有三個參數(TSourec=子元素,ParallelLoopState=並行循環狀態,long=索引):

1 public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body)

所以,想要得到索引和提前跳出的正確方式如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             Parallel.ForEach(data, (num, state, i) =>
 7             {
 8                 Console.WriteLine($"當前索引為:{i},狀態為:{state}");
 9                 Test(num);
10                 if (num == "six")
11                     state.Break();
12             });
13             Console.ReadKey();
14         }
15         static void Test(string str)
16         {
17             Console.WriteLine($"當前線程Id:{Thread.CurrentThread.ManagedThreadId},輸出結果為:{str}");
18         }
19     }

結果如下:

For的版本如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             Parallel.For(0, data.Length, (i, state) =>
 7             {
 8                 Console.WriteLine($"當前索引為:{i},狀態為:{state}");
 9                 Test(data[i]);
10                 if (data[i] == "six")
11                     state.Break();
12             });
13             Console.ReadKey();
14         }
15         static void Test(string str)
16         {
17             Console.WriteLine($"當前線程Id:{Thread.CurrentThread.ManagedThreadId},輸出結果為:{str}");
18         }
19     }

 

任務並行

對於任務並行的內容,請戳 任務(Task)異步編程(async&await)

 

並發集合概述

.NET 4.0在System.Collections.Concurrent命名空間中提供了一組新的集合。所有這些集合都完全是線程安全的:

這些集合不僅是為使用帶鎖的普通集合提供了快捷方式,而且可以在一般的多線程中使用並發集合,但需要注意
1.並發集合針對並行編程進行了調整。只有在高度並發的應用場景中,傳統集合的性能才能勝過它們

2.線程安全的集合不能確保使用它的代碼也是安全的

3.如果枚舉一個並發集合的同時,另一個線程要修改它,不會拋出任何異常,相反,得到舊內容與新內容的混合。

4.不存在任何List<T>的並發版本。

5.它們的內存利用率沒有非並發的Stack和Queue類高效,但對於並發訪問的效果更好。

1.結構概述

這些並發集合與傳統集合的區別是:它們公開了特殊方法來執行原子測試和行動操作,而這些方法都是通過IProducerConsumerCollection<T>接口提供的。

IProducerConsumerCollection<T>接口代表一個線程安全的生產者/消費者集合,這三個類繼承並實現了IProducerConsumerCollection<T>接口:

ConcurrentStack<T>ConcurrentQueue<T>ConcurrentBag<T>

它們實現的TryAddTryTake方法用於測試一個添加/刪除操作能否執行,如果可以,則執行添加/刪除操作。測試與行動不需要對傳統集合上鎖。

ConcurrentBag<T>用於保存對象的無需集合,適用於調用Take或TryTake時不關心獲取那個元素的額情況。

相對於並發隊列或堆棧,在多線程同時調用一個ConcurrentBag的Add時,不存在競爭,但隊列或堆棧並行調用Add會引起一些競爭,所以ConcurrentBag上調用Take方法非常高效。

BlockingCollection<T>類似阻塞集合,適用於等待新元素的出現,可以把它看作一個容器,使用一個阻塞集合封裝所有實現IProducerConsumerCollection<T>的集合,並且允許從封裝的集合中去除元素,若沒有元素,操作會阻塞

2.基礎方法

常用的一些方法,整理自 zy__ :

ConcurrentQueue:完全無鎖,但面臨資源競爭失敗時可能會陷入自旋並重試操作。

Enqueue:在隊尾插入元素

TryDequeue:嘗試刪除隊頭元素,並通過out參數返回

TryPeek:嘗試將對頭元素通過out參數返回,但不刪除該元素。

ConcurrentStack:完全無鎖,但面臨資源競爭失敗時可能會陷入自旋並重試操作。

Push:向棧頂插入元素

TryPop:從棧頂彈出元素,並且通過out 參數返回

TryPeek:返回棧頂元素,但不彈出。

ConcurrentBag:一個無序的集合,程序可以向其中插入元素,或刪除元素。在同一個線程中向集合插入,刪除元素的效率很高。

Add:向集合中插入元素 

TryTake:從集合中取出元素並刪除 

TryPeek:從集合中取出元素,但不刪除該元素。

BlockingCollection:一個支持界限和阻塞的容器

Add :向容器中插入元素

TryTake:從容器中取出元素並刪除

TryPeek:從容器中取出元素,但不刪除。

CompleteAdding:告訴容器,添加元素完成。此時如果還想繼續添加會發生異常。

IsCompleted:告訴消費線程,生產者線程還在繼續運行中,任務還未完成。

ConcurrentDictionary對於讀操作是完全無鎖的,當很多線程要修改數據時,它會使用細粒度的鎖。

AddOrUpdate:如果鍵不存在,方法會在容器中添加新的鍵和值,如果存在,則更新現有的鍵和值。

GetOrAdd:如果鍵不存在,方法會向容器中添加新的鍵和值,如果存在則返回現有的值,並不添加新值。
TryAdd:嘗試在容器中添加新的鍵和值。

TryGetValue:嘗試根據指定的鍵獲得值。

TryRemove:嘗試刪除指定的鍵。

TryUpdate:有條件的更新當前鍵所對應的值。

GetEnumerator:返回一個能夠遍歷整個容器的枚舉器。

 

結語

根據ConcurrentBag編寫線程安全的生產者消費者請戳:這里 。

說實在的寫這篇文章挺煩的,主要涉及的知識點太多講的太細篇幅會很長況且我自己有些也還沒用過,所以是概述性文章,對PFX有個基本的認識,當需要具體深入使用某些知識時再查詢相關文檔。

關於 並發編程(Concurrent programming)更新到這里基本已經完結,謝謝大家的支持

因個人的興趣,所以准備沉淀下來專攻 數據結構和算法,然后研究 人工智能(Microsoft的人工智能平台Windows ML不會涉及,選擇研究Google的第二代人工智能學習系統TensorFlow )。

接下來會對LinuxPython進行基礎的學習並更新文章。

但是最核心的還是數據結構&算法使用那種編程語言並不重要

感興趣的朋友可以關注。

 

參考文獻

CLR via C#(第4版) Jeffrey Richter

C#高級編程(第10版) C# 6 & .NET Core 1.0   Christian Nagel  

果殼中的C# C#5.0權威指南  Joseph Albahari

...


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM