在上篇最后一個例子之后,我們發現了怎么去使用線程池,調用ThreadPool的QueueUserWorkItem方法來發起一次異步的、計算限制的操作,例子很簡單,不是嗎?
然而,在今天這篇博客中,我們要知道的是,QueueUserWorkItem這個技術存在許多限制。其中最大的問題是沒有一個內建的機制讓你知道操作在什么時候完成,也沒有一個機制在操作完成是獲得一個返回值,這些問題使得我們都不敢啟用這個技術。
Microsoft為了克服這些限制(同時解決其他一些問題),引入了任務(tasks)的概念。順帶說一下我們得通過System.Threading.Tasks命名空間來使用它們。
現在我要說的是,用線程池不是調用ThreadPool的QueueUserWorkItem方法,而是用任務來做相同的事:
1 static void Main(string[] args) 2 { 3 Console.WriteLine("主線程啟動"); 4 //ThreadPool.QueueUserWorkItem(StartCode,5); 5 new Task(StartCode, 5).Start(); 6 Console.WriteLine("主線程運行到此!"); 7 Thread.Sleep(1000); 8 } 9 10 private static void StartCode(object i) 11 { 12 Console.WriteLine("開始執行子線程...{0}",i); 13 Thread.Sleep(1000);//模擬代碼操作 14 } 15 }
嘿,你會發現結果是一樣的。
再來看看這個是什么:
TaskCreationOptions這個類型是一個枚舉類型,傳遞一些標志來控制Task的執行方式。TaskCreationOptions定義如下:
慢點,注釋很詳細,看看這些有好處,TaskScheduler(任務調度器)不懂沒關系,請繼續往下看,我會介紹的,但請注意,這些標識都只是一些提議而已,在調度一個Task時,可能會、也可能不會采納這些提議,不過有一條要注意:AttachedToParent標志,它總會得到Task采納,因為它和TaskScheduler本身無關。
來看下這段代碼:
1 static void Main(string[] args) 2 { 3 4 //1000000000這個數字會拋出System.AggregateException 5 6 Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 1000000000); 7 8 //可以現在開始,也可以以后開始 9 10 t.Start(); 11 12 //Wait顯式的等待一個線程完成 13 14 t.Wait(); 15 16 Console.WriteLine("The Sum is:"+t.Result); 17 } 18 19 private static Int32 Sum(Int32 i) 20 { 21 Int32 sum = 0; 22 for (; i > 0; i--) 23 checked { sum += i; } 24 return sum; 25 } 26 }
這段代碼大家應該猜得出是什么意思吧,人人都會寫。
但是,我的結果為什么是t.Result而不直接是返回的Sum呢? 有沒有多此一舉的感覺?
下面我來說說這段代碼我想表達的意思:
在一個線程調用Wait方法時,系統會檢查線程要等待的Task是否已經開始執行,如果任務正在執行,那么這個Wait方法會使線程阻塞,知道Task運行結束為止。
就說上面的程序執行,因為累加數字太大,它拋出算術運算溢出錯誤,在一個計算限制任務拋出一個未處理的異常時,這個異常會被“包含”不並存儲到一個集合中,而線程池線程是允許返回到線程池中的,在調用Wait方法或者Result屬性時,這個成員會拋出一個System.AggregateException對象。
現在你會問,為什么要調用Wait或者Result?或者一直不查詢Task的Exception屬性?你的代碼就永遠注意不到這個異常的發生,如果不能捕捉到這個異常,垃圾回收時,拋出AggregateException,進程就會立即終止,這就是“牽一發動全身”,莫名其妙程序就自己關掉了,誰也不知道這是什么情況。所以,必須調用前面提到的某個成員,確保代碼注意到異常,並從異常中恢復。悄悄告訴你,其實在用Result的時候,內部會調用Wait。
怎么恢復?
為了幫助你檢測沒有注意到的異常,可以向TaskScheduler的靜態UnobservedTaskException時間等級一個回調方法,當Task被垃圾回收時,如果出現一個沒有被注意到的異常,CLR終結器會引發這個事件。一旦引發,就會向你的時間處理器方法傳遞一個UnobservedTaskExceptionEvenArgs對象,其中包含了你沒有注意的AggregateException。然后再調用UnobservedTasExceptionEvenArgs的SetObserved方法來指出你的異常已經處理好了,從而阻止CLR終止進程。這是個圖省事的做法,要少做這些,寧願終止進程,也不要呆着已經損壞的狀態而繼續運行。做人也一樣,病了寧肯休息,也不要帶病堅持上班,你沒那么偉大,公司也不需要你的這一點偉大,命是自己的。(─.─|||扯遠了。
除了單個等待任務,Task 還提供了兩個靜態方法:WaitAny和WaitAll,他們允許線程等待一個Task對象數組。
WaitAny方法會阻塞調用線程,知道數組中的任何一個Task對象完成,這個方法會返回一個索引值,指明完成的是哪一個Task對象。如果發生超時,方法將返回-1。它可以通過一個CancellationToken取消,會拋出一個OperationCanceledException。
WaitAll方法也會阻塞調用線程,知道數組中的所有Task對象都完成,如果全部完成就返回true,如果超時就返回false。當然它也能取消,同樣會拋出OperationCanceledException。
說了這么兩個取消任務的方法,現在來試試這個方法,加深下印象,修改先前例子代碼,完整代碼如下:
1 static void Main(string[] args) 2 { 3 CancellationTokenSource cts = new CancellationTokenSource(); 4 5
6 7 Task<Int32> t = new Task<Int32>(() => Sum(cts.Token,10000), cts.Token); 8 9 //可以現在開始,也可以以后開始 10 11 t.Start(); 12 13 //在之后的某個時間,取消CancellationTokenSource 以取消Task 14 15 cts.Cancel();//這是個異步請求,Task可能已經完成了。我是雙核機器,Task沒有完成過 16 17 18 //注釋這個為了測試拋出的異常 19 //Console.WriteLine("This sum is:" + t.Result); 20 try 21 { 22 //如果任務已經取消了,Result會拋出AggregateException 23 24 Console.WriteLine("This sum is:" + t.Result); 25 } 26 catch (AggregateException x) 27 { 28 //將任何OperationCanceledException對象都視為已處理。 29 //其他任何異常都造成拋出一個AggregateException,其中 30 //只包含未處理的異常 31 32 x.Handle(e => e is OperationCanceledException); 33 Console.WriteLine("Sum was Canceled"); 34 } 35 36 } 37 38 private static Int32 Sum(CancellationToken ct ,Int32 i) 39 { 40 Int32 sum = 0; 41 for (; i > 0; i--) 42 { 43 //在取消標志引用的CancellationTokenSource上如果調用 44 //Cancel,下面這一行就會拋出OperationCanceledException 45 46 ct.ThrowIfCancellationRequested(); 47 48 checked { sum += i; } 49 } 50 51 return sum; 52 } 53 }
這個例子展示了一個任務在進行的時候中途取消的操作,我覺得它很有趣,你試試也會發現。
Lamada表達式寫這個,是個亮點,得學學,將CancellationToken閉包變量“傳遞”。
如果不用Lamada表達式,這問題還真不好解決:
Task<Int32> t = new Task<Int32>(() => Sum(cts.Token,10000), cts.Token);
Sum(cts.Token,10000) 內的Token需要和cts.Token關聯起來,你還能想出怎么關聯起來么?
好,任務取消也講玩了,來看個更好用的技術:
1 static void Main(string[] args) 2 { 3 4 Task<Int32> t = new Task<Int32>(i => Sum((Int32)i),10000); 5 6 //可以現在開始,也可以以后開始 7 8 t.Start(); 9 10 Task cwt = t.ContinueWith(task=>Console.WriteLine("The sum is:{0}",task.Result)); 11 cwt.Wait(); 12 13 } 14 15 private static Int32 Sum(Int32 i) 16 { 17 Int32 sum = 0; 18 for (; i > 0; i--) 19 { 20 checked { sum += i; } 21 } 22 23 return sum; 24 } 25 }
ContinueWith? 啥東西~~??
要寫可伸縮的軟件,一定不能使你的線程阻塞。這意味着如果調用Wait或者在任務未完成時查詢Result屬性,極有可能造成線程池創建一個新線程,這增大了資源的消耗,並損害了伸縮性。
ContinueWith便是一個更好的方式,一個任務完成時它可以啟動另一個任務。上面的例子不會阻塞任何線程。
當Sum的任務完成時,這個任務會啟動另一個任務以顯示結果。ContinueWith會返回對新的Task對象的一個引用,所以為了看到結果,我需要調用一下Wait方法,當然你也可以查詢下Result,或者繼續ContinueWith,返回的這個對象可以忽略,它僅僅是一個變量。
還要指出的是,Task對象內部包含了ContinueWith任務的一個集合。所以,實際上可以用一個Task對象來多次調用ContinueWith。任務完成時,所有ContinueWith任務都會進入線程池隊列中,在構造ContinueWith的時候我們可以看到一個TaskContinuationOptions枚舉值,不能忽視,看看它的定義:
PrefereFairness是盡量公平的意思,就是較早調度的任務可能較早的運行,先來后到,將線程放到全局隊列,便可以實現這個效果。
ExecuteSynchronously指同步執行,強制兩個任務用同一個線程一前一后運行,然后就同步運行了。
看得是不是暈乎乎 ?有這么多枚舉例子,怎么掌握啊?多看幾次,知道任務的使用情況,以后用起來得心應手~想學新技術,就要能耐住,才能基礎牢固。來看個例子,用用這些枚舉。
1 static void Main(string[] args) 2 { 3 Task<Int32> t = new Task<Int32>(i => Sum((Int32)i),10000); 4 5 t.Start(); 6 7 t.ContinueWith(task=>Console.WriteLine("The sum is:{0}",task.Result), 8 TaskContinuationOptions.OnlyOnRanToCompletion); 9 10 t.ContinueWith(task=>Console.WriteLine("Sum throw:"+task.Exception), 11 TaskContinuationOptions.OnlyOnFaulted); 12 13 t.ContinueWith(task=>Console.WriteLine("Sum was cancel:"+task.IsCanceled), 14 TaskContinuationOptions.OnlyOnCanceled); 15 try 16 { 17 t.Wait(); // 測試用 18 } 19 catch (AggregateException) 20 { 21 Console.WriteLine("出錯"); 22 } 23 24 25 } 26 27 private static Int32 Sum(Int32 i) 28 { 29 Int32 sum = 0; 30 for (; i > 0; i--) 31 { 32 checked { sum += i; } 33 } 34 35 return sum; 36 } 37 }
ContinueWith講完了。可是還沒有結束哦。
AttachedToParnt枚舉類型(父任務)也不能放過!看看怎么用,寫法有點新奇,看看:
1 static void Main(string[] args) 2 { 3 Task<Int32[]> parent = new Task<Int32[]>(() => { 4 var results = new Int32[3]; 5 // 6 new Task(() => results[0] = Sum(10000), TaskCreationOptions.AttachedToParent).Start(); 7 new Task(() => results[1] = Sum(20000), TaskCreationOptions.AttachedToParent).Start(); 8 new Task(() => results[2] = Sum(30000), TaskCreationOptions.AttachedToParent).Start(); 9 return results; 10 }); 11 12 var cwt = parent.ContinueWith( parentTask=>Array.ForEach(parentTask.Result,Console.WriteLine)); 13 14 15 parent.Start(); 16 cwt.Wait(); 17 } 18 19 private static Int32 Sum(Int32 i) 20 { 21 Int32 sum = 0; 22 for (; i > 0; i--) 23 { 24 checked { sum += i; } 25 } 26 return sum; 27 } 28 }
Oh,我都寫暈了。。。(+﹏+)~
例子中,父任務創建兵啟動3個Task對象。默認情況下,一個任務創建的Task對象是頂級任務,這些任務跟創建它們的那個任務沒有關系。
TaskCreationOptions.AttachedToParent標志將一個Task和創建它的那個Task關聯起來,除非所有子任務(子任務的子任務)結束運行,否則創建任務(父任務)不會認為已經結束。調用ContinueWith方法創建一個Task時,可以指定TaskContinuationOptions.AttachedToParent標志將延續任務置頂為一個子任務。
看了這么多任務的方法操作示例了,現在來挖挖任務內部構造:
每個Task對象都有一組構成任務狀態的字段。
- 一個Int32 ID(只讀屬性)
- 代表Task執行狀態的一個Int32
- 對父任務的一個引用
- 對Task創建時置頂TaskSchedule的一個引用
- 對回調方法的一個引用
- 對要傳給回調方法的對象的一個引用(通過Task只讀AsyncState屬性查詢)
- 對一個ExceptionContext的引用
- 對一個ManualResetEventSlim對象的引用
還有沒個Task對象都有對根據需要創建的一些補充狀態的一個引用,補充狀態包含這些:
- 一個CancellationToken
- 一個ContinueWithTask對象集合
- 為拋出未處理異常的子任務,所准備的一個Task對象集合
說了這么多,只想要大家知道:
雖然任務提供了大量功能,但並不是沒有代價的。因為必須為所有的這些狀態分配內存。
如果不需要任務提供的附加功能,使用ThreadPool.QueueUserWorkItem,資源的使用效率會更高一些。
Task類還實現了IDispose接口,允許你在用完Task對象后調用Dispose,不過大多數不管,讓垃圾回收器回收就好。
創建一個Task對象時,代表Task唯一的一個Int32字段初始化為零,TaskID從1開始,每分配一個ID都遞增1。順帶說一下,在你調試中查看一個Task對象的時候,會造成調試器顯示Task的ID,從而造成為Task分配一個ID。
這個ID的意義在於,每個Task都可以用一個唯一的值來標識。Visual Studio會在它的“並行任務”和並行堆棧“窗口中顯示這些任務ID。要知道的是,這是Visual Studio自己分配的ID,不是在自己代碼中分配的ID,幾乎不可能將Visual Studio分配的ID和代碼正在做的事情聯系起來。要查看自己正在運行的任務,可以在調試的時候查看Task的靜態CurrentId屬性,如果沒有任務在執行,CurrentId返回null。
再看看TaskStatus的值,這個可以查詢Task對象的生存期:
這些在任務運行的時候都是可以一一查到的,還有~判斷要像這樣:
1 if(task.Status==TaskStatus.RantoCompletion)...
為了簡化編碼,Task只提供幾個只讀Boolean屬性:IsCanceled,IsFaulted,IsCompleted,它們能返回最終狀態true/false。
如果Task是通過調用某個函數來創建的,這個Task對象就會出於WaitingForActivation狀態,它會自動運行。
最后我們要來了解一下TaskFactory(任務工廠):
1.需要創建一組Task對象來共享相同的狀態
2.為了避免機械的將相同的參數傳給每一個Task的構造器。
滿足這些條件就可以創建一個任務工廠來封裝通用的狀態。TaskFactory類型和TaskFactory<TResult>類型,它們都派生System.Object。
你會學到不一樣的編碼方式:
1 static void Main(string[] args) 2 { 3 Task parent = new Task(() => 4 { 5 var cts = new CancellationTokenSource(); 6 var tf = new TaskFactory<Int32>(cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); 7 8 //創建並啟動3個子任務 9 var childTasks = new[] { 10 tf.StartNew(() => Sum(cts.Token, 10000)), 11 tf.StartNew(() => Sum(cts.Token, 20000)), 12 tf.StartNew(() => Sum(cts.Token, Int32.MaxValue)) // 這個會拋異常 13 }; 14 15 // 任何子任務拋出異常就取消其余子任務 16 for (Int32 task = 0; task < childTasks.Length; task++) 17 childTasks[task].ContinueWith(t => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted); 18 19 // 所有子任務完成后,從未出錯/未取消的任務獲取返回的最大值 20 // 然后將最大值傳給另一個任務來顯示最大結果 21 tf.ContinueWhenAll(childTasks, 22 completedTasks => completedTasks.Where(t => !t.IsFaulted && !t.IsCanceled).Max(t => t.Result), 23 CancellationToken.None) 24 .ContinueWith(t => Console.WriteLine("The maxinum is: " + t.Result), 25 TaskContinuationOptions.ExecuteSynchronously).Wait(); // Wait用於測試 26 }); 27 28 // 子任務完成后,也顯示任何未處理的異常 29 parent.ContinueWith(p => 30 { 31 // 用StringBuilder輸出所有 32 33 StringBuilder sb = new StringBuilder("The following exception(s) occurred:" + Environment.NewLine); 34 foreach (var e in p.Exception.Flatten().InnerExceptions) 35 sb.AppendLine(" " + e.GetType().ToString()); 36 Console.WriteLine(sb.ToString()); 37 }, TaskContinuationOptions.OnlyOnFaulted); 38 39 // 啟動父任務 40 parent.Start(); 41 42 try 43 { 44 parent.Wait(); //顯示結果 45 } 46 catch (AggregateException) 47 { 48 } 49 } 50 51 private static Int32 Sum(CancellationToken ct, Int32 n) 52 { 53 Int32 sum = 0; 54 for (; n > 0; n--) 55 { 56 ct.ThrowIfCancellationRequested(); 57 checked { sum += n; } 58 } 59 return sum; 60 } 61 }
任務工廠就這么用,就是一個任務的集合。
現在看看TaskScheduler(任務調度)
任務基礎結構是很靈活的,TaskScheduler對象功不可沒。
TaskScheduler對象負責執行調度的任務,同時向Visual Studio調試器公開任務信息,就像一座橋梁,讓我們能夠掌控自己的任務線程。
TaskScheduler有兩個派生類:thread pool task scheduler(線程池任務調度),和synchronization context task scheduler(同步上下文任務調度器)。默認情況下,所以應用程序使用的都是線程池任務調度器,這個任務調度器將任務調度給線程池的工作者線程。可以查詢TaskScheduler的靜態Default屬性來獲得對默認任務調度器的一個引用。
同步上下文任務調度器通常用於桌面應用程序,Winfrom,WPF及Silverlight。這個任務調度器將多有任務都調度給應用程序的GUI線程,使所有任務代碼都能成功更新UI組建,比如按鈕、菜單項等。同步上下文任務調度器根本不使用線程池。同樣,可以查詢TaskScheduler的靜態FromCurrentSynchronizationContext方法來獲得對一個同步上下文任務調度器的引用。
就像這樣創建類型:
1 //同步上下文任務調度 2 TaskScheduler m_syncContextTaskScheduler = 3 TaskScheduler.FromCurrentSynchronizationContext();
任務調度有很多的,下面列舉一部分,供參考,更多的請參看http://code.msdn.microsoft.com/ParExtSamples 它包括了大量的示例代碼。
它內容實在有點多。寫了我很久了。好不容易把任務這塊一次寫完,希望大家有更多的收獲。
--------------------下篇預告,線程池如何管理線程。把基礎介紹完結,也算是一個新的起點了。^_^