Task開啟線程
有兩種啟動方式:
1.構造創建線程,然后啟動
var taskForAction = new Task(() => { //do something });
taskForAction.Start();
注:構造所有的重載並沒有傳入Func函數的,而且我們這個時候看線程池中活動線程數會發現改變
//打印線程池中線程活動數 PrintAvailabeWorkThreadNum(); var taskForAction = new Task(() => { //do something });
taskForAction.Start(); PrintAvailabeWorkThreadNum();
輸出結果:
2.直接使用靜態方法
//打印線程池中線程活動數 PrintAvailableWorkThreadNum(); var taskForAction = Task.Run(() => Console.WriteLine("print string for Action")); var taskForFunc = Task.Run(() => "return string for Func<string>"); PrintAvailableWorkThreadNum(); //Result內部會調用Wait,所以這里不需要調 Console.WriteLine(taskForFunc.Result);
同樣的,直接調用靜態方法來創建一個線程,並返回當前正在執行的線程副本以供我們調用,Result只有傳遞進去的是Func函數才會在返回的Task中存在,如果傳入的是Action函數,Result是不存在的, 這個時候線程活動數量也會改變。
取消任務
已經在 【C#】線程協作式取消 這章里面好好討論過如何去取消了,如何注冊回調函數等技術了.
任務完成時重新開啟一個任務(ConintueWith)
我們有時候想在執行完一個任務以后,再開始做一個其他的任務,這個時候如果我們用Wait就會堵塞線程,如果我們用線程嵌套的方式去做,會浪費資源並損害的伸縮性。
//這樣會堵塞我們的線程 Task.Run(() => { //do something }).Wait(); Task.Run(() => { //do another thing }); //雖然不會堵塞線程了,但這樣會浪費資源 Task.Run(() => { Task.Run(() => { //do something }).Wait(); Task.Run(() => { //do another thing }); });
CLR給我們提供了另一個方法:ContinueWith.
這個方法會不會堵塞當前的線程,並且會等第一個任務做好了以后再做第二個任務(當然可以開啟多個)
var t = Task.Run(() => { int index = 0; int count = 0; while (index != 5) { count += index; Console.WriteLine("Task:" + index++); Thread.Sleep(1 * 1000); } return count; }); t.ContinueWith(task => { //這里的參數Task,就是我們上面那個線程對象(t),可以用於獲取結果集,狀態等數據 Console.WriteLine("First continue task:" + task.Status); Console.WriteLine("First continue task:" + (task.Result + 100)+"\n"); }); t.ContinueWith(task => { Console.WriteLine("Second continue task:" + task.Status); Console.WriteLine("Second continue task:" + (task.Result - 100)); }); t.ContinueWith(task => { //Do another thing });
ContinueWith方法延伸
需求肯定是很復雜的,比如我們希望在各種狀態(取消,完成,失敗等)情況下執行各種ContinueWith的方法,這個時候我們需要關注一個枚舉類型:TaskContinuationOptions, 以下給出官方的定義:

namespace System.Threading.Tasks { // Summary: // Specifies the behavior for a task that is created by using the System.Threading.Tasks.Task.ContinueWith(System.Action<System.Threading.Tasks.Task>,System.Threading.CancellationToken,System.Threading.Tasks.TaskContinuationOptions,System.Threading.Tasks.TaskScheduler) // or System.Threading.Tasks.Task<TResult>.ContinueWith(System.Action<System.Threading.Tasks.Task<TResult>>,System.Threading.Tasks.TaskContinuationOptions) // method. [Serializable] [Flags] public enum TaskContinuationOptions { // Summary: // Default = "Continue on any, no task options, run asynchronously" Specifies // that the default behavior should be used. Continuations, by default, will // be scheduled when the antecedent task completes, regardless of the task's // final System.Threading.Tasks.TaskStatus. None = 0, // // Summary: // A hint to a System.Threading.Tasks.TaskScheduler to schedule a task in as // fair a manner as possible, meaning that tasks scheduled sooner will be more // likely to be run sooner, and tasks scheduled later will be more likely to // be run later. PreferFairness = 1, // // Summary: // Specifies that a task will be a long-running, course-grained operation. It // provides a hint to the System.Threading.Tasks.TaskScheduler that oversubscription // may be warranted. LongRunning = 2, // // Summary: // Specifies that a task is attached to a parent in the task hierarchy. AttachedToParent = 4, // // Summary: // Specifies that an System.InvalidOperationException will be thrown if an attempt // is made to attach a child task to the created task. DenyChildAttach = 8, // // Summary: // Prevents the ambient scheduler from being seen as the current scheduler in // the created task. This means that operations like StartNew or ContinueWith // that are performed in the created task will see System.Threading.Tasks.TaskScheduler.Default // as the current scheduler. HideScheduler = 16, // // Summary: // In the case of continuation cancellation, prevents completion of the continuation // until the antecedent has completed. LazyCancellation = 32, // // Summary: // Specifies that the continuation task should not be scheduled if its antecedent // ran to completion. This option is not valid for multi-task continuations. NotOnRanToCompletion = 65536, // // Summary: // Specifies that the continuation task should not be scheduled if its antecedent // threw an unhandled exception. This option is not valid for multi-task continuations. NotOnFaulted = 131072, // // Summary: // Specifies that the continuation task should be scheduled only if its antecedent // was canceled. This option is not valid for multi-task continuations. OnlyOnCanceled = 196608, // // Summary: // Specifies that the continuation task should not be scheduled if its antecedent // was canceled. This option is not valid for multi-task continuations. NotOnCanceled = 262144, // // Summary: // Specifies that the continuation task should be scheduled only if its antecedent // threw an unhandled exception. This option is not valid for multi-task continuations. OnlyOnFaulted = 327680, // // Summary: // Specifies that the continuation task should be scheduled only if its antecedent // ran to completion. This option is not valid for multi-task continuations. OnlyOnRanToCompletion = 393216, // // Summary: // Specifies that the continuation task should be executed synchronously. With // this option specified, the continuation will be run on the same thread that // causes the antecedent task to transition into its final state. If the antecedent // is already complete when the continuation is created, the continuation will // run on the thread creating the continuation. Only very short-running continuations // should be executed synchronously. ExecuteSynchronously = 524288, } }
這里就不一一解釋了,這里面有一些參數只是建議,會不會執行兩說,這里我只介紹幾個常用的,直接附上代碼:
var t = Task.Run(() => { int index = 0; int count = 0; while (index != 5) { count += index; Console.WriteLine("Task:" + index++); Thread.Sleep(1 * 1000); } return count; }); t.ContinueWith(task => { //只有執行成功以后才會繼續做 }, TaskContinuationOptions.OnlyOnRanToCompletion); t.ContinueWith(task => { //只有取消的時候才做操作 }, TaskContinuationOptions.OnlyOnCanceled); t.ContinueWith(task => { //只有失敗的時候才會運行,拋出未知異常什么的. AggregateException ex = task.Exception; Console.WriteLine(ex.Message); }, TaskContinuationOptions.OnlyOnFaulted);
個人對這個機制是十分喜歡的,即不堵塞我們的線程,又可以按照狀態來分別做操作。
任務啟動子任務
var t = new Task<Int32[]>(() => { var results = new int[3]; new Task(() => { Thread.Sleep(3 * 1000); results[0] = 1; }, TaskCreationOptions.AttachedToParent).Start(); new Task(() => { results[1] = 2; }, TaskCreationOptions.AttachedToParent).Start(); new Task(() => { results[2] = 3; }, TaskCreationOptions.AttachedToParent).Start(); return results; }); t.ContinueWith(task => Array.ForEach(task.Result, Console.WriteLine), TaskContinuationOptions.AttachedToParent); t.Start();
這里主要是用到了TaskCreationOptions.AttachedToParent枚舉標志,用到了這個標志,父線程會等待子線程所有線程都執行完畢以后才會繼續往下走(注:這里也不能當前主線程).這里我嘗試過用Task.Run這個去建立這樣的機制,可惜的是這個沒有辦法完成(Task.Run沒有參數包含TaskCreationOptions的重載),具體的原因還在探索中,如果有朋友知道,請告知,謝謝:)!
任務工廠
關於這個方法,我在網上查到的也都是CLR那本書上的東西,但是關於這個,如果用起來不注意的話,會出現很多的問題,先給出代碼:
private static Int32 Sum(CancellationToken ct, Int32 n) { Int32 sum = 0; for (; n > 0; n--) { ct.ThrowIfCancellationRequested(); checked { sum += n; } } return sum; } public static void TaskFactory() { var parent = new Task(() => { var cts = new CancellationTokenSource(); var tf = new TaskFactory<Int32>(cts.Token,
TaskCreationOptions.AttachedToParent,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default); // This tasks creates and starts 3 child tasks var childTasks = new[] { tf.StartNew(() => Sum(cts.Token, 10000)), tf.StartNew(() => Sum(cts.Token, 20000)), tf.StartNew(() => Sum(cts.Token, Int32.MaxValue)), // Too big, throws OverflowException }; //如果有一個線程錯誤了就暫停所有的任務 Array.ForEach(childTasks, task => task.ContinueWith(t => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted)); tf.ContinueWhenAll( childTasks, completedTasks => completedTasks.Where(t => t.Status == TaskStatus.RanToCompletion).Max(t => t.Result), CancellationToken.None) .ContinueWith(t => Console.WriteLine("The maximum is: " + t.Result), TaskContinuationOptions.ExecuteSynchronously).Wait(); // Wait is for testing only }); parent.ContinueWith(p => { var sb = new StringBuilder("The following exception(s) occurred:" + Environment.NewLine); foreach (var e in p.Exception.Flatten().InnerExceptions) sb.AppendLine(" " + e.GetType().ToString()); Console.WriteLine(sb.ToString()); }, TaskContinuationOptions.OnlyOnFaulted); parent.Start(); try { parent.Wait(); // For testing purposes } catch (AggregateException) { } }
首先我們看一下
var childTasks = new[] {
tf.StartNew(() => Sum(cts.Token, 10000)),
tf.StartNew(() => Sum(cts.Token, 20000)),
tf.StartNew(() => Sum(cts.Token, Int32.MaxValue)), // Too big, throws OverflowException
};
//如果有一個線程錯誤了就暫停所有的任務
Array.ForEach(childTasks,
task => task.ContinueWith(t => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted));
這段代碼是創建了三個線程放入工廠中,並建立一個會拋出異常的線程,下面那段代碼會取消線程的操作(3個線程都取消操作,因為注冊了同一個TOKEN),但是這里需要注意的是:如果我其他線程跑的比拋出異常的線程塊,這會導致取消不了,因為結束了(這個的確很難做,因為無法控制線程的執行速度和優先級)。
tf.ContinueWhenAll(
childTasks,
completedTasks => completedTasks.Where(t => t.Status == TaskStatus.RanToCompletion).Max(t => t.Result),
CancellationToken.None)
.ContinueWith(t => Console.WriteLine("The maximum is: " + t.Result),
TaskContinuationOptions.ExecuteSynchronously).Wait(); // Wait is for testing only
這段代碼才是用TaskFactory的核心,這個會等待所有工廠中的線程執行完畢(包括被取消)才會執行,還有一個方法叫ContinueWhenAny:當有一個線程結束操作就會執行。這里要注意的是:
兩個方法都有帶TaskContinuationOptions參數的重載,但是有那么幾個是不能用的:
- NotOnRanToCompletion
- NotOnFaulted
- OnlyOnCanceled
- NotOnCanceled
- OnlyOnFaulted
- OnlyOnRanToCompletion
也就是說無論前面任務是什么狀態,這個方法都會執行,所以我們必須要自己去判斷:Where(t => t.Status == TaskStatus.RanToCompletion).