目錄結構:
在這篇文章中,筆者將會討論如何執行異步的IO操作。上面一篇文章,筆者介紹了如何執行異步計算操作。在讀完本文后,將異步應用到IO操作中,能夠提高讀取的效率。
1.為什么需要異步IO操作
關於異步操作,想必讀者已知道異步IO操作,筆者在這里展示FileStream類讀取本地文件的過程。首先展示FileStream類同步讀取IO的流程圖。
上面的執行流程中,在第4步Windows將IRP數據包傳送給恰當的設備驅動的IRP隊列(每個設備驅動程序都維護着自己的IRP隊列,其中包含了機器上運行的所有進程發出的I/O請求)。在IRP數據包到達時,設備驅動程序將IRP信息傳給物理硬件設備上的安裝電路板,然后由硬件驅動設備執行請求的I/O操作,也就是第5個步驟。
當硬件驅動設備執行I/O操作期間,發出了I/O請求的線程將無事可做,所以Windows將線程變成睡眠線程,防止它浪費CPU的時間(步驟6)。這當然好,雖然線程不浪費時間,但其仍然浪費空間(內存),這當然就不好了。
當硬件設備執行完I/0操作。然后Windows會喚醒你的線程,把它調度給一個CPU,使其從內核模式返回至用戶模式,然后返回至托管代碼(步驟7、8、9)。
上面的步驟看起來很不錯,但是依舊存在兩個問題:1.請求的數量越來越多,創建的線程就越來越多,那么被阻塞的線程就會越來越多,這樣會更浪費內存。2.用執行結果來響應請求,如果請求的數量非常多,那么解鎖的阻塞線程也就很多,而且機器上的線程數都會遠遠大於CPU數,所以在阻塞線程被集中解鎖期間CPU很有可能會頻繁地發生上下文切換,損害性能。
下面展示Windows如何異步讀取I/O流,仍然使用FileStream來構建對象,但是需要傳遞FileOptions.Asynchronous標志,告訴Windows希望文件的讀/寫以異步的方式進行。
在使用FileOptions.Asynchronous創建FileStream對象后,就應該使用ReadAsync(...)來讀取文件,而不是Read(...)。在ReadAsync內部分配一個Task<Int32>對象來代表用於完成的讀取操作的代碼。然后ReadAsync調用Win32ReadFile函數(步驟1),ReadFile分配IRP數據包(步驟2),然后將其傳遞給Windows內核(步驟3)。Windows內核把IRP數據包添加到IRP隊列中(步驟4)。此時線程不會再阻塞,而是可以直接運行返回至你的代碼。所以線程能夠立即從ReadAsync調用中返回(步驟5、6、7)。
在調用ReadAsync后返回一個Task<Int32>對象,可以在該對象上調用ContinueWith來登記任務完成時執行的回調方法,然后在回調方法中處理數據。當硬件設備處理好IRP后(步驟a)。硬件設備會把IRP放到CLR的線程池中隊列中(步驟b)。將來某個時候,一個線程池會提取完成的IRP並執行任務的代碼,最終要么設置異常(如果發生異常),要么返回結果(步驟c)。在知道這些之后,就知道使用異步I/O可以盡量的減少同步I/O訪問存在的那些問題。
2.C#的異步函數
之前的一篇文章中,我們討論了《計算限制的異步操作》,其中絕大部分代碼都是使用Task來完成的。C#還為我們提供了另一種異步糖語法—異步函數,使用異步函數時可以以順序的步驟寫異步的代碼,感覺像是在進行同步操作。Task和異步函數的功能類似,但他們之前還是有本質的差別。
2.1 async和await的使用
async和await是C#異步函數編程的核心,async和await是從.NET Framwork 4.5提供的新關鍵字,被async標記的方法表明該方法應該以異步的方式運行;await操作符用於標記異步函數執行完成后狀態機恢復的位置(注意,這里不是等待),同時指示包含該await操作符的方法以異步的方式運行。如果async中不包含await,那么async會以同步的方式運行。
下面展示異步訪問網絡的步驟:
static void Main(string[] args) { Task<int> task= AccessTheWebAsync(); task.ContinueWith((t) => { Console.WriteLine(t.Result); }); Console.ReadLine(); } static async Task<int> AccessTheWebAsync() { //需要引入System.Net.Http程序集 HttpClient httpClient = new HttpClient(); Task<string> getStringTask = httpClient.GetStringAsync("https://www.baidu.com/"); // await操作符掛起AccessTheWebAsync方法 // AccessTheWebAsync不能夠繼續執行,直到getStringTask任務完成。 // AccessTheWebAsync可以從這里直接異步返回給AccessTheWebAsync的調用者。 // 當getStringTask任務完成后,狀態機可以直接從這里恢復,並且await操作符會返回任務的Result值。 String urlContents = await getStringTask; //返回長度 return urlContents.Length; }
使用async和await有以下幾點需要注意:
1.方法名應該以Async結尾(比如:AccessTheWebAsync)。
2.方法應該包含有async修飾符。
3.方法的返回類型應該是Task<TResult>或Task或void或其他類型(從C#7.0,.NET Core開始,其他類型的返回值應該提供GetAwaiter方法)。
4.方法中至少應該包含一個await表達式。
2.2 Async和Task的區別
Async和Task是非常相似的,但是都可以用於異步執行。但是他們之間也是有本質區別的,相信讀者看完文章開篇的“為什么需要進行異步I/O操作”的過程已經有所了解了。接下來,筆者想再延伸一下,之所以要使用異步函數來進行異步I/O操作,而不推薦使用Task重新創建一個線程池線程來訪問異步I/O操作,就是因為阻塞。
當異步函數的線程遇到阻塞時,並有線程被真正阻塞在哪里,當阻塞被完成后,再從線程池中喚醒一個線程用來執行之后的任務。當線程池遇到阻塞時,那么那個線程是被真正阻塞了的。
例如:
static async void Test1() { Console.WriteLine("thread id before await:" + Thread.CurrentThread.ManagedThreadId); HttpClient hc = new HttpClient(); HttpResponseMessage hrm = await hc.GetAsync("http://www.baidu.com"); Console.WriteLine(hrm.StatusCode); Console.WriteLine("thread id after await:" + Thread.CurrentThread.ManagedThreadId); }
通過這個異步函數,你一般都會看到await前后是被不同的線程執行的。
static void Test2(){ Task task = new Task(() => { Console.WriteLine("thread id:" + Thread.CurrentThread.ManagedThreadId); //做一些阻塞動作 }); task.ContinueWith((tk) => { Console.WriteLine("thread id:" + Thread.CurrentThread.ManagedThreadId); }); task.Start(); }
通過這個,你會看到兩個Thread id是相同的。
最后總結一下,
async和Task非常相似,都可以用於執行異步操作。但是異步函數(async)遇到阻塞后,其線程會被回收,用於執行線程池中的其他任務,當阻塞完成后,其會從線程池中喚醒另一個線程(這個線程和之前的線程也有可能是同一個線程,幾率很小),用於執行await后面的動作。Task創建的額外線程遇到阻塞時,其創建的線程是會被阻塞的,直到阻塞完成后,線程才能繼續執行。
3.異步函數的狀態機
3.1 異步函數如何轉化為狀態機
通常情況下,觀察編譯器給我們編譯好的代碼,可以幫助我們更好的理解我們的代碼。像async和await操作符,編譯器其實是把這些操作符轉化成了一種狀態機的機制。將含有async和await的代碼編譯為IL代碼,再將IL代碼反編譯為C#代碼,就可以得到狀態機。
比如:
class Type { } class Program { private static async Task<Type> Method1() { /*執行一些異步操作,最后返回一個Type類型的數據*/ HttpClient httpClient = new HttpClient(); String result= await httpClient.GetStringAsync("http://www.baidu.com"); return new Type(); } private static async Task<String> MyMethodAsync() { Type result1 = await Method1(); return result1.ToString(); } static void Main(string[] args) { } }
編譯為IL代碼后,再利用ILSPY把IL代碼反編譯為C#代碼,在返編譯IL代碼的時候,需要注意,不能勾選“decompile async methods(async/await)”
然后就可以看到async和await轉化成的狀態機
通過查看反編譯后的C#代碼,C#中的異步函數的運行過程,可以用下圖進行簡單的概括:
但任務未完成時,isCompleted返回false,所以會在onCompleted登記任務完成時會調用的action動作,action動作執行完成后,會再一次調用MoveNext,然后isCompleted就返回true,此時就可以通過GetResult獲得結果。
3.2 如何擴展異步函數
在擴展性方面,能用Task對象包裝一個即將完成的操作,就可以使用await操作符來等待該操作。
下面是一個TaskLogger類,可用它顯示未完成的異步操作。
static class TaskLogger { public enum TaskLogLevel { None,Pending} public static TaskLogLevel LogLevel { get; set; } public sealed class TaskLogEntry { public Task Task { get; internal set; } public String Tag { get; internal set; } public DateTime LogTime { get; internal set; } public String CallerMemberName { get; internal set; } public String CallerFilePath { get; internal set; } public Int32 CallerLineNumber { get; internal set; } public override string ToString() { return String.Format("LogTime={0},Tag={1},Member={2},File={3}({4})", LogTime,Tag??"(none)",CallerMemberName,CallerFilePath,CallerLineNumber); } } private static readonly ConcurrentDictionary<Task, TaskLogEntry> s_log = new ConcurrentDictionary<Task, TaskLogEntry>(); public static IEnumerable<TaskLogEntry> GetLogEntries() { return s_log.Values;} public static Task<TResult> Log<TResult>(this Task<TResult> task, String tag = null, [CallerMemberName] String callerMemberName=null, [CallerFilePath] String callerFilePath=null, [CallerLineNumber] Int32 callerLineNumber=-1) { return (Task<TResult>)Log(task, tag, callerMemberName, callerFilePath, callerLineNumber); } public static Task Log(this Task task, String tag = null, [CallerMemberName] String callerMemberName = null, [CallerFilePath] String callerFilePath = null, [CallerLineNumber] Int32 callerLineNumber = -1) { if (LogLevel == TaskLogLevel.None) { return task; } var logEntry = new TaskLogEntry { Task=task, LogTime=DateTime.Now, Tag=tag, CallerMemberName=callerMemberName, CallerFilePath=callerFilePath, CallerLineNumber=callerLineNumber }; s_log[task] = logEntry; //附加一個異步任務,當一個任務執行完成后,應該將其從清單中移除 task.ContinueWith(t => { TaskLogEntry entry; s_log.TryRemove(t,out entry); },TaskContinuationOptions.ExecuteSynchronously); return task; } }
Callation類,用於取消正在執行的異步操作
static class Cancellation { public struct Void { } public static async Task WithCancellation(this Task originalTask, CancellationToken ct) { //創建在Cancellation被取消時完成的一個Task var cancelTask = new TaskCompletionSource<Void>(); using (ct.Register(t => ((TaskCompletionSource<Void>)t).TrySetResult(new Void()), cancelTask)) { //創建在原始Task或CancellationToken Task完成時都完成的一個Task Task any = await Task.WhenAny(originalTask,cancelTask.Task); //任務Task因為CancellationToken而完成,就拋出OperationCanceledException if (any == cancelTask.Task) ct.ThrowIfCancellationRequested(); }; //等待原始任務;若任務失敗,它將拋出一個異常 await originalTask; } }
最后,展示如何使用
class Program { static void Main(string[] args) { Go(); Console.ReadLine(); } public static async Task Go() { #if DEBUG //使用TaskLogger會影響內存和性能,所以只在調試生成中啟用它 TaskLogger.LogLevel=TaskLogger.TaskLogLevel.Pending; #endif //初始化3個任務;為了測試TaskLogger,我們顯示控制持續時間 var tasks = new List<Task>{ Task.Delay(2000).Log("2s op"), Task.Delay(5000).Log("5s op"), Task<String>.Delay(8000).Log("8s op"), }; try { //等待全部任務,但在3秒后取消;只有一個任務能夠按時完成 await Task.WhenAll(tasks).WithCancellation(new CancellationTokenSource(3000).Token); } catch (OperationCanceledException) { //查詢logger哪些任務尚未完成,按照從等待時間從最長到最短的順序排序 foreach (var op in TaskLogger.GetLogEntries().OrderBy(tle => tle.LogTime)) { Console.WriteLine(op); } } } }
我的得到如下的輸出結果:
LogTime=2018/11/7 1:30:41,Tag=8s op,Member=Go,File=e:\MyLearn\ConsoleApplication1\Program.cs(28)
LogTime=2018/11/7 1:30:41,Tag=5s op,Member=Go,File=e:\MyLearn\ConsoleApplication1\Program.cs(27)
除了增強使用Task的靈活性,異步函數對另一個擴展性有力的地方在於編譯器可以在await的任何操作數上調用GetAwaiter。所以操作數不一定是Task對象。可以是任何任意類型,只要提供一個調用GetAwaiter的方法就可以了。
例如:
public sealed class EventAwaiter<TEventArgs> : INotifyCompletion { private ConcurrentQueue<TEventArgs> m_events = new ConcurrentQueue<TEventArgs>(); private Action m_continuation; //狀態機調用GetAwaiter獲得Awaiter,這里返回自己 public EventAwaiter<TEventArgs> GetAwaiter() { return this; } //告訴狀態機是否發生了任何事件 public Boolean IsCompleted { get { return m_events.Count > 0; } } //狀態機告訴我們以后要調用什么方法,continuation中包含有恢復狀態機的操作 public void OnCompleted(Action continuation) { Volatile.Write(ref m_continuation,continuation); } //狀態機查詢結果,這是awaiter操作符的結果 public TEventArgs GetResult() { TEventArgs e; m_events.TryDequeue(out e); return e; } public void EventRaised(Object sender, TEventArgs eventArgs) { m_events.Enqueue(eventArgs); //如果有一個等待運行的延續任務,該線程會運行它 Action continuation = Interlocked.Exchange(ref m_continuation, null); if (continuation != null) { continuation();//恢復狀態機 } } }
在EventAwaiter類在事件發生的時候從await操作符返回。在本例中,一旦AppDomain中的任何線程拋出異常,狀態機就會繼續。
private static async void ShowException() { var eventAwaiter = new EventAwaiter<FirstChanceExceptionEventArgs>(); AppDomain.CurrentDomain.FirstChanceException += eventAwaiter.EventRaised; while (true) { Console.WriteLine((await eventAwaiter).Exception.GetType()); } }
筆者自定義的EventAwaiter<TEventArgs>提供了GetAwaiter()、isCompleted()、onCompleted(Action continuation)、GetResult()幾個重要的方法,其實這幾個方法恰好對應了第3.1中“異步函數如何轉化為狀態機”中狀態機需要操作的各個方法,在3.1中筆者給出一張狀態機執行的流程圖,這里就不再貼那張圖片了。
筆者接下來結合這個案例,說一說本例的流程:
a.當執行到await eventAwaiter時,會去調用eventAwaiter的GetAwaiter()方法,然后得到Awaiter對象。
b.查詢Awaiter對象和IsCompleted()方法,判斷當前Awaiter是否發生了事件。
c.若Awaiter還沒有發生事件,就調用OnCompleted(Action)方法,並且傳遞一個Action委托給OnCompleted()方法,其中的Action委托里就包含了恢復狀態機的邏輯。
d.此時還沒有線程執行恢復狀態機的代碼,await eventWaiter 的線程將會被阻塞。
e.當結合本例的程序邏輯,當出現異常時EventRaised會被調用,然后在EventRaised中會恢復狀態機,喚醒await eventWaiter阻塞的線程。
f.狀態機然后會再次調用IsCompleted方法判斷是否有事件,這時m_events 已經有一個事件了,所以IsCompleted會返回true。
g.狀態機接着調用GetResult,並且將結果值賦值給await關鍵字的表達式。
最后演示這一切是如何工作的:
static void Main(string[] args) { ShowException(); for (int i = 0; i < 3; i++) { try { switch (i) { case 0: throw new InvalidCastException(); case 1: throw new InvalidOperationException(); case 2: throw new ArgumentException(); } } catch (Exception) { } } Console.ReadLine(); }
4.FCL中的異步IO操作
FCL中的異步函數非常容易辨認,因為命名規范要求異步函數必須加上Async的后綴。在FCL中,支持I/O操作的許多類型都提供了XxxAsync方法
例如:
a.System.IO.Stream的所有派生類都提供了ReadAsync,WriteAsync,FlushAsync和CopyToAsync方法
b.System.IO.TextReader的所有派生類都提供了ReadAsync,ReadLineAsync,ReadToEndAsync和ReadBlockAsync方法。System.IO.TextWriter的派生類提供了WriteAsync,WriteLineAsync和FlushAsync.
c.System.Net.Http.HttpClient 類提供了GetAsync,GetStreamAsync,GetByteArrayAsync,PostAsync,PutAsync,DeleteAsync和其他許多方法。
d.System.Net.WebRequest的所派生類(包括FileWebRequest,FtpWebRequest和HttpWebRequest)都提供了GetRequestStreamAsync和GetResponseAsync方法。
e.System.Data.SqlClient.SqlCommand類提供了ExecuteDbDataReaderAsync,ExecuteNonQueryAsync,ExecuteReaderAsync,ExecuteScalarAsync和ExecuteXmlReaderAsync方法。
f.生成Web服務代理工具(比如SvcUtil.exe)也生成了XxxAsync方法。
這里筆者以System.Net.Http.HttpClient來舉例:
static async void Go() { HttpClient httpClient = new HttpClient(); Stream stm = await httpClient.GetStreamAsync("http://www.baidu.com"); StreamReader sr = new StreamReader(stm); String line= ""; while ((line = await sr.ReadLineAsync()) != null) { Console.WriteLine(line); } }
FCL中有許多編程都使用了BeginXxx/EndXxx方法模型和IAsyncResult接口,還有基於事件的編程模型,它也提供了XxxAsync方法(不返回Task對象),能在異步操作完成時調用事件處理程序。這兩種編程模型都已經過時,使用Task的新模型才是你的首要選擇。
在FCL中,有一些類缺少XxxAsync方法,只提供了BeginXxx和EndXxx方法。可以通過TaskFactory將其轉化為基於Task的模型。
BeginExecuteXXX 和EndExecuteXXX 使用TaskFactory來轉化的步驟,例如:
返回值= Task.Factory.FromAsync(BeginEexcuteXXX,EndExecuteXXX,...);
返回值是EndExecute的返回值。
例如:NamedPipeServerStream類定義了BeginWaitForConnection和EndWaitForConnection,但是沒有定義WaitForConnectionAsync方法,可以按照如下代碼來完成轉化。
static async void StartServer() { while (true) { //循環不停的接受來自客戶端的鏈接 var pipe = new NamedPipeServerStream(c_pipeName,PipeDirection.InOut,-1,PipeTransmissionMode.Message,PipeOptions.Asynchronous|PipeOptions.WriteThrough); //異步的接受來自客戶端的連接 //用TaskFactory的FromAsync將舊的異步編程模型轉化為新的Task模型 //當沒有客戶端連接時,線程將會掛起,並且允許方法已異步的方式返回調用者(本例中未有返回) //當有客戶端連接后,立即喚醒狀態機,線程繼續執行。 await Task.Factory.FromAsync(pipe.BeginWaitForConnection,pipe.EndWaitForConnection,null); //為客戶端提供服務 //startServiceConnectionAsync 也是異步方法,所以能夠立即返回 startServiceConnectionAsync(pipe); } }
FCL沒有提供任何的輔助方法將舊的、基於事件的編程模型轉化為新的、基於Task的編程模型。所有只能使用硬編碼的方式。例如下面演示了使用TaskCompletionSource包裝使用了“基於事件的編程模型”的WebClient,以便在異步函數中等待它。
static async Task<String> AwaitWebClient(Uri uri) { //System.Net.WebClient var wc = new System.Net.WebClient(); //創建TaskCompletionSource及其基礎Task對象 var tcs = new TaskCompletionSource<String>(); //字符串下載完成后,WebClient對象引發DownloadStringCompleted事件 wc.DownloadStringCompleted += (s, e) => { if (e.Cancelled) tcs.SetCanceled(); else if (e.Error != null) tcs.SetException(e.Error); else tcs.SetResult(e.Result); }; //啟動異步操作 wc.DownloadStringAsync(uri); //現在可以等待TaskCompletion String result = await tcs.Task; return result; }
4.1 FileStream類
創建FileStream對象時,可通過FileOptions.AsyncChronous標志指定以同步方式還是異步方式進行通信。如果不指定該標志,Windows將以同步方式執行所有文件操作。當然,仍然可以調用FileStream的ReadAsync方法,對於你的應用程序,表面上是異步執行,但FileStream類在內部用另一個線程模擬異步行為。這個額外的線程純屬是浪費。
如果創建FileStream對象時指定FileOptions.AsyncChronous標志。然后,可以調用FileStream的Read方法執行一個同步操作。在內部,FileStream類會開始一個異步操作,然后立即調用線程進入睡眠狀態,直到操作完成才喚醒,從而模擬同步行為,這樣依然效率低下。
總之,使用FileStream時應該想好是以同步方式還是以異步方式執行I/O操作,並指定FileOptions.Asynchronous標志來指明自己的選擇。如果指定了該標志,就總是調用ReadAsync。如果沒有使用這個標志,就總是調用Read。這樣能夠獲得最佳性能。如果想先對FileStream執行一些同步操作,再執行一些異步操作,那么更高效的做法是使用FileOptions.Asynchronous標志來構造它。另外也可針對同一個文件,創建兩個FileStream對象,一個FileStream進行同步操作,另一個FileStream執行異步操作。
FileStream的輔助方法(Create,Open和OpenWrite)創建並返回FileStream對象,這些方法都沒有指定FileOptions.Asynchronous標志,所以為了實現響應靈敏的、可伸縮性的應用程序,應避免使用這些方法。
5.異步實現服務器
FCL內建了對伸縮性很好的一些異步服務器的支持。下面列舉中MSDN文檔中值的參考的地方。
1.要構建異步ASP.NET Web窗體,在.aspx文件中添加Async="true"的網頁指令,並參考System.Web.UI.Page的RegisterAsyncTask方法。
2.要構建異步ASP.NET MVC控制器,使你的控制器類從System.Web.Mvc.AsyncController派生,讓操作方法返回一個Task<ActionResult>即可。
3.要構建異步ASP.NET 處理程序,使你的類從System.Web.HttpTaskAsyncHandler派生,重寫其ProcessRequestAsync方法。
4.要構建異步WCF服務,將服務作為異步函數來實現,讓它返回Task或Task<TResult>。
這里筆者講解一下如何構建異步的ASP.NET MVC控制器,如果是.NET 4.5(支持await和async關鍵詞)以上以及4.5的版本,那么構建異步ASP.NET非常方便,例如:
public class TestController extends AsyncController{ public async Task<ActionResult> Get() { Task<TResult> task = ...; return await task; } }
上面的模型方法不會阻塞任何線程。在.NET 4.5 以下的話,並不支持async和await,我們仍然可以通過AsyncController來實現不阻塞任何線程的異步服務器響應:
public class TestController : AsyncController { //開始異步會調用該方法 public void getAsync() { //聲明異步操作 AsyncManager.OutstandingOperations.Increment(); Task.Factory.StartNew(() => {//開始異步操作 //在這里可以進行耗時的操作,並不會有線程等待 //將結果賦值給AsyncManager.Parameters AsyncManager.Parameters["response"] = "abc"; //異步結束 AsyncManager.OutstandingOperations.Decrement(); }); } //異步結束時,會調用該方法 //參數必需和AsyncManager.Parameters賦值key一樣 public ActionResult getCompleted(String response) { return Content(response.ToString(), "text/json"); } }
上面雖然聲明了兩個控制器方法getAsync和getCompleted,但實際上只有get,訪問也只能通過get。
在使用AsyncManager進行數據傳遞的時候,AsyncManager是和控制器相關聯的,也就是說如果有Simple控制器和Test控制器,那么Simple控制器中的AsyncManager是不能干擾Test控制器中的AsyncManager的數的。
更詳細的內容,可以看這篇文章:https://msdn.microsoft.com/cs-cz/library/ee728598(v=vs.100).aspx
6.如何取消異步IO操作
Windows一般沒有提供取消未完成I/O操作的途徑,這是許多開發人員都想要的功能,實現起來卻很困難。畢竟,如果向服務器請求了1000個字節,然后決定不再需要這些字節,那么其實沒有辦法告訴服務器忘掉你的請求。在這種情況下,只能讓字節照常返回,再將他們丟棄。此外,這里還發生競態條件-取消請求的請求可能正在服務器發送響應的時候到來,要在代碼中處理這種潛在的競態條件,決定是丟棄還是使用數據。
建議實現一個WithCancellation擴展方法Task<TResult>(需要重載版本來擴展Task)上面的案例中,我們已經使用過Task的擴展版本了,下面是Task<TResult>版本:
static class CancelleationClass { private struct Void { }//沒有泛型的TaskCompletionSource類 public static async Task<TResult> WithCancellation<TResult>(this Task<TResult> originalTask, CancellationToken ct) { //創建在CancellationToken被取消時完成的一個Task var cancelTask = new TaskCompletionSource<Void>(); //一旦CancellationToken被取消,就完成Task CancellationTokenRegistration cancellationTokenRegistration = ct.Register(t => { ((TaskCompletionSource<Void>)t).TrySetResult(new Void()); }, cancelTask); //創建在原始task或cancel task完成時都完成的Task Task any = await Task.WhenAny(originalTask, cancelTask.Task); //只要是cancel task先完成,就拋出OperationCanceledException if (any == cancelTask.Task) { ct.ThrowIfCancellationRequested(); } //釋放資源 cancellationTokenRegistration.Dispose(); //返回原始任務 return originalTask.Result; } }
按照如下的代碼來使用它:
public static async Task<Int32> go() { var cts = new CancellationTokenSource(); var ct = cts.Token; try { Int32 max = 10; Task<Int32> task = new Task<Int32>(() => { Int32 result = 0; for (int i = 0; i < max; i++) { result += i; Thread.Sleep(1000); } return result; }); task.Start(); //在指定的時間后取消操作 Task.Delay(500).ContinueWith((obj) => { cts.Cancel(); }); Int32 res=await task.WithCancellation<Int32>(ct); return res; } catch (OperationCanceledException e) { Console.WriteLine(e.Message); } return -1; }