【讀書筆記】.Net並行編程高級教程(二)-- 任務並行


      前面一篇提到例子都是數據並行,但這並不是並行化的唯一形式,在.Net4之前,必須要創建多個線程或者線程池來利用多核技術。現在只需要使用新的Task實例就可以通過更簡單的代碼解決命令式任務並行問題。

1.Task及它的生命周期

      一個Task表示一個異步操作,它的創建和執行都是獨立的,因此可以對相關操作的執行擁有完全的控制權;當有很多異步操作作為Task實例加載的時候,為了充分利用運行時的邏輯內核,任務調度器會嘗試並行的運行這些任務,當然任務都是有額外的開銷,雖然要小於添加線程的開銷;

      對Task實例的生命周期的理解非常重要。一個Task的執行,取決於底層硬件和運行時可用的資源。因此Task實例的狀態會不斷的發生改變,而一個Task實例只會完成其生命周期一次,當Task到達它三種可能的最終狀態只后,它就回不去之前的任何狀態了。

      

      Task實例有三種可能的初始狀態,Created是Task構造函數創建實例的初始狀態,WaitForActivation是子任務依賴其他任務完成后等待調度的初始狀態,WaitingToRun是通過TaskFactory.StartNew所創建任務的初始狀態。表示正在等待調度器挑選自己並運行。

      任務開始執行,狀態就變為TaskStatus.Runing。如果還有子任務,主任務的狀態會轉變到TaskStatus.WaitingForChildrenToComplete狀態。並最終到達,Canceled,Faulted和RunToCompletion 三種狀態。從字面理解就是任務取消,出錯和完成。

2.任務並行。

     前面我們通過Parallel.Invoke來並行加載方法。

  Parallel.Invoke(GenerateAESKeys,GenerateMD5Has);

     通過Task實例也能完成同樣的工作。

      var t1 = new Task(GenerateAESKeys);
      var t2 = new Task(GenerateMD5Has);
      t1.Start();
      t2.Start();
      Task.WaitAll(t1, t2);

 Start方法對委托進行初始化。 WaitAll方法會等待兩個任務的執行完成之后再往下走。

可以看見,執行過程中,任務的狀態不斷的發生變化。可以給WaitFor方法加上毫秒數。看任務是否會在指定時間內完成。

  if(!Task.WaitAll(new[]{t1,t2},3000))
            {
                Console.WriteLine("任務執行超過3秒");
                Console.WriteLine(t1.Status.ToString());
                Console.WriteLine(t2.Status.ToString());
 }

即使到達了指定時間,任務還是繼續執行。

同樣任務本身也是可以等待

  if (t1.Wait(3000))
   {
    Console.WriteLine("任務t1執行超過3秒");
    Console.WriteLine(t1.Status.ToString());
    }

3.通過取消標記取消任務。

    可以通過CancellationToken 來中斷任務的執行。這需要再委托中添加一些代碼,創建可以取消的任務。

  private static void GenerateMD5HasCancel(CancellationToken ct)
        {
            ct.ThrowIfCancellationRequested(); var sw = Stopwatch.StartNew();
            for (int i = 0; i < NUM_AES_KEYS; i++)
            {
                var md5M = MD5.Create();
                byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i);
                byte[] result = md5M.ComputeHash(data);
                string hexString = ConverToHexString(result);
                ct.ThrowIfCancellationRequested();
            }
            Console.WriteLine("MD5:" + sw.Elapsed.ToString());
        }
            Console.WriteLine("任務開始...");
            var cts = new CancellationTokenSource();
            var ct = cts.Token;
            var sw = Stopwatch.StartNew();
            var t1 = Task.Factory.StartNew(() => GenerateMD5HasCancel(ct), ct);
            var t2 = Task.Factory.StartNew(() => GenerateAESKeysCancel(ct), ct);
            
            //1秒后取消任務
            Thread.Sleep(1000);

            cts.Cancel();

            try
            {
                if (!Task.WaitAll(new[] { t1,t2}, 1000))
                {
                    Console.WriteLine("任務執行超過1秒");
                    Console.WriteLine(t1.Status.ToString());
                }
            }
            catch (AggregateException ex)
            {
                foreach (var exc in ex.InnerExceptions)
                {
                    Console.WriteLine(exc.ToString());
                }
                if (t1.IsCanceled)
                {
                    Console.WriteLine("任務1取消了...");
                }
                Console.WriteLine(sw.Elapsed.ToString());
                Console.WriteLine("結束");
            }
CancellationTokenSource能夠初始化取消的請求,而CancellationToken能將這些請求傳遞給異步操作;上面的方法通過Task類的Factory方法得到一個TaskFactory實例,相比Task直接創建任務,這個實例可以使用更多的功能。而StartNew 等價於用Task構造函數創建一個Task並調用Start方法執行。

直接在Debug下面運行,程序會在異常的地方中斷。直接運行exe得到上面的結果。 

ThrowIfCancellationRequested在每一次循環迭代都會執行,內部是判斷任務取消后拋出一個OperationCanceledException的異常,來避免運行不必要的循環和其他命令。

  public void ThrowIfCancellationRequested()
        {
            if (IsCancellationRequested) 
                ThrowOperationCanceledException();
        }
 private void ThrowOperationCanceledException()
        {
            throw new OperationCanceledException(Environment.GetResourceString("OperationCanceled"), this);
        }

如果有代碼正在等待取消,還會自動拋出一個TaskCanceledException異常。會包含在AggregateException中。

4.處理異常。

  修改上面的方法拋出一個異常。

 private static void GenerateMD5HasCancel(CancellationToken ct)
        {
            ct.ThrowIfCancellationRequested();
             //....if (sw.Elapsed.TotalSeconds > 0.5)
                {
                    throw new TimeoutException("超時異常0.5秒");
                }
                ct.ThrowIfCancellationRequested();
            }
            Console.WriteLine("MD5:" + sw.Elapsed.ToString());
        }

修改Main方法的Catch。

                if (t1.IsFaulted)
                {
                    foreach (var exc in ex.InnerExceptions)
                    {
                        Console.WriteLine(exc.ToString());
                    }
                    Console.WriteLine(t1.Status.ToString());
                }

執行結果:

當出現異常時,任務的狀態就會轉換為Faulted。並不會影響另外一個任務的執行。

5.從任務返回值。

前面的方法都是沒有返回值,得到任務的返回值需要使用Task<TResult>實例,TResult要替換為返回的類型。修改AES方法。返回一個指定前綴的List<String>

GenerateMD5HasList:
 private static List<string> GenerateMD5HasList(CancellationToken ct, char prefix)
        {
            ct.ThrowIfCancellationRequested();
            var sw = Stopwatch.StartNew();
            var list = new List<string>();
            for (int i = 0; i < NUM_AES_KEYS; i++)
            {
                var md5M = MD5.Create();
                byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i);
                byte[] result = md5M.ComputeHash(data);
                string hexString = ConverToHexString(result);
                if (hexString[0] == prefix)
                {
                    list.Add(hexString);
                }
                ct.ThrowIfCancellationRequested();
            }
            Console.WriteLine("MD5:" + sw.Elapsed);
            return list;
        }
View Code
  Console.WriteLine("任務開始...");
            var cts = new CancellationTokenSource();
            var ct = cts.Token;
            
            var t1 = Task.Factory.StartNew(() => GenerateMD5HasList(ct,'A'), ct);
            //等待執行完成
            t1.Wait();

            var res = t1.Result;
            for (int i = 0; i < res.Count; i++)
            {
                Console.WriteLine(res[i]);
            }

而這時的StartNew創建的類型是Task<List<String>>.StartNew源碼如下:

  public Task<TResult> StartNew<TResult>(Func<TResult> function)
        {
            StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
            Task currTask = Task.InternalCurrent;
            return Task<TResult>.StartNew(currTask, function, m_defaultCancellationToken,
                m_defaultCreationOptions, InternalTaskOptions.None, GetDefaultScheduler(currTask), ref stackMark);
        }

我們還可以將任務串聯起來。比如上面的代碼。避免寫太多代碼來檢查前面一個任務是否完成。而ContinueWith這個方法可以用來串聯多個任務。

            var t1 = Task.Factory.StartNew(() => GenerateMD5HasList(ct,'A'), ct);
            var t2 = t1.ContinueWith((t) =>
            {
                for (int i = 0; i < t.Result.Count; i++)
                {
                    Console.WriteLine(t.Result[i]);
                                    }
            });
            //可以等待t2執行完成
            t2.Wait();

如果需要設置繼續的條件,就要用到TaskContinuationOptions,它是一個枚舉類型,用來控制另一個任務執行和調度的可選行為

 var t2 = t1.ContinueWith((t) => OtherMethod(t), TaskContinuationOptions.NotOnCanceled);

 NotOnCanceled,就是表示上個任務不取消的情況下執行。例如還有NotOnFaulted.如果上一個任務拋出了異常,那么就不會執行。這里就不一一例舉了。

 

 小結:這一章主要是將了基於任務的編程模型,學習了任務的創建、狀態,以及如何取消、捕獲異常和獲得返回值,並能串行任務,任務的延續不僅能簡化代碼,而且還能幫助調度器對很快就要執行的任務采取正確的操作。下一章學習並發集合。

 

閱讀書籍:《C#並行編程高級教程 鏈接: 下載鏈: http://pan.baidu.com/s/1bn1BdBx  密碼: fn2d

 C#並行編程高級教程

喜歡看書,也喜歡分享書籍(不限技術書籍)的朋友,誠邀加入書山有路群q:452450927 。

第三期書山有路,大家正在讀《女人的起源》。 鏈接: http://pan.baidu.com/s/1ntEhMHz 密碼: 84d8

 在喜歡你的人那里,去熱愛生活;在不喜歡你的人那里,去看清世界。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM