3.1 簡介
線程池主要用在需要大量短暫的開銷大的資源的情形。我們預先分配一些資源在線程池當中,當我們需要使用的時候,直接從池中取出,代替了重新創建,不用時候就送回到池當中。
.NET當中的線程池是受CLR來管理的。
.NET線程池有一個QueueUserWorkItem()的靜態方法,這個方法接收一個委托,每當該方法被調用后,委托進入內部的隊列中,如果線程池當中沒有任何線程,此時創建一個新的工作線程,並將隊列的第一個委托放入到工作線程當中。
注意點:
①線程池內的操作,盡量放入短時間運行的工作
②ASP.NET應用程序使用線程池不要把工作線程全部使用掉,否則web服務器將不能處理新的請求。
ASP.NET只推薦使用輸入/輸出密集型異步操作,因為其使用了一個不同的方式,叫做I/O線程。
③線程池當中的線程全部是后台線程,因此要注意前台線程執行完成后,后台線程也將結束工作。
3.2線程池中調用委托
首先要了解一個什么是【異步編程模型(Asynchronous Programming Model簡稱APM)】
.NET 1.0 異步編程模型(APM),
.NET 2.0 基於事件的異步編程模型(EAP),
.NET 4.0 基於任務的異步編程模型(TAP)。
本章主要了解什么是APM和EAP。
下面這篇文章介紹了異步編程模型,感覺挺好的,這里mark一下。
http://blog.csdn.net/xinke453/article/details/37810823
結合我這本書上的Demo,感覺理解起來無鴨梨,哈哈~
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 int threadId = 0; 6 7 RunOnThreadPool poolDelegate = Test; 8 //用創建線程的方法先創建了一個線程 9 var t = new Thread(() => Test(out threadId)); 10 t.Start(); 11 t.Join(); 12 13 Console.WriteLine("Thread id: {0}", threadId); 14 15 /* 16 使用BeginInvoke來運行委托,Callback是一個回調函數, 17 "a delegate asynchronous call" 代表你希望轉發給回調方法的一個對象的引用, 18 在回調方法中,可以查詢IAsyncResult接口的AsyncState屬性來訪問該對象 19 */ 20 IAsyncResult r = poolDelegate.BeginInvoke(out threadId, Callback, "a delegate asynchronous call"); 21 //這個例子當中使用AsyncWaitHandle屬性來等待直到操作完成★ 22 r.AsyncWaitHandle.WaitOne(); 23 //操作完成后,會得到一個結果,可以通過委托調用EndInvoke方法,將IAsyncResult對象傳遞給委托參數。 24 string result = poolDelegate.EndInvoke(out threadId, r); 25 26 Console.WriteLine("Thread pool worker thread id: {0}", threadId); 27 Console.WriteLine(result); 28 29 Thread.Sleep(TimeSpan.FromSeconds(2)); 30 } 31 32 private delegate string RunOnThreadPool(out int threadId); 33 34 private static void Callback(IAsyncResult ar) 35 { 36 Console.WriteLine("Starting a callback..."); 37 Console.WriteLine("State passed to a callbak: {0}", ar.AsyncState); 38 Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); 39 Console.WriteLine("Thread pool worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); 40 } 41 42 43 private static string Test(out int threadId) 44 { 45 Console.WriteLine("Starting..."); 46 Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); 47 Thread.Sleep(TimeSpan.FromSeconds(2)); 48 threadId = Thread.CurrentThread.ManagedThreadId; 49 return string.Format("Thread pool worker thread id was: {0}", threadId);
注意:在這個例子當中,主線程調用Thread.Sleep(TimeSpan.FromSeconds(2));如果沒這句話,回調函數就不會被執行了,
以為線程池是后台線程,此時主線程結束,那么后台線程也跟着結束了,所以可能不會執行。
對於訪問異步操作的結果,APM提供了四種方式供開發人員選擇:
①在調用BeginXxx方法的線程上調用EndXxx方法來得到異步操作的結果,但是這種方式會阻塞調用線程,知道操作完成之后調用線程才繼續運行。
②查詢IAsyncResult的AsyncWaitHandle屬性,從而得到WaitHandle,然后再調用它的WaitOne方法來使一個線程阻塞並等待操作完成再調用EndXxx方法來獲得操作的結果。
(本例子當中使用了這個方法)
③循環查詢IAsyncResult的IsComplete屬性,操作完成后再調用EndXxx方法來獲得操作返回的結果。
④使用 AsyncCallback委托來指定操作完成時要調用的方法,在操作完成后調用的方法中調用EndXxx操作來獲得異步操作的結果。
★★★推薦使用第④種方法,因為此時不會阻塞執行BeginXxx方法的線程,然而其他三種都會阻塞調用線程,相當於效果和使用同步方法是一樣,個人感覺根本失去了異步編程的特點,所以其他三種方式可以簡單了解下,在實際異步編程中都是使用委托的方式。
3.3向線程池中加入異步操作
QueueUserWorkItem方法的定義!
1 [SecuritySafeCritical] 2 public static bool QueueUserWorkItem(WaitCallback callBack);
1 [SecuritySafeCritical] 2 public static bool QueueUserWorkItem(WaitCallback callBack, object state);
實例: 1 class Program 2 { 3 static void Main(string[] args) 4 { 5 const int x = 1; 6 const int y = 2; 7 const string lambdaState = "lambda state 2"; 8 //方法一,直接調用QueueUserWorkItem傳入單個參數,作為回調函數 9 ThreadPool.QueueUserWorkItem(AsyncOperation); 10 Thread.Sleep(TimeSpan.FromSeconds(1)); 11 12 //方法二,傳入回調函數以及狀態參數 13 ThreadPool.QueueUserWorkItem(AsyncOperation, "async state"); 14 Thread.Sleep(TimeSpan.FromSeconds(1)); 15 16 //方法三,使用labmbda表達式 17 ThreadPool.QueueUserWorkItem( state => { 18 Console.WriteLine("Operation state: {0}", state); 19 Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); 20 Thread.Sleep(TimeSpan.FromSeconds(2)); 21 }, "lambda state"); 22 23 //方法四,使用閉包機制 24 ThreadPool.QueueUserWorkItem( _ => 25 { 26 Console.WriteLine("Operation state: {0}, {1}", x+y, lambdaState); 27 Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); 28 Thread.Sleep(TimeSpan.FromSeconds(2)); 29 }, "lambda state"); 30 31 Thread.Sleep(TimeSpan.FromSeconds(2)); 32 } 33 34 private static void AsyncOperation(object state) 35 { 36 Console.WriteLine("Operation state: {0}", state ?? "(null)"); 37 Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); 38 Thread.Sleep(TimeSpan.FromSeconds(2)); 39 }
擴展:C#閉包(Closure)機制是什么?
3.4線程池與並行度
下面這個實例展示線程池如何工作於大量異步操作,以及他和創建的大量單獨線程方式的區別。
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 const int numberOfOperations = 500; 6 var sw = new Stopwatch(); 7 sw.Start(); 8 UseThreads(numberOfOperations); 9 sw.Stop(); 10 Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); 11 12 sw.Reset(); 13 sw.Start(); 14 UseThreadPool(numberOfOperations); 15 sw.Stop(); 16 Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); 17 } 18 19 static void UseThreads(int numberOfOperations) 20 { 21 using (var countdown = new CountdownEvent(numberOfOperations)) 22 { 23 Console.WriteLine("Scheduling work by creating threads"); 24 for (int i = 0; i < numberOfOperations; i++) 25 { 26 var thread = new Thread(() => { 27 Console.Write("{0},", Thread.CurrentThread.ManagedThreadId); 28 Thread.Sleep(TimeSpan.FromSeconds(0.1)); 29 countdown.Signal(); 30 }); 31 thread.Start(); 32 } 33 countdown.Wait(); 34 Console.WriteLine(); 35 } 36 } 37 38 static void UseThreadPool(int numberOfOperations) 39 { 40 using (var countdown = new CountdownEvent(numberOfOperations)) 41 { 42 Console.WriteLine("Starting work on a threadpool"); 43 for (int i = 0; i < numberOfOperations; i++) 44 { 45 ThreadPool.QueueUserWorkItem( _ => { 46 Console.Write("{0},", Thread.CurrentThread.ManagedThreadId); 47 Thread.Sleep(TimeSpan.FromSeconds(0.1)); 48 countdown.Signal(); 49 }); 50 } 51 countdown.Wait(); 52 Console.WriteLine(); 53 } 54 } 55 }
分別用創建大量線程的方式和線程池的方式執行500個Thread.Sleep(TimeSpan.FromSeconds(0.1))操作,
我們發現線程池花費了更多的時間,但是占用的資源數目很少(通過ThreadId來看)。
3.5實現一個取消選項
使用CancellationTokenSource和CancellationToken兩個類來實現工作線程工作的取消操作。
實例: 1 class Program 2 { 3 static void Main(string[] args) 4 { 5 using (var cts = new CancellationTokenSource()) 6 { 7 CancellationToken token = cts.Token; 8 ThreadPool.QueueUserWorkItem(_ => AsyncOperation1(token)); 9 Thread.Sleep(TimeSpan.FromSeconds(2)); 10 cts.Cancel(); 11 } 12 13 using (var cts = new CancellationTokenSource()) 14 { 15 CancellationToken token = cts.Token; 16 ThreadPool.QueueUserWorkItem(_ => AsyncOperation2(token)); 17 Thread.Sleep(TimeSpan.FromSeconds(2)); 18 cts.Cancel(); 19 } 20 21 using (var cts = new CancellationTokenSource()) 22 { 23 CancellationToken token = cts.Token; 24 ThreadPool.QueueUserWorkItem(_ => AsyncOperation3(token)); 25 Thread.Sleep(TimeSpan.FromSeconds(2)); 26 cts.Cancel(); 27 } 28 29 Thread.Sleep(TimeSpan.FromSeconds(2)); 30 } 31 32 /// <summary> 33 /// 第一種采用輪詢IsCancellationRequested屬性的方式,如果為true,那么操作被取消 34 /// </summary> 35 /// <param name="token"></param> 36 static void AsyncOperation1(CancellationToken token) 37 { 38 Console.WriteLine("Starting the first task"); 39 for (int i = 0; i < 5; i++) 40 { 41 if (token.IsCancellationRequested) 42 { 43 Console.WriteLine("The first task has been canceled."); 44 return; 45 } 46 Thread.Sleep(TimeSpan.FromSeconds(1)); 47 } 48 Console.WriteLine("The first task has completed succesfully"); 49 } 50 /// <summary> 51 /// 拋出一個OperationCancelledException異常 52 /// 這個允許操作之外控制取消過程,即需要取消操作的時候,通過操作之外的代碼來處理 53 /// </summary> 54 /// <param name="token"></param> 55 static void AsyncOperation2(CancellationToken token) 56 { 57 try 58 { 59 Console.WriteLine("Starting the second task"); 60 61 for (int i = 0; i < 5; i++) 62 { 63 token.ThrowIfCancellationRequested(); 64 Thread.Sleep(TimeSpan.FromSeconds(1)); 65 } 66 Console.WriteLine("The second task has completed succesfully"); 67 } 68 catch (OperationCanceledException) 69 { 70 Console.WriteLine("The second task has been canceled."); 71 } 72 } 73 /// <summary> 74 /// 第三種注冊一個回調函數,當操作被取消時候,調用回調函數 75 /// </summary> 76 /// <param name="token"></param> 77 private static void AsyncOperation3(CancellationToken token) 78 { 79 bool cancellationFlag = false; 80 token.Register(() => cancellationFlag = true); 81 Console.WriteLine("Starting the third task"); 82 for (int i = 0; i < 5; i++) 83 { 84 if (cancellationFlag) 85 { 86 Console.WriteLine("The third task has been canceled."); 87 return; 88 } 89 Thread.Sleep(TimeSpan.FromSeconds(1)); 90 } 91 Console.WriteLine("The third task has completed succesfully"); 92
CancellationTokenSource和CancellationToken兩個類是.net4.0一會引入的,目前是實現異步操作取消事實標准。
3.6在線程池中使用等待事件處理器和超時
使用線程池當中的Threadpool.RegisterWaitSingleObject類來進行事件案處理。
RegisterWaitSingleObject的原型如下:
1 public static RegisteredWaitHandle RegisterWaitForSingleObject( 2 WaitHandle waitObject, 3 WaitOrTimerCallback callBack, 4 Object state, 5 int millisecondsTimeOutInterval, 6 bool executeOnlyOnce 7 )
參數
waitObject
要注冊的 WaitHandle。使用 WaitHandle 而非 Mutex。
callBack
waitObject 參數終止時調用的 WaitOrTimerCallback 委托。
state
傳遞給委托的對象。
timeout
TimeSpan 表示的超時時間。如果 timeout 為零,則函數測試對象的狀態並立即返回。如果 timeout 為 -1,則函數的超時間隔永遠不過期。
executeOnlyOnce
如果為 true,表示在調用了委托后,線程將不再在 waitObject 參數上等待;如果為 false,表示每次完成等待操作后都重置計時器,直到注銷等待。
返回值
封裝本機句柄的 RegisteredWaitHandle。
相信看了這些之后大家還是一頭霧水,這個方法的做用是向線程池添加一個可以定時執行的方法,第四個參數millisecondsTimeOutInterval 就是用來設置間隔執行的時間,但是這里第五個參數executeOnlyOnce 會對第四個參數起作用,當它為true時,表示任務僅會執行一次,就是說它不會,像Timer一樣,每隔一定時間執行一次,這個功能的話用Timer控件也可以實現
該方法還在此基礎上提供了基於信號量來觸發執行任務。
信號量也叫開關量,故名思議,它只有兩種狀態,不是true就是false,
WaitHandle就是這類開關量的基礎類,繼承它的類有Mutex,ManualResetEvent,AutoResetEvent,一般我們使用后兩個
寫法:
static ManualResetEvent wait2=new ManualResetEvent(false);
static AutoResetEvent wait=new AutoResetEvent(false);
我們可以在將其實例化時指定開關量的初始值。(true為有信號,false為沒信號)
ManualResetEvent和AutoResetEvent的區別在於:
前者調用Set方法后將自動將開關量值將一直保持為true,后者調用Set方法將變為true隨后立即變為false,可以將它理解為一個脈沖。
例子 1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //執行兩次RunOperations操作,第一次會超時,第二次不會超時 6 RunOperations(TimeSpan.FromSeconds(5)); 7 RunOperations(TimeSpan.FromSeconds(7)); 8 } 9 10 static void RunOperations(TimeSpan workerOperationTimeout) 11 { 12 //定義一個ManualResetEvent信號量,初始為false 13 using (var evt = new ManualResetEvent(false)) 14 //實例化一個CancellationTokenSource實例,用於取消操作 15 using (var cts = new CancellationTokenSource()) 16 { 17 Console.WriteLine("Registering timeout operations..."); 18 //注冊超時的被調用的回調函數。 19 var worker = ThreadPool.RegisterWaitForSingleObject( 20 evt, 21 (state, isTimedOut) => WorkerOperationWait(cts, isTimedOut), 22 null, 23 workerOperationTimeout, 24 true ); 25 26 Console.WriteLine("Starting long running operation..."); 27 //線程池執行WorkerOperation操作 28 ThreadPool.QueueUserWorkItem(_ => WorkerOperation(cts.Token, evt)); 29 30 Thread.Sleep(workerOperationTimeout.Add(TimeSpan.FromSeconds(2))); 31 worker.Unregister(evt); 32 } 33 } 34 35 /// <summary> 36 /// 線程池內需要被調用的操作 37 /// </summary> 38 /// <param name="token"></param> 39 /// <param name="evt"></param> 40 static void WorkerOperation(CancellationToken token, ManualResetEvent evt) 41 { 42 for(int i = 0; i < 6; i++) 43 { 44 if (token.IsCancellationRequested) 45 { 46 return; 47 } 48 Thread.Sleep(TimeSpan.FromSeconds(1)); 49 } 50 //設置信號量,此時evt為true。 51 evt.Set(); 52 } 53 54 /// <summary> 55 /// 超時時候執行的回調函數 56 /// </summary> 57 /// <param name="cts"></param> 58 /// <param name="isTimedOut"></param> 59 static void WorkerOperationWait(CancellationTokenSource cts, bool isTimedOut) 60 { 61 if (isTimedOut) 62 { 63 cts.Cancel(); 64 Console.WriteLine("Worker operation timed out and was canceled."); 65 } 66 else 67 { 68 Console.WriteLine("Worker operation succeded."); 69 } 70
3.7在線程池中使用計時器
使用system.Threading.Timer對象來在線程池中創建周期性調用的異步操作。
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 Console.WriteLine("Press 'Enter' to stop the timer..."); 6 DateTime start = DateTime.Now; 7 //實例化這個timer類 8 //一秒后執行TimerOperation這個操作,然后每隔2秒執行一次 9 _timer = new Timer( 10 _ => TimerOperation(start), 11 null, 12 TimeSpan.FromSeconds(1), 13 TimeSpan.FromSeconds(2)); 14 15 Thread.Sleep(TimeSpan.FromSeconds(6)); 16 17 //改變計時器的運行時間,一秒后執行TimerOperation,然后每隔4秒執行一次 18 _timer.Change(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(4)); 19 20 Console.ReadLine(); 21 22 _timer.Dispose(); 23 } 24 25 static Timer _timer; 26 27 static void TimerOperation(DateTime start) 28 { 29 TimeSpan elapsed = DateTime.Now - start; 30 Console.WriteLine("{0} seconds from {1}. Timer thread pool thread id: {2}", elapsed.Seconds, start, 31 Thread.CurrentThread.ManagedThreadId); 32 } 33
3.8使用BackgroudWorker組件
本小節將介紹一個異步編程模式的另一種方式,叫基於事件的異步模式(EAP)
先看一個例子吧:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //實例化一個BackgroundWorker類 6 var bw = new BackgroundWorker(); 7 //獲取或設置一個值,該值指示 BackgroundWorker 能否報告進度更新。 8 bw.WorkerReportsProgress = true; 9 //設置后台工作線程是否支持取消操作 10 bw.WorkerSupportsCancellation = true; 11 12 //給DoWork、ProgressChanged、RunWorkerCompleted事件綁定處理函數 13 bw.DoWork += Worker_DoWork; 14 bw.ProgressChanged += Worker_ProgressChanged; 15 bw.RunWorkerCompleted += Worker_Completed; 16 17 //啟動異步操作 18 bw.RunWorkerAsync(); 19 20 Console.WriteLine("Press C to cancel work"); 21 do 22 { 23 if (Console.ReadKey(true).KeyChar == 'C') 24 { 25 //取消操作 26 bw.CancelAsync(); 27 } 28 29 } 30 while(bw.IsBusy); 31 } 32 33 static void Worker_DoWork(object sender, DoWorkEventArgs e) 34 { 35 Console.WriteLine("DoWork thread pool thread id: {0}", Thread.CurrentThread.ManagedThreadId); 36 var bw = (BackgroundWorker) sender; 37 for (int i = 1; i <= 100; i++) 38 { 39 40 if (bw.CancellationPending) 41 { 42 e.Cancel = true; 43 return; 44 } 45 46 if (i%10 == 0) 47 { 48 bw.ReportProgress(i); 49 } 50 51 Thread.Sleep(TimeSpan.FromSeconds(0.1)); 52 } 53 e.Result = 42; 54 } 55 56 static void Worker_ProgressChanged(object sender, ProgressChangedEventArgs e) 57 { 58 Console.WriteLine("{0}% completed. Progress thread pool thread id: {1}", e.ProgressPercentage, 59 Thread.CurrentThread.ManagedThreadId); 60 } 61 62 static void Worker_Completed(object sender, RunWorkerCompletedEventArgs e) 63 { 64 Console.WriteLine("Completed threadpool thread id:{0}",Thread.CurrentThread.ManagedThreadId); 65 if (e.Error != null) 66 { 67 Console.WriteLine("Exception {0} has occured.", e.Error.Message); 68 } 69 else if (e.Cancelled) 70 { 71 Console.WriteLine("Operation has been canceled."); 72 } 73 else 74 { 75 Console.WriteLine("The answer is: {0}", e.Result); 76 } 77 }
|
名稱 |
說明 |
|
||
|
調用 RunWorkerAsync 時發生。 RunWorkerAsync 方法提交一個啟動以異步方式運行的操作的請求。發出該請求后,將引發 DoWork 事件,該事件隨后開始執行后台操作。 如果后台操作已在運行,則再次調用 RunWorkerAsync 將引發 InvalidOperationException。 |
|
|
調用 ReportProgress 時發生。 public void ReportProgress ( percentProgress 已完成的后台操作所占的百分比,范圍從 0% 到 100%。 如果您需要后台操作報告其進度,則可以調用 ReportProgress 方法來引發 ProgressChanged 事件。 WorkerReportsProgress 屬性值必須是 true,否則 ReportProgress 將引發 InvalidOperationException。 您需要實現一個有意義的方法,以便按照占已完成的總任務的百分比來度量后台操作的進度。 對 ReportProgress 方法的調用為異步且立即返回。The ProgressChanged 事件處理程序在創建 BackgroundWorker 的線程上執行。 |
|
|
當后台操作已完成、被取消或引發異常時發生。 |
|
|
|
在該方法中可以知道操作是成功完成還是發生錯誤,亦或被取消。 |
貼り付け元 <https://msdn.microsoft.com/zh-cn/library/system.componentmodel.backgroundworker.aspx>
成功完成的時候
任務取消的時候