【讀書筆記】.Net並行編程高級教程--Parallel


一直覺得自己對並發了解不夠深入,特別是看了《代碼整潔之道》覺得自己有必要好好學學並發編程,因為性能也是衡量代碼整潔的一大標准。而且在《失控》這本書中也多次提到並發,不管是計算機還是生物都並發處理着各種事物。人真是奇怪,當你關注一個事情的時候,你會發現周圍的事物中就常出現那個事情。所以好奇心驅使下學習並發。便有了此文。

一、理解硬件線程和軟件線程

     多核處理器帶有一個以上的物理內核--物理內核是真正的獨立處理單元,多個物理內核使得多條指令能夠同時並行運行。硬件線程也稱為邏輯內核,一個物理內核可以使用超線程技術提供多個硬件線程。所以一個硬件線程並不代表一個物理內核;Windows中每個運行的程序都是一個進程,每一個進程都會創建並運行一個或多個線程,這些線程稱為軟件線程。硬件線程就像是一條泳道,而軟件線程就是在其中游泳的人。

二、並行場合

    .Net Framework4 引入了新的Task Parallel Library(任務並行庫,TPL),它支持數據並行、任務並行和流水線。讓開發人員應付不同的並行場合。

  • 數據並行:有大量數據需要處理,並且必須對每一份數據執行同樣的操作。比如通過256bit的密鑰對100個Unicode字符串進行AES算法加密。
  • 任務並行:通過任務並發運行不同的操作。例如生成文件散列碼,加密字符串,創建縮略圖。
  • 流水線:這是任務並行和數據並行的結合體。

    TPL引入了System.Threading.Tasks ,主類是Task,這個類表示一個異步的並發的操作,然而我們不一定要使用Task類的實例,可以使用Parallel靜態類。它提供了Parallel.Invoke, Parallel.For Parallel.Forecah 三個方法。

三、Parallel.Invoke

     試圖讓很多方法並行運行的最簡單的方法就是使用Parallel類的Invoke方法。例如有四個方法:

  • WatchMovie
  • HaveDinner
  • ReadBook
  • WriteBlog

    通過下面的代碼就可以使用並行。

 System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);

  這段代碼會創建指向每一個方法的委托。Invoke方法接受一個Action的參數組。

public static void Invoke(params Action[] actions);

  用lambda表達式或匿名委托可以達到同樣的效果。

System.Threading.Tasks.Parallel.Invoke(() => WatchMovie(), () => HaveDinner(), () => ReadBook(), delegate() { WriteBlog(); });

 1.沒有特定的執行順序。

   Parallel.Invoke方法只有在4個方法全部完成之后才會返回。它至少需要4個硬件線程才足以讓這4個方法並發運行。但並不保證這4個方法能夠同時啟動運行,如果一個或者多個內核處於繁忙狀態,那么底層的調度邏輯可能會延遲某些方法的初始化執行。

    

給方法加上延時,就可以看到必須等待最長的方法執行完成才回到主方法。

 static void Main(string[] args)
        {
            System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook,
                WriteBlog);
            Console.WriteLine("執行完成");
            Console.ReadKey();
        }

        static void WatchMovie()
        {
            Thread.Sleep(5000);
            Console.WriteLine("看電影");
        }
        static void HaveDinner()
        {
            Thread.Sleep(1000);
            Console.WriteLine("吃晚飯");
        }
        static void ReadBook()
        {
            Thread.Sleep(2000);
            Console.WriteLine("讀書");
        }
        static void WriteBlog()
        {
            Thread.Sleep(3000);
            Console.WriteLine("寫博客");
        }
View Code

這樣會造成很多邏輯內核處於長時間閑置狀態。

四、Parallel.For

Parallel.For為固定數目的獨立For循環迭代提供了負載均衡 (即將工作分發到不同的任務中執行,這樣所有的任務在大部分時間都可以保持繁忙) 的並行執行。從而能盡可能地充分利用所有的可用的內核。

我們比較下下面兩個方法,一個使用For循環,一個使用Parallel.For  都是生成密鑰在轉換為十六進制字符串。

 private static void GenerateAESKeys()
        {
            var sw = Stopwatch.StartNew();
            for (int i = 0; i < NUM_AES_KEYS; i++)
            {
                var aesM = new AesManaged();
                aesM.GenerateKey();
                byte[] result = aesM.Key;
                string hexStr = ConverToHexString(result);
            }
            Console.WriteLine("AES:"+sw.Elapsed.ToString());
        }

 private static void ParallelGenerateAESKeys()
        {
            var sw = Stopwatch.StartNew();
            System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, (int i) =>
            {
                var aesM = new AesManaged();
                aesM.GenerateKey();
                byte[] result = aesM.Key;
                string hexStr = ConverToHexString(result);
            });

            Console.WriteLine("Parallel_AES:" + sw.Elapsed.ToString());
        }
  private static int NUM_AES_KEYS = 100000;
        static void Main(string[] args)
        {
            Console.WriteLine("執行"+NUM_AES_KEYS+"次:");
            GenerateAESKeys(); ParallelGenerateAESKeys();
            Console.ReadKey();
        }

執行1000000次

這里並行的時間是串行的一半。

 

五、Parallel.ForEach

在Parallel.For中,有時候對既有循環進行優化可能會是一個非常復雜的任務。Parallel.ForEach為固定數目的獨立For Each循環迭代提供了負載均衡的並行執行,且支持自定義分區器,讓使用者可以完全掌握數據分發。實質就是將所有要處理的數據區分為多個部分,然后並行運行這些串行循環。

修改上面的代碼:

  System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range =>
            {
                var aesM = new AesManaged();
                Console.WriteLine("AES Range({0},{1} 循環開始時間:{2})",range.Item1,range.Item2,DateTime.Now.TimeOfDay);

                for (int i = range.Item1; i < range.Item2; i++)
                {
                    aesM.GenerateKey();
                    byte[] result = aesM.Key;
                    string hexStr = ConverToHexString(result);
                }
                Console.WriteLine("AES:"+sw.Elapsed.ToString());
            });

從執行結果可以看出,分了13個段執行的。

 

第二次執行還是13個段。速度上稍微有差異。開始沒有指定分區數,Partitioner.Create使用的是內置默認值。

而且我們發現這些分區並不是同時執行的,大致是分了三個時間段執行。而且執行順序是不同的。總的時間和Parallel.For的方法差不多。

 public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body)

Parallel.ForEach方法定義了source和Body兩個參數。source是指分區器。提供了分解為多個分區的數據源。body是要調用的委托。它接受每一個已定義的分區作為參數。一共有20多個重載,在上面的例子中,分區的類型為Tuple<int,int>,是一個二元組類型。此外,返回一個ParallelLoopResult的值。

Partitioner.Create 創建分區是根據邏輯內核數及其他因素決定。

 public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive)
    {
      int num = 3;
      if (toExclusive <= fromInclusive)
        throw new ArgumentOutOfRangeException("toExclusive");
      int rangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * num); if (rangeSize == 0)
        rangeSize = 1;
      return Partitioner.Create<Tuple<int, int>>(Partitioner.CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering);
    }

因此我們可以修改分區數目,rangesize大致為250000左右。也就是說我的邏輯內核是4.

   var rangesize = (int) (NUM_AES_KEYS/Environment.ProcessorCount) + 1;
   System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1,rangesize), range =>

再次執行:

分區變成了四個,時間上沒有多大差別(第一個時間是串行時間)。我們看見這四個分區幾乎是同時執行的。大部分情況下,TPL在幕后使用的負載均衡機制都是非常高效的,然而對分區的控制便於使用者對自己的工作負載進行分析,來改進整體的性能。

Parallel.ForEach也能對IEnumerable<int>集合進行重構。Enumerable.Range生產了序列化的數目。但這樣就沒有上面的分區效果。

 private static void ParallelForEachGenerateMD5HasHes()
        {
            var sw = Stopwatch.StartNew();
            System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), number =>
            {
                var md5M = MD5.Create();
                byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
                byte[] result = md5M.ComputeHash(data);
                string hexString = ConverToHexString(result);
            });
            Console.WriteLine("MD5:"+sw.Elapsed.ToString());
        }

 

六、從循環中退出

和串行運行中的break不同,ParallelLoopState 提供了兩個方法用於停止Parallel.For 和 Parallel.ForEach的執行。

  • Break:讓循環在執行了當前迭代后盡快停止執行。比如執行到100了,那么循環會處理掉所有小於100的迭代。
  • Stop:讓循環盡快停止執行。如果執行到了100的迭代,那不能保證處理完所有小於100的迭代。

修改上面的方法:執行3秒后退出。

  private static void ParallelLoopResult(ParallelLoopResult loopResult)
        {
            string text;
            if (loopResult.IsCompleted)
            {
                text = "循環完成";
            }
            else
            {
                if (loopResult.LowestBreakIteration.HasValue)
                {
                    text = "Break終止";
                }
                else
                {
                    text = "Stop 終止";
                }
            }
            Console.WriteLine(text);
        }


        private static void ParallelForEachGenerateMD5HasHesBreak()
        {
            var sw = Stopwatch.StartNew();
            var loopresult= System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (int number,ParallelLoopState loopState) =>
            {
                var md5M = MD5.Create();
                byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
                byte[] result = md5M.ComputeHash(data);
                string hexString = ConverToHexString(result);
                if (sw.Elapsed.Seconds > 3)
                {
                   loopState.Stop();
                }
            });
            ParallelLoopResult(loopresult);
            Console.WriteLine("MD5:" + sw.Elapsed);
        }

 

七、捕捉並行循環中發生的異常。

  當並行迭代中調用的委托拋出異常,這個異常沒有在委托中被捕獲到時,就會變成一組異常,新的System.AggregateException負責處理這一組異常。

 private static void ParallelForEachGenerateMD5HasHesException()
        {
            var sw = Stopwatch.StartNew();
            var loopresult = new ParallelLoopResult();
            try
            {
                loopresult = System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (number, loopState) =>
                {
                    var md5M = MD5.Create();
                    byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
                    byte[] result = md5M.ComputeHash(data);
                    string hexString = ConverToHexString(result);
                    if (sw.Elapsed.Seconds > 3)
                    {
                        throw new TimeoutException("執行超過三秒");
                    }
                });
            }
            catch (AggregateException ex)
            {
                foreach (var innerEx in  ex.InnerExceptions)
                {
                    Console.WriteLine(innerEx.ToString());
                }
            }
           
            ParallelLoopResult(loopresult);
            Console.WriteLine("MD5:" + sw.Elapsed);
        }

結果:

 異常出現了好幾次。

 八、指定並行度。

TPL的方法總會試圖利用所有可用的邏輯內核來實現最好的結果,但有時候你並不希望在並行循環中使用所有的內核。比如你需要留出一個不參與並行計算的內核,來創建能夠響應用戶的應用程序,而且這個內核需要幫助你運行代碼中的其他部分。這個時候一種好的解決方法就是指定最大並行度。

這需要創建一個ParallelOptions的實例,設置MaxDegreeOfParallelism的值。

 private static void ParallelMaxDegree(int maxDegree)
        {
            var parallelOptions = new ParallelOptions();
            parallelOptions.MaxDegreeOfParallelism = maxDegree;

            var sw = Stopwatch.StartNew();
            System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, parallelOptions, (int i) =>
            {
                var aesM = new AesManaged();
                aesM.GenerateKey();
                byte[] result = aesM.Key;
                string hexStr = ConverToHexString(result);
            });
            Console.WriteLine("AES:" + sw.Elapsed.ToString());
        }

調用:如果在四核微處理器上運行,那么將使用3個內核。

 ParallelMaxDegree(Environment.ProcessorCount - 1);

時間上大致慢了點(第一次Parallel.For 3.18s),但可以騰出一個內核來處理其他的事情。

 

小結:這次學習了Parallel相關方法以及如何退出並行循環和捕獲異常、設置並行度,還有並行相關的知識。園子里也有類似的博客。但作為自己知識的管理,在這里梳理一遍。

園友的博客:8天玩轉並發 

閱讀書籍:《C#並行編程高級教程 

C#並行編程高級教程

 

喜歡看書,也喜歡分享書籍(不限技術書籍)的朋友,  誠邀加入書山有路群q:452450927 。大家推薦的書籍太多,喊你來讀。

        

     人的核心競爭力超過一半來自不緊不慢的事——讀書、鍛煉身體、與智者交友,以及業余愛好。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM