.NET Framework 3.5 中引入了語言集成查詢 (LINQ),它具有統一的模型,以類型安全方式查詢任何 System.Collections.IEnumerable或 System.Collections.Generic.IEnumerable<T> 數據源。關於LINQ函數的使用,可以參考:https://www.cnblogs.com/zhaotianff/p/6236062.html
並行 LINQ (PLINQ) 是 LINQ 模式的並行實現
注意:以下示例代碼僅是演示用,其運行速度可能不如等效的順序LINQ查詢快
1、如何進行PLINQ查詢
ParallelEnumerable類提供了一個擴展方法AsParallel()可以並行執行LINQ查詢
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 }; 14 15 var result = list.AsParallel().Where(x => x > 30); 16 } 17 } 18 }
2、PLINQ查詢的參數
WithDegreeOfParallelism:指定 PLINQ 應用於並行化查詢的處理器的最大數量。
WithExecutionMode:指定 PLINQ 應如何並行化查詢(即使是當默認行為是按順序運行查詢時)。
WithMergeOptions:提供有關 PLINQ 應如何(如果可能)將並行結果合並回使用線程上的一個序列的提示。
ParallelMergeOptions 枚舉值如下:
AutoBuffered | 2 | 利用系統選定大小的輸出緩沖區進行合並。 在向查詢使用者提供結果之前,會先將結果累計到輸出緩沖區中。 |
Default | 0 | 使用默認合並類型,即 AutoBuffered。 |
FullyBuffered | 3 | 利用整個輸出緩沖區進行合並。 在向查詢使用者提供任何結果之前,系統會先累計所有結果。 |
NotBuffered | 1 | 不利用輸出緩沖區進行合並。 一旦計算出結果元素,就向查詢使用者提供這些元素。 |
WithCancellation:指定 PLINQ 應定期監視請求取消時所提供的取消標記的狀態以及取消執行。(這個操作和線程的取消操作是一致的,如果不理解線程中的取消操作,可以訪問:https://docs.microsoft.com/zh-cn/dotnet/standard/threading/cancellation-in-managed-threads)
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 var cts = new System.Threading.CancellationTokenSource(); 14 15 try 16 { 17 var result2 = list.AsParallel().Where(x => x > 30) 18 .WithDegreeOfParallelism(Environment.ProcessorCount) 19 .WithExecutionMode(ParallelExecutionMode.ForceParallelism) 20 .WithMergeOptions(ParallelMergeOptions.Default) 21 .WithCancellation(cts.Token); 22 23 PrintResult(result2); 24 } 25 catch(OperationCanceledException) 26 { 27 Console.WriteLine("並行查詢已被取消"); 28 } 29 } 30 31 32 static void PrintResult(IEnumerable<int> collection) 33 { 34 foreach (var item in collection) 35 { 36 Console.WriteLine(item); 37 } 38 } 39 } 40 }
3、PLINQ查詢中常用的函數
AsOrdered:指定 PLINQ 應為查詢的其余部分保留源序列的排序,或直到例如通過使用 orderby(在 Visual Basic 中為 Order By)子句更改排序為止。
AsUnordered:指定保留源序列的排序不需要查詢其余部分的 PLINQ。
AsSequential:指定查詢的其余部分應像非並行的 LINQ 查詢一樣按順序運行。
1 using System;
2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static Func<int, bool> query => x => x > 30;
12 static void Main(string[] args) 13 { 14 var orderResult = list.AsParallel().AsOrdered().Where(query);//asc 15 //list.AsParallel().Where(query).OrderBy(x=>x);//asc 16 //list.AsParallel().Where(query).OrderByDescending(x => x);//desc 17 PrintResult(orderResult); 18 19 var unorderResult = list.AsParallel().AsUnordered().Where(query); 20 PrintResult(unorderResult); 21 22 list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x)); 23 Console.WriteLine("\n\n"); 24 25 var linqResult = list.Where(query); 26 PrintResult(linqResult); 27 28 var pinqSequentialResult = list.AsParallel().AsSequential().Where(query); 29 PrintResult(pinqSequentialResult); 30 31 var forceExecutionMode = list.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).AsSequential().Where(query); 32 PrintResult(forceExecutionMode); 33 } 34 35 36 static void PrintResult(IEnumerable<int> collection) 37 { 38 foreach (var item in collection) 39 { 40 Console.WriteLine(item); 41 } 42 43 Console.WriteLine("\n\n"); 44 } 45 } 46 }
4、ForAll
ForAll是一種多線程枚舉方法,與循環訪問查詢結果不同,它允許在不首先合並回使用者線程的情況下並行處理結果。
如果並行執行查詢,PLINQ 對源序列進行分區,以便多個線程能夠並發處理不同部分,通常是在不同的線程中。 如果要在一個線程(例如,foreach)中使用結果,必須將每個線程的結果合並回一個序列中。 PLINQ 執行的合並類型具體視查詢中的運算符而定。 例如,對結果強制施加新順序的運算符必須緩沖所有線程中的全部元素。 從使用線程(以及應用用戶)的角度來看,完全緩沖查詢可能會運行很長時間,才能生成第一個結果。 默認情況下,其他運算符進行部分緩沖,並分批生成結果。 默認不緩沖的一個運算符是 ForAll。 它會立即生成所有線程中的所有元素。
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static Func<int, bool> query => x => x > 30; 12 static void Main(string[] args) 13 { 14 var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 }; 15 list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x)); 16 Console.WriteLine("\n\n"); 17 } 18 } 19 }
5、處理PLINQ查詢中常用的異常
1 using System;
2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 //普通 LINQ 14 try 15 { 16 tempList.Select(x => 100 / x).ToList(); 17 } 18 catch(DivideByZeroException ex) 19 { 20 Console.WriteLine(ex.Message); 21 } 22 23 //PLINQ 24 try 25 { 26 tempList.AsParallel().Select(x => 100 / x).ToList(); 27 } 28 catch(DivideByZeroException ex) 29 { 30 Console.WriteLine(ex.Message); 31 } 32 catch(AggregateException ex) 33 { 34 Console.WriteLine(ex.Message); 35 } 36 37 } 38 } 39 }
使用順序LINQ查詢:當除以0時,得到了DevideByZeroException異常
使用並行LINQ查詢:當使用Asparallel()運行,除以0時,得到了AggregateException,因為現在是並行的方式運行,AggregateException將包含運行PLINQ查詢期間的所有異常,可以使用Flatten和Handle方法來處理內部的DivideByZeroException類
示例如下:
1 try 2 { 3 tempList.AsParallel().Select(x => 100 / x).ToList(); 4 } 5 catch(DivideByZeroException ex) 6 { 7 Console.WriteLine(ex.Message); 8 } 9 catch(AggregateException ex) 10 { 11 var exceptions = ex.Flatten().InnerExceptions; 12 13 foreach (var item in exceptions) 14 { 15 Console.WriteLine(item); 16 } 17 }
6、數據分區
若要並行執行對數據源的操作,關鍵步驟之一是,將數據源分區 成多個部分,以供多個線程同時訪問。 PLINQ 和任務並行庫 (TPL) 提供了默認分區程序,在用戶編寫並行查詢或 ForEach 循環時透明運行。 對於更高級的方案,可以插入自己的分區程序。
這里有點難,還需要再學習理解一下,后面再更新