菜鳥學習並行編程,參考《C#並行編程高級教程.PDF》,如有錯誤,歡迎指正。
目錄
任務簡介
TPL引入新的基於任務的編程模型,通過這種編程模型可以發揮多核的功效,提升應用程序的性能,不需要編寫底層復雜且重量級的線程代碼。
但需要注意:任務並不是線程(任務運行的時候需要使用線程,但並不是說任務取代了線程,任務代碼是使用底層的線程(軟件線程,調度在特定的硬件線程或邏輯內核上)運行的,任務與線程之間並沒有一對一的關系。)
創建一個新的任務時,調度器(調度器依賴於底層的線程池引擎)會使用工作竊取隊列找到一個最合適的線程,然后將任務加入隊列,任務所包含的代碼會在一個線程中運行。如圖:
System.Threading.Tasks.Task
一個Task表示一個異步操作,Task提供了很多方法和屬性,通過這些方法和屬性能夠對Task的執行進行控制,並且能夠獲得其狀態信息。
Task的創建和執行都是獨立的,因此可以對關聯操作的執行擁有完全的控制權。
使用Parallel.For、Parallel.ForEach的循環迭代的並行執行,TPL會在后台創建System.Threading.Tasks.Task的實例。
使用Parallel.Invoke時,TPL也會創建與調用的委托數目一致的System.Threading.Tasks.Task的實例。
注意項
程序中添加很多異步的操作作為Task實例加載的時候,為了充分利用運行時所有可用的邏輯內核,任務調度器會嘗試的並行的運行這些任務,也會嘗試在所有的可用內核上對工作進行負載均衡。
但在實際的編碼過程當中,並不是所有的代碼片段都能夠方便的用任務來運行,因為任務會帶來額外的開銷,盡管這種開銷比添加線程所帶來的開銷要小,但是仍然需要將這個開銷考慮在內。
Task狀態與生命周期
一個Task實例只會完成其生命周期一次,當Task到達它的3種肯呢過的最終狀態之一是,就無法回到之前的任何狀態
下面貼代碼,詳解見注釋,方便大家理解Task的狀態:

class Program { /* coder:釋迦苦僧 */ static void Main(string[] args) { /* 創建一個任務 不調用 不執行 狀態為Created */ Task tk = new Task(() => { }); Console.WriteLine(tk.Status.ToString()); /* 創建一個任務 執行 狀態為 WaitingToRun */ Task tk1 = new Task(() => { }); tk1.Start();/*對於安排好的任務,就算調用Start方法也不會立馬啟動 此時任務的狀態為WaitingToRun*/ Console.WriteLine(tk1.Status.ToString()); /* 創建一個主任務 */ Task mainTask = new Task(() => { SpinWait.SpinUntil(() => { return false; }, 30000); }); /* 將子任務加入到主任務完成之后執行 */ Task subTask = mainTask.ContinueWith((t1) => { }); /* 啟動主任務 */ mainTask.Start(); /* 此時子任務狀態為 WaitingForActivation */ Console.WriteLine(subTask.Status.ToString()); /* 創建一個任務 執行 后 等待一段時間 並行未結束的情況下 狀態為 Running */ Task tk2 = new Task(() => { SpinWait.SpinUntil(() => false, 30000); }); tk2.Start();/*對於安排好的任務,就算調用Start方法也不會立馬啟動*/ SpinWait.SpinUntil(() => false, 300); Console.WriteLine(tk2.Status.ToString()); /* 創建一個任務 然后取消該任務 狀態為Canceled */ CancellationTokenSource cts = new CancellationTokenSource(); Task tk3 = new Task(() => { for (int i = 0; i < int.MaxValue; i++) { if (!cts.Token.IsCancellationRequested) { cts.Token.ThrowIfCancellationRequested(); } } }, cts.Token); tk3.Start();/*啟動任務*/ SpinWait.SpinUntil(() => false, 100); cts.Cancel();/*取消該任務執行 但並非立馬取消 所以對於Canceled狀態也不會立馬生效*/ SpinWait.SpinUntil(() => false, 1000); Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled); SpinWait.SpinUntil(() => false, 1000); Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled); SpinWait.SpinUntil(() => false, 1000); Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled); /*創建一個任務 讓它成功的運行完成 會得到 RanToCompletion 狀態*/ Task tk4 = new Task(() => { SpinWait.SpinUntil(() => false, 10); }); tk4.Start(); SpinWait.SpinUntil(() => false, 300); Console.WriteLine(tk4.Status.ToString()); /*創建一個任務 讓它運行失敗 會得到 Faulted 狀態*/ Task tk5 = new Task(() => { throw new Exception(); }); tk5.Start(); SpinWait.SpinUntil(() => false, 300); Console.WriteLine(tk5.Status.ToString()); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
使用任務來對代碼進行並行化
使用Parallel.Invoke可以並行加載多個方法,使用Task實例也能完成同樣的工作,下面貼代碼:

class Program { private static ConcurrentQueue<Product> queue = null; /* coder:釋迦苦僧 */ static void Main(string[] args) { queue = new ConcurrentQueue<Product>(); Task tk1 = new Task(() => { SetProduct(1); SetProduct(3);}); Task tk2 = new Task(() => SetProduct(2)); tk1.Start(); tk2.Start(); Console.ReadLine(); } static void SetProduct(int index) { Parallel.For(0, 10000, (i) => { Product model = new Product(); model.Name = "Name" + i; model.SellPrice = i; model.Category = "Category" + i; queue.Enqueue(model); }); Console.WriteLine("SetProduct {0} 執行完成", index); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
等待任務完成Task.WaitAll
Task.WaitAll 方法,這個方法是同步執行的,在Task作為參數被接受,所有Task結束其執行前,主線程不會繼續執行下一條指令,下面貼代碼

class Program { private static ConcurrentQueue<Product> queue = null; /* coder:釋迦苦僧 */ static void Main(string[] args) { queue = new ConcurrentQueue<Product>(); Task tk1 = new Task(() => { SetProduct(1); SetProduct(3); }); Task tk2 = new Task(() => SetProduct(2)); tk1.Start(); tk2.Start(); /*等待任務執行完成后再輸出 ====== */ Task.WaitAll(tk1, tk2); Console.WriteLine("等待任務執行完成后再輸出 ======"); Task tk3 = new Task(() => { SetProduct(1); SetProduct(3); }); Task tk4 = new Task(() => SetProduct(2)); tk3.Start(); tk4.Start(); /*等待任務執行前輸出 ====== */ Console.WriteLine("等待任務執行前輸出 ======"); Task.WaitAll(tk3, tk4); Console.ReadLine(); } static void SetProduct(int index) { Parallel.For(0, 10000, (i) => { Product model = new Product(); model.Name = "Name" + i; model.SellPrice = i; model.Category = "Category" + i; queue.Enqueue(model); }); Console.WriteLine("SetProduct {0} 執行完成", index); } }
Task.WaitAll 限定等待時長

queue = new ConcurrentQueue<Product>(); Task tk1 = new Task(() => { SetProduct(1); SetProduct(3);}); Task tk2 = new Task(() => SetProduct(2)); tk1.Start(); tk2.Start(); /*如果tk1 tk2 沒能在10毫秒內完成 則輸出 ***** */ if (!Task.WaitAll(new Task[] { tk1, tk2 }, 10)) { Console.WriteLine("******"); } Console.ReadLine();
如圖10毫秒沒有完成任務,則輸出了****
通過取消標記取消任務
通過取消標記來中斷Task實例的執行。 CancellationTokenSource,CancellationToken下的IsCanceled屬性標志當前是否已經被取消,取消任務,任務也不一定會馬上取消,下面貼代碼:

class Program { private static ConcurrentQueue<Product> queue = null; /* coder:釋迦苦僧 */ static void Main(string[] args) { queue = new ConcurrentQueue<Product>(); System.Threading.CancellationTokenSource token = new CancellationTokenSource(); Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token)); Task tk2 = Task.Factory.StartNew(() => SetProduct(token.Token)); Thread.Sleep(10); /*取消任務操作*/ token.Cancel(); try { /*等待完成*/ Task.WaitAll(new Task[] { tk1, tk2 }); } catch (AggregateException ex) { /*如果當前的任務正在被取消,那么還會拋出一個TaskCanceledException異常,這個異常包含在AggregateException異常中*/ Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled); Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled); } Thread.Sleep(2000); Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled); Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled); Console.ReadLine(); } static void SetProduct(System.Threading.CancellationToken ct) { /* 每一次循環迭代,都會有新的代碼調用 ThrowIfCancellationRequested * 這行代碼能夠對 OpreationCanceledException 異常進行觀察 * 並且這個異常的標記與Task實例關聯的那個標記進行比較,如果兩者相同 ,而且IsCancelled屬性為True,那么Task實例就知道存在一個要求取消的請求,並且會將狀態轉變為Canceled狀態,中斷任務執行。 * 如果當前的任務正在被取消,那么還會拋出一個TaskCanceledException異常,這個異常包含在AggregateException異常中 /*檢查取消標記*/ ct.ThrowIfCancellationRequested(); for (int i = 0; i < 50000; i++) { Product model = new Product(); model.Name = "Name" + i; model.SellPrice = i; model.Category = "Category" + i; queue.Enqueue(model); ct.ThrowIfCancellationRequested(); } Console.WriteLine("SetProduct 執行完成"); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
Task異常處理 當很多任務並行運行的時候,可能會並行發生很多異常。Task實例能夠處理一組一組的異常,這些異常有System.AggregateException類處理

class Program { private static ConcurrentQueue<Product> queue = null; /* coder:釋迦苦僧 */ static void Main(string[] args) { queue = new ConcurrentQueue<Product>(); System.Threading.CancellationTokenSource token = new CancellationTokenSource(); Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token)); Thread.Sleep(2000); if (tk1.IsFaulted) { /* 循環輸出異常 */ foreach (Exception ex in tk1.Exception.InnerExceptions) { Console.WriteLine("tk1 Exception:{0}", ex.Message); } } Console.ReadLine(); } static void SetProduct(System.Threading.CancellationToken ct) { for (int i = 0; i < 5; i++) { throw new Exception(string.Format("Exception Index {0}", i)); } Console.WriteLine("SetProduct 執行完成"); } }
Task返回值 Task<TResult>

class Program { /* coder:釋迦苦僧 */ static void Main(string[] args) { Task<List<Product>> tk1 = Task<List<Product>>.Factory.StartNew(() => SetProduct()); Task.WaitAll(tk1); Console.WriteLine(tk1.Result.Count); Console.WriteLine(tk1.Result[0].Name); Console.ReadLine(); } static List<Product> SetProduct() { List<Product> result = new List<Product>(); for (int i = 0; i < 500; i++) { Product model = new Product(); model.Name = "Name" + i; model.SellPrice = i; model.Category = "Category" + i; result.Add(model); } Console.WriteLine("SetProduct 執行完成"); return result; } }
通過延續串聯多個任務
ContinueWith:創建一個目標Task完成時,異步執行的延續程序,await,如代碼所示:

class Program { /* coder:釋迦苦僧 */ static void Main(string[] args) { /*創建任務t1*/ Task t1 = Task.Factory.StartNew(() => { Console.WriteLine("執行 t1 任務"); SpinWait.SpinUntil(() => { return false; }, 2000); }); /*創建任務t2 t2任務的執行 依賴與t1任務的執行完成*/ Task t2 = t1.ContinueWith((t) => { Console.WriteLine("執行 t2 任務"); SpinWait.SpinUntil(() => { return false; }, 2000); }); /*創建任務t3 t3任務的執行 依賴與t2任務的執行完成*/ Task t3 = t2.ContinueWith((t) => { Console.WriteLine("執行 t3 任務"); }); Console.ReadLine(); } }
TaskContinuationOptions
TaskContinuationOptions參數,可以控制延續另一個任的任務調度和執行的可選行為。下面看代碼:

class Program { /* coder:釋迦苦僧 */ static void Main(string[] args) { /*創建任務t1*/ Task t1 = Task.Factory.StartNew(() => { Console.WriteLine("執行 t1 任務"); SpinWait.SpinUntil(() => { return false; }, 2000); throw new Exception("異常"); }); /*創建任務t2 t2任務的執行 依賴與t1任務的執行完成*/ Task t2 = t1.ContinueWith((t) => { Console.WriteLine(t.Status); Console.WriteLine("執行 t2 任務"); SpinWait.SpinUntil(() => { return false; }, 2000); /*定義 TaskContinuationOptions 行為為 NotOnFaulted 在 t1 任務拋出異常后,t1 的任務狀態為 Faulted , 則t2 不會執行里面的方法 但是需要注意的是t3任務*/ /*t2在不符合條件時 返回Canceled狀態狀態讓t3任務執行*/ }, TaskContinuationOptions.NotOnFaulted); /*創建任務t3 t3任務的執行 依賴與t2任務的執行完成*/ /*t2在不符合條件時 返回Canceled狀態狀態讓t3任務執行*/ Task t3 = t2.ContinueWith((t) => { Console.WriteLine(t.Status); Console.WriteLine("執行 t3 任務"); }); Console.ReadLine(); } }
TaskContinuationOptions 屬性有很多,如下所示
關於並行編程中的Task就寫到這,如有問題,請指正。
作者:釋迦苦僧 出處:http://www.cnblogs.com/woxpp/p/3928788.html
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接。