數據與任務的並行---Parallel類


  Parallel類是對線程的抽象,提供數據與任務的並行性。類定義了靜態方法For和ForEach,使用多個任務來完成多個作業。Parallel.For和Parallel.ForEach方法在每次迭代的時候調用相同的代碼,而Parallel.Invoke()方法允許同時調用不同的方法。Parallel.ForEach()方法用於數據的並行性,Parallel.Invoke()方法用於任務的並行性。

1、For()方法

  For()方法用於多次執行一個任務,可以並行運行迭代,但迭代的順序並沒指定。For()方法前兩個參數為定義循環的開始和結束,第三個參數為Action<int>委托。方法的返回值是ParallelLoopResult結構,它提供了是否結束的信息。如以下循環方法,不能保證輸出順序: 

static void ParallelFor()
{
    ParallelLoopResult result =
        Parallel.For(0, 10, async i =>
            {
                Console.WriteLine("{0}, task: {1}, thread: {2}", i,
                   Task.CurrentId, Thread.CurrentThread.ManagedThreadId);

                await Task.Delay(10);//異步方法,用於釋放線程供其他任務使用。完成后,可能看不到方法的輸出,因為主(前台線)程結束,所有的后台線程也將結束
                Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
            });
    Console.WriteLine("Is completed: {0}", result.IsCompleted);
}

  異步功能雖然方便,但是知道后台發生了什么仍然重要,必須留意。

提前停止For()方法

  可以根據條件提前停止For()方法,而不必完成全部的迭代。,傳入參數ParallelLoopState的對象,調用Break()方法或者Stop()方法。如調用Break()方法,當迭代值大於15的時候中斷(當前線程結束,類似於普通for的Continue),但其他任務可以同時運行,有其他值的任務也可以運行(如果當前線程是主線程,那么就等同於Stop(),結束所有線程)。Stop()方法結束的是所有操作(類似於普通for的Break)。利用LowestBreakIteration屬性可以忽略其他任務的結果:

static void ParallelFor()
{
    ParallelLoopResult result = Parallel.For(10, 40, (int i, ParallelLoopState pls) =>
         {
             Console.WriteLine("i: {0} task {1}", i, Task.CurrentId);
             Thread.Sleep(10);
             if (i > 15)
                 pls.Break();
         });
    Console.WriteLine("Is completed: {0}", result.IsCompleted);
    if (!result.IsCompleted)
        Console.WriteLine("lowest break iteration: {0}", result.LowestBreakIteration);
}

  For()方法可以使用幾個線程執行循環。如果要對每個線程進行初始化,就需要使用到For<TLocal>(int, int, Func<TLocal>, Func<int, ParallelLoopState, TLocal, TLocal> , Action<TLocal>)方法。

  • 前兩個參數是對應的循環起始和終止條件;
  • 第二個參數類型是Func<TLocal>,返回一個值,傳遞給第三個參數。
  • 第三個參數類型是Func<int, ParallelLoopState, TLocal, TLocal>,是循環體的委托,其內部的第一個參數是循環迭代,內部第二個參數允許停止迭代,內部第三個參數用於接收For()方法的前一個參數的返回值。循環體應當返回與For()循環泛型類型一致的值。
  • 第四個參數是指定的一個委托,用於執行相關后續操作。
static void ParallelFor()
{
    Parallel.For<string>(0, 20, () =>
      {
          // invoked once for each thread
          Console.WriteLine("init thread {0}, task {1}", Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
          return String.Format("t{0}", Thread.CurrentThread.ManagedThreadId);
      },
      (i, pls, str1) =>
      {
          // invoked for each member
          Console.WriteLine("body i {0} str1 {1} thread {2} task {3}", i, str1, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
          Thread.Sleep(10);
          return String.Format("i {0}", i);
      },
      (str1) =>
      {
          // final action on each thread
          Console.WriteLine("finally {0}", str1);
      });
}

2、使用ForEach()方法循環

  ForEach()方法遍歷實現了IEnumerable的集合,其方式類似於foreach語句,但是以異步方式遍歷,沒有確定的順序。如果要中斷循環,同樣可以采用ParallelLoopState參數。ForEach<TSource>有許多泛型的重載方法。

static void ParallelForeach()
{
    string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", "eleven", "twelve" };

    ParallelLoopResult result = Parallel.ForEach<string>(data, s =>
             {
                 Console.WriteLine(s);
             });
    Parallel.ForEach<string>(data, (s, pls, l) =>
    {
        Console.WriteLine("{0} {1}", s, l);
    });
}

3、調用多個方法

  如果有多個任務並行,可以使用Parallel.Invoke()方法,它提供任務的並行性模式:

static void ParallelInvoke()
{
    Parallel.Invoke(Foo, Bar);
}

static void Foo()
{
    Console.WriteLine("foo");
}

static void Bar()
{
    Console.WriteLine("bar");
}    

4、For()方法的取消

  在For()方法的重載方法中,可以傳遞一個ParallelOptions類型的參數,利用此參數可以傳遞一個CancellationToken參數。使用CancellationTokenSource對象用於注冊CancellationToken,並允許調用Cancel方法用於取消操作。

  一旦取消操作,For()方法就拋出一個OperationCanceledException類型的異常,使用CancellationToken可以注冊取消操作時的信息。調用Register方法,傳遞一個在取消操作時調用的委托。通過取消操作,可以將其他的迭代操作在啟動之前取消,但已經啟動的迭代操作允許完成。取消操作是以協作方式進行的,以避免在取消迭代操作的中間泄露資源。

static void CancelParallelLoop()
{
    var cts = new CancellationTokenSource();
    cts.Token.ThrowIfCancellationRequested();
    cts.Token.Register(() => Console.WriteLine("** token cancelled"));
    // 在500ms后取消標記
    cts.CancelAfter(500);
    try
    {
        ParallelLoopResult result = Parallel.For(0, 100,
            new ParallelOptions()
            {
                CancellationToken = cts.Token
            },
               x =>
               {
                   Console.WriteLine("loop {0} started", x);
                   int sum = 0;
                   for (int i = 0; i < 100; i++)
                   {
                       Thread.Sleep(2);
                       sum += i;
                   }
                   Console.WriteLine("loop {0} finished", x);
               });
    }
    catch (OperationCanceledException ex)
    {
        Console.WriteLine(ex.Message);
    }
}

 5、發現存在的問題

  使用並行循環時,若出現以下兩個問題,需要使用Partitioner(命名空間 System.Collections.Concurrent中)解決。

  1. 使用並行循環時,應確保每次迭代的工作量要明顯大於同步共享狀態的開銷。 如果循環把時間都耗在了阻塞式的訪問共享的循環變量上,那么並行執行的好處就很容易完全喪失。盡可能讓每次循環迭代都只是在局部進行,避免阻塞式訪問造成的損耗。見示例1
  2. 並行循環的每一次迭代都會生成一個委托,如果每次生成委托或方法的開銷比迭代完成的工作量大,使用並行方案就不適合了(委托會設計兩類開銷:構造開銷和調用開銷。大多數調用開銷和普通方法的調用差不多。 但委托是一種對象,構造開銷可能相當大,最好是只做一次構造,然后把對象緩存起來)。見示例2

  示例1中,求1000000000以內所有自然數開方的和。第一部分采用直接計算的方式,第二部分采用分區計算。第二部分的Partitioner 會把需要迭代的區間分拆為多個不同的空間,並存入Tuple對象中。

/*   示例1  */
public
static void PartitionerTest() { //使用計時器 System.Diagnostics.Stopwatch stopwatch = new System.Diagnostics.Stopwatch(); const int maxValue = 1000000000; long sum = 0; stopwatch.Restart();//開始計時 Parallel.For(0, maxValue, (i) => { Interlocked.Add(ref sum, (long )Math.Sqrt(i));//Interlocked是原子操作,多線程訪問時的線程互斥操作 }); stopwatch.Stop(); Console.WriteLine($"Parallel.For:{stopwatch.Elapsed}");//我的機器運行出的時間是:00:01:37.0391204 var partitioner = System.Collections.Concurrent.Partitioner.Create(0, maxValue);//拆分區間 sum = 0; stopwatch.Restart(); Parallel.ForEach(partitioner, (rang) => { long partialSum = 0; //迭代區間的數據 for(int i=rang.Item1;i<rang.Item2;i++) { partialSum += (long)Math.Sqrt(i); } Interlocked.Add(ref sum, partialSum);//原子操作 }); stopwatch.Stop(); Console.WriteLine($"Parallel.ForEach:{stopwatch.Elapsed}"); //我的機器運行出的時間是:00:00:02.7111666 }

  Partitioner的分區是靜態的,只要迭代分區划分完成,每個分區上都會運行一個委托。如果某一段區間的迭代次數提前完成,也不會嘗試重新分區並讓處理器分擔工作。 對於任意IEnumerable<T>類型都可以創建不指定區間的分區,但這樣就會讓每個迭代項目都創建一個委托,而不是對每個區間創建委托。創建自定義的Partitioner可以解決這個問題,代碼比較復雜。請自行參閱:http://www.writinghighperf.net/go/20

  示例2中,采用一個委托方法來計算兩個數之間的關系值。前一種是每次運行都重新構造委托,后一種是先構造出委托的方法而后每一次調用。

 //聲明一個委托
 private delegate int MathOp(int x, int y);
 private int Add(int x,int y)
 {
     return x + y;
 }

 private int DoOperation(MathOp op,int x,int y)
 {
     return op(x, y);
 }

 /*
  * 委托會設計兩類開銷:構造開銷和調用開銷。大多數調用開銷和普通方法的調用差不多。 但委托是一種對象,構造開銷可能相當大,最好是只做一次構造,然后把對象緩存起來。
  */
 public void Test()
 {
     System.Diagnostics.Stopwatch stopwatch = new System.Diagnostics.Stopwatch();
     stopwatch.Restart();
     for(int i=0;i<10;i++)
     {
         //每一次遍歷循環,都會產生一次構造和調用開銷
         DoOperation(Add, 1, 2);
     }
     stopwatch.Stop();
     Console.WriteLine("Construction and invocation: {0}", stopwatch.Elapsed);//00:00:00.0003812

     stopwatch.Restart();
     MathOp op = Add;//只產生一次構造開銷
     for(int i=0;i<10;i++)
     {
         DoOperation(op, 1, 2);//每一次遍歷都只產生遍歷開銷
     }
     stopwatch.Stop();
     Console.WriteLine("Once Construction and invocation: {0}", stopwatch.Elapsed);//00:00:00.0000011
 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM