异步编程:(TAP)基于任务的异步编程模型详解


   最近我为大家陆续介绍了“ IAsyncResult 异步编程模型 (APM) ”和“ 基于事件的异步编程模式 (EAP) ”两种异步编程模型。在 .NET4.0 中 Microsoft 又为我们引入了新的异步编程模型“基于任务的异步编程模型 (TAP) ”,并且推荐我们在开发新的多线程应用程序中首选 TAP 。那现在我先介绍下 TAP 具有哪些优势:

1.         目前版本 (.NET4.X) 的任务调度器 (TaskScheduler) 依赖于底层的线程池引擎。通过局部队列的任务内联化 (task inlining) 和工作窃取机制可以为我们提升程序性能。

2.         轻松实现任务等待、任务取消、延续任务、异常处理( System.AggregateException )、 GUI 线程操作。

3.         在任务启动后,可以随时以任务延续的形式注册回调。

4.         充分利用现有的线程,避免创建不必要的额外线程。

5.         结合 C#5.0 引入 async 和 await 关键字轻松实现“异步方法”。

 

示例源码: 异步编程: (TAP) 基于任务的异步编程模型详解 .rar

 

术语:

APM               异步编程模型, Asynchronous Programming Model

EAP                基于事件的异步编程模式, Event-based Asynchronous Pattern

TAP                基于任务的异步编程模式, Task-based Asynchronous Pattern

 

理解 CLR 线程池引擎、理解全局队列、理解线程的局部队列及性能优势

1.         CLR 线程池引擎

CLR 线程池引擎维护了一定数量的空闲工作线程以支持工作项的执行,并且能够重用已有的线程以避免创建新的不必要的线程所花费的昂贵的处理过程。并且使用爬山算法( hill-climbing algorithm )检测吞吐量,判断是否能够通过更多的线程来完成更多的工作项。这个算法的判断依据是工作项所需某些类型资源的可用情况,例如: CPU、网络带宽或其他。此外这个算法还会考虑一个饱和点,即达到饱和点的时候,创建更多地线程反而会降低吞吐量。(线程池的详细介绍请看 《异步编程:使用线程池管理线程》 

目前版本的 TAP 的任务调度器( TaskScheduler )基于 CLR 线程池引擎实现。当 任务调度器( TaskScheduler ) 开始 分派任务 时:

1)         在主线程或其他并没有分配给某个特定任务的线程的上下文中创建并启动的任务,这些任务将会在 全局队列 中竞争工作线程。这些任务被称为 顶层任务 。

2)         然而,如果是在其他任务的上下文中创建的任务(子任务或嵌套任务),这些任务将被分配在线程的局部队列中。

嵌套任务:

是在另一个任务的用户委托中创建并启动的任务。

子任务:

是使用 TaskCreationOptions.AttachedToParent 选项创建顶层任务的嵌套任务或延续任务;或使用 TaskContinuationOptions.AttachedToParent 选项创建的延续任务的嵌套任务或延续任务。(应用程序使用 TaskCreationOptions.DenyChildAttach 选项创建父任务。此选项指示运行时会取消子任务的 AttachedToParent 规范)

如果你不想特定的任务放入线程的局部队列,那么可以指定 TaskCreationOptions.PreferFairness 或 TaskContinuationOptions.PreferFairness 枚举参数。(使 Task 与 ThreadPool.QueueUserWorkItem 行为相同)

2.         线程池的全局队列

       当调用 ThreadPool.QueueUserWorkItem() 添加工作项时,该工作项会被添加到线程池的全局队列中。 线程池中的空闲线程以 FIFO 的顺序将工作项从全局队列中取出并执行,但并不能保证按某个指定的顺序完成。

       线程的全局队列是共享资源,所以内部会实现一个锁机制。 当一个任务内部会创建很多子任务时 ,并且这些子任务完成得非常快,就会造成频繁的进入全局队列和移出全局队列,从而降低应用程序的性能。基于此原因,线程池引擎为每个线程引入了局部队列。

3.         线程的局部队列为我们带来两个性能优势:任务内联化 (task inlining) 和工作窃取机制。

1)         任务内联化 (task inlining)---- 活用顶层任务工作线程

我们用一个示例来说明:

static void Main(string[] args) { Task headTask= new Task(() => { DoSomeWork(null); }); headTask.Start(); Console.Read(); } private static void DoSomeWork(object obj) { Console.WriteLine("任务headTask运行在线程“{0}”上", Thread.CurrentThread.ManagedThreadId); var taskTop = new Task(() => { Thread.Sleep(500); Console.WriteLine("任务taskTop运行在线程“{0}”上", Thread.CurrentThread.ManagedThreadId); }); var taskCenter = new Task(() => { Thread.Sleep(500); Console.WriteLine("任务taskCenter运行在线程“{0}”上", Thread.CurrentThread.ManagedThreadId); }); var taskBottom = new Task(() => { Thread.Sleep(500); Console.WriteLine("任务taskBottom运行在线程“{0}”上", Thread.CurrentThread.ManagedThreadId); }); taskTop.Start(); taskCenter.Start(); taskBottom.Start(); Task.WaitAll(new Task[] { taskTop, taskCenter, taskBottom }); }

结果:

image

分析:

       这个示例,我们从 Main 方法主线程中创建了一个 headTask 顶层任务并开启。在headTask 任务中又创建了三个嵌套任务并最后 WaitAll() 这三个嵌套任务执行完成。此时出现的情况就是 headTask 任务的线程被阻塞,而“任务内联化”技术会使用阻塞的headTask 的线程去执行局部队列中的任务。因为减少了对额外线程需求,从而提升了程序性能。

       局部队列“ 通常 ”以 LIFO 的顺序抽取任务并执行,而不是像全局队列那样使用 FIFO 顺序 。 LIFO 顺序通常用有利于数据局部性,能够在牺牲一些公平性的情况下提升性能。

数据局部性的意思是:运行最后一个到达的任务所需的数据都还在任何一个级别的CPU 高速缓存中可用。由于数据在高速缓存中任然是“热的”,因此立即执行最后一个任务可能会获得性能提升。

2)         工作窃取机制 ---- 活用空闲工作线程

当一个工作线程的局部队列中有很多工作项正在等待时,而存在一些线程却保持空闲,这样会导致 CPU 资源的浪费。此时任务调度器( TaskScheduler )会让空闲的工作线程进入忙碌线程的局部队列中窃取一个等待的任务,并且执行这个任务。

 

由于局部队列为我们带来了性能提升,所以,我们应尽可能地使用 TAP 提供的服务(任务调度器( TaskScheduler )),而不是直接使用 ThreadPool 的方法。

 

任务并行 Task

一个任务表示一个异步操作。任务运行的时候需要使用线程,但并不是说任务取代了线程,理解这点很重要。事实上,在 《异步编程: .NET4.X 数据并行》 中介绍的 System.Threading.Tasks.Parallel 类构造的并行逻辑内部都会创建 Task ,而它们的并行和并发执行都是由底层线程支持的。任务和线程之间也没有一对一的限制关系,通用语言运行时( CLR )会创建必要的线程来支持任务执行的需求。

1.         Task 简单的实例成员

public class Task : IThreadPoolWorkItem, IAsyncResult, IDisposable { public Task(Action<object> action, object state , CancellationToken cancellationToken,TaskCreationOptions creationOptions); // 获取此 Task 实例的唯一 ID。 public int Id { get; } // 获取用于创建此任务的TaskCreationOptions。 public TaskCreationOptions CreationOptions { get; } // 获取此任务的TaskStatus。 public TaskStatus Status { get; } // 获取此 Task 实例是否由于被取消的原因而已完成执行。 public bool IsCanceled { get; } // 获取 Task 是否由于未经处理异常的原因而完成。 public bool IsFaulted { get; } // 获取导致 Task 提前结束的System.AggregateException。 public AggregateException Exception { get; } #region IAsyncResult接口成员 private bool IAsyncResult.CompletedSynchronously { get;} private WaitHandleIAsyncResult.AsyncWaitHandle { get; } // 获取在创建 Task 时提供的状态对象,如果未提供,则为 null。 public object AsyncState { get; } // 获取此 Task 是否已完成。 public bool IsCompleted { get; } #endregion // 释放由 Task 类的当前实例占用的所有资源。 public void Dispose(); …… }

         分析:

1)         CancellationToken 、 IsCancel

对于长时间运行的计算限制操作来说,支持取消是一件很“棒”的事情。 .NET 4.0提供了一个标准的取消操作模式。即通过使用 CancellationTokenSource 创建一个或多个取消标记 CancellationToken ( cancellationToken 可在线程池中线程或 Task 对象之间实现协作取消),然后将此取消标记传递给应接收取消通知的任意数量的线程或 Task 对象。当调用 CancellationToken 关联的 CancellationTokenSource 对象的 Cancle() 时,每个取消标记 (CancellationToken) 上的 IsCancellationRequested 属性将返回 true 。异步操作中可以通过检查此属性做出任何适当响应。也可调用取消标记的 ThrowIfCancellationRequested() 方法来抛出 OperationCanceledException 异常。

       更多关于 CancellationToken 与 CancellationTokenSource 的介绍及示例请看 《协作式取消》 ….

       在 Task 任务中实现取消,可以使用以下几种选项之一终止操作:

                                      i.               简单地从委托中返回。在许多情况下,这样已足够;但是,采用这种方式“取消”的任务实例会转换为 RanToCompletion 状态,而不是 Canceled 状态。

                                    ii.               创建 Task 时 传入 CancellationToken 标识参数 ,并调用关联 CancellationTokenSource 对象的 Cancel() 方法:

a)         如果 Task 还未开始,那么 Task 实例直接转为 Canceled 状态。(注意,因为已经 Canceled 状态了,所以不能再在后面调用 Start() )

b)         (见示例: TaskOperations.Test_Cancel(); )如果 Task 已经开始,在 Task 内部必须抛出 OperationCanceledException 异常(注意,只能存在 OperationCanceledException 异常,可优先考虑使用 CancellationToken 的 ThrowIfCancellationRequested() 方法), Task 实例转为 Canceled 状态。

若对抛出 OperationCanceledException 异常且状态为 Canceled 的 Task 进行等待操作(如: Wait/WaitAll ),则会在 Catch 块中捕获到 OperationCanceledException 异常,但是此异常指示 Task 成功取消,而不是有错误的情况。因此 IsCancel 为 true ; IsFaulted 为 false 且 Exception 属性为 null 。

                                  iii.               对于使用 TaskContinuationOptions 枚举值为 NotOn 或OnlyOn 创建的延续任务 A ,在其前面的任务结束状态不匹配时,延续任务 A 将转换为Canceled 状态,并且不会运行。

2)         TaskCreationOptions 枚举

定义任务创建、调度和执行的一些可选行为。

None

指定应使用默认行为。

PreferFairness

 

较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行。 (Prefer :更喜欢 ; Fair :公平的 )

LongRunning

该任务需要很长时间运行,因此,调度器可以对这个任务使用粗粒度的操作( 默认 TaskScheduler 为任务创建一个专用线程,而不是排队让一个线程池线程来处理 ,可通过在延续任务中访问: Thread.CurrentThread.IsThreadPoolThread 属性判别)。比如:如果任务可能需要好几秒的时间运行,那么就使用这个参数。相反,如果任务只需要不到 1 秒钟的时间运行,那么就不应该使用这个参数。

AttachedToParent

指定此枚举值的 Task ,其内部创建的 Task 或通过 ContinueWith() 创建的延续任务都为子任务。 (父级是顶层任务)

DenyChildAttach

如果尝试附加子任务到创建的任务,指定 System.InvalidOperationException 将被引发。

HideScheduler

创建任务的执行操作将被视为 TaskScheduler.Default 默认计划程序。

3)         IsCompleted

Task 实现了 IAsyncResult 接口。在任务处于以下三个最终状态之一时 IsCompleted 返回 true : RanToCompletion 、 Faulted 或 Canceled 。

4)         TaskStatus 枚举

表示 Task 的生命周期中的当前阶段。一个 Task 实例只会完成其生命周期一次,即当 Task 到达它的三种可能的最终状态之一时, Task 就结束并释放。

可能的初始状态

Created

该任务已初始化,但尚未被计划。

WaitingForActivation

只有在其它依赖的任务完成之后才会得到调度的任务的初始状态。这种任务是使用定义延续的方法创建的。

WaitingToRun

该任务已被计划执行,但尚未开始执行。

中间状态

Running

该任务正在运行,但尚未完成。

WaitingForChildrenToComplete

该任务已完成执行,正在隐式等待附加的子任务完成。

可能的最终状态

RanToCompletion

已成功完成执行的任务。

Canceled

该任务已通过对其自身的 CancellationToken 引发 OperationCanceledException 异常

Faulted

由于未处理异常的原因而完成的任务。

       状态图如下:

                image

5)         Dispose()

尽管 Task 为我们实现了 IDisposable 接口,但依然不推荐你主动调用 Dispose() 方法,而是由系统终结器进行清理。原因:

a)         Task 调用 Dispose() 主要释放的资源是 WaitHandle 对象。

b)         .NET4.5 对 .NET4.0 中提出的 Task 进行过大量的优化,让其尽量不再依赖 WaitHandle 对象( eg : .NET4.0 种 Task 的 WaitAll()/WaitAny() 的实现依赖于 WaitHandle)。

c)         在使用 Task 时,大多数情况下找不到一个好的释放点,保证该 Task 已经完成并且没有被其他地方在使用。

d)         Task.Dispose() 方法在“ .NET Metro 风格应用程序”框架所引用的程序集中甚至并不存在(即此框架中 Task 没有实现 IDisposable 接口)。

更详细更专业的 Dispose() 讨论请看 《 .NET4.X 并行任务 Task 需要释放吗?》 

2.         Task 的实例方法

// 获取用于等待此 Task 的等待者。
        public TaskAwaiter GetAwaiter(); // 配置用于等待此System.Threading.Tasks.Task的awaiter。 // 参数:continueOnCapturedContext: // 试图在await返回时夺取原始上下文,则为 true;否则为 false。 public ConfiguredTaskAwaitable ConfigureAwait(bool continueOnCapturedContext); // 对提供的TaskScheduler同步运行 Task。 public void RunSynchronously(TaskScheduler scheduler); // 启动 Task,并将它安排到指定的TaskScheduler中执行。 public void Start(TaskScheduler scheduler); // 等待 Task 完成执行过程。 public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken); // 创建一个在目标 Task 完成时执行的延续任务。 public Task ContinueWith(Action<Task, object> continuationAction, object state , CancellationToken cancellationToken , TaskContinuationOptions continuationOptions, TaskScheduler scheduler); public Task<TResult>ContinueWith<TResult>( Func<Task, object, TResult> continuationFunction , object state,CancellationToken cancellationToken , TaskContinuationOptions continuationOptions, TaskScheduler scheduler); ……

         分析:

1)         TaskContinuationOptions

在创建一个 Task 作为另一个 Task 的延续时,你可以指定一个 TaskContinuationOptions 参数,这个参数可以控制延续另一个任务的任务调度和执行的可选行为。

None

默认情况下,完成前面的任务之后“都”将安排运行延续任务,而不考虑前面任务的最终 TaskStatus 。

AttachedToParent

对延续任务指定此枚举值,表示该延续任务内部创建的新 Task 或通过 ContinueWith() 创建的延续任务都为子任务。 (父级是延续任务)

PreferFairness

LongRunning

DenyChildAttach            

HideScheduler

 

 

参考: TaskCreationOptions 枚举 

LazyCancellation

在延续取消的情况下,防止延续的完成直到完成先前的任务。

NotOnRanToCompletion

NotOnFaulted

NotOnCanceled

指定不应在延续任务 前面的任务 “已完成运行、引发了未处理异常、已取消”的情况下安排延续任务。

 

此选项对多任务延续无效。

OnlyOnCanceled

OnlyOnFaulted

OnlyOnRanToCompletion

指定只应在延续任务 前面的任务“ 已取消、引发了未处理异常、已完成运行”的情况下才安排延续任务。

ExecuteSynchronously

指定应同步执行延续任务。指定此选项后,延续任务将在导致前面的任务转换为其最终状态的相同线程上运行。

注意:

a)         如果使用默认选项 TaskContinuationOptions.None ,并且之前的任务被取消了,那么延续任务任然会被调度并启动执行。

b)         如果该条件在前面的任务准备调用延续时未得到满足,则延续将直接转换为Canceled 状态,之后将无法启动。

c)         如果调用 多任务延续(即: 调用 TaskFactory 或 TaskFactory<TResult> 的静态 ContinueWhenAll 和 ContinueWhenAny 方法)时, NotOn 和 OnlyOn 六个标识或标识的组合都是无效的。也就是说,无论先驱任务是如何完成的, ContinueWhenAll 和 ContinueWhenAny 都会执行延续任务。

d)         TaskContinuationOptions.ExecuteSynchronously ,指定同步执行延续任务。延续任务会使用前一个任务的数据,而保持在相同线程上执行就能快速访问高速缓存中的数据,从而 提升性能 。此外,也可避免调度这个延续任务产生不必要的额外线程开销。

如果在创建延续任务时已经完成前面的任务,则延续任务将在创建此延续任务的线程上运行。只应同步执行运行时间非常短的延续任务。

2)         开启任务

只有 Task 处于 TaskStatus.Created 状态时才能使用实例方法 Start() 。并且, 只有在使用 Task 的公共构造函数构造的 Task 实例才能处于 TaskStatus.Created 状态。

当然我们还知道有其他方式可以创建 Task 并开启任务,比如 Task.Run()/Task.ContinueWith()/Task.Factory.StartNew()/TaskCompletionSource/ 异步方法 ( 即使用 async 与 await关键字的方法 ) ,但是这些方法返回的 Task 已经处于开启状态,即不能再调用 Start()。更丰富更专业的讨论请看 《 .NET4.X 并行任务中 Task.Start() 的 FAQ 》 

3)         延续任务 ContinueWith

a)         ContinueWith() 方法可创建一个根据 TaskContinuationOptions 参数限制的延续任务。 可以为同一个 Task 定义多个延续任务让它们并行执行。

比如,为 t1 定义两个 并行 延续任务 t2 、 t3.

Task<int> t1 = new Task<int>(() => { return 1; }); Task<int> t2 = t1.ContinueWith<int>(Work1,……); Task<int> t3 = t1.ContinueWith<int>(Work1,……);

b)         调用 Wait() 方法和 Result 属性会导致线程阻塞,极有可能造成线程池创建一个新线程,这增大了资源的消耗,并损害了伸缩性。可以在延续任务中访问这些成员,并做相应操作。

c)         对前面任务的引用将以参数形式传递给延续任务的用户委托,以将前面任务的数据传递到延续任务中。

4)         Wait()

一个线程调用 Wait() 方法时,系统会检查线程要等待的 Task 是否已开始执行。

a)         如果是,调用 Wait() 的线程会阻塞,直到 Task 运行结束为止。

b)         如果 Task 还没有开始执行,系统可能(取决于 TaskScheduler )使用调用 Wait() 的线程来执行 Task 。如果发生这种情况,那么调用 Wait() 的线程不会阻塞;它会执行 Task 并立刻返回。

                                       i.               这样做的好处在于,没有线程会被阻塞,所以减少了资源的使用(因为不需要创建一个线程来替代被阻塞的线程),并提升了性能(因为不需要花时间创建一个线程,也没有上下文切换)。

                                     ii.               但不好的地方在于,假如线程在调用 Wait() 前已经获得一个不可重入的线程同步锁 (eg : SpinLock) ,而 Task 试图获取同一个锁,就会造成一个死锁的线程!

5)         RunSynchronously

可在指定的 TaskScheduler 或 TaskScheduler.Current 中同步运行 Task 。即 RunSynchronously() 之后的代码会阻塞到 Task 委托执行完毕。

示例如下:

Task task1 = new Task(() =>
            {
                Thread.Sleep(5000); Console.WriteLine("task1执行完毕。"); }); task1.RunSynchronously(); Console.WriteLine("执行RunSynchronously()之后的代码。"); // 输出============================== // task1执行完毕。 // 执行RunSynchronously()之后的代码。

3.         Task 的静态方法

// 返回当前正在执行的 Task 的唯一 ID。
        public static int? CurrentId{ get; }
        // 提供对用于创建 Task 和 Task<TResult>实例的工厂方法的访问。 public static TaskFactory Factory { get; } // 创建指定结果的、成功完成的Task<TResult>。 public static Task<TResult> FromResult<TResult>(TResult result); // 创建将在指定延迟后完成的任务。 public static Task Delay(int millisecondsDelay, CancellationToken cancellationToken); // 将在线程池上运行的指定工作排队,并返回该工作的任务句柄。 public static Task Run(Action action, CancellationToken cancellationToken); // 将在线程池上运行的指定工作排队,并返回该工作的 Task(TResult) 句柄。 public static Task<TResult> Run<TResult>(Func<TResult> function, CancellationToken cancellationToken); // 将在线程池上运行的指定工作排队,并返回 function 返回的任务的代理项。 public static Task Run(Func<Task> function, CancellationToken cancellationToken); // 将在线程池上运行的指定工作排队,并返回 function 返回的 Task(TResult) 的代理项。 public static Task<TResult> Run<TResult>(Func<Task<TResult>> function, CancellationToken cancellationToken); // 等待提供的所有 Task 对象完成执行过程。 public static bool WaitAll(Task[] tasks, intmillisecondsTimeout, CancellationToken cancellationToken); // 等待提供的任何一个 Task 对象完成执行过程。 // 返回结果: // 已完成的任务在 tasks 数组参数中的索引,如果发生超时,则为 -1。 public static int WaitAny(Task[] tasks, int millisecondsTimeout, CancellationToken cancellationToken); // 所有提供的任务已完成时,创建将完成的任务。 public static Task WhenAll(IEnumerable<Task> tasks); public static Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks); // 任何一个提供的任务已完成时,创建将完成的任务。 public static Task<Task> WhenAny(IEnumerable<Task> tasks); public static Task<Task<TResult>> WhenAny<TResult>(IEnumerable<Task<TResult>> tasks); // 创建awaitable,等待时,它异步产生当前上下文。 // 返回结果:等待时,上下文将异步转换回等待时的当前上下文。 // 如果当前SynchronizationContext不为 null,则将其视为当前上下文。 // 否则,与当前执行任务关联的任务计划程序将视为当前上下文。 public static YieldAwaitable Yield();

         分析:

1)         FromResult<TResult>(TResult result);

创建指定结果的、成功完成的 Task<TResult> 。我们可以使用此方法创建包含预先计算结果 / 缓存结果的 Task<TResult> 对象, 示例代码 或 CachedDownloads.cs 示例文件。

2)         Delay

创建将在指定延迟后完成的任务,返回 Task 。可以通过 await 或 Task.Wait() 来达到Thread.Sleep() 的效果。尽管, Task.Delay() 比 Thread.Sleep() 消耗更多的资源,但是 Task.Delay() 可用于为方法返回 Task 类型;或者根据 CancellationToken 取消标记动态取消等待。

Task.Delay() 等待完成返回的 Task 状态为 RanToCompletion ;若被取消,返回的 Task状态为 Canceled 。

var tokenSource = new CancellationTokenSource(); var token = tokenSource.Token; Task.Factory.StartNew(() => { Thread.Sleep(1000); tokenSource.Cancel(); }); Console.WriteLine("Begin taskDelay1"); Task taskDelay1 = Task.Delay(100000, token); try { taskDelay1.Wait(); } catch (AggregateException ae) { foreach (var v in ae.InnerExceptions) Console.WriteLine(ae.Message + " " + v.Message); } taskDelay1.ContinueWith((t) =>Console.WriteLine(t.Status.ToString())); Thread.Sleep(100); Console.WriteLine(); Console.WriteLine("Begin taskDelay2"); Task taskDelay2 = Task.Delay(1000); taskDelay2.ContinueWith((t) =>Console.WriteLine(t.Status.ToString())); // 输出====================================== // Begin taskDelay1 // 发生一个或多个错误。已取消一个任务。 // Canceled // // Begin taskDelay2 // Completed

4.         ask<TResult>:Task

Task<TResult> 继承自 Task ,表示一个可以返回值的异步操作,提供 Result 只读属性用于访问异步操作的返回值。该属性会阻塞线程,直到 Task 执行完毕并返回值。

 

System.Threading.Tasks.TaskFactory          

1.         设置共用 \ 默认的参数

通过 TaskFactory 对象提供的 Scheduler 、 CancellationToken 、 CreationOption 和 ContinuationOptions 属性可以为 Task 设置共用 \ 默认的参数,以便快捷的创建 Task 或延续任务。影响 StartNew() 、 ContinueWhenAll()|ContinueWhenAny() 、 FromAsync() 方法的默认参数设置。

2.         StartNew()

Task.Factory.StartNew() 可快速创建一个 Task 并且开启任务。代码如下:

var t = Task.Factory.StartNew(someDelegate);

这等效于:

var t = new Task(someDelegate); t.Start();

表现方面,前者更高效。 Start() 采用同步方式运行以确保任务对象保持一致的状态即使是同时调用多次 Start() ,也可能只有一个调用会成功。相比之下, StartNew() 知道没有其他代码能同时启动任务,因为在 StartNew() 返回之前它不会将创建的 Task 引用给任何人,所以 StartNew() 不需要采用同步方式执行。更丰富更专业的讨论请看 《.NET4.X 并行任务中 Task.Start() 的 FAQ 》 

3.         ContinueWhenAll()

public Task ContinueWhenAll(Task[] tasks, Action<Task[]> continuationAction , CancellationToken cancellationToken , TaskContinuationOptions continuationOptions, TaskScheduler scheduler);

创建一个延续 Task 或延续 Task<TResult> ,它将在提供的一组任务完成后马上开始。延续任务操作委托接受一个 Task[] 数组做参数。

4.         ContinueWhenAny()

public Task ContinueWhenAny(Task[] tasks, Action<Task> continuationAction , CancellationToken cancellationToken , TaskContinuationOptions continuationOptions, TaskScheduler scheduler);

创建一个延续 Task 或延续 Task<TResult> ,它将在提供的组中的任何一个任务完成后马上开始。延续任务操作委托接受一个 Task 做参数。

5.         通过 Task.TaskFactory.FromAsync() 实例方法,我们可以将 APM 转化为 TAP。示例见此文的后面小节  AMP 转化为 TAP 和 EAP 转化为 TAP  。

 

System.Threading.Tasks.TaskScheduler         

       TaskScheduler 表示一个处理将任务排队到线程中的底层工作对象。 TaskScheduler通常有哪些应用呢?

1.         TaskScheduler 是抽象类,可以继承它实现自己的任务调度计划。如:默认调度程序 ThreadPoolTaskScheduler 、与 SynchronizationContext.Current 关联的 SynchronizationContextTaskScheduler 。

2.         由 TaskScheduler.Default 获取默认调度程序 ThreadPoolTaskScheduler 。

3.         由 TaskScheduler.Current 获取当前任务的执行的 TaskScheduler 。

4.         由 TaskScheduler.TaskSchedulerFromCurrentSynchronizationContext() 方法获取与 SynchronizationContext.Current 关联的 SynchronizationContextTaskScheduler , SynchronizationContextTaskScheduler 上的任务都会通过 SynchronizationContext.Post() 在同步上下文中进行调度。通常用于实现跨线程更新控件。

5.         通过 MaximumConcurrencyLevel 设置任务调度计划能支持的最大并发级别。

6.         通过 UnobservedTaskException 事件捕获未被观察到的异常。

 

System.Threading.Tasks.TaskExtensions

提供一组用于处理特定类型的 Task 实例的静态方法。将特定 Task 实例进行解包操作。

public static class TaskExtensions
    {
        public static Task<TResult> Unwrap<TResult>(this Task<Task<TResult>> task); public static Task Unwrap(this Task<Task> task); }

 

转化为 TAP 和 EAP 转化为 TAP

1.         AMP 转化为 TAP

通过 Task.TaskFactory.FromAsync() 实例方法,我们可以将 APM 转化为 TAP 。

注意点:

1)         FromAsync 方法返回的任务具有 WaitingForActivation 状态,并将在创建该任务后的某一时间由系统启动。如果尝试在这样的任务上调用 Start ,将引发异常。

2)         转化的 APM 异步模型必须符合两个模式:

a)         接受 Begin*** 和 End*** 方法。此时要求 Begin*** 方法签名的委托必须是 AsyncCallback 以及 End*** 方法只接受 IAsyncResult 一个参数。此模式 AsyncCallback 回调由系统自动生成,主要工作是调用 End*** 方法。

public Task<TResult> FromAsync<TArg1, TResult>( Func<TArg1, AsyncCallback, object, IAsyncResult> beginMethod , Func<IAsyncResult, TResult> endMethod, TArg1 arg1 , object state, TaskCreationOptions creationOptions);

b)         接受 IAsyncResult 对象以及 End*** 方法。此时 Begin*** 方法的签名已经无关紧要只要能返回 IAsyncResult 的参数以及 End*** 方法只接受 IAsyncResult 一个参数。此模式支持自定义回调委托。

public Task<TResult> FromAsync<TResult>(IAsyncResult asyncResult , Func<IAsyncResult, TResult> endMethod);

3)         当然,我们有时需要给客户提供统一的 Begin***() 和 End***() 调用方式,我们可以直接使用 Task 从零开始构造 APM 。即:在 Begin***() 创建并开启任务,并返回Task 。因为 Task 是继承自 IAsyncResult 接口的,所以我们可以将其传递给 End***() 方法,并在此方法里面调用 Result 属性来等待任务完成。

4)         对于返回的 Task ,可以随时以任务延续的形式注册回调。

现在将在 《 APM 异步编程模型 》 博文中展现的示例转化为 TAP 模式。关键代码如下:

public Task<int> CalculateAsync<TArg1, TArg2>( Func<TArg1, TArg2, AsyncCallback, object, IAsyncResult> beginMethod , AsyncCallback userCallback, TArg1 num1, TArg2 num2, object asyncState) { IAsyncResult result = beginMethod(num1, num2, userCallback, asyncState); return Task.Factory.FromAsync<int>(result , EndCalculate, TaskCreationOptions.None); } public Task<int> CalculateAsync(int num1, int num2, object asyncState) { return Task.Factory.FromAsync<int, int, int>(BeginCalculate, EndCalculate , num1, num2, asyncState, TaskCreationOptions.None); }

2.         EAP 转化为 TAP

我们可以使用 TaskCompletionSource<TResult> 实例将 EAP 操作表示为一个 Task<TResult> 。

TaskCompletionSource<TResult> 表示未绑定委托的 Task<TResult> 的制造者方,并通过 TaskCompletionSource<TResult>.Task 属性获取由此 Tasks.TaskCompletionSource<TResult> 创建的 Task<TResult> 。

注意, TaskCompletionSource<TResult> 创建的任何任务将由 TaskCompletionSource启动,因此,用户代码不应在该任务上调用 Start() 方法。

public class TaskCompletionSource<TResult> { public TaskCompletionSource(); // 使用指定的状态和选项创建一个TaskCompletionSource<TResult>。 // state: 要用作基础 Task<TResult>的AsyncState的状态。 public TaskCompletionSource(object state, TaskCreationOptions creationOptions); // 获取由此Tasks.TaskCompletionSource<TResult>创建的Tasks.Task<TResult>。 public Task<TResult> Task { get; } // 将基础Tasks.Task<TResult>转换为Tasks.TaskStatus.Canceled状态。 public void SetCanceled();   public bool TrySetCanceled(); // 将基础Tasks.Task<TResult>转换为Tasks.TaskStatus.Faulted状态。 public void SetException(Exception exception); public void SetException(IEnumerable<Exception> exceptions); public bool TrySetException(Exception exception); public bool TrySetException(IEnumerable<Exception> exceptions); // 尝试将基础Tasks.Task<TResult>转换为TaskStatus.RanToCompletion状态。 public bool TrySetResult(TResult result); …… }

现在我将在 《基于事件的异步编程模 式 (EAP) 》 博文中展现的 BackgroundWorker2 组件示例转化为 TAP 模式。

我们需要修改地方有:

1)         创建一个 TaskCompletionSource<int> 实例 tcs ;

2)         为 tcs.Task 返回的任务创建延续任务,延续任务中根据前面任务的 IsCanceled、 IsFaulted 、 Result 等成员做逻辑;

3)         Completed 事件,在这里面我们将设置返回任务的状态。

关键代码如下:

// 1、创建 TaskCompletionSource<TResult>
 tcs = new TaskCompletionSource<int>();  worker2.RunWorkerCompleted += RunWorkerCompleted; // 2、注册延续  tcs.Task.ContinueWith(t =>  { if (t.IsCanceled) MessageBox.Show("操作已被取消"); else if (t.IsFaulted) MessageBox.Show(t.Exception.GetBaseException().Message); else MessageBox.Show(String.Format("操作已完成,结果为:{0}", t.Result)); }, TaskContinuationOptions.ExecuteSynchronously); // 3、运行异步任务 worker2.RunWorkerAsync(); // 4、Completed事件 private void RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e) { if (e.Error != null) tcs.SetException(e.Error); else if (e.Cancelled) tcs.SetCanceled(); else tcs.SetResult((int)e.Result); // 注销事件,避免多次挂接事件 worker2.RunWorkerCompleted -= RunWorkerCompleted; }

 

当然,这两部分的代码都不能直接运行,只是部分关键代码,完整的示例请在我提供的示例源码中查看。

 

使用关键字 async 和 await 实现异步方法

       在 C#5.0 中引入了 async 和 await 关键字,可以方便我们使用顺序结构流 ( 即不用回调 ) 来实现异步编程,大大降低了异步编程的复杂程度。( vs2010 打 Visual Studio Async CTP for VS2010 补丁 可以引入关键字” async ”和” await ”的支持,但是得不到 .net4.5 新增 API 的支持)

1.         我们可通过下图来明白异步方法的构建和异步方法的执行流程。(代码详见我提供的示例程序)

      image

 

2.         编译器转换

使用 async 关键字标记方法,会导致 C# 或 Visual Basic 编译器使用状态机重新编写该方法的实施。借助此状态机,编译器可以在该方法中插入多个中断点,以便该方法可以在不阻止线程的情况下,挂起和恢复其执行。这些中断点不会随意地插入。它们只会在您明确使用 await 关键字的位置插入:

private async void btnDoWork_Click(object sender, EventArgs e) { ... await someObject; // <-- potential method suspension point ... }

当您等待未完成的异步操作时,编译器生成的代码可确保与该方法相关的所有状态(例如,局部变量)封装并保留在堆中。然后,该函数将返回到调用程序,允许在其运行的线程中执行其他任务。当所等待的异步操作在稍后完成时,该方法将使用保留的状态恢复执行。

任何公开 await 模式的类型都可以进行等待。该模式主要由一个公开的 GetAwaiter()方法组成,该方法会返回一个提供 IsCompleted 、 OnCompleted 和 GetResult 成员的类型。当您编写以下代码时:

await someObject;

编译器会生成一个包含 MoveNext 方法的状态机类:

private class FooAsyncStateMachine : IAsyncStateMachine { // Member fields for preserving “locals” and other necessary state int $state; TaskAwaiter $awaiter; … public void MoveNext() { // Jump table to get back to the right statement upon resumption switch (this.$state) { … case 2: goto Label2; … } … // Expansion of “await someObject;” this.$awaiter = someObject.GetAwaiter(); if (!this.$awaiter.IsCompleted) { this.$state = 2; this.$awaiter.OnCompleted(MoveNext); return; Label2: } this.$awaiter.GetResult(); … } }

在实例 someObject 上使用这些成员来检查该对象是否已完成(通过 IsCompleted),如果未完成,则挂接一个续体(通过 OnCompleted ),当所等待实例最终完成时,系统将再次调用 MoveNext 方法,完成后,来自该操作的任何异常将得到传播或作为结果返回(通过 GetResult ),并跳转至上次执行中断的位置。

3.         自定义类型支持等待

如果希望某种自定义类型支持等待,我们可以选择两种主要的方法。

1)         一种方法是针对自定义的可等待类型手动实施完整的 await 模式,提供一个返回自定义等待程序类型的 GetAwaiter 方法,该等待程序类型知道如何处理续体和异常传播等等。

2)         第二种实施该功能的方法是将自定义类型转换为任务,然后只需依靠对等待任务的内置支持来等待特殊类型。前文所展示的“ EAP 转化为 TAP ”正属于这一类,关键代码如下:

private async void btn_Start_Click(object sender, EventArgs e) { this.progressBar1.Value = 0; tcs = new TaskCompletionSource<int>(); worker2.RunWorkerCompleted += RunWorkerCompleted; tcs.Task.ContinueWith(t => { if (t.IsCanceled) MessageBox.Show("操作已被取消"); else if (t.IsFaulted) MessageBox.Show(t.Exception.GetBaseException().Message); else MessageBox.Show(String.Format("操作已完成,结果为:{0}", t.Result)); }, TaskContinuationOptions.ExecuteSynchronously); worker2.RunWorkerAsync(); await tcs.Task; }

 

处理 TAP 中的异常

       在任务抛出的未处理异常都封装在 System.AggregateException 对象中。这个对象会存储在方法返回的 Task 或 Task<TResult> 对象中,需要通过访问 Wait() 、 Result 、 Exception 成员才能观察到异常。(所以,在访问 Result 之前,应先观察 IsCanceled 和 IsFaulted 属性)

1.         AggregateException 对象的三个重要成员

1)         InnerExceptions 属性

获取导致当前异常的 System.Exception 实例的只读集合(即, ReadOnlyCollection<Exception> )。不要将其与基类 Exception 提供的 InnerException 属性混淆。

2)         Flatten() 方法

遍历 InnerExceptions 异常列表,若列表中包含类型为 AggregateException 的异常,就移除 所有嵌套 的 AggregateException ,直接返回其真真的异常信息(效果如下图)。

                image

1)         Handle(Func<Exception, bool> predicate) 方法

它为 AggregateException 中包含的每个异常都调用一个回调方法。然后,回调方法可以为每个异常决定如何对其进行处理,回调返回 true 表示异常已经处理,返回 false表示没有。在调用 Handle 之后,如果至少有一个异常没有处理,就创建一个新的 AggregateException 对象,其中只包含未处理的异常, 并抛出这个新的 AggregateException对象 。

比如:将任何 OperationCanceledException 对象都视为已处理。其他任何异常都造成抛出一个新的 AggregateException ,其中只包含未处理的异常。

try{……}
        catch (AggregateException ae) { ae.Handle(e => e is OperationCanceledException); }

1.         父任务生成了多个子任务,而多个子任务都抛出了异常

1)         嵌套子任务

Task t4 = Task.Factory.StartNew(() =>
        {
            Task.Factory.StartNew(() => { throw new Exception("子任务Exception_1"); } , TaskCreationOptions.AttachedToParent); Task.Factory.StartNew(() => { throw new Exception("子任务Exception_2"); } , TaskCreationOptions.AttachedToParent); throw new Exception("父任务Exception"); });

对于“嵌套子任务”中子任务的异常都会包装在父任务返回的 Task 或 Task<TResult>对象中。如此例子中 t4.Exception.InnerExceptions 的 Count 为 3 。

       对于子任务返回的异常类型为包装过的 AggregateException 对象,为了避免循环访问子任务异常对象的 InnerExceptions 才能获取真真的异常信息,可以使用上面提到的 Flatten() 方法移除所有嵌套的 AggregateExceprion 。

2)         Continue 子任务

Task t1 = Task.Factory.StartNew(() =>
            {
                Thread.Sleep(500);   // 确保已注册好延续任务 throw new Exception("父任务Exception"); }, TaskCreationOptions.AttachedToParent); Task t2 = t1.ContinueWith((t) => { throw new Exception("子任务Exception_1"); }); Task t3 = t1.ContinueWith((t) => { throw new Exception("子任务Exception_2"); });

       对于“ Continue 子任务”中的子任务其异常与父任务是分离的,各自包装在自己返回的 Task 或 Task<TResult> 对象中。如此示例 t1 、 t2 、 t3 的 Exception.InnerExceptions 的 Count 都为 1 。     

2.         TaskScheduler 的 UnobservedTaskException 事件

假如你一直不访问 Task 的 Wait() 、 Result 、 Exception 成员,那么你将永远注意不到这些异常的发生。为了帮助你检测到这些未处理的异常,可以向 TaskScheduler 对象的 UnobservedTaskException 事件注册回调函数。每当一个 Task 被垃圾回收时,如果存在一个没有注意到的异常, CLR 的终结器线程会引发这个事件。

可在事件回调函数中调用 UnobservedTaskExceptionEventArgs 对象的 SetObserved()方法来指出已经处理好了异常,从而阻止 CLR 终止线程。然而并不推荐这么做,宁愿终止进程也不要带着已经损坏的状态继续运行。

示例代码:(要监控此代码必须在 GC.Collect(); 和事件里两个地方进行断点)

TaskScheduler.UnobservedTaskException += (s, e) =>
            {
                //设置所有未觉察异常被觉察
                e.SetObserved();
            };
            Task.Factory.StartNew(() =>
            {
                throw new Exception(); }); //确保任务完成 Thread.Sleep(100); //强制垃圾会受到,在GC回收时才会触发UnobservedTaskException事件 GC.Collect(); //等待终结器处理 GC.WaitForPendingFinalizers();

3.         返回 void 的 async “异步方法”中的异常

我们已经知道返回 Task 或 Task<TResult> 对象的任务中抛出的异常会随着返回对象一起返回,可通过 Exception 属性获取。那么对于返回 Task 或 Task<TResult> 对象的“异步方法”情况也是一样。

然而对于返回 void 的“异步方法”,方法中抛出的异常会直接导致程序奔溃。

public static async void Test_void_async_Exception() { throw new Exception(); }

另外,我们还要特别注意 lambda 表达式构成的“异步方法”,如:

Enumerable.Range(0, 3).ToList().ForEach(async (i) => { throw new Exception(); });

       本博文到此结束,我相信你看累了,其实我也写了很久…很久… ,写完此文,我的 “ 异步编程系列 ” 也算有头有尾了(还会继续扩充)。本博文主要介绍了 Task 的重要 API 、任务的 CLR 线程池引擎、 TaskFactory 对象、 TaskScheduler 对象、 TaskExtensions 对象、 AMP 转化为 TAP 和 EAP 转化为 TAP 、使用关键字 async 和await 实现异步方法以及自定义类型支持等待、处理 TAP 中的异常。

感谢你的观看,如果对你有帮助, 还请多多推荐 ……

 

 

===================================================================

抱歉,我知道你已经累了,但我还是有了这么一段:(其实博文也是写着写着就长起来了!!!)

此小段是博主的求助,能帮的就帮 。

       求职: web 中高级工程师 ( 本人已工作 3 年 )

       我打算 5 月中旬换一家公司,争取能再端午节 6 月 10 之前入职新工作,地点广州 ( 其次,深圳 ) 吧,毕竟还有些朋友在广州。希望园友能推荐一些不错的广州互联网公司。

       PS :我个人有关注下广州 3g 门户网、广州多益互联网、广州 39 健康网公司,如你是内部员工我很希望能和你交流下。

       如果你能帮助我 :请在博文回复给我,或我的 QQ : 369220123      邮箱: 369220123@qq.com

       如果你只是想与我后续方便讨论问题,请加群 : 69594961 ( .NET 开源交流), 185718116 (广深莞· NET 技术)

===================================================================

 

 

推荐阅读:

关于 async 与 await 的 FAQ        ----- 详细讲解了 await 和 async 的作用和意义,以及什么是可等待对象、等待者……(此文可帮助你解决 80% 关于 await 和 async 关键字的疑惑)

               深入探究 WinRT 和 await        ----- 基于 WinRT 平板 win8 系统,讲解了异步功能,以及 TPL 、编译器转换……

 

 

参考资料: MSDN

                    书籍:《 CLR via C#( 第三版 ) 》

书籍:《 C# 并行编程高级教程:精通 .NET 4 Parallel Extensions 》


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM