線程是創建並發的底層工具,對於開發者而言,想實現細粒度並發具有一定的局限性,比如將小的並發組合成大的並發,還有性能方面的影響。
Task可以很好的解決這些問題,Task是一個更高級的抽象概念,代表一個並發操作,但不一定依賴線程完成。
Task從Framework4.0開始引入,Framework4.5又添加了一些功能,比如Task.Run(),async/await關鍵字等,
在.NET Framework4.5之后,基於任務的異步處理已經成為主流模式, (Task-based Asynchronous Pattern,TAP)基於任務的異步模式。
在使用異步函數之前,先看下Task的基本操作。
一. Task 基本操作
1.1 Task 啟動方式
Task.Run(()=>Console.WriteLine("Hello Task"));
Task.Factory.StartNew(()=>Console.WriteLine("Hello Task"));
Task.Run是Task.Factory.StartNew的快捷方式。
啟動的都是后台線程,並且默認都是線程池的線程
Task.Run(() => { Console.WriteLine( $"TaskRun IsBackGround:{CurrentThread.IsBackground}, IsThreadPool:{CurrentThread.IsThreadPoolThread}"); }); Task.Factory.StartNew(() => { Console.WriteLine( $"TaskFactoryStartNew IsBackGround:{CurrentThread.IsBackground}, IsThreadPool:{CurrentThread.IsThreadPoolThread}"); });
如果Task是長任務,可以添加TaskCreationOptions.LongRunning參數,使任務不運行在線程池上,有利於提升性能。
Task.Factory.StartNew(() => { Console.WriteLine( $"TaskFactoryStartNew IsBackGround:{CurrentThread.IsBackground}, IsThreadPool:{CurrentThread.IsThreadPoolThread}"); }, TaskCreationOptions.LongRunning);
1.2 Task 返回值/帶參數
Task 有一個泛型子類Task<TResult>,允許返回一個值。
Task<string> task =Task.Run(()=>SayHello("Jack")); string SayHello(string name) { return "Hello " + name; } Console.WriteLine(task.Result);
通過任務的Result屬性獲取返回值,這是會堵塞線程,尤其是在桌面客戶端程序中,謹慎使用Task.Result,容易導致死鎖!
同時帶參數的方式也不是很合理,后面可以被async/await方式直接替代。
1.3 Task 異常/異常處理
當任務中的代碼拋出一個未處理異常時,調用任務的Wait()或者Result屬性時,異常會被重新拋出。
var task = Task.Run(ThrowError); try { task.Wait(); } catch(AggregateException ex) { Console.WriteLine(ex.InnerException is NullReferenceException ? "Null Error!" : "Other Error"); } void ThrowError() { throw new NullReferenceException(); }
對於自治任務(沒有wait()和Result或者是延續的任務),使用靜態事件TaskScheduler.UnobservedTaskException可以在全局范圍訂閱未觀測的異常。
以便記錄錯誤日志
1.4 Task 延續
延續通常由一個回調方法實現,該方法會在任務完成之后執行,延續方法有兩種
(1)調用任務的GetAwaiter方法,將返回一個awaiter對象。這個對象的OnCompleted方法告知任務當執行完畢或者出錯時調用一個委托。
Task<string> learnTask = Task.Run(Learn); var awaiter = learnTask.GetAwaiter(); awaiter.OnCompleted(() => { var result = awaiter.GetResult(); Console.WriteLine(result); }); string Learn() { Console.WriteLine("Learn Method Executing"); Thread.Sleep(1000); return "Learn End"; }
如果learnTask任務出現錯誤,延續代碼awaiter.GetResult()將重新拋出異常,其中GetResult可以直接得到原始的異常,如果使用Result屬性,只能解析AggergateException.
這種延續方法更適用於富客戶端程序,延續可以提交到同步上下文,延續回到UI線程中。
當編寫庫文件,可以使用ConfigureAwait方法,延續代碼會運行在任務運行的線程上,從而避免不必要的切換開銷。
var awaiter =learnTask.ConfigureAwait(false).GetAwaiter();
(2)另一種方法使用ContiuneWith
Task<string> learnTask = Task.Run(Learn); learnTask.ContinueWith(antecedent => { var result = learnTask.Result; Console.WriteLine(result); }); string Learn() { Console.WriteLine("Learn Method Executing"); Thread.Sleep(1000); return "Learn End"; }
當任務出現錯誤時,必須處理AggregateException, ContiuneWith更適合並行編程場景。
1.5 TaskCompletionSource類使用
從如下源碼中可以看出當實例化TaskCompletionSource時,構造函數會新建一個Task任務。
public class TaskCompletionSource { private readonly Task _task; /// <summary>Creates a <see cref="TaskCompletionSource"/>.</summary> public TaskCompletionSource() => _task = new Task(); /// <summary> /// Gets the <see cref="Tasks.Task"/> created /// by this <see cref="TaskCompletionSource"/>. /// </summary> /// <remarks> /// This property enables a consumer access to the <see cref="Task"/> that is controlled by this instance. /// The <see cref="SetResult"/>, <see cref="SetException(Exception)"/>, <see cref="SetException(IEnumerable{Exception})"/>, /// and <see cref="SetCanceled"/> methods (and their "Try" variants) on this instance all result in the relevant state /// transitions on this underlying Task. /// </remarks> public Task Task => _task; }
它的真正的作用是創建一個不綁定線程的任務。
eg: 可以使用Timer類,CLR在定時之后觸發一個事件,而無需使用線程。
實現通用Delay方法:
Delay(5000).GetAwaiter().OnCompleted(()=>{ Console.WriteLine("Delay End"); }); Task Delay(int millisecond) { var tcs = new TaskCompletionSource<object>(); var timer = new System.Timers.Timer(millisecond) { AutoReset = false }; timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult(null); }; timer.Start(); return tcs.Task; }
這個方法類似Task.Delay()方法。
二. 異步原則(補充)
同步操作:先完成其工作再返回調用者
異步操作:大部分工作則是在返回調用者之后才完成的,也稱非阻塞方法。
異步編程的原則:
(1)以異步的方式編寫運行時間很長(或者可能很長)的函數,會在一個新的線程或者任務上調用這些函數,從而實現需要的並發性。
(2)異步方法的並發性是在長時間運行的方法內啟動的,而不是從這個方法外啟動的。
- I/O密集的並發性的實現不需要綁定線程(如1.5節的例子所示),因此可以提高可伸縮性和效率。
- 富客戶端應用程序可以減少工作線程的代碼,因此可以簡化線程安全性的實現。
Task支持延續,因此非常適合進行異步編程的,如1.5節的Delay方法。
在計算密集的方法中,我們使用Task.Run創建線程相關的異步性。但是異步編程的不同點在於,更希望將異步放在底層調用圖上,
因此富客戶端應用程序的高層方法就可以一直在UI線程上運行,訪問控件、共享狀態而不用擔心會出現線程安全問題。
看Task.Run的例子:
//粗粒度並發 Task.Run(() => DisplayPrimeCounts()); /// <summary> /// 顯示素數個數 /// </summary> void DisplayPrimeCounts() { for (int i = 0; i < 10; i++) Console.WriteLine(GetPrimesCount(i * 1000000 + 2, 1000000) + " primes between " + (i * 1000000) + " and " + ((i + 1) * 1000000 - 1)); Console.WriteLine("Done!"); } /// <summary> /// 獲取素數個數 /// </summary> int GetPrimesCount(int start, int count) { return ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)); }
這是一種粗粒度並發,如果想實現細粒度並發,需要編寫異步的方法。
看異步版本:
DisplayPrimeCountsAsync(); Task DisplayPrimeCountsAsync() { var machine = new PrimesStateMachine(); machine.DisplayPrimeCountsFrom(0); return machine.Task; } class PrimesStateMachine { TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>(); public Task Task { get { return _tcs.Task; } } /// <summary> /// 異步顯示素數個數 /// </summary> /// <param name="i"></param> public void DisplayPrimeCountsFrom(int i) { var awaiter = GetPrimesCountAsync(i * 1000000 + 2, 1000000).GetAwaiter(); awaiter.OnCompleted(() => { Console.WriteLine(awaiter.GetResult()+" primes between " + (i * 1000000) + " and " + ((i + 1) * 1000000 - 1)); if (i++ < 10) DisplayPrimeCountsFrom(i); else { Console.WriteLine("Done"); _tcs.SetResult(null); } }); } /// <summary> /// 異步獲取素數個數 /// </summary> /// <param name="start"></param> /// <param name="count"></param> /// <returns></returns> Task<int> GetPrimesCountAsync(int start, int count) { return Task.Run(() => ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); } }
可以看到改造異步后的實現方式,很復雜。 GetPrimesCountAsync改為方法內部啟動異步,DisplayPrimeCountsFrom通過TaskCompletionSource實現異步。
這時async和await登場!
async和await關鍵字極大的簡化了程序的復雜度。
async/await版本:
DisplayPrimeCountsAsync(); /// <summary> /// 異步顯示素數個數 /// </summary> async Task DisplayPrimeCountsAsync() { for (int i = 0; i < 10; i++) Console.WriteLine(await GetPrimesCountAsync(i * 1000000 + 2, 1000000) + " primes between " + (i * 1000000) + " and " + ((i + 1) * 1000000 - 1)); Console.WriteLine("Done!"); } /// <summary> /// 異步獲取素數個數 /// </summary> Task<int> GetPrimesCountAsync(int start, int count) { return Task.Run(() => ParallelEnumerable.Range(start, count).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0))); }
從編程形式上,有點類似同步方法一樣直觀簡潔。其實async/await編譯器也是將其轉換為一個狀態機。通常我們稱之為C#語法糖。
編譯器背后的原理可以參考這篇文章:https://www.cnblogs.com/zh7791/p/9951478.html
三. 異步函數
這章開始進入異步函數的使用,由上面一章已經引出async/await關鍵字。可以使用同步的代碼風格編寫異步代碼,極大地降低了異步編程的復雜度。
簡單捋下async/await
如下語句中使用了await附加了延續,statement(s)是expression的延續。
這個“等待”被編譯器轉化為如下同等功能的代碼。
這是如果想要成功編譯就必須添加async修飾符,如下圖提示。
async修飾符會指示編譯器將await作為一個關鍵字而非標識符,來避免二義性(C#5之前有可能作為標識符使用),添加async修飾符的方法稱為異步函數。
3.1 富客戶端異步函數Demo
通過WPF的例子展示異步函數在富客戶端應用程序中的作用:在執行計算密集的方法時,仍然保持UI的響應,不堵塞UI線程。
先看同步調用的情況:
private void ExecuteTaskOnClick(object sender, RoutedEventArgs e) { TextBoxMessage.Text = "Call Worker" + Environment.NewLine; DoSomething();//同步調用 } private void DoSomething() { Thread.Sleep(3000);//模擬計算密集耗時 TextBoxMessage.Text += "Calculate Done" + Environment.NewLine; }
上圖可以清楚的看到,當使用同步調用耗時方法時,UI線程無法響應用戶事件請求,TextBox的信息顯示也是等耗時方法結束后才更新。
原因是在耗時方法執行期間,UI線程已經被阻塞,UI線程接收的處理請求都會進入請求隊列,無法及時響應(包括鼠標鍵盤的事件請求,控件更新),很影響用戶體驗。
下面看異步版本:
btnExecuteTaskAsync.Click += (sender, args) => ExecuteTaskAsync(); private async void ExecuteTaskAsync() { btnExecuteTaskAsync.IsEnabled = false; TextBoxMessage.Text = "Call Worker Async" + Environment.NewLine; await DoSomethingAsync();//異步調用 TextBoxMessage.Text += "Calculate Async Done" + Environment.NewLine; btnExecuteTaskAsync.IsEnabled = true; } private async Task DoSomethingAsync() { await Task.Run(() => { Thread.Sleep(3000); //模擬計算密集耗時 }); }
更改為異步版本后,在執行耗時任務時,UI線程沒有被堵塞,可以正常響應用戶事件和控件更新,提高了用戶體驗。
3.2 異步調用執行過程
根據3.1節的例子,整個調用過程如下:
當用戶點擊按鈕時觸發事件,事件調用ExecuteTaskAsync 方法,ExecuteTaskAsync 方法調用DoSomethingAsync方法,而后調用await,而await會使執行點返回給調用者,
當DoSomethingAsync方法完成(或者出現錯誤)時,執行點會從停止之處恢復執行DoSomethingAsync后面的代碼。
ExecuteTaskAsync 方法則會'租用'UI線程的時間,即ExecuteTaskAsync 方法在消息循環注1中是以偽並發的方式執行的(執行會在UI線程的其他事件處理中穿插進行)。
在整個偽並發的過程中,只有await的過程中才會進行搶占,這就簡化了線程的安全性。DoSomethingAsync會運行在工作線程上,正真的並發發生在DoSomethingAsync方法的Task.Run部分,在Task.Run部分盡量避免訪問共享狀態和UI組件。
本小節結尾完善一下上面的例子代碼:
btnExecuteTaskAsync.Click += (sender, args) => ExecuteTaskAsync(); private async void ExecuteTaskAsync() { try { btnExecuteTaskAsync.IsEnabled = false; TextBoxMessage.Text = "Calculate Async Start" + Environment.NewLine; TextBoxMessage.Text += await DoSomethingAsync(); //異步調用 btnExecuteTaskAsync.IsEnabled = true; } catch (Exception e) { TextBoxMessage.Text += $"Error: {e.Message}" + Environment.NewLine; } finally { btnExecuteTaskAsync.IsEnabled = true; } } private async Task<string> DoSomethingAsync() { await Task.Delay(3000); //模擬計算密集耗時 return "Calculate Async Done"; }
增加了ExecuteTaskAsync方法的異常處理,給DoSomethingAsync方法添加了返回值Task<TResult>
還有一些關於優化方面的內容,簡單提一下:
同步完成:執行過程在await之前就返回給調用者,同時這個方法會返回一個已經結束的任務。編譯器會在同步完成的情況下跳過延續代碼,會awaiter的IsCompleted屬性來實現這種優化。
避免大量回彈: 對於一個在循環中多次調用的異步方法,通過調用ConfigureAwait方法可以避免該方法重復回彈到UI消息循環中。
它會阻止任務將延續提交到同步上下文中,將開銷降低到了上下文切換的級別,該優化比較適合編寫程序庫。
四. 異步模式
4.1 取消操作
在並發操作啟動之后,需要能夠取消任務,看如下示例:
private CancellationTokenSource? cts; btnExecuteTaskAsync.Click += (sender, args) => ExecuteTaskAsync(); btnCancel.Click += (sender, args) => ExecuteCancelTask(); private async void ExecuteTaskAsync() { cts = new CancellationTokenSource(); try { btnExecuteTaskAsync.IsEnabled = false; TextBoxMessage.Text = "Calculate Async Start" + Environment.NewLine; TextBoxMessage.Text += await DoSomethingAsync(cts.Token); //異步調用 btnExecuteTaskAsync.IsEnabled = true; } catch (OperationCanceledException) { TextBoxMessage.Text += "任務已經取消!" + Environment.NewLine; } catch (Exception e) { TextBoxMessage.Text += $"Error: {e.Message}" + Environment.NewLine; } finally { btnExecuteTaskAsync.IsEnabled = true; } } private async Task<string> DoSomethingAsync(CancellationToken cancellationToken) { for (int i = 0; i < 3; i++) { await Task.Delay(1000); //模擬計算密集耗時 cancellationToken.ThrowIfCancellationRequested(); } return "Calculate Async Done"; } private void ExecuteCancelTask() { cts?.Cancel(); }
在第3章結尾示例的基礎上,添加了異步函數可取消功能。
通過實例化CancellationTokenSource類,可以得到取消令牌Token,當取消令牌調用Cancel()方法時,就會將IsCancellationRequested屬性設置為True,同時任務會拋出OperationCanceledException。
在設計上將檢查方法取消操作和啟動取消操作分離開來,具有一定的安全性。
檢查取消在CancellationTaken類上,取消動作在CancellationTokenSource類上。
看實際效果:
4.2 進度報告
一些異步操作需要在運行時報告其執行進度。一種簡單的方案時向異步方法傳入一個Action委托,在進度發生變化時就觸發方法,在上面例子上添加了進度報告,如下:
private async void ExecuteTaskAsync() { cts = new CancellationTokenSource(); try { btnExecuteTaskAsync.IsEnabled = false; TextBoxMessage.Text = "Calculate Async Start" + Environment.NewLine; var result = await DoSomethingAsync( (percent) => { TextBoxMessage.Text += "Current progress is " + percent + Environment.NewLine; }, cts.Token); //異步調用 TextBoxMessage.Text += result; btnExecuteTaskAsync.IsEnabled = true; } catch (OperationCanceledException) { TextBoxMessage.Text += "任務已經取消!" + Environment.NewLine; } catch (Exception e) { TextBoxMessage.Text += $"Error: {e.Message}" + Environment.NewLine; } finally { btnExecuteTaskAsync.IsEnabled = true; } } private async Task<string> DoSomethingAsync(Action<string> progressReport, CancellationToken cancellationToken) { for (int i = 1; i <= 10; i++) { await Task.Delay(500); //模擬計算密集耗時 progressReport($"{i * 10}%".ToString()); cancellationToken.ThrowIfCancellationRequested(); } return "Calculate Async Done"; }
實現是簡單,但是在富客戶端應用程序中,有潛在的線程安全問題,由並發性對外暴露所產生的風險。
CLR擁有一對專門針對進度報告的類型:IProgress<T>接口和Progress<T>類 ,它們的作用包裝一個委托,以便是UI應用程序可以通過同步上下文安全地報告進度。
private async void ExecuteTaskAsync() { cts = new CancellationTokenSource(); try { btnExecuteTaskAsync.IsEnabled = false; TextBoxMessage.Text = "Calculate Async Start" + Environment.NewLine; //通過Progress<T>構造函數接受一個Action<T>委托並對其進行包裝 var result = await DoSomethingAsync(new Progress<string>((percent) => { TextBoxMessage.Text += "Current progress is " + percent + Environment.NewLine; }) , cts.Token); //異步調用 TextBoxMessage.Text += result; btnExecuteTaskAsync.IsEnabled = true; } catch (OperationCanceledException) { TextBoxMessage.Text += "任務已經取消!" + Environment.NewLine; } catch (Exception e) { TextBoxMessage.Text += $"Error: {e.Message}" + Environment.NewLine; } finally { btnExecuteTaskAsync.IsEnabled = true; } } private async Task<string> DoSomethingAsync(IProgress<string> progressReport, CancellationToken cancellationToken) { for (int i = 1; i <= 10; i++) { await Task.Delay(500); //模擬計算密集耗時 progressReport.Report($"{i * 10}%".ToString()); cancellationToken.ThrowIfCancellationRequested(); } return "Calculate Async Done"; }
對上面的例子稍作改造,就實現使用IProgress<T>和Progress<T>來完成進度報告。
4.3 基於任務的異步模式TAP
一個TAP方法:
- 返回一個“熱”Task或者Task<TResult>
- 擁有Async后綴,除一些特殊情況或者是任務組合器
- 若支持取消和進度報告,則需要擁有接受CancellationTaken或者IProgress<T>的重載。
- 快速返回調用者
- 對於I/O密集型任務不綁定線程
本文主要參考書籍: C#7.0核心技術指南
注1:UI線程上的消息循環的偽代碼如下: