一直覺得自己對並發了解不夠深入,特別是看了《代碼整潔之道》覺得自己有必要好好學學並發編程,因為性能也是衡量代碼整潔的一大標准。而且在《失控》這本書中也多次提到並發,不管是計算機還是生物都並發處理着各種事物。人真是奇怪,當你關注一個事情的時候,你會發現周圍的事物中就常出現那個事情。所以好奇心驅使下學習並發。便有了此文。
一、理解硬件線程和軟件線程
多核處理器帶有一個以上的物理內核--物理內核是真正的獨立處理單元,多個物理內核使得多條指令能夠同時並行運行。硬件線程也稱為邏輯內核,一個物理內核可以使用超線程技術提供多個硬件線程。所以一個硬件線程並不代表一個物理內核;Windows中每個運行的程序都是一個進程,每一個進程都會創建並運行一個或多個線程,這些線程稱為軟件線程。硬件線程就像是一條泳道,而軟件線程就是在其中游泳的人。
二、並行場合
.Net Framework4 引入了新的Task Parallel Library(任務並行庫,TPL),它支持數據並行、任務並行和流水線。讓開發人員應付不同的並行場合。
- 數據並行:有大量數據需要處理,並且必須對每一份數據執行同樣的操作。比如通過256bit的密鑰對100個Unicode字符串進行AES算法加密。
- 任務並行:通過任務並發運行不同的操作。例如生成文件散列碼,加密字符串,創建縮略圖。
- 流水線:這是任務並行和數據並行的結合體。
TPL引入了System.Threading.Tasks ,主類是Task,這個類表示一個異步的並發的操作,然而我們不一定要使用Task類的實例,可以使用Parallel靜態類。它提供了Parallel.Invoke, Parallel.For Parallel.Forecah 三個方法。
三、Parallel.Invoke
試圖讓很多方法並行運行的最簡單的方法就是使用Parallel類的Invoke方法。例如有四個方法:
- WatchMovie
- HaveDinner
- ReadBook
- WriteBlog
通過下面的代碼就可以使用並行。
System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);
這段代碼會創建指向每一個方法的委托。Invoke方法接受一個Action的參數組。
public static void Invoke(params Action[] actions);
用lambda表達式或匿名委托可以達到同樣的效果。
System.Threading.Tasks.Parallel.Invoke(() => WatchMovie(), () => HaveDinner(), () => ReadBook(), delegate() { WriteBlog(); });
1.沒有特定的執行順序。
Parallel.Invoke方法只有在4個方法全部完成之后才會返回。它至少需要4個硬件線程才足以讓這4個方法並發運行。但並不保證這4個方法能夠同時啟動運行,如果一個或者多個內核處於繁忙狀態,那么底層的調度邏輯可能會延遲某些方法的初始化執行。

給方法加上延時,就可以看到必須等待最長的方法執行完成才回到主方法。
static void Main(string[] args) { System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog); Console.WriteLine("執行完成"); Console.ReadKey(); } static void WatchMovie() { Thread.Sleep(5000); Console.WriteLine("看電影"); } static void HaveDinner() { Thread.Sleep(1000); Console.WriteLine("吃晚飯"); } static void ReadBook() { Thread.Sleep(2000); Console.WriteLine("讀書"); } static void WriteBlog() { Thread.Sleep(3000); Console.WriteLine("寫博客"); }

這樣會造成很多邏輯內核處於長時間閑置狀態。
四、Parallel.For
Parallel.For為固定數目的獨立For循環迭代提供了負載均衡 (即將工作分發到不同的任務中執行,這樣所有的任務在大部分時間都可以保持繁忙) 的並行執行。從而能盡可能地充分利用所有的可用的內核。
我們比較下下面兩個方法,一個使用For循環,一個使用Parallel.For 都是生成密鑰在轉換為十六進制字符串。
private static void GenerateAESKeys() { var sw = Stopwatch.StartNew(); for (int i = 0; i < NUM_AES_KEYS; i++) { var aesM = new AesManaged(); aesM.GenerateKey(); byte[] result = aesM.Key; string hexStr = ConverToHexString(result); } Console.WriteLine("AES:"+sw.Elapsed.ToString()); } private static void ParallelGenerateAESKeys() { var sw = Stopwatch.StartNew(); System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, (int i) => { var aesM = new AesManaged(); aesM.GenerateKey(); byte[] result = aesM.Key; string hexStr = ConverToHexString(result); }); Console.WriteLine("Parallel_AES:" + sw.Elapsed.ToString()); }
private static int NUM_AES_KEYS = 100000; static void Main(string[] args) { Console.WriteLine("執行"+NUM_AES_KEYS+"次:"); GenerateAESKeys(); ParallelGenerateAESKeys(); Console.ReadKey(); }

執行1000000次

這里並行的時間是串行的一半。
五、Parallel.ForEach
在Parallel.For中,有時候對既有循環進行優化可能會是一個非常復雜的任務。Parallel.ForEach為固定數目的獨立For Each循環迭代提供了負載均衡的並行執行,且支持自定義分區器,讓使用者可以完全掌握數據分發。實質就是將所有要處理的數據區分為多個部分,然后並行運行這些串行循環。
修改上面的代碼:
System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range => { var aesM = new AesManaged(); Console.WriteLine("AES Range({0},{1} 循環開始時間:{2})",range.Item1,range.Item2,DateTime.Now.TimeOfDay); for (int i = range.Item1; i < range.Item2; i++) { aesM.GenerateKey(); byte[] result = aesM.Key; string hexStr = ConverToHexString(result); } Console.WriteLine("AES:"+sw.Elapsed.ToString()); });
從執行結果可以看出,分了13個段執行的。
第二次執行還是13個段。速度上稍微有差異。開始沒有指定分區數,Partitioner.Create使用的是內置默認值。

而且我們發現這些分區並不是同時執行的,大致是分了三個時間段執行。而且執行順序是不同的。總的時間和Parallel.For的方法差不多。
public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body)
Parallel.ForEach方法定義了source和Body兩個參數。source是指分區器。提供了分解為多個分區的數據源。body是要調用的委托。它接受每一個已定義的分區作為參數。一共有20多個重載,在上面的例子中,分區的類型為Tuple<int,int>,是一個二元組類型。此外,返回一個ParallelLoopResult的值。
Partitioner.Create 創建分區是根據邏輯內核數及其他因素決定。
public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive) { int num = 3; if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive"); int rangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * num); if (rangeSize == 0) rangeSize = 1; return Partitioner.Create<Tuple<int, int>>(Partitioner.CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); }
因此我們可以修改分區數目,rangesize大致為250000左右。也就是說我的邏輯內核是4.
var rangesize = (int) (NUM_AES_KEYS/Environment.ProcessorCount) + 1; System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1,rangesize), range =>
再次執行:

分區變成了四個,時間上沒有多大差別(第一個時間是串行時間)。我們看見這四個分區幾乎是同時執行的。大部分情況下,TPL在幕后使用的負載均衡機制都是非常高效的,然而對分區的控制便於使用者對自己的工作負載進行分析,來改進整體的性能。
Parallel.ForEach也能對IEnumerable<int>集合進行重構。Enumerable.Range生產了序列化的數目。但這樣就沒有上面的分區效果。
private static void ParallelForEachGenerateMD5HasHes() { var sw = Stopwatch.StartNew(); System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), number => { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); }); Console.WriteLine("MD5:"+sw.Elapsed.ToString()); }
六、從循環中退出
和串行運行中的break不同,ParallelLoopState 提供了兩個方法用於停止Parallel.For 和 Parallel.ForEach的執行。
- Break:讓循環在執行了當前迭代后盡快停止執行。比如執行到100了,那么循環會處理掉所有小於100的迭代。
- Stop:讓循環盡快停止執行。如果執行到了100的迭代,那不能保證處理完所有小於100的迭代。
修改上面的方法:執行3秒后退出。
private static void ParallelLoopResult(ParallelLoopResult loopResult) { string text; if (loopResult.IsCompleted) { text = "循環完成"; } else { if (loopResult.LowestBreakIteration.HasValue) { text = "Break終止"; } else { text = "Stop 終止"; } } Console.WriteLine(text); } private static void ParallelForEachGenerateMD5HasHesBreak() { var sw = Stopwatch.StartNew(); var loopresult= System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (int number,ParallelLoopState loopState) => { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); if (sw.Elapsed.Seconds > 3) { loopState.Stop(); } }); ParallelLoopResult(loopresult); Console.WriteLine("MD5:" + sw.Elapsed); }

七、捕捉並行循環中發生的異常。
當並行迭代中調用的委托拋出異常,這個異常沒有在委托中被捕獲到時,就會變成一組異常,新的System.AggregateException負責處理這一組異常。
private static void ParallelForEachGenerateMD5HasHesException() { var sw = Stopwatch.StartNew(); var loopresult = new ParallelLoopResult(); try { loopresult = System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (number, loopState) => { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); if (sw.Elapsed.Seconds > 3) { throw new TimeoutException("執行超過三秒"); } }); } catch (AggregateException ex) { foreach (var innerEx in ex.InnerExceptions) { Console.WriteLine(innerEx.ToString()); } } ParallelLoopResult(loopresult); Console.WriteLine("MD5:" + sw.Elapsed); }
結果:

異常出現了好幾次。
八、指定並行度。
TPL的方法總會試圖利用所有可用的邏輯內核來實現最好的結果,但有時候你並不希望在並行循環中使用所有的內核。比如你需要留出一個不參與並行計算的內核,來創建能夠響應用戶的應用程序,而且這個內核需要幫助你運行代碼中的其他部分。這個時候一種好的解決方法就是指定最大並行度。
這需要創建一個ParallelOptions的實例,設置MaxDegreeOfParallelism的值。
private static void ParallelMaxDegree(int maxDegree) { var parallelOptions = new ParallelOptions(); parallelOptions.MaxDegreeOfParallelism = maxDegree; var sw = Stopwatch.StartNew(); System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, parallelOptions, (int i) => { var aesM = new AesManaged(); aesM.GenerateKey(); byte[] result = aesM.Key; string hexStr = ConverToHexString(result); }); Console.WriteLine("AES:" + sw.Elapsed.ToString()); }
調用:如果在四核微處理器上運行,那么將使用3個內核。
ParallelMaxDegree(Environment.ProcessorCount - 1);

時間上大致慢了點(第一次Parallel.For 3.18s),但可以騰出一個內核來處理其他的事情。
小結:這次學習了Parallel相關方法以及如何退出並行循環和捕獲異常、設置並行度,還有並行相關的知識。園子里也有類似的博客。但作為自己知識的管理,在這里梳理一遍。
園友的博客:8天玩轉並發
閱讀書籍:《C#並行編程高級教程》

喜歡看書,也喜歡分享書籍(不限技術書籍)的朋友, 誠邀加入書山有路群q:452450927 。大家推薦的書籍太多,喊你來讀。
人的核心競爭力超過一半來自不緊不慢的事——讀書、鍛煉身體、與智者交友,以及業余愛好。
