C#當中的多線程_線程池


3.1 簡介

線程池主要用在需要大量短暫的開銷大的資源的情形。我們預先分配一些資源在線程池當中,當我們需要使用的時候,直接從池中取出,代替了重新創建,不用時候就送回到池當中。

.NET當中的線程池是受CLR來管理的。

.NET線程池有一個QueueUserWorkItem()的靜態方法,這個方法接收一個委托,每當該方法被調用后,委托進入內部的隊列中,如果線程池當中沒有任何線程,此時創建一個新的工作線程,並將隊列的第一個委托放入到工作線程當中。

    

 

注意點:

①線程池內的操作,盡量放入短時間運行的工作

ASP.NET應用程序使用線程池不要把工作線程全部使用掉,否則web服務器將不能處理新的請求。

ASP.NET只推薦使用輸入/輸出密集型異步操作,因為其使用了一個不同的方式,叫做I/O線程。

③線程池當中的線程全部是后台線程,因此要注意前台線程執行完成后,后台線程也將結束工作。

 

3.2線程池中調用委托

首先要了解一個什么是【異步編程模型(Asynchronous Programming Model簡稱APM)】

.NET 1.0 異步編程模型(APM),

.NET 2.0 基於事件的異步編程模型(EAP),

.NET 4.0 基於任務的異步編程模型(TAP)。

本章主要了解什么是APMEAP

下面這篇文章介紹了異步編程模型,感覺挺好的,這里mark一下。

http://blog.csdn.net/xinke453/article/details/37810823

結合我這本書上的Demo,感覺理解起來無鴨梨,哈哈~

1     class Program

2     {

3         static void Main(string[] args)

4         {

5             int threadId = 0;

6

7             RunOnThreadPool poolDelegate = Test;

8                         //用創建線程的方法先創建了一個線程

9             var t = new Thread(() => Test(out threadId));

10             t.Start();

11             t.Join();

12

13             Console.WriteLine("Thread id: {0}", threadId);

14

15             /*

16              使用BeginInvoke來運行委托,Callback是一個回調函數,

17               "a delegate asynchronous call" 代表你希望轉發給回調方法的一個對象的引用,

18              在回調方法中,可以查詢IAsyncResult接口的AsyncState屬性來訪問該對象

19              */

20             IAsyncResult r = poolDelegate.BeginInvoke(out threadId, Callback, "a delegate asynchronous call");

21              //這個例子當中使用AsyncWaitHandle屬性來等待直到操作完成★

22             r.AsyncWaitHandle.WaitOne();

23             //操作完成后,會得到一個結果,可以通過委托調用EndInvoke方法,將IAsyncResult對象傳遞給委托參數。

24             string result = poolDelegate.EndInvoke(out threadId, r);

25             

26             Console.WriteLine("Thread pool worker thread id: {0}", threadId);

27             Console.WriteLine(result);

28

29             Thread.Sleep(TimeSpan.FromSeconds(2));

30         }

31

32         private delegate string RunOnThreadPool(out int threadId);

33

34         private static void Callback(IAsyncResult ar)

35         {

36             Console.WriteLine("Starting a callback...");

37             Console.WriteLine("State passed to a callbak: {0}", ar.AsyncState);

38             Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread);

39             Console.WriteLine("Thread pool worker thread id: {0}", Thread.CurrentThread.ManagedThreadId);

40         }

41

42

43         private static string Test(out int threadId)

44         {

45             Console.WriteLine("Starting...");

46             Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread);

47             Thread.Sleep(TimeSpan.FromSeconds(2));

48             threadId = Thread.CurrentThread.ManagedThreadId;

49             return string.Format("Thread pool worker thread id was: {0}", threadId);

 

注意:在這個例子當中,主線程調用Thread.Sleep(TimeSpan.FromSeconds(2));如果沒這句話,回調函數就不會被執行了,

以為線程池是后台線程,此時主線程結束,那么后台線程也跟着結束了,所以可能不會執行。

對於訪問異步操作的結果,APM提供了四種方式供開發人員選擇:

①在調用BeginXxx方法的線程上調用EndXxx方法來得到異步操作的結果,但是這種方式會阻塞調用線程,知道操作完成之后調用線程才繼續運行。

②查詢IAsyncResultAsyncWaitHandle屬性,從而得到WaitHandle,然后再調用它的WaitOne方法來使一個線程阻塞並等待操作完成再調用EndXxx方法來獲得操作的結果。

(本例子當中使用了這個方法)

③循環查詢IAsyncResultIsComplete屬性,操作完成后再調用EndXxx方法來獲得操作返回的結果。

④使用 AsyncCallback委托來指定操作完成時要調用的方法,在操作完成后調用的方法中調用EndXxx操作來獲得異步操作的結果。

★★★推薦使用第④種方法,因為此時不會阻塞執行BeginXxx方法的線程,然而其他三種都會阻塞調用線程,相當於效果和使用同步方法是一樣,個人感覺根本失去了異步編程的特點,所以其他三種方式可以簡單了解下,在實際異步編程中都是使用委托的方式。

3.3向線程池中加入異步操作

QueueUserWorkItem方法的定義!
1     [SecuritySafeCritical] 2     public static bool QueueUserWorkItem(WaitCallback callBack);
1 [SecuritySafeCritical] 2     public static bool QueueUserWorkItem(WaitCallback callBack, object state);
實例:
1     class Program
2     {
3         static void Main(string[] args)
4         {
5             const int x = 1;
6             const int y = 2;
7             const string lambdaState = "lambda state 2";
8            //方法一,直接調用QueueUserWorkItem傳入單個參數,作為回調函數
9            ThreadPool.QueueUserWorkItem(AsyncOperation);
10           Thread.Sleep(TimeSpan.FromSeconds(1));
11 
12           //方法二,傳入回調函數以及狀態參數
13           ThreadPool.QueueUserWorkItem(AsyncOperation, "async state");
14           Thread.Sleep(TimeSpan.FromSeconds(1));
15 
16           //方法三,使用labmbda表達式
17           ThreadPool.QueueUserWorkItem( state => {
18                     Console.WriteLine("Operation state: {0}", state);
19                     Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId);
20                     Thread.Sleep(TimeSpan.FromSeconds(2));
21                 }, "lambda state");
22 
23           //方法四,使用閉包機制
24           ThreadPool.QueueUserWorkItem( _ =>
25             {
26                 Console.WriteLine("Operation state: {0}, {1}", x+y, lambdaState);
27                 Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId);
28                 Thread.Sleep(TimeSpan.FromSeconds(2));
29             }, "lambda state");
30 
31             Thread.Sleep(TimeSpan.FromSeconds(2));
32         }
33 
34         private static void AsyncOperation(object state)
35         {
36             Console.WriteLine("Operation state: {0}", state ?? "(null)");
37             Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId);
38             Thread.Sleep(TimeSpan.FromSeconds(2));
39         }

擴展:C#閉包(Closure)機制是什么?

 

3.4線程池與並行度

下面這個實例展示線程池如何工作於大量異步操作,以及他和創建的大量單獨線程方式的區別。

1     class Program
2     {
3         static void Main(string[] args)
4         {
5             const int numberOfOperations = 500;
6             var sw = new Stopwatch();
7             sw.Start();
8             UseThreads(numberOfOperations);
9             sw.Stop();
10             Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds);
11 
12             sw.Reset();
13             sw.Start();
14             UseThreadPool(numberOfOperations);
15             sw.Stop();
16             Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds);
17         }
18 
19         static void UseThreads(int numberOfOperations)
20         {
21             using (var countdown = new CountdownEvent(numberOfOperations))
22             {
23                 Console.WriteLine("Scheduling work by creating threads");
24                 for (int i = 0; i < numberOfOperations; i++)
25                 {
26                     var thread = new Thread(() => {
27                         Console.Write("{0},", Thread.CurrentThread.ManagedThreadId);
28                         Thread.Sleep(TimeSpan.FromSeconds(0.1));
29                         countdown.Signal();
30                     });
31                     thread.Start();
32                 }
33                 countdown.Wait();
34                 Console.WriteLine();
35             }
36         }
37 
38         static void UseThreadPool(int numberOfOperations)
39         {
40             using (var countdown = new CountdownEvent(numberOfOperations))
41             {
42                 Console.WriteLine("Starting work on a threadpool");
43                 for (int i = 0; i < numberOfOperations; i++)
44                 {
45                     ThreadPool.QueueUserWorkItem( _ => {
46                         Console.Write("{0},", Thread.CurrentThread.ManagedThreadId);
47                         Thread.Sleep(TimeSpan.FromSeconds(0.1));
48                         countdown.Signal();
49                     });
50                 }
51                 countdown.Wait();
52                 Console.WriteLine();
53             }
54         }
55     }

分別用創建大量線程的方式和線程池的方式執行500個Thread.Sleep(TimeSpan.FromSeconds(0.1))操作,

我們發現線程池花費了更多的時間,但是占用的資源數目很少(通過ThreadId來看)。

 

3.5實現一個取消選項

使用CancellationTokenSource和CancellationToken兩個類來實現工作線程工作的取消操作。

實例:
1     class Program
2     {
3         static void Main(string[] args)
4         {
5              using (var cts = new CancellationTokenSource())
6              {
7                   CancellationToken token = cts.Token;
8                   ThreadPool.QueueUserWorkItem(_ => AsyncOperation1(token));
9                   Thread.Sleep(TimeSpan.FromSeconds(2));
10                  cts.Cancel();
11             }
12 
13             using (var cts = new CancellationTokenSource())
14             {
15                  CancellationToken token = cts.Token;
16                  ThreadPool.QueueUserWorkItem(_ => AsyncOperation2(token));
17                  Thread.Sleep(TimeSpan.FromSeconds(2));
18                  cts.Cancel();
19              }
20 
21              using (var cts = new CancellationTokenSource())
22              {
23                   CancellationToken token = cts.Token;
24                   ThreadPool.QueueUserWorkItem(_ => AsyncOperation3(token));
25                   Thread.Sleep(TimeSpan.FromSeconds(2));
26                   cts.Cancel();
27              }
28 
29             Thread.Sleep(TimeSpan.FromSeconds(2));
30         }
31 
32        /// <summary>
33        /// 第一種采用輪詢IsCancellationRequested屬性的方式,如果為true,那么操作被取消
34        /// </summary>
35        /// <param name="token"></param>
36         static void AsyncOperation1(CancellationToken token)
37         {
38             Console.WriteLine("Starting the first task");
39             for (int i = 0; i < 5; i++)
40             {
41                 if (token.IsCancellationRequested)
42                 {
43                     Console.WriteLine("The first task has been canceled.");
44                     return;
45                 }
46                 Thread.Sleep(TimeSpan.FromSeconds(1));
47             }
48             Console.WriteLine("The first task has completed succesfully");
49         }
50        /// <summary>
51        /// 拋出一個OperationCancelledException異常
52        /// 這個允許操作之外控制取消過程,即需要取消操作的時候,通過操作之外的代碼來處理
53        /// </summary>
54        /// <param name="token"></param>
55         static void AsyncOperation2(CancellationToken token)
56         {
57             try
58             {
59                 Console.WriteLine("Starting the second task");
60 
61                 for (int i = 0; i < 5; i++)
62                 {
63                     token.ThrowIfCancellationRequested();
64                     Thread.Sleep(TimeSpan.FromSeconds(1));
65                 }
66                 Console.WriteLine("The second task has completed succesfully");
67             }
68             catch (OperationCanceledException)
69             {
70                 Console.WriteLine("The second task has been canceled.");
71             }
72         }
73        /// <summary>
74        /// 第三種注冊一個回調函數,當操作被取消時候,調用回調函數
75        /// </summary>
76        /// <param name="token"></param>
77         private static void AsyncOperation3(CancellationToken token)
78         {
79             bool cancellationFlag = false;
80             token.Register(() => cancellationFlag = true);
81             Console.WriteLine("Starting the third task");
82             for (int i = 0; i < 5; i++)
83             {
84                 if (cancellationFlag)
85                 {
86                     Console.WriteLine("The third task has been canceled.");
87                     return;
88                 }
89                 Thread.Sleep(TimeSpan.FromSeconds(1));
90             }
91             Console.WriteLine("The third task has completed succesfully");
92       

CancellationTokenSourceCancellationToken兩個類是.net4.0一會引入的,目前是實現異步操作取消事實標准。

 

3.6在線程池中使用等待事件處理器和超時

使用線程池當中的Threadpool.RegisterWaitSingleObject類來進行事件案處理。

RegisterWaitSingleObject的原型如下:

1  public static RegisteredWaitHandle RegisterWaitForSingleObject(
2           WaitHandle waitObject,
3           WaitOrTimerCallback callBack,
4           Object state,
5           int millisecondsTimeOutInterval,
6           bool executeOnlyOnce
7           )

參數

waitObject

要注冊的 WaitHandle。使用 WaitHandle 而非 Mutex

callBack

waitObject 參數終止時調用的 WaitOrTimerCallback 委托。

state

傳遞給委托的對象。

timeout

TimeSpan 表示的超時時間。如果 timeout 為零,則函數測試對象的狀態並立即返回。如果 timeout  -1,則函數的超時間隔永遠不過期。

executeOnlyOnce

如果為 true,表示在調用了委托后,線程將不再在 waitObject 參數上等待;如果為 false,表示每次完成等待操作后都重置計時器,直到注銷等待。

 

返回值

封裝本機句柄的 RegisteredWaitHandle

相信看了這些之后大家還是一頭霧水,這個方法的做用是向線程池添加一個可以定時執行的方法,第四個參數millisecondsTimeOutInterval 就是用來設置間隔執行的時間,但是這里第五個參數executeOnlyOnce 會對第四個參數起作用,當它為true時,表示任務僅會執行一次,就是說它不會,像Timer一樣,每隔一定時間執行一次,這個功能的話用Timer控件也可以實現

該方法還在此基礎上提供了基於信號量來觸發執行任務。

信號量也叫開關量,故名思議,它只有兩種狀態,不是true就是false,

WaitHandle就是這類開關量的基礎類,繼承它的類有Mutex,ManualResetEvent,AutoResetEvent,一般我們使用后兩個

寫法: 

        static ManualResetEvent wait2=new ManualResetEvent(false);

        static AutoResetEvent wait=new AutoResetEvent(false); 

我們可以在將其實例化時指定開關量的初始值。(true為有信號,false為沒信號)

ManualResetEvent和AutoResetEvent的區別在於:

前者調用Set方法后將自動將開關量值將一直保持為true,后者調用Set方法將變為true隨后立即變為false,可以將它理解為一個脈沖。

例子
1     class Program
2     {
3         static void Main(string[] args)
4         {
5             //執行兩次RunOperations操作,第一次會超時,第二次不會超時
6             RunOperations(TimeSpan.FromSeconds(5));
7             RunOperations(TimeSpan.FromSeconds(7));
8         }
9 
10         static void RunOperations(TimeSpan workerOperationTimeout)
11         {
12             //定義一個ManualResetEvent信號量,初始為false
13             using (var evt = new ManualResetEvent(false))
14             //實例化一個CancellationTokenSource實例,用於取消操作
15             using (var cts = new CancellationTokenSource())
16             {
17                 Console.WriteLine("Registering timeout operations...");
18                 //注冊超時的被調用的回調函數。
19                 var worker = ThreadPool.RegisterWaitForSingleObject(
20                                         evt,
21                     (state, isTimedOut) => WorkerOperationWait(cts, isTimedOut), 
22                                         null, 
23                                         workerOperationTimeout, 
24                                         true );
25 
26                 Console.WriteLine("Starting long running operation...");
27                 //線程池執行WorkerOperation操作
28                 ThreadPool.QueueUserWorkItem(_ => WorkerOperation(cts.Token, evt));
29 
30                 Thread.Sleep(workerOperationTimeout.Add(TimeSpan.FromSeconds(2)));
31                 worker.Unregister(evt);
32             }
33         }
34 
35        /// <summary>
36        /// 線程池內需要被調用的操作
37        /// </summary>
38        /// <param name="token"></param>
39        /// <param name="evt"></param>
40         static void WorkerOperation(CancellationToken token, ManualResetEvent evt)
41         {
42             for(int i = 0; i < 6; i++)
43             {
44                 if (token.IsCancellationRequested)
45                 {
46                     return;
47                 }
48                 Thread.Sleep(TimeSpan.FromSeconds(1));
49             }
50             //設置信號量,此時evt為true。
51             evt.Set();
52         }
53 
54         /// <summary>
55         /// 超時時候執行的回調函數
56         /// </summary>
57         /// <param name="cts"></param>
58         /// <param name="isTimedOut"></param>
59         static void WorkerOperationWait(CancellationTokenSource cts, bool isTimedOut)
60         {
61             if (isTimedOut)
62             {
63                 cts.Cancel();
64                 Console.WriteLine("Worker operation timed out and was canceled.");
65             }
66             else
67             {
68                 Console.WriteLine("Worker operation succeded.");
69             }
70      

 

3.7在線程池中使用計時器

使用system.Threading.Timer對象來在線程池中創建周期性調用的異步操作。

1     class Program
2     {
3         static void Main(string[] args)
4         {
5             Console.WriteLine("Press 'Enter' to stop the timer...");
6             DateTime start = DateTime.Now;
7             //實例化這個timer類
8             //一秒后執行TimerOperation這個操作,然后每隔2秒執行一次
9             _timer = new Timer(
10                                 _ => TimerOperation(start), 
11                                 null, 
12                                 TimeSpan.FromSeconds(1), 
13                                 TimeSpan.FromSeconds(2));
14 
15             Thread.Sleep(TimeSpan.FromSeconds(6));
16 
17             //改變計時器的運行時間,一秒后執行TimerOperation,然后每隔4秒執行一次
18             _timer.Change(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(4));
19 
20             Console.ReadLine();
21 
22             _timer.Dispose();
23         }
24 
25         static Timer _timer;
26 
27         static void TimerOperation(DateTime start)
28         {
29             TimeSpan elapsed = DateTime.Now - start;
30             Console.WriteLine("{0} seconds from {1}. Timer thread pool thread id: {2}", elapsed.Seconds, start,
31             Thread.CurrentThread.ManagedThreadId);
32         }
33 

 

3.8使用BackgroudWorker組件

本小節將介紹一個異步編程模式的另一種方式,叫基於事件的異步模式(EAP

先看一個例子吧:

1     class Program
2     {
3         static void Main(string[] args)
4         {
5               //實例化一個BackgroundWorker類
6               var bw = new BackgroundWorker();
7               //獲取或設置一個值,該值指示 BackgroundWorker 能否報告進度更新。
8               bw.WorkerReportsProgress = true;
9               //設置后台工作線程是否支持取消操作
10             bw.WorkerSupportsCancellation = true;
11 
12             //給DoWork、ProgressChanged、RunWorkerCompleted事件綁定處理函數
13             bw.DoWork += Worker_DoWork;
14             bw.ProgressChanged += Worker_ProgressChanged;
15             bw.RunWorkerCompleted += Worker_Completed;
16 
17             //啟動異步操作
18             bw.RunWorkerAsync();
19 
20             Console.WriteLine("Press C to cancel work");
21             do
22             {
23                 if (Console.ReadKey(true).KeyChar == 'C')
24                 {
25                     //取消操作
26                     bw.CancelAsync();
27                 }
28                 
29             }
30             while(bw.IsBusy);
31         }
32 
33         static void Worker_DoWork(object sender, DoWorkEventArgs e)
34         {
35             Console.WriteLine("DoWork thread pool thread id: {0}", Thread.CurrentThread.ManagedThreadId);
36             var bw = (BackgroundWorker) sender;
37             for (int i = 1; i <= 100; i++)
38             {
39 
40                 if (bw.CancellationPending)
41                 {
42                     e.Cancel = true;
43                     return;
44                 }
45 
46                 if (i%10 == 0)
47                 {
48                     bw.ReportProgress(i);
49                 }
50 
51                 Thread.Sleep(TimeSpan.FromSeconds(0.1));
52             }
53             e.Result = 42;
54         }
55 
56         static void Worker_ProgressChanged(object sender, ProgressChangedEventArgs e)
57         {
58             Console.WriteLine("{0}% completed. Progress thread pool thread id: {1}", e.ProgressPercentage,
59                 Thread.CurrentThread.ManagedThreadId);
60         }
61 
62         static void Worker_Completed(object sender, RunWorkerCompletedEventArgs e)
63         {
64             Console.WriteLine("Completed threadpool thread id:{0}",Thread.CurrentThread.ManagedThreadId);
65             if (e.Error != null)
66             {
67                 Console.WriteLine("Exception {0} has occured.", e.Error.Message);
68             }
69             else if (e.Cancelled)
70             {
71                 Console.WriteLine("Operation has been canceled.");
72             }
73             else
74             {
75                 Console.WriteLine("The answer is: {0}", e.Result);
76             }
77         }

事件

 

名稱

說明

 

Disposed

當通過調用 Dispose 方法釋放組件時發生。(從 Component 繼承。)

 

DoWork

調用 RunWorkerAsync 時發生。

RunWorkerAsync 方法提交一個啟動以異步方式運行的操作的請求。發出該請求后,將引發 DoWork 事件,該事件隨后開始執行后台操作。

如果后台操作已在運行,則再次調用 RunWorkerAsync 將引發 InvalidOperationException

 

ProgressChanged

調用 ReportProgress 時發生。

public void ReportProgress

(
    int percentProgress
)

percentProgress

已完成的后台操作所占的百分比,范圍從 0% 100%

如果您需要后台操作報告其進度,則可以調用 ReportProgress 方法來引發 ProgressChanged 事件。 WorkerReportsProgress 屬性值必須是 true,否則 ReportProgress 將引發 InvalidOperationException

您需要實現一個有意義的方法,以便按照占已完成的總任務的百分比來度量后台操作的進度。

ReportProgress 方法的調用為異步且立即返回。The ProgressChanged 事件處理程序在創建 BackgroundWorker 的線程上執行。

 

RunWorkerCompleted

當后台操作已完成、被取消或引發異常時發生。

 

 

在該方法中可以知道操作是成功完成還是發生錯誤,亦或被取消。

貼り付け元  <https://msdn.microsoft.com/zh-cn/library/system.componentmodel.backgroundworker.aspx

成功完成的時候

任務取消的時候

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM