System.Threading.Tasks.Parallel類提供了Parallel.Invoke,Parallel.For,Parallel.ForEach這三個靜態方法。
1 Parallel.Invoke
盡可能並行執行所提供的每個操作,除非用戶取消了操作。
方法:
1)public static void Invoke(params Action[] actions);
2)public static void Invoke(ParallelOptions parallelOptions,
params Action[] actions);
參數:
parallelOptions:一個對象,用於配置此操作的行為。
Actions:要執行的操作數組
異常:
對方法1:
System.ArgumentNullException: actions 參數為 null。
System.AggregateException:當 actions 數組中的任何操作引發異常時引發的異常。
System.ArgumentException:actions數組包含 null 個元素。
對方法2除上述異常外還包括:
System.OperationCanceledException:parallelOptions 設置了System.Threading.CancellationToken。
System.ObjectDisposedException:在 parallelOptions 中與 System.Threading.CancellationToken 關聯的System.Threading.CancellationTokenSource已被釋放。
說明:
1)Invoke方法只有在actions全部執行完才會返回,即使在執行過程中出現異常也會完成。
2)不能保證actions中的所有操作同時執行。比如actions大小為4,但硬件線程數為2,那么同時運行的操作數最多為2。
3)actions中的操作並行的運行且與順序無關,若編寫與運行順序有關的並發代碼,應選擇其他方法。
4)如果使用Invoke加載多個操作,多個操作運行時間迥異,總的運行時間以消耗時間最長操作為基准,這會導致很多邏輯內核長時間處於空閑狀態。
5)受限的並行可擴展性,這源於Invoke所調用的委托數目是固定的。
2 Parallel.For
可能會並行運行迭代,可以監視和操作循環的狀態。Parallel.For有多個重載的方法,下面列舉部分方法。
方法:
1)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body);
2)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body);
3)public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body);
4)public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);
參數:
fromInclusive:開始索引(含)。
toExclusive:結束索引(不含)。
body:將被每個迭代調用一次的委托。
parallelOptions:一個對象,用於配置此操作的行為。
localInit:一個委托,用於返回每個任務的本地數據的初始狀態。
localFinally:一個委托,用於對每個任務的本地狀態執行一個最終操作。
返回結果:
ParallelLoopResult :包含有關已完成的循環部分的信息。
異常:
System.ArgumentNullException:body 參數為 null,或 localInit 參數為 null,或 localFinally 參數為 null,或 parallelOptions 參數為 null。 System.AggregateException:包含在所有線程上引發的全部單個異常的異常。
對於方法3)和4)除包含以上異常外還包括:
System.OperationCanceledException:在 parallelOptions 設置了參數 System.Threading.CancellationToken。
System.ObjectDisposedException:在 parallelOptions 中與 System.Threading.CancellationToken 關聯的 System.Threading.CancellationTokenSource已被釋放。
說明:
1)不支持浮點和步進。
2)無法保證迭代的執行順序。
3)如果fromInclusive大於或等於toExclusive,方法立即返回而不會執行任何迭代。
4)對於body參數中含有的ParallelLoopState實例,其作用為提早中斷並行循環。
5)只有在迭代全部完成以后才會返回結果,否則循環將一直阻塞。
3 Parallel.ForEach
方法
1)public static ParallelLoopResult ForEach(IEnumerable<TSource> source, Action<TSource> body);
2)public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body);
3)public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body);
參數:
source:數據源
body:將被每個迭代調用一次的委托。
parallelOptions:一個對象,用於配置此操作的行為。
返回結果:
ParallelLoopResult :包含有關已完成的循環部分的信息。
異常:
System.ArgumentNullException:source 參數為 null。-或- 方body 參數為 null。
System.AggregateException:包含了所有線程上引發的全部單個異常。
對於方法2)還包括:
System.OperationCanceledException:在 parallelOptions 設置了參數 System.Threading.CancellationToken。
System.ObjectDisposedException:在 parallelOptions 中與 System.Threading.CancellationToken 關聯的 System.Threading.CancellationTokenSource已被釋放。
對於3)包括的異常為:
System.ArgumentNullException:source 參數為 null。-或- 方body 參數為 null。
System.InvalidOperationException:source 分區程序中的 System.Collections.Concurrent.Partitioner<TSource>.SupportsDynamicPartitions 屬性返回 false。或 在 source 分區程序中的任何方法返回 null 時引發異常。或在source 分區程序中的 System.Collections.Concurrent.Partitioner<TSource>.GetPartitions(System.Int32)方法不返回正確數目的分區。
說明:
1)對於body參數中含有的ParallelLoopState實例,其作用為提早中斷並行循環。
2)Parallel.ForEach方法不保證執行順序,它不像foreach循環那樣總是順序執行。
3)對於方法3)中的source,它的類型是Partitioner<TSource>。可以使用Partitioner.Create方法創建分區,該方法的幾個重整方法為:
l public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive);
l public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive, int rangeSize);
fromInclusive為范圍下限(含),toExclusive為范圍下限(不含),rangeSize為每個子范圍的大小。
使用Partitioner創建的子范圍大小默認大約是計算機內核的三倍,而當使用rangeSize指定范圍大小時,那么子范圍大小為指定值。
4)只有在迭代全部完成以后才會返回結果,否則循環將一直阻塞。
4 ParallelOptions
定義:
存儲選項,用於配置 System.Threading.Tasks.Parallel 類的方法。
ParallelOptions屬性:
1)public CancellationToken CancellationToken { get; set; }
獲取或設置傳播有關應取消操作的通知。
2)public int MaxDegreeOfParallelism { get; set; }
獲取或設置此 ParallelOptions 實例所允許的最大並行度。
3)public TaskScheduler TaskScheduler { get; set; }
獲取或設置與此 System.Threading.Tasks.ParallelOptions 實例關聯的 System.Threading.Tasks.TaskScheduler
說明:
1)通過設置CancellationToken來取消並行循環,當前正在運行的迭代會執行完,然后拋出System.OperationCanceledException類型的異常。
2)TPL的方法總是會試圖利用所有可用內核以達到最好的效果,但是很可能.NET Framework內部使用的啟發式算法所得到的注入和使用的線程數比實際需要的多(通常都會高於硬件線程數,這樣會更好地支持CPU和I/O混合型的工作負載)。
通常將最大並行度設置為小於等於邏輯內核數。如果設置為等於邏輯內核數,那么要確保不會影響其他程序的執行。設置為小於邏輯內核數是為了有空閑內核來處理其他緊急的任務。
用途:
1)從循環外部取消並行循環
2)指定並行度
3)指定自定義任務調度程序
5 ParallelLoopState
定義:
可使並行循環迭代與其他迭代交互。 此類的實例由 Parallel 類提供給每個循環;不能在用戶代碼中創建實例。
方法:
1)Break()方法:通知並行循環在執行完當前迭代之后盡快停止執行,可確保低索引步驟完成。且可確保正在執行的迭代繼續運行直到完成。
2)Stop()方法:通知並行循環盡快停止執行。對於尚未運行的迭代不能會嘗試執行低索引迭代。不保證所有已運行的迭代都執行完。
用途:提早退出並行循環。
說明:
1)不能同時在同一個並行循環中同時使用Break和Stop。
2)Stop比Break更常用。break語句用在並行循環中的效果和用在串行循環中不同。Break用在並行循環中,委托的主體方法在每次迭代的時候被調用,退出委托的主體方法對並行循環的執行沒有影響。Stop停止循環比Break快。
6 ParallelLoopResult結構
定義:
並行循環運行結果的信息。
屬性:
1)public bool IsCompleted { get; }
如果該循環已運行完成(該循環的所有迭代均已執行,並且該循環沒有收到提前結束的請求),則為 true;否則為 false。
2)public long? LowestBreakIteration { get; }
返回一個表示從中調用 Break 語句的最低迭代的整數
用途:判斷當並行循環結束時,是否因調用了break方法或stop方法而提前退出並行循環,或所有迭代均已執行。
判斷依據:
| 條件 |
|
| IsCompleted |
運行完成 |
| !IsCompleted && LowestBreakIteration==null |
使用了Stop語句而提前終止 |
| !IsCompleted && LowestBreakIteration!=null |
使用了Break語句而提前終止 |
7 捕獲並行循環中的異常
原則:
1)異常優先於從循環外部取消和使用Break()方法或Stop()方法提前退出並行循環。
2)並行循環體拋出一個未處理的異常,並行循環就不能再開始新的迭代。
3)默認情況下當某次迭代拋出一個未處理異常,那么正在執行的迭代如果沒拋出異常,正在執行的迭代會執行完。當所有迭代都執行完(有可能其他的迭代在執行的過程中也拋出異常),並行循環將在調用它的線程中拋出異常。
並行循環運行的過程中,可能有多個迭代拋出異常,所以一般使用AggregateException來捕獲異常。AggregateException繼承自Exception。為了防止僅使用AggregateException未能捕獲某些異常,使用AggregateException的同時還要使用Exception。
8 使用模式
8.1 Parallel.Invoke
1 public static void DemonstrateInvoke() 2 { 3 //使用Lambda 4 Parallel.Invoke( 5 () => 6 { 7 //具體操作1 8 }, 9 () => 10 { 11 //具體操作2 12 }); 13 14 //不使用lambda 15 Parallel.Invoke(Operation1, Operation2); 16 } 17 18 private static void Operation1() 19 { 20 //具體操作1 21 } 22 23 private static void Operation2() 24 { 25 //具體操作2 26 }
8.2 Parallel.For
1 串行循環: 2 int toExclusive = ...; 3 for(int i =0;i<=toExclusive;i++){}; 4 5 對應的並行循環: 6 Parallel.For(0, toExclusive+1, (i) => 7 { 8 //具體操作 9 });
8.3 Parallel.ForEach
1 一般用法 2 IEnumerable<string> coll = ...; 3 Parallel.ForEach(coll,(str)=> 4 { 5 //具體操作 6 }); 7 8 基於分區的模式 9 優化分區數,使其最接近系統邏輯內核數: 10 子分區范圍 = 對“待處理集合大小/系統邏輯內核數”取整+1。 11 int logicalCores =...; 12 IEnumerable<string> collP = ...; 13 int fromInclusive = ...; 14 int toExclusiv = ...; 15 int rangeSize = (int)((toExclusiv-fromInclusive )/logicalCores) +1; 16 Parallel.ForEach(Partitioner.Create(fromInclusive, toExclusiv, rangeSize), range => 17 { 18 for (int i = range.Item1; i < range.Item2; i++) 19 { 20 //使用集合:collection[i] 21 } 22 });
8.4 從循環外部取消並行循環
注意:不使用IsCancellationRequested或ThrowIfCancellationRequested()的情況下無法捕獲類型為AggregateException的異常。
1)對於Parallel.For
使用IsCancellationRequested屬性
1 public static void CancelFromExternal() 2 { 3 CancellationTokenSource cts = new CancellationTokenSource(); 4 //其他操作... 5 6 //異步執行Operation方法 7 Task.Factory.StartNew(()=>{Operation(cts);}); 8 //異步執行condition的計算過程 9 Task.Factory.StartNew(()=>{ 10 bool condition = ...; 11 if (condition) cts.Cancel(); 12 } 13 14 //其他操作... 15 } 16 17 private static void Operation(CancellationTokenSource cts) 18 { 19 CancellationToken ct = cts.Token; 20 ParallelOptions op = new ParallelOptions { CancellationToken = ct }; 21 int toExclusive = ...; 22 Parallel.For(0, toExclusive, op, (i) => 23 { 24 25 //其他操作... 26 27 //return只對當前子線程有效 28 if (ct.IsCancellationRequested) 29 { return; } 30 31 //其他操作... 32 }); 33 }
使用ThrowIfCancellationRequested()方法拋出異常
將上面的並行循環部分替換為下面的代碼:
1 Parallel.For(0, toExclusive, op, (i) => 2 { 3 4 //其他操作... 5 6 ct.ThrowIfCancellationRequested(); 7 8 //其他操作... 9 });
不使用IsCancellationRequested和ThrowIfCancellationRequested()方法
將Operation方法中的涉及到IsCancellationRequested和ThrowIfCancellationRequested()方法的代碼去掉
2)對於Parallel.ForEach
使用IsCancellationRequested屬性
1 public static void CancelFromExternal() 2 { 3 //同1)中CancelFromExternal方法 4 } 5 6 private static void Operation(CancellationTokenSource cts) 7 { 8 CancellationToken ct = cts.Token; 9 ParallelOptions op = new ParallelOptions { CancellationToken = ct }; 10 IEnumerable<string> coll = new List<string> { "str1", "str2" }; 11 Parallel.ForEach(coll, op,(str, loopState) => 12 { 13 //其他操作... 14 15 //return只對當前子線程有效 16 if (ct.IsCancellationRequested) 17 { return; } 18 19 //其他操作... 20 }); 21 }
使用ThrowIfCancellationRequested()方法拋出異常
將Operation方法中的:
if (ct.IsCancellationRequested)
{ return; }
替換為:
ct.ThrowIfCancellationRequested();
不使用IsCancellationRequested和ThrowIfCancellationRequested()方法
將Operation方法中的涉及到IsCancellationRequested和ThrowIfCancellationRequested()方法的代碼去掉
8.5 指定並行度
1 int maxDegreeOfParallelism = Environment.ProcessorCount - 1; 2 ParallelOptions op = new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }; 3 IEnumerable<string> coll = new List<string> { }; 4 Parallel.ForEach(coll, op ,(str) => 5 { 6 //具體操作 7 });
8.6 提早退出並行循環
1)對於Parallel.For
1 int toExclusive = 10; 2 Parallel.For(0, toExclusive, (i, loopState) => 3 { 4 //其他操作... 5 //計算condition 6 bool condition = ...; 7 if (condition) 8 { 9 loopState.Break();//或使用loopState.Stop 10 return; 11 } 12 13 //其他操作 14 });
2)對於Parallel.ForEach
1 IEnumerable<string> coll = new List<string> {"str1","str2" }; 2 Parallel.ForEach(coll, (str, loopState) => 3 { 4 //其他操作... 5 6 //計算condition 7 bool condition = ...; 8 if (condition) 9 { 10 loopState.Break();//或使用loopState.Stop 11 return; 12 } 13 14 //其他操作 15 16 });
9 異常處理模式
基本形式
在確保使用AggregateException 能夠捕捉到所有的異常時,可以省去catch(Exception e)的部分。
1 try 2 { 3 //Do something 4 } 5 catch(AggregateException e) 6 { 7 Foreach(Exception ex in e.InnerExceptions) 8 { 9 //Do something 10 } 11 } 12 catch(Exception e) 13 { 14 //Do something 15 }
為上述並行循環使用模式添加異常處理機制
一種方式是把並行循環放入try塊中,另一種方式是在每次迭代的過程中捕獲異常。
-----------------------------------------------------------------------------------------
轉載與引用請注明出處。
時間倉促,水平有限,如有不當之處,歡迎指正。
