異步編程:.NET 4.5 基於任務的異步編程模型(TAP)


傳送門:異步編程系列目錄……

       最近我為大家陸續介紹了“IAsyncResult異步編程模型 (APM)”和“基於事件的異步編程模式(EAP)”兩種異步編程模型。在.NET4.0 Microsoft又為我們引入了新的異步編程模型“基於任務的異步編程模型(TAP)”,並且推薦我們在開發新的多線程應用程序中首選TAP,在.NET4.5中更是對TPL庫進行了大量的優化與改進。那現在我先介紹下TAP具有哪些優勢:

1.        目前版本(.NET4.X)的任務調度器(TaskScheduler)依賴於底層的線程池引擎。通過局部隊列的任務內聯化(task inlining)和工作竊取機制可以為我們提升程序性能。

2.        輕松實現任務等待、任務取消、延續任務、異常處理(System.AggregateException)、GUI線程操作。

3.        在任務啟動后,可以隨時以任務延續的形式注冊回調。

4.        充分利用現有的線程,避免創建不必要的額外線程。

5.        結合C#5.0引入asyncawait關鍵字輕松實現“異步方法”。

 

示例源碼:異步編程:.NET 4.5 基於任務的異步編程模型(TAP).rar

 

術語:

APM              異步編程模型,Asynchronous Programming Model

EAP               基於事件的異步編程模式,Event-based Asynchronous Pattern

TAP               基於任務的異步編程模式,Task-based Asynchronous Pattern

TPL                任務並行庫,Task Parallel Library

 

理解CLR線程池引擎、理解全局隊列、理解線程的局部隊列及性能優勢

1.        CLR線程池引擎

CLR線程池引擎維護了一定數量的空閑工作線程以支持工作項的執行,並且能夠重用已有的線程以避免創建新的不必要的線程所花費的昂貴的處理過程。並且使用爬山算法(hill-climbing algorithm)檢測吞吐量,判斷是否能夠通過更多的線程來完成更多的工作項。這個算法的判斷依據是工作項所需某些類型資源的可用情況,例如:CPU、網絡帶寬或其他。此外這個算法還會考慮一個飽和點,即達到飽和點的時候,創建更多地線程反而會降低吞吐量。(線程池的詳細介紹請看《異步編程:使用線程池管理線程》

目前版本的TAP的任務調度器(TaskScheduler)基於CLR線程池引擎實現。當任務調度器(TaskScheduler開始分派任務時:

1)        在主線程或其他並沒有分配給某個特定任務的線程的上下文中創建並啟動的任務,這些任務將會在全局隊列中競爭工作線程。這些任務被稱為頂層任務

2)        然而,如果是在其他任務的上下文中創建的任務(子任務或嵌套任務),這些任務將被分配在線程的局部隊列中。

嵌套任務:

是在另一個任務的用戶委托中創建並啟動的任務。

子任務:

是使用TaskCreationOptions.AttachedToParent選項創建頂層任務的嵌套任務或延續任務;或使用TaskContinuationOptions.AttachedToParent選項創建的延續任務的嵌套任務或延續任務。(應用程序使用TaskCreationOptions.DenyChildAttach選項創建父任務。此選項指示運行時會取消子任務的AttachedToParent規范)

如果你不想特定的任務放入線程的局部隊列,那么可以指定TaskCreationOptions.PreferFairnessTaskContinuationOptions.PreferFairness枚舉參數。(使TaskThreadPool.QueueUserWorkItem行為相同)

2.        線程池的全局隊列

       當調用ThreadPool.QueueUserWorkItem()添加工作項時,該工作項會被添加到線程池的全局隊列中。線程池中的空閑線程以FIFO的順序將工作項從全局隊列中取出並執行,但並不能保證按某個指定的順序完成。

       線程的全局隊列是共享資源,所以內部會實現一個鎖機制。當一個任務內部會創建很多子任務時,並且這些子任務完成得非常快,就會造成頻繁的進入全局隊列和移出全局隊列,從而降低應用程序的性能。基於此原因,線程池引擎為每個線程引入了局部隊列。

3.        線程的局部隊列為我們帶來兩個性能優勢:任務內聯化(task inlining)和工作竊取機制。

1)        任務內聯化(task inlining)----活用頂層任務工作線程

我們用一個示例來說明:

        static void Main(string[] args)
        {
            Task headTask= new Task(() =>
            {
                DoSomeWork(null);
            });
            headTask.Start();
            Console.Read();
        }
        private static void DoSomeWork(object obj)
        {
            Console.WriteLine("任務headTask運行在線程“{0}”上",
                Thread.CurrentThread.ManagedThreadId);

            var taskTop = new Task(() =>
            {
                Thread.Sleep(500);
                Console.WriteLine("任務taskTop運行在線程“{0}”上",
                    Thread.CurrentThread.ManagedThreadId);
            });
            var taskCenter = new Task(() =>
            {
                Thread.Sleep(500);
                Console.WriteLine("任務taskCenter運行在線程“{0}”上",
                    Thread.CurrentThread.ManagedThreadId);
            });
            var taskBottom = new Task(() =>
            {
                Thread.Sleep(500);
                Console.WriteLine("任務taskBottom運行在線程“{0}”上",
                    Thread.CurrentThread.ManagedThreadId);
            });
            taskTop.Start();
            taskCenter.Start();
            taskBottom.Start();
            Task.WaitAll(new Task[] { taskTop, taskCenter, taskBottom });
        }

結果:

image

分析:(目前內聯機制只有出現在等待任務場景)

       這個示例,我們從Main方法主線程中創建了一個headTask頂層任務並開啟。在headTask任務中又創建了三個嵌套任務並最后WaitAll() 這三個嵌套任務執行完成(嵌套任務安排在局部隊列)。此時出現的情況就是headTask任務的線程被阻塞,而“任務內聯化”技術會使用阻塞的headTask的線程去執行局部隊列中的任務。因為減少了對額外線程需求,從而提升了程序性能。

       局部隊列“通常”以LIFO的順序抽取任務並執行,而不是像全局隊列那樣使用FIFO順序LIFO順序通常用有利於數據局部性,能夠在犧牲一些公平性的情況下提升性能。

數據局部性的意思是:運行最后一個到達的任務所需的數據都還在任何一個級別的CPU高速緩存中可用。由於數據在高速緩存中任然是“熱的”,因此立即執行最后一個任務可能會獲得性能提升。

2)        工作竊取機制----活用空閑工作線程

當一個工作線程的局部隊列中有很多工作項正在等待時,而存在一些線程卻保持空閑,這樣會導致CPU資源的浪費。此時任務調度器(TaskScheduler)會讓空閑的工作線程進入忙碌線程的局部隊列中竊取一個等待的任務,並且執行這個任務。

 

由於局部隊列為我們帶來了性能提升,所以,我們應盡可能地使用TPL提供的服務(任務調度器(TaskScheduler)),而不是直接使用ThreadPool的方法。

 

任務並行Task

一個任務表示一個異步操作。任務運行的時候需要使用線程,但並不是說任務取代了線程,理解這點很重要。事實上,在《異步編程:.NET4.X 數據並行》中介紹的System.Threading.Tasks.Parallel類構造的並行邏輯內部都會創建Task,而它們的並行和並發執行都是由底層線程支持的。任務和線程之間也沒有一對一的限制關系,通用語言運行時(CLR)會創建必要的線程來支持任務執行的需求。

1.        Task簡單的實例成員

    public class Task : IThreadPoolWorkItem, IAsyncResult, IDisposable
    {
        public Task(Action<object> action, object state
              , CancellationToken cancellationToken,TaskCreationOptions creationOptions);

        // 獲取此 Task 實例的唯一 ID。
        public int Id { get; }
        // 獲取用於創建此任務的TaskCreationOptions。
        public TaskCreationOptions CreationOptions { get; }
        // 獲取此任務的TaskStatus。
        public TaskStatus Status { get; }
        // 獲取此 Task 實例是否由於被取消的原因而已完成執行。
        public bool IsCanceled { get; }
        // 獲取 Task 是否由於未經處理異常的原因而完成。
        public bool IsFaulted { get; }
        // 獲取導致 Task 提前結束的System.AggregateException。
        public AggregateException Exception { get; }

        #region IAsyncResult接口成員
        private bool IAsyncResult.CompletedSynchronously { get;}
        private WaitHandleIAsyncResult.AsyncWaitHandle { get; }

        // 獲取在創建 Task 時提供的狀態對象,如果未提供,則為 null。
        public object AsyncState { get; }
        // 獲取此 Task 是否已完成。
        public bool IsCompleted { get; }
        #endregion

        // 釋放由 Task 類的當前實例占用的所有資源。
        public void Dispose();
        ……
    }

         分析:

1)        CancellationTokenIsCancel

對於長時間運行的計算限制操作來說,支持取消是一件很“棒”的事情。.NET 4.0提供了一個標准的取消操作模式。即通過使用CancellationTokenSource創建一個或多個取消標記CancellationTokencancellationToken可在線程池中線程或 Task 對象之間實現協作取消),然后將此取消標記傳遞給應接收取消通知的任意數量的線程或Task對象。當調用CancellationToken關聯的CancellationTokenSource對象的Cancle()時,每個取消標記(CancellationToken)上的IsCancellationRequested屬性將返回true。異步操作中可以通過檢查此屬性做出任何適當響應。也可調用取消標記的ThrowIfCancellationRequested()方法來拋出OperationCanceledException異常。

       更多關於CancellationTokenCancellationTokenSource的介紹及示例請看《協作式取消》….

       Task任務中實現取消,可以使用以下幾種選項之一終止操作:

                                      i.              簡單地從委托中返回。在許多情況下,這樣已足夠;但是,采用這種方式“取消”的任務實例會轉換為RanToCompletion狀態,而不是 Canceled 狀態。

                                    ii.              創建Task傳入CancellationToken標識參數,並調用關聯CancellationTokenSource對象的Cancel()方法:

a)        如果Task還未開始,那么Task實例直接轉為Canceled狀態。(注意,因為已經Canceled狀態了,所以不能再在后面調用Start()

b)        (見示例:TaskOperations.Test_Cancel();)如果Task已經開始,在Task內部必須拋出OperationCanceledException異常(注意,只能存在OperationCanceledException異常,可優先考慮使用CancellationTokenThrowIfCancellationRequested()方法),Task實例轉為Canceled狀態。

若對拋出OperationCanceledException異常且狀態為CanceledTask進行等待操作(如:Wait/WaitAll),則會在Catch塊中捕獲到OperationCanceledException異常,但是此異常指示Task成功取消,而不是有錯誤的情況。因此IsCanceltrueIsFaultedfalseException屬性為null

                                  iii.              對於使用TaskContinuationOptions枚舉值為NotOnOnlyOn創建的延續任務A,在其前面的任務結束狀態不匹配時,延續任務A將轉換為Canceled狀態,並且不會運行。

2)        TaskCreationOptions枚舉

定義任務創建、調度和執行的一些可選行為。

None

指定應使用默認行為。

PreferFairness

 

較早安排的任務將更可能較早運行,而較晚安排運行的任務將更可能較晚運行。(Prefer:更喜歡 ; Fair:公平的)

LongRunning

該任務需要很長時間運行,因此,調度器可以對這個任務使用粗粒度的操作(默認TaskScheduler為任務創建一個專用線程,而不是排隊讓一個線程池線程來處理,可通過在延續任務中訪問:Thread.CurrentThread.IsThreadPoolThread屬性判別)。比如:如果任務可能需要好幾秒的時間運行,那么就使用這個參數。相反,如果任務只需要不到1秒鍾的時間運行,那么就不應該使用這個參數。

AttachedToParent

指定此枚舉值的Task,其內部創建的Task或通過ContinueWith()創建的延續任務都為子任務。(父級是頂層任務)

DenyChildAttach

如果嘗試附加子任務到創建的任務,指定System.InvalidOperationException將被引發。

HideScheduler

創建任務的執行操作將被視為TaskScheduler.Default默認計划程序。

3)        IsCompleted

Task實現了IAsyncResult接口。在任務處於以下三個最終狀態之一時IsCompleted返回 trueRanToCompletion Faulted Canceled

4)        TaskStatus枚舉

表示 Task 的生命周期中的當前階段。一個Task實例只會完成其生命周期一次,即當Task到達它的三種可能的最終狀態之一時,Task就結束並釋放。

可能的初始狀態

Created

該任務已初始化,但尚未被計划。

WaitingForActivation

只有在其它依賴的任務完成之后才會得到調度的任務的初始狀態。這種任務是使用定義延續的方法創建的。

WaitingToRun

該任務已被計划執行,但尚未開始執行。

中間狀態

Running

該任務正在運行,但尚未完成。

WaitingForChildrenToComplete

該任務已完成執行,正在隱式等待附加的子任務完成。

可能的最終狀態

RanToCompletion

已成功完成執行的任務。

Canceled

該任務已通過對其自身的CancellationToken引發OperationCanceledException異常

Faulted

由於未處理異常的原因而完成的任務。

       狀態圖如下:

                 image

5)        Dispose()

盡管Task為我們實現了IDisposable接口,但依然不推薦你主動調用Dispose()方法,而是由系統終結器進行清理。原因:

a)        Task調用Dispose()主要釋放的資源是WaitHandle對象。

b)        .NET4.5 .NET4.0 中提出的Task進行過大量的優化,讓其盡量不再依賴WaitHandle對象(eg.NET4.0TaskWaitAll()/WaitAny()的實現依賴於WaitHandle)。

c)        在使用Task時,大多數情況下找不到一個好的釋放點,保證該Task已經完成並且沒有被其他地方在使用。

d)        Task.Dispose()方法在“.NET Metro風格應用程序”框架所引用的程序集中甚至並不存在(即此框架中Task沒有實現IDisposable接口)。

更詳細更專業的Dispose()討論請看.NET4.X並行任務Task需要釋放嗎?》

2.        Task的實例方法

        // 獲取用於等待此 Task 的等待者。
        public TaskAwaiter GetAwaiter();
        // 配置用於等待此System.Threading.Tasks.Task的awaiter。
        // 參數:continueOnCapturedContext:
        //     試圖在await返回時奪取原始上下文,則為 true;否則為 false。
        public ConfiguredTaskAwaitable ConfigureAwait(bool continueOnCapturedContext);

        // 對提供的TaskScheduler同步運行 Task。
        public void RunSynchronously(TaskScheduler scheduler);
        // 啟動 Task,並將它安排到指定的TaskScheduler中執行。
        public void Start(TaskScheduler scheduler);
        // 等待 Task 完成執行過程。
        public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken);

        // 創建一個在目標 Task 完成時執行的延續任務。
        public Task ContinueWith(Action<Task, object> continuationAction, object state
                , CancellationToken cancellationToken
                , TaskContinuationOptions continuationOptions, TaskScheduler scheduler);
        public Task<TResult>ContinueWith<TResult>(
                Func<Task, object, TResult> continuationFunction
                , object state,CancellationToken cancellationToken
                , TaskContinuationOptions continuationOptions, TaskScheduler scheduler);
        ……

         分析:

1)        TaskContinuationOptions

在創建一個Task作為另一個Task的延續時,你可以指定一個TaskContinuationOptions參數,這個參數可以控制延續另一個任務的任務調度和執行的可選行為。

None

默認情況下,完成前面的任務之后“都”將安排運行延續任務,而不考慮前面任務的最終TaskStatus

AttachedToParent

對延續任務指定此枚舉值,表示該延續任務內部創建的新Task或通過ContinueWith()創建的延續任務都為子任務。(父級是延續任務)

PreferFairness

LongRunning

DenyChildAttach           

HideScheduler

 

 

參考:TaskCreationOptions枚舉

LazyCancellation

在延續取消的情況下,防止延續的完成直到完成先前的任務。

NotOnRanToCompletion

NotOnFaulted

NotOnCanceled

指定不應在延續任務前面的任務“已完成運行、引發了未處理異常、已取消”的情況下安排延續任務。

 

此選項對多任務延續無效。

OnlyOnCanceled

OnlyOnFaulted

OnlyOnRanToCompletion

指定只應在延續任務前面的任務“已取消、引發了未處理異常、已完成運行”的情況下才安排延續任務。

ExecuteSynchronously

指定應同步執行延續任務。指定此選項后,延續任務將在導致前面的任務轉換為其最終狀態的相同線程上運行。

注意:

a)        如果使用默認選項TaskContinuationOptions.None,並且之前的任務被取消了,那么延續任務任然會被調度並啟動執行。

b)        如果該條件在前面的任務准備調用延續時未得到滿足,則延續將直接轉換為 Canceled 狀態,之后將無法啟動。

c)        如果調用多任務延續(即:調用TaskFactoryTaskFactory<TResult>的靜態ContinueWhenAllContinueWhenAny方法)時,NotOnOnlyOn六個標識或標識的組合都是無效的。也就是說,無論先驅任務是如何完成的,ContinueWhenAllContinueWhenAny都會執行延續任務。

d)        TaskContinuationOptions.ExecuteSynchronously,指定同步執行延續任務。延續任務會使用前一個任務的數據,而保持在相同線程上執行就能快速訪問高速緩存中的數據,從而提升性能。此外,也可避免調度這個延續任務產生不必要的額外線程開銷。

如果在創建延續任務時已經完成前面的任務,則延續任務將在創建此延續任務的線程上運行。只應同步執行運行時間非常短的延續任務。

2)        開啟任務

只有Task處於TaskStatus.Created狀態時才能使用實例方法Start()。並且,只有在使用Task的公共構造函數構造的Task實例才能處於TaskStatus.Created狀態。

當然我們還知道有其他方式可以創建Task並開啟任務,比如Task.Run()/Task.ContinueWith()/Task.Factory.StartNew()/TaskCompletionSource/異步方法(即使用asyncawait關鍵字的方法),但是這些方法返回的Task已經處於開啟狀態,即不能再調用Start()。更豐富更專業的討論請看.NET4.X 並行任務中Task.Start()FAQ

3)        延續任務ContinueWith

a)        ContinueWith() 方法可創建一個根據TaskContinuationOptions參數限制的延續任務。可以為同一個Task定義多個延續任務讓它們並行執行。

比如,為t1定義兩個並行延續任務t2t3.

        Task<int> t1 = new Task<int>(() => { return 1; });
        Task<int> t2 = t1.ContinueWith<int>(Work1,……);
        Task<int> t3 = t1.ContinueWith<int>(Work1,……);

b)        調用Wait()方法和Result屬性會導致線程阻塞,極有可能造成線程池創建一個新線程,這增大了資源的消耗,並損害了伸縮性。可以在延續任務中訪問這些成員,並做相應操作。

c)        對前面任務的引用將以參數形式傳遞給延續任務的用戶委托,以將前面任務的數據傳遞到延續任務中。

4)        Wait()

一個線程調用Wait()方法時,系統會檢查線程要等待的Task是否已開始執行。

a)        如果是,調用Wait()的線程會阻塞,直到Task運行結束為止。

b)        如果Task還沒有開始執行,系統可能(取決於局部隊列的內聯機制)使用調用Wait()的線程來執行Task。如果發生這種情況,那么調用Wait()的線程不會阻塞;它會執行Task並立刻返回。

                                       i.              這樣做的好處在於,沒有線程會被阻塞,所以減少了資源的使用(因為不需要創建一個線程來替代被阻塞的線程),並提升了性能(因為不需要花時間創建一個線程,也沒有上下文切換)。

                                     ii.              但不好的地方在於,假如線程在調用Wait()前已經獲得一個不可重入的線程同步鎖(egSpinLock),而Task試圖獲取同一個鎖,就會造成一個死鎖的線程!

5)        RunSynchronously

可在指定的TaskSchedulerTaskScheduler.Current中同步運行 Task。即RunSynchronously()之后的代碼會阻塞到Task委托執行完畢。

示例如下:

            Task task1 = new Task(() =>
            {
                Thread.Sleep(5000);
                Console.WriteLine("task1執行完畢。");
            });
            task1.RunSynchronously();
            Console.WriteLine("執行RunSynchronously()之后的代碼。");

            // 輸出==============================
            // task1執行完畢。
            // 執行RunSynchronously()之后的代碼。

3.        Task的靜態方法

        // 返回當前正在執行的 Task 的唯一 ID。
        public static int? CurrentId{ get; }
        // 提供對用於創建 Task 和 Task<TResult>實例的工廠方法的訪問。
        public static TaskFactory Factory { get; }
        // 創建指定結果的、成功完成的Task<TResult>。
        public static Task<TResult> FromResult<TResult>(TResult result);

        // 創建將在指定延遲后完成的任務。
        public static Task Delay(int millisecondsDelay, CancellationToken cancellationToken);

        // 將在線程池上運行的指定工作排隊,並返回該工作的任務句柄。
        public static Task Run(Action action, CancellationToken cancellationToken);
        // 將在線程池上運行的指定工作排隊,並返回該工作的 Task(TResult) 句柄。
        public static Task<TResult>  Run<TResult>(Func<TResult> function, CancellationToken  cancellationToken);
        // 將在線程池上運行的指定工作排隊,並返回 function 返回的任務的代理項。
        public static Task Run(Func<Task> function, CancellationToken cancellationToken);
        // 將在線程池上運行的指定工作排隊,並返回 function 返回的 Task(TResult) 的代理項。
        public static Task<TResult> Run<TResult>(Func<Task<TResult>> function, CancellationToken cancellationToken);

        // 等待提供的所有 Task 對象完成執行過程。
        public static bool WaitAll(Task[] tasks, intmillisecondsTimeout, CancellationToken cancellationToken);
        // 等待提供的任何一個 Task 對象完成執行過程。
        // 返回結果:
        //     已完成的任務在 tasks 數組參數中的索引,如果發生超時,則為 -1。
        public static int WaitAny(Task[] tasks, int millisecondsTimeout, CancellationToken cancellationToken);

        // 所有提供的任務已完成時,創建將完成的任務。
        public static Task WhenAll(IEnumerable<Task> tasks);
        public static Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks);
        // 任何一個提供的任務已完成時,創建將完成的任務。
        public static Task<Task> WhenAny(IEnumerable<Task> tasks);
        public static Task<Task<TResult>> WhenAny<TResult>(IEnumerable<Task<TResult>> tasks);

        // 創建awaitable,等待時,它異步產生當前上下文。
        // 返回結果:等待時,上下文將異步轉換回等待時的當前上下文。
        // 如果當前SynchronizationContext不為 null,則將其視為當前上下文。
        // 否則,與當前執行任務關聯的任務計划程序將視為當前上下文。
        public static YieldAwaitable Yield();

         分析:

1)        FromResult<TResult>(TResult result);

創建指定結果的、成功完成的Task<TResult>。我們可以使用此方法創建包含預先計算結果/緩存結果的 Task<TResult>對象,示例代碼CachedDownloads.cs示例文件。

2)        Delay

創建將在指定延遲后完成的任務,返回Task。可以通過awaitTask.Wait()來達到Thread.Sleep()的效果。盡管,Task.Delay() Thread.Sleep()消耗更多的資源,但是Task.Delay()可用於為方法返回Task類型;或者根據CancellationToken取消標記動態取消等待。

Task.Delay()等待完成返回的Task狀態為RanToCompletion;若被取消,返回的Task狀態為Canceled

            var tokenSource = new CancellationTokenSource();
            var token = tokenSource.Token;
            Task.Factory.StartNew(() => { Thread.Sleep(1000); tokenSource.Cancel(); });
            Console.WriteLine("Begin taskDelay1");
            Task taskDelay1 = Task.Delay(100000, token);
            try
            {
                taskDelay1.Wait();
            }
            catch (AggregateException ae)
            {
                foreach (var v in ae.InnerExceptions)
                    Console.WriteLine(ae.Message + " " + v.Message);
            }
            taskDelay1.ContinueWith((t) =>Console.WriteLine(t.Status.ToString()));

            Thread.Sleep(100);
            Console.WriteLine();

            Console.WriteLine("Begin taskDelay2");
            Task taskDelay2 = Task.Delay(1000);
            taskDelay2.ContinueWith((t) =>Console.WriteLine(t.Status.ToString()));
            // 輸出======================================
            // Begin taskDelay1
            // 發生一個或多個錯誤。已取消一個任務。
            // Canceled
            // 
            // Begin taskDelay2
            // Completed

4.        Task<TResult>:Task

Task<TResult>繼承自Task,表示一個可以返回值的異步操作,提供Result只讀屬性用於訪問異步操作的返回值。該屬性會阻塞線程,直到Task執行完畢並返回值。

 

System.Threading.Tasks.TaskFactory         

1.        設置共用\默認的參數

通過TaskFactory對象提供的SchedulerCancellationTokenCreationOptionContinuationOptions屬性可以為Task設置共用\默認的參數,以便快捷的創建Task或延續任務。影響StartNew()ContinueWhenAll()|ContinueWhenAny()FromAsync()方法的默認參數設置。

2.        StartNew()

Task.Factory.StartNew()可快速創建一個Task並且開啟任務。代碼如下:

        var t = Task.Factory.StartNew(someDelegate);

這等效於:

        var t = new Task(someDelegate); 
        t.Start();

表現方面,前者更高效。Start()采用同步方式運行以確保任務對象保持一致的狀態即使是同時調用多次Start(),也可能只有一個調用會成功。相比之下,StartNew()知道沒有其他代碼能同時啟動任務,因為在StartNew()返回之前它不會將創建的Task引用給任何人,所以StartNew()不需要采用同步方式執行。更豐富更專業的討論請看.NET4.X 並行任務中Task.Start()FAQ

3.        ContinueWhenAll()

        public Task ContinueWhenAll(Task[] tasks, Action<Task[]> continuationAction
            , CancellationToken cancellationToken
            , TaskContinuationOptions continuationOptions, TaskScheduler scheduler);

創建一個延續 Task 或延續 Task<TResult>,它將在提供的一組任務完成后馬上開始。延續任務操作委托接受一個Task[]數組做參數。

4.        ContinueWhenAny()

        public Task ContinueWhenAny(Task[] tasks, Action<Task> continuationAction
            , CancellationToken cancellationToken
            , TaskContinuationOptions continuationOptions, TaskScheduler scheduler);

創建一個延續 Task 或延續 Task<TResult>,它將在提供的組中的任何一個任務完成后馬上開始。延續任務操作委托接受一個 Task 做參數。

5.        通過Task.TaskFactory.FromAsync() 實例方法,我們可以將APM轉化為TAP。示例見此文的后面小節AMP轉化為TAPEAP轉化為TAP

 

System.Threading.Tasks.TaskScheduler        

       TaskScheduler表示一個處理將任務排隊到線程中的底層工作對象。TaskScheduler通常有哪些應用呢?

1.        TaskScheduler是抽象類,可以繼承它實現自己的任務調度計划。如:默認調度程序ThreadPoolTaskScheduler、與SynchronizationContext.Current關聯的SynchronizationContextTaskScheduler

2.        TaskScheduler.Default獲取默認調度程序ThreadPoolTaskScheduler

3.        TaskScheduler.Current獲取當前任務執行的TaskScheduler

4.        TaskScheduler.TaskSchedulerFromCurrentSynchronizationContext() 方法獲取與SynchronizationContext.Current關聯的SynchronizationContextTaskSchedulerSynchronizationContextTaskScheduler上的任務都會通過SynchronizationContext.Post()在同步上下文中進行調度。通常用於實現跨線程更新控件。

5.        通過MaximumConcurrencyLevel設置任務調度計划能支持的最大並發級別。

6.        通過UnobservedTaskException事件捕獲未被觀察到的異常。

 

System.Threading.Tasks.TaskExtensions

提供一組用於處理特定類型的 Task 實例的靜態方法。將特定Task實例進行解包操作。

    public static class TaskExtensions
    {
        public static Task<TResult> Unwrap<TResult>(this Task<Task<TResult>> task);
        public static Task Unwrap(this Task<Task> task);
    }

 

AMP轉化為TAPEAP轉化為TAP

1.        AMP轉化為TAP

通過Task.TaskFactory.FromAsync() 實例方法,我們可以將APM轉化為TAP

注意點:

1)        FromAsync方法返回的任務具有WaitingForActivation狀態,並將在創建該任務后的某一時間由系統啟動。如果嘗試在這樣的任務上調用 Start,將引發異常。

2)        轉化的APM異步模型必須符合兩個模式:

a)        接受Begin***End***方法。此時要求Begin***方法簽名的委托必須是AsyncCallback以及 End***方法只接受IAsyncResult一個參數。此模式AsyncCallback回調由系統自動生成,主要工作是調用End***方法。

        public Task<TResult> FromAsync<TArg1, TResult>(
              Func<TArg1, AsyncCallback, object, IAsyncResult> beginMethod
            , Func<IAsyncResult, TResult> endMethod, TArg1 arg1
            , object state, TaskCreationOptions creationOptions);

b)        接受IAsyncResult對象以及End***方法。此時Begin***方法的簽名已經無關緊要只要(即:此模式支持傳入自定義回調委托)能返回IAsyncResult的參數以及 End***方法只接受IAsyncResult一個參數。

        public Task<TResult> FromAsync<TResult>(IAsyncResult asyncResult
            , Func<IAsyncResult, TResult> endMethod);

3)        當然,我們有時需要給客戶提供統一的 Begin***() End***() 調用方式,我們可以直接使用Task從零開始構造APM。即:在 Begin***() 創建並開啟任務,並返回Task。因為Task是繼承自IAsyncResult接口的,所以我們可以將其傳遞給 End***() 方法,並在此方法里面調用Result屬性來等待任務完成。

4)        對於返回的Task,可以隨時以任務延續的形式注冊回調。

現在將在APM異步編程模型博文中展現的示例轉化為TAP模式。關鍵代碼如下:

        public Task<int> CalculateAsync<TArg1, TArg2>(
              Func<TArg1, TArg2, AsyncCallback, object, IAsyncResult> beginMethod
            , AsyncCallback userCallback, TArg1 num1, TArg2 num2, object asyncState)
        {
            IAsyncResult result = beginMethod(num1, num2, userCallback, asyncState);
            return Task.Factory.FromAsync<int>(result
                    , EndCalculate, TaskCreationOptions.None);
        }

        public Task<int> CalculateAsync(int num1, int num2, object asyncState)
        {
            return Task.Factory.FromAsync<int, int, int>(BeginCalculate, EndCalculate
                    , num1, num2, asyncState, TaskCreationOptions.None);
        }

2.        EAP轉化為TAP

我們可以使用TaskCompletionSource<TResult>實例將EAP操作表示為一個Task<TResult>

TaskCompletionSource<TResult>表示未綁定委托的Task<TResult>的制造者方,並通過TaskCompletionSource<TResult>.Task屬性獲取由此Tasks.TaskCompletionSource<TResult>創建的Task<TResult>

注意,TaskCompletionSource<TResult>創建的任何任務將由TaskCompletionSource啟動,因此,用戶代碼不應在該任務上調用 Start()方法。

    public class TaskCompletionSource<TResult>
    {
        public TaskCompletionSource();
        // 使用指定的狀態和選項創建一個TaskCompletionSource<TResult>。
        //   state: 要用作基礎 Task<TResult>的AsyncState的狀態。
        public TaskCompletionSource(object state, TaskCreationOptions creationOptions);

        // 獲取由此Tasks.TaskCompletionSource<TResult>創建的Tasks.Task<TResult>。
        public Task<TResult> Task { get; }

        // 將基礎Tasks.Task<TResult>轉換為Tasks.TaskStatus.Canceled狀態。
        public void SetCanceled();
		public bool TrySetCanceled();

        // 將基礎Tasks.Task<TResult>轉換為Tasks.TaskStatus.Faulted狀態。
        public void SetException(Exception exception);
        public void SetException(IEnumerable<Exception> exceptions);
        public bool TrySetException(Exception exception);
        public bool TrySetException(IEnumerable<Exception> exceptions);

        // 嘗試將基礎Tasks.Task<TResult>轉換為TaskStatus.RanToCompletion狀態。
        public bool TrySetResult(TResult result);
        ……        
    }

現在我將在《基於事件的異步編程模(EAP)博文中展現的BackgroundWorker2組件示例轉化為TAP模式。

我們需要修改地方有:

1)        創建一個TaskCompletionSource<int>實例tcs

2)        tcs.Task返回的任務創建延續任務,延續任務中根據前面任務的IsCanceledIsFaultedResult等成員做邏輯;

3)        Completed事件,在這里面我們將設置返回任務的狀態。

關鍵代碼如下:

        // 1、創建 TaskCompletionSource<TResult>
	tcs = new TaskCompletionSource<int>();
	worker2.RunWorkerCompleted += RunWorkerCompleted;
        // 2、注冊延續
	tcs.Task.ContinueWith(t =>
	{
            if (t.IsCanceled)
                MessageBox.Show("操作已被取消");
            else if (t.IsFaulted)
                MessageBox.Show(t.Exception.GetBaseException().Message);
            else
                MessageBox.Show(String.Format("操作已完成,結果為:{0}", t.Result));
        }, TaskContinuationOptions.ExecuteSynchronously);
        // 3、運行異步任務
        worker2.RunWorkerAsync();
        // 4、Completed事件
        private void RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e)
        {
            if (e.Error != null)
                tcs.SetException(e.Error);
            else if (e.Cancelled)
                tcs.SetCanceled();
            else
                tcs.SetResult((int)e.Result);
            // 注銷事件,避免多次掛接事件
            worker2.RunWorkerCompleted -= RunWorkerCompleted;
        }

 

使用關鍵字asyncawait實現異步方法

       C#5.0中引入了asyncawait關鍵字,可以方便我們使用順序結構流(即不用回調)來實現異步編程,大大降低了異步編程的復雜程度。(vs2010 Visual Studio Async CTP for VS2010補丁可以引入關鍵字”async”和”await”的支持,但是得不到.net4.5新增API的支持)

異步方法的實現原理

異步方法不需要多線程,因為一個異步方法並不是運行在一個獨立的線程中的。

異步方法運行在當前同步上下文中,只有激活的時候才占用當前線程的時間。

異步模型采用時間片輪轉來實現。

異步方法的參數和返回值

異步方法的參數:

不能使用“ref”參數和“out”參數,但是在異步方法內部可以調用含有這些參數的方法

異步方法的返回類型:

Task<TResult>:Tresult為異步方法的返回值類型。

Task:異步方法沒有返回值。

void:主要用於事件處理程序(不能被等待,無法捕獲異常)。異步事件通常被認為是一系列異步操作的開始。使用void返回類型不需要await,而且調用void異步方法的函數不會捕獲方法拋出的異常。(異步事件中使用await,倘若等待的任務由有異常會導致拋出“調用的目標發生了異常”。當然你可以在異步事件中調用另一個有返回值的異步方法)

異步方法的命名規范

異步方法的方法名應該以Async作為后綴

事件處理程序,基類方法和接口方法,可以忽略此命名規范:例如: startButton_Click不應重命名為startButton_ClickAsync

    async和await關鍵字不會導致其他線程的創建,執行異步方法的線程為其調用線程。而異步方法旨在成為非阻塞操作,即當await等待任務運行時,異步方法會將控制權轉移給異步方法外部,讓其不受阻塞的繼續執行,待await等待的任務執行完畢再將控制權轉移給await處,繼續執行異步方法后續的代碼。

1.        我們可通過下圖來明白異步方法的構建和異步方法的執行流程。(代碼詳見我提供的示例程序async_await_method項目)

       image

需要注意的一個問題:被“async”關鍵字標記的方法的調用都會強制轉變為異步方式嗎?

不會,當你調用一個標記了”async”關鍵字的方法,它會在當前線程以同步的方式開始運行。所以,如果你有一個同步方法,它返回void並且你做的所有改變只是將其標記的“async”,這個方法調用依然是同步的。返回值為TaskTask<TResult>也一樣。

方法用“async”關鍵字標記不會影響方法是同步還是異步運行並完成,而是,它使方法可被分割成多個片段,其中一些片段可能異步運行,這樣這個方法可能異步完成。這些片段界限就出現在方法內部顯示使用“await”關鍵字的位置處。所以,如果在標記了“async”的方法中沒有顯示使用“await”,那么該方法只有一個片段,並且將以同步方式運行並完成。

2.        編譯器轉換

使用 async 關鍵字標記方法,會導致 C# Visual Basic 編譯器使用狀態機重新編寫該方法的實現。借助此狀態機,編譯器可以在該方法中插入多個中斷點,以便該方法可以在不阻止線程的情況下,掛起和恢復其執行。這些中斷點不會隨意地插入。它們只會在您明確使用 await 關鍵字的位置插入:

    private async void btnDoWork_Click(object sender, EventArgs e)
    {
        ...
        await someObject; // <-- potential method suspension point
        ...
    }

當您等待未完成的異步操作時,編譯器生成的代碼可確保與該方法相關的所有狀態(例如,局部變量)封裝並保留在堆中。然后,該函數將返回到調用程序,允許在其運行的線程中執行其他任務。當所等待的異步操作在稍后完成時,該方法將使用保留的狀態恢復執行。

任何公開 await 模式的類型都可以進行等待。該模式主要由一個公開的 GetAwaiter()方法組成,該方法會返回一個提供 IsCompletedOnCompleted GetResult 成員的類型。當您編寫以下代碼時:

        await someObject;

編譯器會生成一個包含 MoveNext 方法的狀態機類:

private class FooAsyncStateMachine : IAsyncStateMachine
{ 
    // Member fields for preserving “locals” and other necessary     state 
    int $state; 
    TaskAwaiter $awaiter; 
    … 
    public void MoveNext() 
    { 
        // Jump table to get back to the right statement upon         resumption 
        switch (this.$state) 
        { 
            … 
        case 2: goto Label2; 
            … 
        } 
        … 
        // Expansion of “await someObject;” 
        this.$awaiter = someObject.GetAwaiter(); 
        if (!this.$awaiter.IsCompleted) 
        { 
            this.$state = 2; 
            this.$awaiter.OnCompleted(MoveNext); 
            return; 
            Label2: 
        } 
        this.$awaiter.GetResult(); 
        … 
    } 
}

在實例someObject上使用這些成員來檢查該對象是否已完成(通過 IsCompleted),如果未完成,則掛接一個續體(通過 OnCompleted),當所等待實例最終完成時,系統將再次調用 MoveNext 方法,完成后,來自該操作的任何異常將得到傳播或作為結果返回(通過 GetResult),並跳轉至上次執行中斷的位置。

3.        自定義類型支持等待

如果希望某種自定義類型支持等待,我們可以選擇兩種主要的方法。

1)        一種方法是針對自定義的可等待類型手動實現完整的 await 模式,提供一個返回自定義等待程序類型的 GetAwaiter 方法,該等待程序類型知道如何處理續體和異常傳播等等。

2)        第二種實施該功能的方法是將自定義類型轉換為Task,然后只需依靠對等待任務的內置支持來等待特殊類型。前文所展示的EAP轉化為TAP正屬於這一類,關鍵代碼如下:

        private async void btn_Start_Click(object sender, EventArgs e)
        {
            this.progressBar1.Value = 0;

            tcs = new TaskCompletionSource&lt;int&gt;();
            worker2.RunWorkerCompleted += RunWorkerCompleted;
            tcs.Task.ContinueWith(t =&gt;
            {
                if (t.IsCanceled)
                    MessageBox.Show("操作已被取消");
                else if (t.IsFaulted)
                    MessageBox.Show(t.Exception.GetBaseException().Message);
                else
                    MessageBox.Show(String.Format("操作已完成,結果為:{0}", t.Result));
            }, TaskContinuationOptions.ExecuteSynchronously);

            worker2.RunWorkerAsync();
            // void的異步方法:主要用於事件處理程序(不能被等待,無法捕獲異常)。異步事件通常被認為
            // 是一系列異步操作的開始。使用void返回類型不需要await,而且調用void異步方法的函數不
            // 會捕獲方法拋出的異常。(異步事件中使用await,倘若等待的任務由有異常會導致
            // 拋出“調用的目標發生了異常”。當然你可以在異步事件中調用另一個有返回值的異步方法)

            // 所以不需要下面的await,因為會出現在執行取消后拖動界面會因異常被觀察到並且終止整個進程
            // await tcs.Task;
        }

 

處理TAP中的異常

       在任務拋出的未處理異常都封裝在System.AggregateException對象中。這個對象會存儲在方法返回的TaskTask<TResult>對象中,需要通過訪問Wait()ResultException成員才能觀察到異常。(所以,在訪問Result之前,應先觀察IsCanceledIsFaulted屬性)

1.        AggregateException對象的三個重要成員

1)        InnerExceptions屬性

獲取導致當前異常的System.Exception實例的只讀集合(即,ReadOnlyCollection<Exception>)。不要將其與基類Exception提供的InnerException屬性混淆。

2)        Flatten() 方法

遍歷InnerExceptions異常列表,若列表中包含類型為AggregateException的異常,就移除所有嵌套AggregateException,直接返回其真真的異常信息(效果如下圖)。

                 image

1)        Handle(Func<Exception, bool> predicate)方法

它為AggregateException中包含的每個異常都調用一個回調方法。然后,回調方法可以為每個異常決定如何對其進行處理,回調返回true表示異常已經處理,返回false表示沒有。在調用Handle之后,如果至少有一個異常沒有處理,就創建一個新的AggregateException對象,其中只包含未處理的異常,並拋出這個新的AggregateException對象

比如:將任何OperationCanceledException對象都視為已處理。其他任何異常都造成拋出一個新的AggregateException,其中只包含未處理的異常。

        try{……}
        catch (AggregateException ae)
        {
            ae.Handle(e => e is OperationCanceledException);
        }

1.        父任務生成了多個子任務,而多個子任務都拋出了異常

1)        嵌套子任務

        Task t4 = Task.Factory.StartNew(() =>
        {
            Task.Factory.StartNew(() => { throw new Exception("子任務Exception_1"); }
                    , TaskCreationOptions.AttachedToParent);

            Task.Factory.StartNew(() => { throw new Exception("子任務Exception_2"); }
                    , TaskCreationOptions.AttachedToParent);

            throw new Exception("父任務Exception");
        });

對於“嵌套子任務”中子任務的異常都會包裝在父任務返回的TaskTask<TResult>對象中。如此例子中 t4.Exception.InnerExceptionsCount3

       對於子任務返回的異常類型為包裝過的AggregateException對象,為了避免循環訪問子任務異常對象的InnerExceptions才能獲取真真的異常信息,可以使用上面提到的Flatten() 方法移除所有嵌套的AggregateExceprion

2)        Continue子任務

            Task t1 = Task.Factory.StartNew(() =>
            {
                Thread.Sleep(500);   // 確保已注冊好延續任務
                throw new Exception("父任務Exception");
            }, TaskCreationOptions.AttachedToParent);
            Task t2 = t1.ContinueWith((t) =>
            {
                throw new Exception("子任務Exception_1");
            });
            Task t3 = t1.ContinueWith((t) =>
            {
                throw new Exception("子任務Exception_2");
            });

       對於“Continue子任務”中的子任務其異常與父任務是分離的,各自包裝在自己返回的Task Task<TResult>對象中。如此示例 t1t2t3 Exception.InnerExceptionsCount都為1    

2.        TaskSchedulerUnobservedTaskException事件

假如你一直不訪問TaskWait()ResultException成員,那么你將永遠注意不到這些異常的發生。為了幫助你檢測到這些未處理的異常,可以向TaskScheduler對象的UnobservedTaskException事件注冊回調函數。每當一個Task被垃圾回收時,如果存在一個沒有注意到的異常,CLR的終結器線程會引發這個事件。

可在事件回調函數中調用UnobservedTaskExceptionEventArgs對象的SetObserved() 方法來指出已經處理好了異常,從而阻止CLR終止線程。然而並不推薦這么做,寧願終止進程也不要帶着已經損壞的狀態繼續運行。

示例代碼:(要監控此代碼必須在GC.Collect();和事件里兩個地方進行斷點)

            TaskScheduler.UnobservedTaskException += (s, e) =>
            {
                //設置所有未覺察異常已被處理
                e.SetObserved();
            };
            Task.Factory.StartNew(() =>
            {
                throw new Exception();
            });
            //確保任務完成
            Thread.Sleep(100);
            //強制垃圾會受到,在GC回收時才會觸發UnobservedTaskException事件
            GC.Collect();
            //等待終結器處理
            GC.WaitForPendingFinalizers();

3.        返回voidasync“異步方法”中的異常

我們已經知道返回TaskTask<TResult>對象的任務中拋出的異常會隨着返回對象一起返回,可通過Exception屬性獲取。這對於返回TaskTask<TResult>對象的“異步方法”情況也是一樣。

然而對於返回void的“異步方法”,方法中拋出的異常會直接導致程序奔潰。

        public static async void Test_void_async_Exception()
        {
            throw new Exception();
        }

另外,我們還要特別注意lambda表達式構成的“異步方法”,如:

        Enumerable.Range(0, 3).ToList().ForEach(async (i) => { throw new Exception(); });

 

 

       本博文到此結束,我相信你看累了,其實我也寫了很久…很久…,寫完此文,我的異步編程系列也算有頭有尾了(還會繼續擴充)。本博文主要介紹了Task的重要API、任務的CLR線程池引擎、TaskFactory對象、TaskScheduler對象、TaskExtensions對象、AMP轉化為TAPEAP轉化為TAP、使用關鍵字asyncawait實現異步方法以及自定義類型支持等待、處理TAP中的異常。

感謝你的觀看,如果對你有幫助,還請多多推薦……

 

 

===================================================================

本篇博文基於.NET4.5中TPL所寫。對於.NET4.0中TPL會有些差異,若有園友知道差異還請告知,我這邊做個記錄方便大家也方便自己。

1、.NET4.0中TPL未觀察到的異常會在GC回收時終止進程。(園友:YamatAmain,討論見21-26樓)

 

===================================================================

 

 

推薦閱讀:

         異步性能:了解 Async 和 Await 的成本-----有講解到使用Task.ConfigureAwait(false)來避免捕獲原上下文來提升性能。

關於asyncawaitFAQ      -----詳細講解了awaitasync的作用和意義,以及什么是可等待對象、等待者……(此文可幫助你解決80%關於awaitasync關鍵字的疑惑)

               深入探究 WinRT await      -----基於WinRT平板win8系統,講解了異步功能API、TAP、編譯器轉換……

 

 

參考資料:MSDN

                    書籍:《CLR via C#(第三版)

書籍:《C# 並行編程高級教程:精通.NET 4 Parallel Extensions


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM