前面一篇提到例子都是數據並行,但這並不是並行化的唯一形式,在.Net4之前,必須要創建多個線程或者線程池來利用多核技術。現在只需要使用新的Task實例就可以通過更簡單的代碼解決命令式任務並行問題。
1.Task及它的生命周期
一個Task表示一個異步操作,它的創建和執行都是獨立的,因此可以對相關操作的執行擁有完全的控制權;當有很多異步操作作為Task實例加載的時候,為了充分利用運行時的邏輯內核,任務調度器會嘗試並行的運行這些任務,當然任務都是有額外的開銷,雖然要小於添加線程的開銷;
對Task實例的生命周期的理解非常重要。一個Task的執行,取決於底層硬件和運行時可用的資源。因此Task實例的狀態會不斷的發生改變,而一個Task實例只會完成其生命周期一次,當Task到達它三種可能的最終狀態只后,它就回不去之前的任何狀態了。
Task實例有三種可能的初始狀態,Created是Task構造函數創建實例的初始狀態,WaitForActivation是子任務依賴其他任務完成后等待調度的初始狀態,WaitingToRun是通過TaskFactory.StartNew所創建任務的初始狀態。表示正在等待調度器挑選自己並運行。
任務開始執行,狀態就變為TaskStatus.Runing。如果還有子任務,主任務的狀態會轉變到TaskStatus.WaitingForChildrenToComplete狀態。並最終到達,Canceled,Faulted和RunToCompletion 三種狀態。從字面理解就是任務取消,出錯和完成。
2.任務並行。
前面我們通過Parallel.Invoke來並行加載方法。
Parallel.Invoke(GenerateAESKeys,GenerateMD5Has);
通過Task實例也能完成同樣的工作。
var t1 = new Task(GenerateAESKeys); var t2 = new Task(GenerateMD5Has); t1.Start(); t2.Start(); Task.WaitAll(t1, t2);
Start方法對委托進行初始化。 WaitAll方法會等待兩個任務的執行完成之后再往下走。
可以看見,執行過程中,任務的狀態不斷的發生變化。可以給WaitFor方法加上毫秒數。看任務是否會在指定時間內完成。
if(!Task.WaitAll(new[]{t1,t2},3000)) { Console.WriteLine("任務執行超過3秒"); Console.WriteLine(t1.Status.ToString()); Console.WriteLine(t2.Status.ToString()); }
即使到達了指定時間,任務還是繼續執行。
同樣任務本身也是可以等待
if (t1.Wait(3000)) { Console.WriteLine("任務t1執行超過3秒"); Console.WriteLine(t1.Status.ToString()); }
3.通過取消標記取消任務。
可以通過CancellationToken 來中斷任務的執行。這需要再委托中添加一些代碼,創建可以取消的任務。
private static void GenerateMD5HasCancel(CancellationToken ct) { ct.ThrowIfCancellationRequested(); var sw = Stopwatch.StartNew(); for (int i = 0; i < NUM_AES_KEYS; i++) { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); ct.ThrowIfCancellationRequested(); } Console.WriteLine("MD5:" + sw.Elapsed.ToString()); }
Console.WriteLine("任務開始..."); var cts = new CancellationTokenSource(); var ct = cts.Token; var sw = Stopwatch.StartNew(); var t1 = Task.Factory.StartNew(() => GenerateMD5HasCancel(ct), ct); var t2 = Task.Factory.StartNew(() => GenerateAESKeysCancel(ct), ct); //1秒后取消任務 Thread.Sleep(1000); cts.Cancel(); try { if (!Task.WaitAll(new[] { t1,t2}, 1000)) { Console.WriteLine("任務執行超過1秒"); Console.WriteLine(t1.Status.ToString()); } } catch (AggregateException ex) { foreach (var exc in ex.InnerExceptions) { Console.WriteLine(exc.ToString()); } if (t1.IsCanceled) { Console.WriteLine("任務1取消了..."); } Console.WriteLine(sw.Elapsed.ToString()); Console.WriteLine("結束"); }
CancellationTokenSource能夠初始化取消的請求,而CancellationToken能將這些請求傳遞給異步操作;上面的方法通過Task類的Factory方法得到一個TaskFactory實例,相比Task直接創建任務,這個實例可以使用更多的功能。而StartNew 等價於用Task構造函數創建一個Task並調用Start方法執行。
直接在Debug下面運行,程序會在異常的地方中斷。直接運行exe得到上面的結果。
ThrowIfCancellationRequested在每一次循環迭代都會執行,內部是判斷任務取消后拋出一個OperationCanceledException的異常,來避免運行不必要的循環和其他命令。
public void ThrowIfCancellationRequested() { if (IsCancellationRequested) ThrowOperationCanceledException(); } private void ThrowOperationCanceledException() { throw new OperationCanceledException(Environment.GetResourceString("OperationCanceled"), this); }
如果有代碼正在等待取消,還會自動拋出一個TaskCanceledException異常。會包含在AggregateException中。
4.處理異常。
修改上面的方法拋出一個異常。
private static void GenerateMD5HasCancel(CancellationToken ct) { ct.ThrowIfCancellationRequested(); //....if (sw.Elapsed.TotalSeconds > 0.5) { throw new TimeoutException("超時異常0.5秒"); } ct.ThrowIfCancellationRequested(); } Console.WriteLine("MD5:" + sw.Elapsed.ToString()); }
修改Main方法的Catch。
if (t1.IsFaulted) { foreach (var exc in ex.InnerExceptions) { Console.WriteLine(exc.ToString()); } Console.WriteLine(t1.Status.ToString()); }
執行結果:
當出現異常時,任務的狀態就會轉換為Faulted。並不會影響另外一個任務的執行。
5.從任務返回值。
前面的方法都是沒有返回值,得到任務的返回值需要使用Task<TResult>實例,TResult要替換為返回的類型。修改AES方法。返回一個指定前綴的List<String>
GenerateMD5HasList:

private static List<string> GenerateMD5HasList(CancellationToken ct, char prefix) { ct.ThrowIfCancellationRequested(); var sw = Stopwatch.StartNew(); var list = new List<string>(); for (int i = 0; i < NUM_AES_KEYS; i++) { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); if (hexString[0] == prefix) { list.Add(hexString); } ct.ThrowIfCancellationRequested(); } Console.WriteLine("MD5:" + sw.Elapsed); return list; }
Console.WriteLine("任務開始..."); var cts = new CancellationTokenSource(); var ct = cts.Token; var t1 = Task.Factory.StartNew(() => GenerateMD5HasList(ct,'A'), ct); //等待執行完成 t1.Wait(); var res = t1.Result; for (int i = 0; i < res.Count; i++) { Console.WriteLine(res[i]); }
而這時的StartNew創建的類型是Task<List<String>>.StartNew源碼如下:
public Task<TResult> StartNew<TResult>(Func<TResult> function) { StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; Task currTask = Task.InternalCurrent; return Task<TResult>.StartNew(currTask, function, m_defaultCancellationToken, m_defaultCreationOptions, InternalTaskOptions.None, GetDefaultScheduler(currTask), ref stackMark); }
我們還可以將任務串聯起來。比如上面的代碼。避免寫太多代碼來檢查前面一個任務是否完成。而ContinueWith這個方法可以用來串聯多個任務。
var t1 = Task.Factory.StartNew(() => GenerateMD5HasList(ct,'A'), ct); var t2 = t1.ContinueWith((t) => { for (int i = 0; i < t.Result.Count; i++) { Console.WriteLine(t.Result[i]); } }); //可以等待t2執行完成 t2.Wait();
如果需要設置繼續的條件,就要用到TaskContinuationOptions,它是一個枚舉類型,用來控制另一個任務執行和調度的可選行為
var t2 = t1.ContinueWith((t) => OtherMethod(t), TaskContinuationOptions.NotOnCanceled);
NotOnCanceled,就是表示上個任務不取消的情況下執行。例如還有NotOnFaulted.如果上一個任務拋出了異常,那么就不會執行。這里就不一一例舉了。
小結:這一章主要是將了基於任務的編程模型,學習了任務的創建、狀態,以及如何取消、捕獲異常和獲得返回值,並能串行任務,任務的延續不僅能簡化代碼,而且還能幫助調度器對很快就要執行的任務采取正確的操作。下一章學習並發集合。
閱讀書籍:《C#並行編程高級教程》 鏈接: 下載鏈: http://pan.baidu.com/s/1bn1BdBx 密碼: fn2d
喜歡看書,也喜歡分享書籍(不限技術書籍)的朋友,誠邀加入書山有路群q:452450927 。
第三期書山有路,大家正在讀《女人的起源》。 鏈接: http://pan.baidu.com/s/1ntEhMHz 密碼: 84d8
在喜歡你的人那里,去熱愛生活;在不喜歡你的人那里,去看清世界。