異步編程:(TAP)基於任務的異步編程模型詳解


   最近我為大家陸續介紹了“ IAsyncResult 異步編程模型 (APM) ”和“ 基於事件的異步編程模式 (EAP) ”兩種異步編程模型。在 .NET4.0 中 Microsoft 又為我們引入了新的異步編程模型“基於任務的異步編程模型 (TAP) ”,並且推薦我們在開發新的多線程應用程序中首選 TAP 。那現在我先介紹下 TAP 具有哪些優勢:

1.         目前版本 (.NET4.X) 的任務調度器 (TaskScheduler) 依賴於底層的線程池引擎。通過局部隊列的任務內聯化 (task inlining) 和工作竊取機制可以為我們提升程序性能。

2.         輕松實現任務等待、任務取消、延續任務、異常處理( System.AggregateException )、 GUI 線程操作。

3.         在任務啟動后,可以隨時以任務延續的形式注冊回調。

4.         充分利用現有的線程,避免創建不必要的額外線程。

5.         結合 C#5.0 引入 async 和 await 關鍵字輕松實現“異步方法”。

 

示例源碼: 異步編程: (TAP) 基於任務的異步編程模型詳解 .rar

 

術語:

APM               異步編程模型, Asynchronous Programming Model

EAP                基於事件的異步編程模式, Event-based Asynchronous Pattern

TAP                基於任務的異步編程模式, Task-based Asynchronous Pattern

 

理解 CLR 線程池引擎、理解全局隊列、理解線程的局部隊列及性能優勢

1.         CLR 線程池引擎

CLR 線程池引擎維護了一定數量的空閑工作線程以支持工作項的執行,並且能夠重用已有的線程以避免創建新的不必要的線程所花費的昂貴的處理過程。並且使用爬山算法( hill-climbing algorithm )檢測吞吐量,判斷是否能夠通過更多的線程來完成更多的工作項。這個算法的判斷依據是工作項所需某些類型資源的可用情況,例如: CPU、網絡帶寬或其他。此外這個算法還會考慮一個飽和點,即達到飽和點的時候,創建更多地線程反而會降低吞吐量。(線程池的詳細介紹請看 《異步編程:使用線程池管理線程》 

目前版本的 TAP 的任務調度器( TaskScheduler )基於 CLR 線程池引擎實現。當 任務調度器( TaskScheduler ) 開始 分派任務 時:

1)         在主線程或其他並沒有分配給某個特定任務的線程的上下文中創建並啟動的任務,這些任務將會在 全局隊列 中競爭工作線程。這些任務被稱為 頂層任務 。

2)         然而,如果是在其他任務的上下文中創建的任務(子任務或嵌套任務),這些任務將被分配在線程的局部隊列中。

嵌套任務:

是在另一個任務的用戶委托中創建並啟動的任務。

子任務:

是使用 TaskCreationOptions.AttachedToParent 選項創建頂層任務的嵌套任務或延續任務;或使用 TaskContinuationOptions.AttachedToParent 選項創建的延續任務的嵌套任務或延續任務。(應用程序使用 TaskCreationOptions.DenyChildAttach 選項創建父任務。此選項指示運行時會取消子任務的 AttachedToParent 規范)

如果你不想特定的任務放入線程的局部隊列,那么可以指定 TaskCreationOptions.PreferFairness 或 TaskContinuationOptions.PreferFairness 枚舉參數。(使 Task 與 ThreadPool.QueueUserWorkItem 行為相同)

2.         線程池的全局隊列

       當調用 ThreadPool.QueueUserWorkItem() 添加工作項時,該工作項會被添加到線程池的全局隊列中。 線程池中的空閑線程以 FIFO 的順序將工作項從全局隊列中取出並執行,但並不能保證按某個指定的順序完成。

       線程的全局隊列是共享資源,所以內部會實現一個鎖機制。 當一個任務內部會創建很多子任務時 ,並且這些子任務完成得非常快,就會造成頻繁的進入全局隊列和移出全局隊列,從而降低應用程序的性能。基於此原因,線程池引擎為每個線程引入了局部隊列。

3.         線程的局部隊列為我們帶來兩個性能優勢:任務內聯化 (task inlining) 和工作竊取機制。

1)         任務內聯化 (task inlining)---- 活用頂層任務工作線程

我們用一個示例來說明:

static void Main(string[] args) { Task headTask= new Task(() => { DoSomeWork(null); }); headTask.Start(); Console.Read(); } private static void DoSomeWork(object obj) { Console.WriteLine("任務headTask運行在線程“{0}”上", Thread.CurrentThread.ManagedThreadId); var taskTop = new Task(() => { Thread.Sleep(500); Console.WriteLine("任務taskTop運行在線程“{0}”上", Thread.CurrentThread.ManagedThreadId); }); var taskCenter = new Task(() => { Thread.Sleep(500); Console.WriteLine("任務taskCenter運行在線程“{0}”上", Thread.CurrentThread.ManagedThreadId); }); var taskBottom = new Task(() => { Thread.Sleep(500); Console.WriteLine("任務taskBottom運行在線程“{0}”上", Thread.CurrentThread.ManagedThreadId); }); taskTop.Start(); taskCenter.Start(); taskBottom.Start(); Task.WaitAll(new Task[] { taskTop, taskCenter, taskBottom }); }

結果:

image

分析:

       這個示例,我們從 Main 方法主線程中創建了一個 headTask 頂層任務並開啟。在headTask 任務中又創建了三個嵌套任務並最后 WaitAll() 這三個嵌套任務執行完成。此時出現的情況就是 headTask 任務的線程被阻塞,而“任務內聯化”技術會使用阻塞的headTask 的線程去執行局部隊列中的任務。因為減少了對額外線程需求,從而提升了程序性能。

       局部隊列“ 通常 ”以 LIFO 的順序抽取任務並執行,而不是像全局隊列那樣使用 FIFO 順序 。 LIFO 順序通常用有利於數據局部性,能夠在犧牲一些公平性的情況下提升性能。

數據局部性的意思是:運行最后一個到達的任務所需的數據都還在任何一個級別的CPU 高速緩存中可用。由於數據在高速緩存中任然是“熱的”,因此立即執行最后一個任務可能會獲得性能提升。

2)         工作竊取機制 ---- 活用空閑工作線程

當一個工作線程的局部隊列中有很多工作項正在等待時,而存在一些線程卻保持空閑,這樣會導致 CPU 資源的浪費。此時任務調度器( TaskScheduler )會讓空閑的工作線程進入忙碌線程的局部隊列中竊取一個等待的任務,並且執行這個任務。

 

由於局部隊列為我們帶來了性能提升,所以,我們應盡可能地使用 TAP 提供的服務(任務調度器( TaskScheduler )),而不是直接使用 ThreadPool 的方法。

 

任務並行 Task

一個任務表示一個異步操作。任務運行的時候需要使用線程,但並不是說任務取代了線程,理解這點很重要。事實上,在 《異步編程: .NET4.X 數據並行》 中介紹的 System.Threading.Tasks.Parallel 類構造的並行邏輯內部都會創建 Task ,而它們的並行和並發執行都是由底層線程支持的。任務和線程之間也沒有一對一的限制關系,通用語言運行時( CLR )會創建必要的線程來支持任務執行的需求。

1.         Task 簡單的實例成員

public class Task : IThreadPoolWorkItem, IAsyncResult, IDisposable { public Task(Action<object> action, object state , CancellationToken cancellationToken,TaskCreationOptions creationOptions); // 獲取此 Task 實例的唯一 ID。 public int Id { get; } // 獲取用於創建此任務的TaskCreationOptions。 public TaskCreationOptions CreationOptions { get; } // 獲取此任務的TaskStatus。 public TaskStatus Status { get; } // 獲取此 Task 實例是否由於被取消的原因而已完成執行。 public bool IsCanceled { get; } // 獲取 Task 是否由於未經處理異常的原因而完成。 public bool IsFaulted { get; } // 獲取導致 Task 提前結束的System.AggregateException。 public AggregateException Exception { get; } #region IAsyncResult接口成員 private bool IAsyncResult.CompletedSynchronously { get;} private WaitHandleIAsyncResult.AsyncWaitHandle { get; } // 獲取在創建 Task 時提供的狀態對象,如果未提供,則為 null。 public object AsyncState { get; } // 獲取此 Task 是否已完成。 public bool IsCompleted { get; } #endregion // 釋放由 Task 類的當前實例占用的所有資源。 public void Dispose(); …… }

         分析:

1)         CancellationToken 、 IsCancel

對於長時間運行的計算限制操作來說,支持取消是一件很“棒”的事情。 .NET 4.0提供了一個標准的取消操作模式。即通過使用 CancellationTokenSource 創建一個或多個取消標記 CancellationToken ( cancellationToken 可在線程池中線程或 Task 對象之間實現協作取消),然后將此取消標記傳遞給應接收取消通知的任意數量的線程或 Task 對象。當調用 CancellationToken 關聯的 CancellationTokenSource 對象的 Cancle() 時,每個取消標記 (CancellationToken) 上的 IsCancellationRequested 屬性將返回 true 。異步操作中可以通過檢查此屬性做出任何適當響應。也可調用取消標記的 ThrowIfCancellationRequested() 方法來拋出 OperationCanceledException 異常。

       更多關於 CancellationToken 與 CancellationTokenSource 的介紹及示例請看 《協作式取消》 ….

       在 Task 任務中實現取消,可以使用以下幾種選項之一終止操作:

                                      i.               簡單地從委托中返回。在許多情況下,這樣已足夠;但是,采用這種方式“取消”的任務實例會轉換為 RanToCompletion 狀態,而不是 Canceled 狀態。

                                    ii.               創建 Task 時 傳入 CancellationToken 標識參數 ,並調用關聯 CancellationTokenSource 對象的 Cancel() 方法:

a)         如果 Task 還未開始,那么 Task 實例直接轉為 Canceled 狀態。(注意,因為已經 Canceled 狀態了,所以不能再在后面調用 Start() )

b)         (見示例: TaskOperations.Test_Cancel(); )如果 Task 已經開始,在 Task 內部必須拋出 OperationCanceledException 異常(注意,只能存在 OperationCanceledException 異常,可優先考慮使用 CancellationToken 的 ThrowIfCancellationRequested() 方法), Task 實例轉為 Canceled 狀態。

若對拋出 OperationCanceledException 異常且狀態為 Canceled 的 Task 進行等待操作(如: Wait/WaitAll ),則會在 Catch 塊中捕獲到 OperationCanceledException 異常,但是此異常指示 Task 成功取消,而不是有錯誤的情況。因此 IsCancel 為 true ; IsFaulted 為 false 且 Exception 屬性為 null 。

                                  iii.               對於使用 TaskContinuationOptions 枚舉值為 NotOn 或OnlyOn 創建的延續任務 A ,在其前面的任務結束狀態不匹配時,延續任務 A 將轉換為Canceled 狀態,並且不會運行。

2)         TaskCreationOptions 枚舉

定義任務創建、調度和執行的一些可選行為。

None

指定應使用默認行為。

PreferFairness

 

較早安排的任務將更可能較早運行,而較晚安排運行的任務將更可能較晚運行。 (Prefer :更喜歡 ; Fair :公平的 )

LongRunning

該任務需要很長時間運行,因此,調度器可以對這個任務使用粗粒度的操作( 默認 TaskScheduler 為任務創建一個專用線程,而不是排隊讓一個線程池線程來處理 ,可通過在延續任務中訪問: Thread.CurrentThread.IsThreadPoolThread 屬性判別)。比如:如果任務可能需要好幾秒的時間運行,那么就使用這個參數。相反,如果任務只需要不到 1 秒鍾的時間運行,那么就不應該使用這個參數。

AttachedToParent

指定此枚舉值的 Task ,其內部創建的 Task 或通過 ContinueWith() 創建的延續任務都為子任務。 (父級是頂層任務)

DenyChildAttach

如果嘗試附加子任務到創建的任務,指定 System.InvalidOperationException 將被引發。

HideScheduler

創建任務的執行操作將被視為 TaskScheduler.Default 默認計划程序。

3)         IsCompleted

Task 實現了 IAsyncResult 接口。在任務處於以下三個最終狀態之一時 IsCompleted 返回 true : RanToCompletion 、 Faulted 或 Canceled 。

4)         TaskStatus 枚舉

表示 Task 的生命周期中的當前階段。一個 Task 實例只會完成其生命周期一次,即當 Task 到達它的三種可能的最終狀態之一時, Task 就結束並釋放。

可能的初始狀態

Created

該任務已初始化,但尚未被計划。

WaitingForActivation

只有在其它依賴的任務完成之后才會得到調度的任務的初始狀態。這種任務是使用定義延續的方法創建的。

WaitingToRun

該任務已被計划執行,但尚未開始執行。

中間狀態

Running

該任務正在運行,但尚未完成。

WaitingForChildrenToComplete

該任務已完成執行,正在隱式等待附加的子任務完成。

可能的最終狀態

RanToCompletion

已成功完成執行的任務。

Canceled

該任務已通過對其自身的 CancellationToken 引發 OperationCanceledException 異常

Faulted

由於未處理異常的原因而完成的任務。

       狀態圖如下:

                image

5)         Dispose()

盡管 Task 為我們實現了 IDisposable 接口,但依然不推薦你主動調用 Dispose() 方法,而是由系統終結器進行清理。原因:

a)         Task 調用 Dispose() 主要釋放的資源是 WaitHandle 對象。

b)         .NET4.5 對 .NET4.0 中提出的 Task 進行過大量的優化,讓其盡量不再依賴 WaitHandle 對象( eg : .NET4.0 種 Task 的 WaitAll()/WaitAny() 的實現依賴於 WaitHandle)。

c)         在使用 Task 時,大多數情況下找不到一個好的釋放點,保證該 Task 已經完成並且沒有被其他地方在使用。

d)         Task.Dispose() 方法在“ .NET Metro 風格應用程序”框架所引用的程序集中甚至並不存在(即此框架中 Task 沒有實現 IDisposable 接口)。

更詳細更專業的 Dispose() 討論請看 《 .NET4.X 並行任務 Task 需要釋放嗎?》 

2.         Task 的實例方法

// 獲取用於等待此 Task 的等待者。
        public TaskAwaiter GetAwaiter(); // 配置用於等待此System.Threading.Tasks.Task的awaiter。 // 參數:continueOnCapturedContext: // 試圖在await返回時奪取原始上下文,則為 true;否則為 false。 public ConfiguredTaskAwaitable ConfigureAwait(bool continueOnCapturedContext); // 對提供的TaskScheduler同步運行 Task。 public void RunSynchronously(TaskScheduler scheduler); // 啟動 Task,並將它安排到指定的TaskScheduler中執行。 public void Start(TaskScheduler scheduler); // 等待 Task 完成執行過程。 public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken); // 創建一個在目標 Task 完成時執行的延續任務。 public Task ContinueWith(Action<Task, object> continuationAction, object state , CancellationToken cancellationToken , TaskContinuationOptions continuationOptions, TaskScheduler scheduler); public Task<TResult>ContinueWith<TResult>( Func<Task, object, TResult> continuationFunction , object state,CancellationToken cancellationToken , TaskContinuationOptions continuationOptions, TaskScheduler scheduler); ……

         分析:

1)         TaskContinuationOptions

在創建一個 Task 作為另一個 Task 的延續時,你可以指定一個 TaskContinuationOptions 參數,這個參數可以控制延續另一個任務的任務調度和執行的可選行為。

None

默認情況下,完成前面的任務之后“都”將安排運行延續任務,而不考慮前面任務的最終 TaskStatus 。

AttachedToParent

對延續任務指定此枚舉值,表示該延續任務內部創建的新 Task 或通過 ContinueWith() 創建的延續任務都為子任務。 (父級是延續任務)

PreferFairness

LongRunning

DenyChildAttach            

HideScheduler

 

 

參考: TaskCreationOptions 枚舉 

LazyCancellation

在延續取消的情況下,防止延續的完成直到完成先前的任務。

NotOnRanToCompletion

NotOnFaulted

NotOnCanceled

指定不應在延續任務 前面的任務 “已完成運行、引發了未處理異常、已取消”的情況下安排延續任務。

 

此選項對多任務延續無效。

OnlyOnCanceled

OnlyOnFaulted

OnlyOnRanToCompletion

指定只應在延續任務 前面的任務“ 已取消、引發了未處理異常、已完成運行”的情況下才安排延續任務。

ExecuteSynchronously

指定應同步執行延續任務。指定此選項后,延續任務將在導致前面的任務轉換為其最終狀態的相同線程上運行。

注意:

a)         如果使用默認選項 TaskContinuationOptions.None ,並且之前的任務被取消了,那么延續任務任然會被調度並啟動執行。

b)         如果該條件在前面的任務准備調用延續時未得到滿足,則延續將直接轉換為Canceled 狀態,之后將無法啟動。

c)         如果調用 多任務延續(即: 調用 TaskFactory 或 TaskFactory<TResult> 的靜態 ContinueWhenAll 和 ContinueWhenAny 方法)時, NotOn 和 OnlyOn 六個標識或標識的組合都是無效的。也就是說,無論先驅任務是如何完成的, ContinueWhenAll 和 ContinueWhenAny 都會執行延續任務。

d)         TaskContinuationOptions.ExecuteSynchronously ,指定同步執行延續任務。延續任務會使用前一個任務的數據,而保持在相同線程上執行就能快速訪問高速緩存中的數據,從而 提升性能 。此外,也可避免調度這個延續任務產生不必要的額外線程開銷。

如果在創建延續任務時已經完成前面的任務,則延續任務將在創建此延續任務的線程上運行。只應同步執行運行時間非常短的延續任務。

2)         開啟任務

只有 Task 處於 TaskStatus.Created 狀態時才能使用實例方法 Start() 。並且, 只有在使用 Task 的公共構造函數構造的 Task 實例才能處於 TaskStatus.Created 狀態。

當然我們還知道有其他方式可以創建 Task 並開啟任務,比如 Task.Run()/Task.ContinueWith()/Task.Factory.StartNew()/TaskCompletionSource/ 異步方法 ( 即使用 async 與 await關鍵字的方法 ) ,但是這些方法返回的 Task 已經處於開啟狀態,即不能再調用 Start()。更豐富更專業的討論請看 《 .NET4.X 並行任務中 Task.Start() 的 FAQ 》 

3)         延續任務 ContinueWith

a)         ContinueWith() 方法可創建一個根據 TaskContinuationOptions 參數限制的延續任務。 可以為同一個 Task 定義多個延續任務讓它們並行執行。

比如,為 t1 定義兩個 並行 延續任務 t2 、 t3.

Task<int> t1 = new Task<int>(() => { return 1; }); Task<int> t2 = t1.ContinueWith<int>(Work1,……); Task<int> t3 = t1.ContinueWith<int>(Work1,……);

b)         調用 Wait() 方法和 Result 屬性會導致線程阻塞,極有可能造成線程池創建一個新線程,這增大了資源的消耗,並損害了伸縮性。可以在延續任務中訪問這些成員,並做相應操作。

c)         對前面任務的引用將以參數形式傳遞給延續任務的用戶委托,以將前面任務的數據傳遞到延續任務中。

4)         Wait()

一個線程調用 Wait() 方法時,系統會檢查線程要等待的 Task 是否已開始執行。

a)         如果是,調用 Wait() 的線程會阻塞,直到 Task 運行結束為止。

b)         如果 Task 還沒有開始執行,系統可能(取決於 TaskScheduler )使用調用 Wait() 的線程來執行 Task 。如果發生這種情況,那么調用 Wait() 的線程不會阻塞;它會執行 Task 並立刻返回。

                                       i.               這樣做的好處在於,沒有線程會被阻塞,所以減少了資源的使用(因為不需要創建一個線程來替代被阻塞的線程),並提升了性能(因為不需要花時間創建一個線程,也沒有上下文切換)。

                                     ii.               但不好的地方在於,假如線程在調用 Wait() 前已經獲得一個不可重入的線程同步鎖 (eg : SpinLock) ,而 Task 試圖獲取同一個鎖,就會造成一個死鎖的線程!

5)         RunSynchronously

可在指定的 TaskScheduler 或 TaskScheduler.Current 中同步運行 Task 。即 RunSynchronously() 之后的代碼會阻塞到 Task 委托執行完畢。

示例如下:

Task task1 = new Task(() =>
            {
                Thread.Sleep(5000); Console.WriteLine("task1執行完畢。"); }); task1.RunSynchronously(); Console.WriteLine("執行RunSynchronously()之后的代碼。"); // 輸出============================== // task1執行完畢。 // 執行RunSynchronously()之后的代碼。

3.         Task 的靜態方法

// 返回當前正在執行的 Task 的唯一 ID。
        public static int? CurrentId{ get; }
        // 提供對用於創建 Task 和 Task<TResult>實例的工廠方法的訪問。 public static TaskFactory Factory { get; } // 創建指定結果的、成功完成的Task<TResult>。 public static Task<TResult> FromResult<TResult>(TResult result); // 創建將在指定延遲后完成的任務。 public static Task Delay(int millisecondsDelay, CancellationToken cancellationToken); // 將在線程池上運行的指定工作排隊,並返回該工作的任務句柄。 public static Task Run(Action action, CancellationToken cancellationToken); // 將在線程池上運行的指定工作排隊,並返回該工作的 Task(TResult) 句柄。 public static Task<TResult> Run<TResult>(Func<TResult> function, CancellationToken cancellationToken); // 將在線程池上運行的指定工作排隊,並返回 function 返回的任務的代理項。 public static Task Run(Func<Task> function, CancellationToken cancellationToken); // 將在線程池上運行的指定工作排隊,並返回 function 返回的 Task(TResult) 的代理項。 public static Task<TResult> Run<TResult>(Func<Task<TResult>> function, CancellationToken cancellationToken); // 等待提供的所有 Task 對象完成執行過程。 public static bool WaitAll(Task[] tasks, intmillisecondsTimeout, CancellationToken cancellationToken); // 等待提供的任何一個 Task 對象完成執行過程。 // 返回結果: // 已完成的任務在 tasks 數組參數中的索引,如果發生超時,則為 -1。 public static int WaitAny(Task[] tasks, int millisecondsTimeout, CancellationToken cancellationToken); // 所有提供的任務已完成時,創建將完成的任務。 public static Task WhenAll(IEnumerable<Task> tasks); public static Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks); // 任何一個提供的任務已完成時,創建將完成的任務。 public static Task<Task> WhenAny(IEnumerable<Task> tasks); public static Task<Task<TResult>> WhenAny<TResult>(IEnumerable<Task<TResult>> tasks); // 創建awaitable,等待時,它異步產生當前上下文。 // 返回結果:等待時,上下文將異步轉換回等待時的當前上下文。 // 如果當前SynchronizationContext不為 null,則將其視為當前上下文。 // 否則,與當前執行任務關聯的任務計划程序將視為當前上下文。 public static YieldAwaitable Yield();

         分析:

1)         FromResult<TResult>(TResult result);

創建指定結果的、成功完成的 Task<TResult> 。我們可以使用此方法創建包含預先計算結果 / 緩存結果的 Task<TResult> 對象, 示例代碼 或 CachedDownloads.cs 示例文件。

2)         Delay

創建將在指定延遲后完成的任務,返回 Task 。可以通過 await 或 Task.Wait() 來達到Thread.Sleep() 的效果。盡管, Task.Delay() 比 Thread.Sleep() 消耗更多的資源,但是 Task.Delay() 可用於為方法返回 Task 類型;或者根據 CancellationToken 取消標記動態取消等待。

Task.Delay() 等待完成返回的 Task 狀態為 RanToCompletion ;若被取消,返回的 Task狀態為 Canceled 。

var tokenSource = new CancellationTokenSource(); var token = tokenSource.Token; Task.Factory.StartNew(() => { Thread.Sleep(1000); tokenSource.Cancel(); }); Console.WriteLine("Begin taskDelay1"); Task taskDelay1 = Task.Delay(100000, token); try { taskDelay1.Wait(); } catch (AggregateException ae) { foreach (var v in ae.InnerExceptions) Console.WriteLine(ae.Message + " " + v.Message); } taskDelay1.ContinueWith((t) =>Console.WriteLine(t.Status.ToString())); Thread.Sleep(100); Console.WriteLine(); Console.WriteLine("Begin taskDelay2"); Task taskDelay2 = Task.Delay(1000); taskDelay2.ContinueWith((t) =>Console.WriteLine(t.Status.ToString())); // 輸出====================================== // Begin taskDelay1 // 發生一個或多個錯誤。已取消一個任務。 // Canceled // // Begin taskDelay2 // Completed

4.         ask<TResult>:Task

Task<TResult> 繼承自 Task ,表示一個可以返回值的異步操作,提供 Result 只讀屬性用於訪問異步操作的返回值。該屬性會阻塞線程,直到 Task 執行完畢並返回值。

 

System.Threading.Tasks.TaskFactory          

1.         設置共用 \ 默認的參數

通過 TaskFactory 對象提供的 Scheduler 、 CancellationToken 、 CreationOption 和 ContinuationOptions 屬性可以為 Task 設置共用 \ 默認的參數,以便快捷的創建 Task 或延續任務。影響 StartNew() 、 ContinueWhenAll()|ContinueWhenAny() 、 FromAsync() 方法的默認參數設置。

2.         StartNew()

Task.Factory.StartNew() 可快速創建一個 Task 並且開啟任務。代碼如下:

var t = Task.Factory.StartNew(someDelegate);

這等效於:

var t = new Task(someDelegate); t.Start();

表現方面,前者更高效。 Start() 采用同步方式運行以確保任務對象保持一致的狀態即使是同時調用多次 Start() ,也可能只有一個調用會成功。相比之下, StartNew() 知道沒有其他代碼能同時啟動任務,因為在 StartNew() 返回之前它不會將創建的 Task 引用給任何人,所以 StartNew() 不需要采用同步方式執行。更豐富更專業的討論請看 《.NET4.X 並行任務中 Task.Start() 的 FAQ 》 

3.         ContinueWhenAll()

public Task ContinueWhenAll(Task[] tasks, Action<Task[]> continuationAction , CancellationToken cancellationToken , TaskContinuationOptions continuationOptions, TaskScheduler scheduler);

創建一個延續 Task 或延續 Task<TResult> ,它將在提供的一組任務完成后馬上開始。延續任務操作委托接受一個 Task[] 數組做參數。

4.         ContinueWhenAny()

public Task ContinueWhenAny(Task[] tasks, Action<Task> continuationAction , CancellationToken cancellationToken , TaskContinuationOptions continuationOptions, TaskScheduler scheduler);

創建一個延續 Task 或延續 Task<TResult> ,它將在提供的組中的任何一個任務完成后馬上開始。延續任務操作委托接受一個 Task 做參數。

5.         通過 Task.TaskFactory.FromAsync() 實例方法,我們可以將 APM 轉化為 TAP。示例見此文的后面小節  AMP 轉化為 TAP 和 EAP 轉化為 TAP  。

 

System.Threading.Tasks.TaskScheduler         

       TaskScheduler 表示一個處理將任務排隊到線程中的底層工作對象。 TaskScheduler通常有哪些應用呢?

1.         TaskScheduler 是抽象類,可以繼承它實現自己的任務調度計划。如:默認調度程序 ThreadPoolTaskScheduler 、與 SynchronizationContext.Current 關聯的 SynchronizationContextTaskScheduler 。

2.         由 TaskScheduler.Default 獲取默認調度程序 ThreadPoolTaskScheduler 。

3.         由 TaskScheduler.Current 獲取當前任務的執行的 TaskScheduler 。

4.         由 TaskScheduler.TaskSchedulerFromCurrentSynchronizationContext() 方法獲取與 SynchronizationContext.Current 關聯的 SynchronizationContextTaskScheduler , SynchronizationContextTaskScheduler 上的任務都會通過 SynchronizationContext.Post() 在同步上下文中進行調度。通常用於實現跨線程更新控件。

5.         通過 MaximumConcurrencyLevel 設置任務調度計划能支持的最大並發級別。

6.         通過 UnobservedTaskException 事件捕獲未被觀察到的異常。

 

System.Threading.Tasks.TaskExtensions

提供一組用於處理特定類型的 Task 實例的靜態方法。將特定 Task 實例進行解包操作。

public static class TaskExtensions
    {
        public static Task<TResult> Unwrap<TResult>(this Task<Task<TResult>> task); public static Task Unwrap(this Task<Task> task); }

 

轉化為 TAP 和 EAP 轉化為 TAP

1.         AMP 轉化為 TAP

通過 Task.TaskFactory.FromAsync() 實例方法,我們可以將 APM 轉化為 TAP 。

注意點:

1)         FromAsync 方法返回的任務具有 WaitingForActivation 狀態,並將在創建該任務后的某一時間由系統啟動。如果嘗試在這樣的任務上調用 Start ,將引發異常。

2)         轉化的 APM 異步模型必須符合兩個模式:

a)         接受 Begin*** 和 End*** 方法。此時要求 Begin*** 方法簽名的委托必須是 AsyncCallback 以及 End*** 方法只接受 IAsyncResult 一個參數。此模式 AsyncCallback 回調由系統自動生成,主要工作是調用 End*** 方法。

public Task<TResult> FromAsync<TArg1, TResult>( Func<TArg1, AsyncCallback, object, IAsyncResult> beginMethod , Func<IAsyncResult, TResult> endMethod, TArg1 arg1 , object state, TaskCreationOptions creationOptions);

b)         接受 IAsyncResult 對象以及 End*** 方法。此時 Begin*** 方法的簽名已經無關緊要只要能返回 IAsyncResult 的參數以及 End*** 方法只接受 IAsyncResult 一個參數。此模式支持自定義回調委托。

public Task<TResult> FromAsync<TResult>(IAsyncResult asyncResult , Func<IAsyncResult, TResult> endMethod);

3)         當然,我們有時需要給客戶提供統一的 Begin***() 和 End***() 調用方式,我們可以直接使用 Task 從零開始構造 APM 。即:在 Begin***() 創建並開啟任務,並返回Task 。因為 Task 是繼承自 IAsyncResult 接口的,所以我們可以將其傳遞給 End***() 方法,並在此方法里面調用 Result 屬性來等待任務完成。

4)         對於返回的 Task ,可以隨時以任務延續的形式注冊回調。

現在將在 《 APM 異步編程模型 》 博文中展現的示例轉化為 TAP 模式。關鍵代碼如下:

public Task<int> CalculateAsync<TArg1, TArg2>( Func<TArg1, TArg2, AsyncCallback, object, IAsyncResult> beginMethod , AsyncCallback userCallback, TArg1 num1, TArg2 num2, object asyncState) { IAsyncResult result = beginMethod(num1, num2, userCallback, asyncState); return Task.Factory.FromAsync<int>(result , EndCalculate, TaskCreationOptions.None); } public Task<int> CalculateAsync(int num1, int num2, object asyncState) { return Task.Factory.FromAsync<int, int, int>(BeginCalculate, EndCalculate , num1, num2, asyncState, TaskCreationOptions.None); }

2.         EAP 轉化為 TAP

我們可以使用 TaskCompletionSource<TResult> 實例將 EAP 操作表示為一個 Task<TResult> 。

TaskCompletionSource<TResult> 表示未綁定委托的 Task<TResult> 的制造者方,並通過 TaskCompletionSource<TResult>.Task 屬性獲取由此 Tasks.TaskCompletionSource<TResult> 創建的 Task<TResult> 。

注意, TaskCompletionSource<TResult> 創建的任何任務將由 TaskCompletionSource啟動,因此,用戶代碼不應在該任務上調用 Start() 方法。

public class TaskCompletionSource<TResult> { public TaskCompletionSource(); // 使用指定的狀態和選項創建一個TaskCompletionSource<TResult>。 // state: 要用作基礎 Task<TResult>的AsyncState的狀態。 public TaskCompletionSource(object state, TaskCreationOptions creationOptions); // 獲取由此Tasks.TaskCompletionSource<TResult>創建的Tasks.Task<TResult>。 public Task<TResult> Task { get; } // 將基礎Tasks.Task<TResult>轉換為Tasks.TaskStatus.Canceled狀態。 public void SetCanceled();   public bool TrySetCanceled(); // 將基礎Tasks.Task<TResult>轉換為Tasks.TaskStatus.Faulted狀態。 public void SetException(Exception exception); public void SetException(IEnumerable<Exception> exceptions); public bool TrySetException(Exception exception); public bool TrySetException(IEnumerable<Exception> exceptions); // 嘗試將基礎Tasks.Task<TResult>轉換為TaskStatus.RanToCompletion狀態。 public bool TrySetResult(TResult result); …… }

現在我將在 《基於事件的異步編程模 式 (EAP) 》 博文中展現的 BackgroundWorker2 組件示例轉化為 TAP 模式。

我們需要修改地方有:

1)         創建一個 TaskCompletionSource<int> 實例 tcs ;

2)         為 tcs.Task 返回的任務創建延續任務,延續任務中根據前面任務的 IsCanceled、 IsFaulted 、 Result 等成員做邏輯;

3)         Completed 事件,在這里面我們將設置返回任務的狀態。

關鍵代碼如下:

// 1、創建 TaskCompletionSource<TResult>
 tcs = new TaskCompletionSource<int>();  worker2.RunWorkerCompleted += RunWorkerCompleted; // 2、注冊延續  tcs.Task.ContinueWith(t =>  { if (t.IsCanceled) MessageBox.Show("操作已被取消"); else if (t.IsFaulted) MessageBox.Show(t.Exception.GetBaseException().Message); else MessageBox.Show(String.Format("操作已完成,結果為:{0}", t.Result)); }, TaskContinuationOptions.ExecuteSynchronously); // 3、運行異步任務 worker2.RunWorkerAsync(); // 4、Completed事件 private void RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e) { if (e.Error != null) tcs.SetException(e.Error); else if (e.Cancelled) tcs.SetCanceled(); else tcs.SetResult((int)e.Result); // 注銷事件,避免多次掛接事件 worker2.RunWorkerCompleted -= RunWorkerCompleted; }

 

當然,這兩部分的代碼都不能直接運行,只是部分關鍵代碼,完整的示例請在我提供的示例源碼中查看。

 

使用關鍵字 async 和 await 實現異步方法

       在 C#5.0 中引入了 async 和 await 關鍵字,可以方便我們使用順序結構流 ( 即不用回調 ) 來實現異步編程,大大降低了異步編程的復雜程度。( vs2010 打 Visual Studio Async CTP for VS2010 補丁 可以引入關鍵字” async ”和” await ”的支持,但是得不到 .net4.5 新增 API 的支持)

1.         我們可通過下圖來明白異步方法的構建和異步方法的執行流程。(代碼詳見我提供的示例程序)

      image

 

2.         編譯器轉換

使用 async 關鍵字標記方法,會導致 C# 或 Visual Basic 編譯器使用狀態機重新編寫該方法的實施。借助此狀態機,編譯器可以在該方法中插入多個中斷點,以便該方法可以在不阻止線程的情況下,掛起和恢復其執行。這些中斷點不會隨意地插入。它們只會在您明確使用 await 關鍵字的位置插入:

private async void btnDoWork_Click(object sender, EventArgs e) { ... await someObject; // <-- potential method suspension point ... }

當您等待未完成的異步操作時,編譯器生成的代碼可確保與該方法相關的所有狀態(例如,局部變量)封裝並保留在堆中。然后,該函數將返回到調用程序,允許在其運行的線程中執行其他任務。當所等待的異步操作在稍后完成時,該方法將使用保留的狀態恢復執行。

任何公開 await 模式的類型都可以進行等待。該模式主要由一個公開的 GetAwaiter()方法組成,該方法會返回一個提供 IsCompleted 、 OnCompleted 和 GetResult 成員的類型。當您編寫以下代碼時:

await someObject;

編譯器會生成一個包含 MoveNext 方法的狀態機類:

private class FooAsyncStateMachine : IAsyncStateMachine { // Member fields for preserving “locals” and other necessary state int $state; TaskAwaiter $awaiter; … public void MoveNext() { // Jump table to get back to the right statement upon resumption switch (this.$state) { … case 2: goto Label2; … } … // Expansion of “await someObject;” this.$awaiter = someObject.GetAwaiter(); if (!this.$awaiter.IsCompleted) { this.$state = 2; this.$awaiter.OnCompleted(MoveNext); return; Label2: } this.$awaiter.GetResult(); … } }

在實例 someObject 上使用這些成員來檢查該對象是否已完成(通過 IsCompleted),如果未完成,則掛接一個續體(通過 OnCompleted ),當所等待實例最終完成時,系統將再次調用 MoveNext 方法,完成后,來自該操作的任何異常將得到傳播或作為結果返回(通過 GetResult ),並跳轉至上次執行中斷的位置。

3.         自定義類型支持等待

如果希望某種自定義類型支持等待,我們可以選擇兩種主要的方法。

1)         一種方法是針對自定義的可等待類型手動實施完整的 await 模式,提供一個返回自定義等待程序類型的 GetAwaiter 方法,該等待程序類型知道如何處理續體和異常傳播等等。

2)         第二種實施該功能的方法是將自定義類型轉換為任務,然后只需依靠對等待任務的內置支持來等待特殊類型。前文所展示的“ EAP 轉化為 TAP ”正屬於這一類,關鍵代碼如下:

private async void btn_Start_Click(object sender, EventArgs e) { this.progressBar1.Value = 0; tcs = new TaskCompletionSource<int>(); worker2.RunWorkerCompleted += RunWorkerCompleted; tcs.Task.ContinueWith(t => { if (t.IsCanceled) MessageBox.Show("操作已被取消"); else if (t.IsFaulted) MessageBox.Show(t.Exception.GetBaseException().Message); else MessageBox.Show(String.Format("操作已完成,結果為:{0}", t.Result)); }, TaskContinuationOptions.ExecuteSynchronously); worker2.RunWorkerAsync(); await tcs.Task; }

 

處理 TAP 中的異常

       在任務拋出的未處理異常都封裝在 System.AggregateException 對象中。這個對象會存儲在方法返回的 Task 或 Task<TResult> 對象中,需要通過訪問 Wait() 、 Result 、 Exception 成員才能觀察到異常。(所以,在訪問 Result 之前,應先觀察 IsCanceled 和 IsFaulted 屬性)

1.         AggregateException 對象的三個重要成員

1)         InnerExceptions 屬性

獲取導致當前異常的 System.Exception 實例的只讀集合(即, ReadOnlyCollection<Exception> )。不要將其與基類 Exception 提供的 InnerException 屬性混淆。

2)         Flatten() 方法

遍歷 InnerExceptions 異常列表,若列表中包含類型為 AggregateException 的異常,就移除 所有嵌套 的 AggregateException ,直接返回其真真的異常信息(效果如下圖)。

                image

1)         Handle(Func<Exception, bool> predicate) 方法

它為 AggregateException 中包含的每個異常都調用一個回調方法。然后,回調方法可以為每個異常決定如何對其進行處理,回調返回 true 表示異常已經處理,返回 false表示沒有。在調用 Handle 之后,如果至少有一個異常沒有處理,就創建一個新的 AggregateException 對象,其中只包含未處理的異常, 並拋出這個新的 AggregateException對象 。

比如:將任何 OperationCanceledException 對象都視為已處理。其他任何異常都造成拋出一個新的 AggregateException ,其中只包含未處理的異常。

try{……}
        catch (AggregateException ae) { ae.Handle(e => e is OperationCanceledException); }

1.         父任務生成了多個子任務,而多個子任務都拋出了異常

1)         嵌套子任務

Task t4 = Task.Factory.StartNew(() =>
        {
            Task.Factory.StartNew(() => { throw new Exception("子任務Exception_1"); } , TaskCreationOptions.AttachedToParent); Task.Factory.StartNew(() => { throw new Exception("子任務Exception_2"); } , TaskCreationOptions.AttachedToParent); throw new Exception("父任務Exception"); });

對於“嵌套子任務”中子任務的異常都會包裝在父任務返回的 Task 或 Task<TResult>對象中。如此例子中 t4.Exception.InnerExceptions 的 Count 為 3 。

       對於子任務返回的異常類型為包裝過的 AggregateException 對象,為了避免循環訪問子任務異常對象的 InnerExceptions 才能獲取真真的異常信息,可以使用上面提到的 Flatten() 方法移除所有嵌套的 AggregateExceprion 。

2)         Continue 子任務

Task t1 = Task.Factory.StartNew(() =>
            {
                Thread.Sleep(500);   // 確保已注冊好延續任務 throw new Exception("父任務Exception"); }, TaskCreationOptions.AttachedToParent); Task t2 = t1.ContinueWith((t) => { throw new Exception("子任務Exception_1"); }); Task t3 = t1.ContinueWith((t) => { throw new Exception("子任務Exception_2"); });

       對於“ Continue 子任務”中的子任務其異常與父任務是分離的,各自包裝在自己返回的 Task 或 Task<TResult> 對象中。如此示例 t1 、 t2 、 t3 的 Exception.InnerExceptions 的 Count 都為 1 。     

2.         TaskScheduler 的 UnobservedTaskException 事件

假如你一直不訪問 Task 的 Wait() 、 Result 、 Exception 成員,那么你將永遠注意不到這些異常的發生。為了幫助你檢測到這些未處理的異常,可以向 TaskScheduler 對象的 UnobservedTaskException 事件注冊回調函數。每當一個 Task 被垃圾回收時,如果存在一個沒有注意到的異常, CLR 的終結器線程會引發這個事件。

可在事件回調函數中調用 UnobservedTaskExceptionEventArgs 對象的 SetObserved()方法來指出已經處理好了異常,從而阻止 CLR 終止線程。然而並不推薦這么做,寧願終止進程也不要帶着已經損壞的狀態繼續運行。

示例代碼:(要監控此代碼必須在 GC.Collect(); 和事件里兩個地方進行斷點)

TaskScheduler.UnobservedTaskException += (s, e) =>
            {
                //設置所有未覺察異常被覺察
                e.SetObserved();
            };
            Task.Factory.StartNew(() =>
            {
                throw new Exception(); }); //確保任務完成 Thread.Sleep(100); //強制垃圾會受到,在GC回收時才會觸發UnobservedTaskException事件 GC.Collect(); //等待終結器處理 GC.WaitForPendingFinalizers();

3.         返回 void 的 async “異步方法”中的異常

我們已經知道返回 Task 或 Task<TResult> 對象的任務中拋出的異常會隨着返回對象一起返回,可通過 Exception 屬性獲取。那么對於返回 Task 或 Task<TResult> 對象的“異步方法”情況也是一樣。

然而對於返回 void 的“異步方法”,方法中拋出的異常會直接導致程序奔潰。

public static async void Test_void_async_Exception() { throw new Exception(); }

另外,我們還要特別注意 lambda 表達式構成的“異步方法”,如:

Enumerable.Range(0, 3).ToList().ForEach(async (i) => { throw new Exception(); });

       本博文到此結束,我相信你看累了,其實我也寫了很久…很久… ,寫完此文,我的 “ 異步編程系列 ” 也算有頭有尾了(還會繼續擴充)。本博文主要介紹了 Task 的重要 API 、任務的 CLR 線程池引擎、 TaskFactory 對象、 TaskScheduler 對象、 TaskExtensions 對象、 AMP 轉化為 TAP 和 EAP 轉化為 TAP 、使用關鍵字 async 和await 實現異步方法以及自定義類型支持等待、處理 TAP 中的異常。

感謝你的觀看,如果對你有幫助, 還請多多推薦 ……

 

 

===================================================================

抱歉,我知道你已經累了,但我還是有了這么一段:(其實博文也是寫着寫着就長起來了!!!)

此小段是博主的求助,能幫的就幫 。

       求職: web 中高級工程師 ( 本人已工作 3 年 )

       我打算 5 月中旬換一家公司,爭取能再端午節 6 月 10 之前入職新工作,地點廣州 ( 其次,深圳 ) 吧,畢竟還有些朋友在廣州。希望園友能推薦一些不錯的廣州互聯網公司。

       PS :我個人有關注下廣州 3g 門戶網、廣州多益互聯網、廣州 39 健康網公司,如你是內部員工我很希望能和你交流下。

       如果你能幫助我 :請在博文回復給我,或我的 QQ : 369220123      郵箱: 369220123@qq.com

       如果你只是想與我后續方便討論問題,請加群 : 69594961 ( .NET 開源交流), 185718116 (廣深莞· NET 技術)

===================================================================

 

 

推薦閱讀:

關於 async 與 await 的 FAQ        ----- 詳細講解了 await 和 async 的作用和意義,以及什么是可等待對象、等待者……(此文可幫助你解決 80% 關於 await 和 async 關鍵字的疑惑)

               深入探究 WinRT 和 await        ----- 基於 WinRT 平板 win8 系統,講解了異步功能,以及 TPL 、編譯器轉換……

 

 

參考資料: MSDN

                    書籍:《 CLR via C#( 第三版 ) 》

書籍:《 C# 並行編程高級教程:精通 .NET 4 Parallel Extensions 》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM