.Net並行編程(一)-TPL之數據並行


前言

許多個人計算機和工作站都有多個CPU核心,可以同時執行多個線程。利用硬件的特性,使用並行化代碼以在多個處理器之間分配工作。
.NetFramework並行編程架構圖

應用場景

  • 文件批量上傳

並行上傳單個文件。也可以把一個文件拆成幾段分開上傳,加快上傳速度。

  • 數據分批計算

如幾百萬數據可以拆成許多無關聯的部分,並行計算處理。最后聚合。

  • 數據推送

也是需要將數據拆解后,並行推送。

任務並行庫-數據並行

如果在一個循環內在每次迭代只執行少量工作或者它沒有運行多次迭代,那么並行化的開銷可能會導致代碼運行的更慢。使用並行之前,應該對線程(鎖,死鎖,競爭條件)應該有基本的了解。

Parallel.For

        /// <summary> /// 正常循環 /// </summary> public void FormalDirRun() { long totalSize = 0; var dir = @"E:\LearnWall\orleans";//args[1]; String[] files = Directory.GetFiles(dir); stopwatch.Restart(); for (var i = 0; i < files.Length; i++) { FileInfo fi = new FileInfo(files[i]); long size = fi.Length; Interlocked.Add(ref totalSize, size); } stopwatch.Stop(); Console.WriteLine($"FormalDirRun------{files.Length} files, {totalSize} bytes,time:{stopwatch.ElapsedMilliseconds},Dir:{dir}"); } /// <summary> /// 並行循環 /// </summary> public void ParallelForDirRun() { long totalSize = 0; var dir = @"E:\LearnWall\orleans";//args[1]; String[] files = Directory.GetFiles(dir); stopwatch.Restart(); Parallel.For(0, files.Length, index => { FileInfo fi = new FileInfo(files[index]); long size = fi.Length; Interlocked.Add(ref totalSize, size); }); stopwatch.Stop(); Console.WriteLine($"ParallelForDirRun-{files.Length} files, {totalSize} bytes,time:{stopwatch.ElapsedMilliseconds},Dir:{dir}"); } 

從下圖對比接口可以看出當循環體內方法執行時間很短時,並行時間反而更長。這塊會有更細致的補充。

FormalDirRun------20 files, 255618 bytes,time:0,Dir:E:\LearnWall\orleans ParallelForDirRun-20 files, 255618 bytes,time:6,Dir:E:\LearnWall\orleans

我們追加一些延時操作如Thread.Sleep,但這應該不是好好例子...但我只想演示效果就行了。

Thread.Sleep(1000);

查看結果得到,當方法內有阻塞延時一秒后,兩者速度錯了七倍。

FormalDirRun------20 files, 255618 bytes,time:20011,Dir:E:\LearnWall\orleans ParallelForDirRun-20 files, 255618 bytes,time:3007,Dir:E:\LearnWall\orleans

矩陣和秒表示例

Parallel.ForEach

為了並行速度的最大化,我們應該盡量減少在並行內對共享資源的訪問,如Console.Write,文件日志等...但這里為了顯示效果,就用了。

 public void ParallelForEachDirRun() { long totalSize = 0; var dir = @"E:\LearnWall\orleans";//args[1]; String[] files = Directory.GetFiles(dir); stopwatch.Restart(); Parallel.ForEach(files, (current) => { FileInfo fi = new FileInfo(current); long size = fi.Length; Interlocked.Add(ref totalSize, size); Console.WriteLine($"name:{fi.Name}"); }); stopwatch.Stop(); Console.WriteLine($"ParallelForEachDirRun-{files.Length} files, {totalSize} bytes,Time:{stopwatch.ElapsedMilliseconds}"); } 
name:.gitignore
name:build.sh
.
.
.
name:TestAll.cmd
ParallelForEachDirRun-20 files, 255618 bytes,Time:17

Parallel.For 線程局部變量

  public void ParallelForForThreadLocalVariables() { int[] nums = Enumerable.Range(0, 1000000).ToArray(); long total = 0; // Use type parameter to make subtotal a long, not an int Parallel.For<long>(0, nums.Length, () => 0, (j,loop, subtotal) => { subtotal += nums[j]; return subtotal; }, (x) => Interlocked.Add(ref total, x) ); Console.WriteLine("The total is {0:N0}", total); Console.WriteLine("Press any key to exit"); Console.ReadKey(); }

結果如下:

The total is 499,999,509,000

每個For方法的前兩個參數指定開始和結束迭代值。在此方法的重載中,第三個參數是初始化本地狀態的位置。在此上下文中,本地狀態表示一個變量,其生命周期從當前線程上的循環的第一次迭代之前延伸到最后一次迭代之后。

第三個參數的類型是Func ,其中TResult是將存儲線程本地狀態的變量的類型。它的類型由調用泛型For (Int32,Int32,Func ,Func ,Action )方法時提供的泛型類型參數定義,在這種情況下是Int64。type參數告訴編譯器將用於存儲線程局部狀態的臨時變量的類型。在此示例中,表達式() => 0(或Function() 0在Visual Basic中)將線程局部變量初始化為零。如果泛型類型參數是引用類型或用戶定義的值類型,則表達式如下所示:

() => new MyClass() 

這塊內容比較繁瑣,一句話來說:前兩個參數是開始和結束值,第三個是根據For泛型而初始化的值。我其實也沒看太懂這塊。.net Framework源碼如下,.netcore的不知道:

 public static ParallelLoopResult For<TLocal>( int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally) { if (body == null) { throw new ArgumentNullException("body"); } if (localInit == null) { throw new ArgumentNullException("localInit"); } if (localFinally == null) { throw new ArgumentNullException("localFinally"); } return ForWorker( fromInclusive, toExclusive, s_defaultParallelOptions, null, null, body, localInit, localFinally); } /// </summary> /// <typeparam name="TLocal">本地數據的類型.</typeparam> /// <param name="fromInclusive">循環開始數</param> /// <param name="toExclusive">循環結束數</param> /// <param name="parallelOptions">選項</param> /// <param name="body">循環執行體</param> /// <param name="bodyWithState">ParallelState的循環體重載。</param> /// <param name="bodyWithLocal">線程局部狀態的循環體重載。</param> /// <param name="localInit">一個返回新線程本地狀態的選擇器函數。</param> /// <param name="localFinally">清理線程本地狀態的清理函數。</param> /// <remarks>只能提供一個身體參數(即它們是獨占的)。</remarks> /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns> private static ParallelLoopResult ForWorker<TLocal>( int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body, Action<int, ParallelLoopState> bodyWithState, Func<int, ParallelLoopState, TLocal, TLocal> bodyWithLocal, Func<TLocal> localInit, Action<TLocal> localFinally) { . . . }

Parallel.ForEach線程局部變量

       /// <summary> /// /// </summary> public void ParallelForEachThreadLocalVariables() { int[] nums = Enumerable.Range(0, 1000000).ToArray(); long total = 0; // First type parameter is the type of the source elements // Second type parameter is the type of the thread-local variable (partition subtotal) Parallel.ForEach<int, long>(nums, // source collection () => 0, // method to initialize the local variable (j, loop, subtotal) => // method invoked by the loop on each iteration { subtotal += j; //modify local variable return subtotal; // value to be passed to next iteration }, // Method to be executed when each partition has completed. // finalResult is the final value of subtotal for a particular partition. (finalResult) => Interlocked.Add(ref total, finalResult) ); Console.WriteLine("The total from Parallel.ForEach is {0:N0}", total); }

ForEach的源碼如下

        /// <summary> /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> /// in which iterations may run in parallel. /// </summary> /// <typeparam name="TSource">The type of the data in the source.</typeparam> /// <param name="source">An enumerable data source.</param> /// <param name="body">The delegate that is invoked once per iteration.</param> /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> /// argument is null.</exception> /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> /// argument is null.</exception> /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception /// thrown from one of the specified delegates.</exception> /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure /// that contains information on what portion of the loop completed.</returns> /// <remarks> /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> /// enumerable. It is provided with the current element as a parameter. /// </remarks> public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body) { if (source == null) { throw new ArgumentNullException("source"); } if (body == null) { throw new ArgumentNullException("body"); } return ForEachWorker<TSource, object>( source, s_defaultParallelOptions, body, null, null, null, null, null, null); }

取消 Parallel.ForEach或Parallel.For

通過CancellationTokenSource來獲取token

CancellationTokenSource cts = new CancellationTokenSource();

通過ParallelOptions.CancellationToken屬性來控制取消狀態。

ParallelOptions po = new ParallelOptions();

po.CancellationToken = cts.Token;

通過Parallel.For或Foreach的ParallelOptions值來控制並行內方法的取消。

代碼如下:

 int[] nums = Enumerable.Range(0, 10000000).ToArray(); CancellationTokenSource cts = new CancellationTokenSource(); // Use ParallelOptions instance to store the CancellationToken ParallelOptions po = new ParallelOptions(); po.CancellationToken = cts.Token; po.MaxDegreeOfParallelism = System.Environment.ProcessorCount; Console.WriteLine("Press any key to start. Press 'c' to cancel."); Console.ReadKey(); // Run a task so that we can cancel from another thread. Task.Factory.StartNew(() => { var s = Console.ReadKey().KeyChar; if (s == 'c') cts.Cancel(); Console.WriteLine("press any key to exit111"); }); try { Parallel.ForEach(nums, po, (num) => { double d = Math.Sqrt(num); Console.WriteLine("{0} on {1}", d, Thread.CurrentThread.ManagedThreadId); po.CancellationToken.ThrowIfCancellationRequested(); }); } catch (OperationCanceledException e) { Console.WriteLine(e.Message); } finally { cts.Dispose(); } Console.ReadKey();

運行結果如下,鍵盤輸入c時,並行取消。

1937.41838537782 on 7 2739.95711645274 on 8 2501.40660429287 on 9 2958.47798707376 on 10 . . . press any key to exit111 The operation was canceled.

捕獲並行體內的異常

示例方法采用ConcurrentQueue來接收異常集合,最后拋出一個聚合異常AggregateException。

var exceptions = new ConcurrentQueue();

exceptions.Enqueue(e);

外部調用AggregateException.Flatten方法獲取異常信息。

這為我以后捕獲異常提供了一個好思路。

        /// <summary> /// 捕獲並行體內的異常 /// </summary> public void HandleExceptionParallelLoop() { // Create some random data to process in parallel. // There is a good probability this data will cause some exceptions to be thrown. byte[] data = new byte[5000]; Random r = new Random(); r.NextBytes(data); try { ProcessDataInParallel(data); } catch (AggregateException ae) { var ignoredExceptions = new List<Exception>(); // This is where you can choose which exceptions to handle. foreach (var ex in ae.Flatten().InnerExceptions) { if (ex is ArgumentException) Console.WriteLine(ex.Message); else ignoredExceptions.Add(ex); } if (ignoredExceptions.Count > 0) throw new AggregateException(ignoredExceptions); } Console.WriteLine("Press any key to exit."); Console.ReadKey(); } private void ProcessDataInParallel(byte[] data) { // Use ConcurrentQueue to enable safe enqueueing from multiple threads. var exceptions = new ConcurrentQueue<Exception>(); // Execute the complete loop and capture all exceptions. Parallel.ForEach(data, d => { try { // Cause a few exceptions, but not too many. if (d < 3) throw new ArgumentException($"Value is {d}. Value must be greater than or equal to 3."); else Console.Write(d + " "); } // Store the exception and continue with the loop. catch (Exception e) { exceptions.Enqueue(e); } }); Console.WriteLine(); // Throw the exceptions here after the loop completes. if (exceptions.Count > 0) throw new AggregateException(exceptions); }

對微小執行體提速

當Parallel.For循環有一個很快的執行體,它可能比同等順序循環執行更慢。較慢的性能是由分區數據所涉及的開銷和每次循環迭代調用委托的成本引起的。為了解決這種情況,Partitioner類提供了Partitioner.Create方法,該方法使您能夠為委托主體提供順序循環,以便每個分區僅調用一次委托,而不是每次迭代調用一次。

var rangePartitioner = Partitioner.Create(0, source.Length);

        /// <summary> /// 提速 /// </summary> public void SpeedUpMicroParallelBody() { // Source must be array or IList. var source = Enumerable.Range(0, 100000).ToArray(); // Partition the entire source array. var rangePartitioner = Partitioner.Create(0, source.Length); double[] results = new double[source.Length]; // Loop over the partitions in parallel. Parallel.ForEach(rangePartitioner, (range, loopState) => { // Loop over each range element without a delegate invocation. for (int i = range.Item1; i < range.Item2; i++) { results[i] = source[i] * Math.PI; } }); Console.WriteLine("Operation complete. Print results? y/n"); char input = Console.ReadKey().KeyChar; if (input == 'y' || input == 'Y') { foreach (double d in results) { Console.Write("{0} ", d); } } }

源碼地址

CsharpFanDemo

總結

本篇文章沿着微軟官方文檔步驟熟悉了第一部分數據並行的用法。

Parallel.For和Parallel.ForEach實現並行。

Parallel.For和Parallel.ForEach線程局部變量。

取消並行ParallelOptions.CancellationToken

捕捉異常ConcurrentQueue累加並行體內的異常,外部接收。

加速Partitioner.Create

感謝觀看!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM