問題
正在 await 一批任務,希望在每個任務完成時對它做一些處理。另外,希望在任務一完成就立即進行處理,而不需要等待其他任務。
問題的重點在於希望任務完成之后立即進行處理,而不去等待其他任務。
這里還沿用文中的例子。
等待幾秒鍾之后返回等待的秒數,之后立即打印任務等待的秒數。
等待的函數如下
static async Task<int> DelayAndReturnAsync(int val)
{
await Task.Delay(TimeSpan.FromSeconds(val));
return val;
}
以下方法執行之后的打印結果是“2”, “3”, “1”。想得到結果“1”, “2”, “3”應該如何實現。
static async Task ProcessTasksAsync()
{
// 創建任務隊列。
Task<int> taskA = DelayAndReturnAsync(2);
Task<int> taskB = DelayAndReturnAsync(3);
Task<int> taskC = DelayAndReturnAsync(1);
var tasks = new[] { taskA, taskB, taskC };
// 按順序 await 每個任務。
foreach (var task in tasks)
{
var result = await task;
Trace.WriteLine(result);
}
}
文中給了兩種解決方案。一種是抽出更高級的async方法,一種是借助作者的nuget拓展。作者還推薦了另外兩個博客文章。
Processing tasks as they complete
ORDERING BY COMPLETION, AHEAD OF TIME
這兩篇文章介紹了更多處理方法。
抽象方法,並發執行
static async Task AwaitAndProcessAsync(Task<int> task)
{
var result = await task;
Trace.WriteLine(result);
}
將執行和處理抽象出來,借助Task.WhenAll和LINQ並發執行。
var processingTasks = (from t in tasks
select AwaitAndProcessAsync(t)).ToArray();
// 等待全部處理過程的完成。
await Task.WhenAll(processingTasks);
或者
var processingTasks = tasks.Select(async t =>
{
var result = await t;
Trace.WriteLine(result);
}).ToArray();
// 等待全部處理過程的完成。
await Task.WhenAll(processingTasks);
借助nuget拓展:Nito.AsyncEx
推薦預發布版本:https://www.nuget.org/packages/Nito.AsyncEx/5.0.0-pre-06
需要添加引用using Nito.AsyncEx;
static async Task UseOrderByCompletionAsync()
{
// 創建任務隊列。
Task<int> taskA = DelayAndReturnAsync(2);
Task<int> taskB = DelayAndReturnAsync(3);
Task<int> taskC = DelayAndReturnAsync(1);
var tasks = new[] { taskA, taskB, taskC };
// 等待每一個任務完成。
foreach (var task in tasks.OrderByCompletion())
{
var result = await task;
Trace.WriteLine(result);
}
}
串行執行
使用ConcurrentExclusiveSchedulerPair,使任務串行執行,結果是“2”, “3”, “1”。
var scheduler = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;
foreach (var t in tasks)
{
await t.ContinueWith(completed =>
{
switch (completed.Status)
{
case TaskStatus.RanToCompletion:
Trace.WriteLine(completed.Result);
//Process(completed.Result);
break;
case TaskStatus.Faulted:
//Handle(completed.Exception.InnerException);
break;
}
}, scheduler);
}
上篇文章中提到了使用Task.WhenAny處理已完成的任務:https://www.cnblogs.com/AlienXu/p/10609253.html#idx_2
文中的例子從算法層面是不推薦使用的,作者推薦了他自己的拓展Nito.AsyncEx,源碼地址:https://github.com/StephenCleary/AsyncEx/blob/master/src/Nito.AsyncEx.Tasks/TaskExtensions.cs。
另外兩種實現的實現方法差不多,都是借助TaskCompletionSource<T>和Interlocked.Incrementa處理Task。
這里只列出ORDERING BY COMPLETION, AHEAD OF TIME的解決方案。
/// <summary>
/// 返回一系列任務,這些任務的輸入類型相同和返回結果類型一致
/// 返回的任務將以完成順序返回
/// </summary>
private static IEnumerable<Task<T>> OrderByCompletion<T>(IEnumerable<Task<T>> inputTasks)
{
// 復制輸入,以下的處理將不需要考慮是否會對輸入有影響
var inputTaskList = inputTasks.ToList();
var completionSourceList = new List<TaskCompletionSource<T>>(inputTaskList.Count);
for (int i = 0; i < inputTaskList.Count; i++)
{
completionSourceList.Add(new TaskCompletionSource<T>());
}
// 索引
// 索引最好是從0開始,但是 Interlocked.Increment 返回的是遞增之后的值,所以這里應該賦值-1
int prevIndex = -1;
// 可以不用再循環之外處理Action,這樣會讓代碼更清晰。現在有C#7.0的新特性本地方法可以使用
/* //本地方法
void continuation(Task<T> completedTask)
{
int index = Interlocked.Increment(ref prevIndex);
var source = completionSourceList[index];
PropagateResult(completedTask, source);
}*/
Action<Task<T>> continuation = completedTask =>
{
int index = Interlocked.Increment(ref prevIndex);
var source = completionSourceList[index];
PropagateResult(completedTask, source);
};
foreach (var inputTask in inputTaskList)
{
inputTask.ContinueWith(continuation,
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
return completionSourceList.Select(source => source.Task);
}
/// <summary>
/// 對 TaskCompletionSource 進行處理
/// </summary>
private static void PropagateResult<T>(Task<T> completedTask,
TaskCompletionSource<T> completionSource)
{
switch (completedTask.Status)
{
case TaskStatus.Canceled:
completionSource.TrySetCanceled();
break;
case TaskStatus.Faulted:
completionSource.TrySetException(completedTask.Exception.InnerExceptions);
break;
case TaskStatus.RanToCompletion:
completionSource.TrySetResult(completedTask.Result);
break;
default:
throw new ArgumentException("Task was not completed");
}
}
