C#中的PLINQ(並行LINQ)


.NET Framework 3.5 中引入了語言集成查詢 (LINQ),它具有統一的模型,以類型安全方式查詢任何 System.Collections.IEnumerable或 System.Collections.Generic.IEnumerable<T> 數據源。關於LINQ函數的使用,可以參考:https://www.cnblogs.com/zhaotianff/p/6236062.html

並行 LINQ (PLINQ) 是 LINQ 模式的並行實現

注意:以下示例代碼僅是演示用,其運行速度可能不如等效的順序LINQ查詢快

 

1、如何進行PLINQ查詢

ParallelEnumerable類提供了一個擴展方法AsParallel()可以並行執行LINQ查詢

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace PLINQ_demo
 8 {
 9     class Program
10     {
11         static void Main(string[] args)
12         {
13             var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 };
14 
15             var result = list.AsParallel().Where(x => x > 30);
16         }
17     }
18 }

 

2、PLINQ查詢的參數

WithDegreeOfParallelism:指定 PLINQ 應用於並行化查詢的處理器的最大數量。

WithExecutionMode:指定 PLINQ 應如何並行化查詢(即使是當默認行為是按順序運行查詢時)。

WithMergeOptions:提供有關 PLINQ 應如何(如果可能)將並行結果合並回使用線程上的一個序列的提示。

ParallelMergeOptions 枚舉值如下:

 

AutoBuffered 2

利用系統選定大小的輸出緩沖區進行合並。 在向查詢使用者提供結果之前,會先將結果累計到輸出緩沖區中。

Default 0

使用默認合並類型,即 AutoBuffered。

FullyBuffered 3

利用整個輸出緩沖區進行合並。 在向查詢使用者提供任何結果之前,系統會先累計所有結果。

NotBuffered 1

不利用輸出緩沖區進行合並。 一旦計算出結果元素,就向查詢使用者提供這些元素。

WithCancellation:指定 PLINQ 應定期監視請求取消時所提供的取消標記的狀態以及取消執行。(這個操作和線程的取消操作是一致的,如果不理解線程中的取消操作,可以訪問:https://docs.microsoft.com/zh-cn/dotnet/standard/threading/cancellation-in-managed-threads

 

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace PLINQ_demo
 8 {
 9     class Program
10     {
11         static void Main(string[] args)
12         {           
13             var cts = new System.Threading.CancellationTokenSource();
14 
15             try
16             {
17                 var result2 = list.AsParallel().Where(x => x > 30)
18                 .WithDegreeOfParallelism(Environment.ProcessorCount)
19                 .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
20                 .WithMergeOptions(ParallelMergeOptions.Default)
21                 .WithCancellation(cts.Token);
22 
23                 PrintResult(result2);
24             }
25             catch(OperationCanceledException)
26             {
27                 Console.WriteLine("並行查詢已被取消");
28             }
29         }
30 
31 
32         static void PrintResult(IEnumerable<int> collection)
33         {
34             foreach (var item in collection)
35             {
36                 Console.WriteLine(item);
37             }
38         }
39     }
40 }

 

3、PLINQ查詢中常用的函數

AsOrdered:指定 PLINQ 應為查詢的其余部分保留源序列的排序,或直到例如通過使用 orderby(在 Visual Basic 中為 Order By)子句更改排序為止。

AsUnordered:指定保留源序列的排序不需要查詢其余部分的 PLINQ。

AsSequential:指定查詢的其余部分應像非並行的 LINQ 查詢一樣按順序運行。

 1 using System;
 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10  { 11 static Func<int, bool> query => x => x > 30;
12 static void Main(string[] args) 13 { 14 var orderResult = list.AsParallel().AsOrdered().Where(query);//asc 15 //list.AsParallel().Where(query).OrderBy(x=>x);//asc 16 //list.AsParallel().Where(query).OrderByDescending(x => x);//desc 17 PrintResult(orderResult); 18 19 var unorderResult = list.AsParallel().AsUnordered().Where(query); 20 PrintResult(unorderResult); 21 22 list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x)); 23 Console.WriteLine("\n\n"); 24 25 var linqResult = list.Where(query); 26 PrintResult(linqResult); 27 28 var pinqSequentialResult = list.AsParallel().AsSequential().Where(query); 29 PrintResult(pinqSequentialResult); 30 31 var forceExecutionMode = list.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).AsSequential().Where(query); 32 PrintResult(forceExecutionMode); 33 } 34 35 36 static void PrintResult(IEnumerable<int> collection) 37 { 38 foreach (var item in collection) 39 { 40 Console.WriteLine(item); 41 } 42 43 Console.WriteLine("\n\n"); 44 } 45 } 46 }

 

4、ForAll

ForAll是一種多線程枚舉方法,與循環訪問查詢結果不同,它允許在不首先合並回使用者線程的情況下並行處理結果。

如果並行執行查詢,PLINQ 對源序列進行分區,以便多個線程能夠並發處理不同部分,通常是在不同的線程中。 如果要在一個線程(例如,foreach)中使用結果,必須將每個線程的結果合並回一個序列中。 PLINQ 執行的合並類型具體視查詢中的運算符而定。 例如,對結果強制施加新順序的運算符必須緩沖所有線程中的全部元素。 從使用線程(以及應用用戶)的角度來看,完全緩沖查詢可能會運行很長時間,才能生成第一個結果。 默認情況下,其他運算符進行部分緩沖,並分批生成結果。 默認不緩沖的一個運算符是 ForAll。 它會立即生成所有線程中的所有元素。

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace PLINQ_demo
 8 {
 9     class Program
10     {
11         static Func<int, bool> query => x => x > 30;
12         static void Main(string[] args)
13         {
14             var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 };
15             list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x));
16             Console.WriteLine("\n\n");        
17         }
18     }
19 }

 

5、處理PLINQ查詢中常用的異常

 1 using System;
 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10  { 11 static void Main(string[] args) 12  { 13 //普通 LINQ 14 try 15  { 16 tempList.Select(x => 100 / x).ToList(); 17  } 18 catch(DivideByZeroException ex) 19  { 20  Console.WriteLine(ex.Message); 21  } 22 23 //PLINQ 24 try 25  { 26 tempList.AsParallel().Select(x => 100 / x).ToList(); 27  } 28 catch(DivideByZeroException ex) 29  { 30  Console.WriteLine(ex.Message); 31  } 32 catch(AggregateException ex) 33  { 34  Console.WriteLine(ex.Message); 35  } 36 37  } 38  } 39 }

使用順序LINQ查詢:當除以0時,得到了DevideByZeroException異常 

使用並行LINQ查詢:當使用Asparallel()運行,除以0時,得到了AggregateException,因為現在是並行的方式運行,AggregateException將包含運行PLINQ查詢期間的所有異常,可以使用Flatten和Handle方法來處理內部的DivideByZeroException

示例如下:

 

 1 try
 2             {
 3                 tempList.AsParallel().Select(x => 100 / x).ToList();              
 4             }
 5             catch(DivideByZeroException ex)
 6             {
 7                 Console.WriteLine(ex.Message);
 8             }
 9             catch(AggregateException ex)
10             {
11                 var exceptions = ex.Flatten().InnerExceptions;
12 
13                 foreach (var item in exceptions)
14                 {
15                     Console.WriteLine(item);
16                 }
17             }

 6、數據分區

若要並行執行對數據源的操作,關鍵步驟之一是,將數據源分區 成多個部分,以供多個線程同時訪問。 PLINQ 和任務並行庫 (TPL) 提供了默認分區程序,在用戶編寫並行查詢或 ForEach 循環時透明運行。 對於更高級的方案,可以插入自己的分區程序。

這里有點難,還需要再學習理解一下,后面再更新


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM