線程池ThreadPool的常用方法介紹
如果您理解了線程池目的及優點后,讓我們溫故下線程池的常用的幾個方法:
1. public static Boolean QueueUserWorkItem(WaitCallback wc, Object state);
WaitCallback回調函數就是前文所闡述的應用程序,通過將一些回調函數放入線程池中讓其形成隊列,然后線程池會自動創建或者復用線程
去執行處理這些回調函數,
State: 這個參數也是非常重要的,當執行帶有參數的回調函數時,該參數會將引用傳入,回調方法中,供其使用
3. public static bool SetMaxThreads(int workerThreads,int completionPortThreads);
4. public static bool SetMinThreads(int workerThreads,int completionPortThreads);
3和4方法 CLR線程池類中預留的兩個能夠更改,線程池中的工作線程和I/O線程數量的方法。
使用該方法時有兩點必須注意:
1.不能將輔助線程的數目或 I/O 完成線程的數目設置為小於計算機的處理器數目。
2.微軟不建議程序員使用這兩個方法的原因是可能會影響到線程池中的性能
我們通過一個簡單的例子來溫故下
using System; using System.Threading; namespace ThreadPoolApplication { class Program { //設定任務數量 static int count = 5; static void Main(string[] args) { //關於ManualResetEvent大伙不必深究,后續章將會詳細闡述,這里由於假設 //讓線程池執行5個任務所以也為每個任務加上這個對象保持同步 ManualResetEvent[] events=new ManualResetEvent[count]; Console.WriteLine("當前主線程id:{0}",Thread.CurrentThread.ManagedThreadId); //循環每個任務 for (int i = 0; i < count; i++) { //實例化同步工具 events[i]=new ManualResetEvent(false); //Test在這里就是任務類,將同步工具的引用傳入能保證共享區內每次只有一個線程進入 Test tst = new Test(events[i]); Thread.Sleep(1000); //將任務放入線程池中,讓線程池中的線程執行該任務 ThreadPool.QueueUserWorkItem(tst.DisplayNumber, new { num1=2}); } //注意這里,設定WaitAll是為了阻塞調用線程(主線程),讓其余線程先執行完畢, //其中每個任務完成后調用其set()方法(收到信號),當所有 //的任務都收到信號后,執行完畢,將控制權再次交回調用線程(這里的主線程) ManualResetEvent.WaitAll(events); Console.ReadKey(); } } public class Test { ManualResetEvent manualEvent; public Test(ManualResetEvent manualEvent) { this.manualEvent = manualEvent; } public void DisplayNumber(object a) { Console.WriteLine("當前運算結果:{0}",((dynamic)a).num1); Console.WriteLine("當前子線程id:{0} 的狀態:{1}", Thread.CurrentThread.ManagedThreadId,Thread.CurrentThread.ThreadState); //這里是方法執行時間的模擬,如果注釋該行代碼,就能看出線程池的功能了 //Thread.Sleep(30000); //這里是釋放共享鎖,讓其他線程進入 manualEvent.Set(); } } }
執行結果:
從顯示結果能夠看出線程池只創建了id為9,10,11這3個線程來處理這5個任務,因為每個任務的執行時間非常短,所以線程池
的優勢被展現出來了
如果我們去掉DisplayNumber方法中的Thread.Sleep(30000) 的注釋的話,會發現由於任務的執行時間遠遠超於任務在隊列中的
排隊時間,所以線程池開啟了5個線程來執行任務
在很多時候例如UI或者IO操作時我們希望將這些很復雜且耗時比較長的邏輯交給后台線程去處理,而不想影響頁面的正常運行,而且
我們希望后台線程能夠觸發一個回調事件來提示該任務已經完成,所以基於這種需求越來越多而且在復雜的邏輯下也難以避免一些多線
程的死鎖,所以微軟為我們提供了一個屬於微軟自己的異步線程的概念,上一章提到了多線程和異步的基本概念和區別大家可以去溫故下,
線程異步指的是一個調用請求發送給被調用者,而調用者不用等待其結果的返回,一般異步執行的任務都需要比較長的時間, |
相信大家理解的異步的概念后都能對異步的根源有個初步的認識,和線程一樣,異步也是針對執行方法而設計的,也就是說當我們執行一個
方法時,使用異步方式可以不阻礙主線程的運行而獨立運行,直到執行完畢后觸發回調事件,注意,.net異步線程也是通過內部線程池建立
的,雖然微軟將其封裝了起來,但是我們也必須了解下
由於委托是方法的抽象,那么如果委托上能設定異步調用的話,方法也能實現異步,所以本節用異步委托來解釋下異步線程的工作過程
前文和前一章節中提到了多線程和異步的區別,對於異步線程來說,這正是體現了其工作方式:
調用者發送一個請求 -> 調用者去做自己的事情 -> 請求會異步執行 -> 執行完畢可以利用回調函數告訴調用者(也可以不用) |
在詳細說明這幾個過程之前,讓我們來了解下下面的幾個重要的元素
AsyncCallback 委托
其實這個委托是微軟給我們提供的用於異步執行方法體后通知該異步方法已經完成。AsyncCallBack抽象了所有異步方法執行后回調函數(方法)
,它規定了回調函數(方法)必須擁有一個IAsyncResult的參數並且沒有返回值,
IAsyncResult 接口
讓我們先來看下msdn上關於它的解釋
- IAsyncResult 接口由包含可異步操作的方法的類實現。它是啟動異步操作的方法的返回類型,也是結束異步操作的方法的第三個參數的類型
- 當異步操作完成時,IAsyncResult 對象也將傳遞給由 AsyncCallback 委托調用的方法
對於第一條的解釋,以下兩條代碼能夠直觀的理解:
有時候主線程需要等待異步執行后才能執行,雖然這違背的異步的初衷但是還是可以納入可能的需求行列,所以如果我們在beginInoke 后立刻使用EndInvoke的話,主線程(調用者)會被阻塞,直到異步線程執行完畢后在啟動執行 |
對於第二條的解釋:
結束異步操作時需要使用的回調方法,這里IAsyncResult作為參數被傳遞進了個這方法,這時IAsyncResult起到了向回調方
法傳遞信息的作用,關於這點會在后文的異步線程的工作過程中詳細解釋下
我們最后再來看下IAsyncResult的幾個重要屬性
在這里再次強調下IAsyncResult第一個屬性AsyncState的作用,就像前面所說,有時我們需要將回調函數的參數傳入到回調方法體中,
當然傳入入口在BeginInvoke的第二個參數中,在回調函數體中我們可以通過將這個屬性類型轉換成和BeginInvoke第二個參數一摸
一樣的類型后加以使用
關於IAsyncResult最后還有一點補充:
如果IAsyncResult本身的功能還不能滿足你的需要的話,可以自定義實現自己的AsyncResult類,但必須實現這個接口 |
理解了以上兩個關於異步至關重要的2個元素后,讓我們進入一段段代碼,在來詳細看下異步線程的執行過程
//定義一個委托 public delegate void DoSomething(); static void Main(string[] args) { //1.實例化一個委托,調用者發送一個請求,請求執行該方法體(還未執行) DoSomething doSomething = new DoSomething( () => { Console.WriteLine("如果委托使用beginInvoke的話,這里便是異步方法體"); //4,實現完這個方法體后自動觸發下面的回調函數方法體 }); //3 。調用者(主線程)去觸發異步調用,采用異步的方式請求上面的方法體 IAsyncResult result= doSomething.BeginInvoke( //2.自定義上面方法體執行后的回調函數 new AsyncCallback ( //5.以下是回調函數方法體 //asyncResult.AsyncState其實就是AsyncCallback委托中的第二個參數 asyncResult => { doSomething.EndInvoke(asyncResult); Console.WriteLine(asyncResult.AsyncState.ToString()); } ) , "BeginInvoke方法的第二個參數就是傳入AsyncCallback中的AsyncResult.AsyncState,我們使用時可以強轉成相關類型加以使用"); //DoSomething......調用者(主線程)會去做自己的事情 Console.ReadKey(); }
大家仔細看這面這段非常簡單的代碼,為了大家理解方便我特意為異步執行過程加上了特有的注釋和序列號,這樣的話,大伙能直觀初步的理解了異步的執行過程。
讓我們根據序列號來說明下:
1. 實例化一個委托,調用者發送一個請求,請求執行該方法體(還未執行) 首先將委實例化並且定義好委托所請求的方法體,但是這個時候方法體是不會運行的 2. 這時候和第一步所相似的是,這里可以將定義好的回調函數AsyncCallback 方法體寫入BeginInvoke的第一個參數,將需要傳入回調方法體的參數放入第二個參數 3.調用者(主線程)去觸發異步調用(執行BeginInvoke方法),采用異步的方式執行委托中的方法體 4.實現完這個方法體后自動觸發下面的AsyncCallback中的方法體回調函數(可以設定回調函數為空來表示不需要回調) 5 . 執行回調函數方法體,注意使用委托的 EndInvoke方法結束異步操作,並且輸出顯示傳入異步回調函數的參數 再次強調第五點: (1) 由於使用了回調函數,所以必然異步方法體已經執行過了,所以在回調函數中使用EndInvoke方法是不會阻塞的, (2) 能通過EndInvoke方法獲得一些返回結果,例如FileStream.EndRead()能夠返回讀取的字節數等等 |
6 有必要簡單介紹下Classic Async Pattern 和Event-based Async Pattern
首先介紹下Classic Async Pattern:
其實Classic Async Pattern指的就是我們常見的BeginXXX和EndXXX
IAsyncResult 異步設計模式通過名為 BeginOperationName 和 EndOperationName 的兩個方法來實現原同步方法的異步調用
讓我們再來回顧下.net中的幾個的BeginXXX 和EndXXX
Stream中的BeginRead,EndRead,BeginWrite,EndWrite Socket中的BeginReceive,EndReceive HttpWebRequest的BeginGetRequestStream和EndGetRequestStream.... |
再來介紹下Event-based Async Pattern
Event-based Async Pattern 值的是類似於 xxxxxxxAsync() 和 類似於event xxxxxCompleteHander
通過一個方法和一個完成事件來處理異步操作
.net中的例子:
WebClient.DownloadStringAsync(string uri)和 event DownloadStringCompleteEventHandler |
其實Classic Async Pattern和Event-based Async Pattern都是一種異步的設計思路,我們也可以根據這一系列的
思路去實現自己的異步方法
微軟貌似現在把精力放在win8或WinPhone的metro上,而且記得在win 8開發者培訓的會議上,着重闡述了微軟對於異步的支持將越來越強,而且為了快
速響應諸如移動設備的應用程序,微軟也在爭取為每個方法都實現一個異步版本…..可見異步的重要性,相信異步的發展趨勢是個不錯的
上升曲線,還沒反應過來.net4.5的異步新特性便誕生了。首先經歷過異步摧殘的我們,都會有這樣一個感受,往往回調方法和普通方法
會搞錯,在復雜的項目面前,有時候簡直無法維護,到處都是回調函數,眼花繚亂 所以微軟為了簡化異步的實現過程,甚至大刀闊斧將
回調函數做成看起來像同步方法,雖然覺得很詭異,還是讓我們初步了解下這種異步的新特性
先看代碼
/// <summary> /// .net 4.5 中 async 和 await 全新的關鍵字 一起實現異步的簡化 /// </summary> void async ShowUriContent(string uri) { using (FileStream fs = File.OpenRead("你的文件地址")) { using (FileStream fs2 = new FileStream()) { byte[] buffer = new byte[4096]; //FileStream的ReadAsync方法也是net4.5版本出現的,它返回一個Task<int>對象 //而且作用於await后的異步代碼會等待阻塞直到異步方法完成后返回 int fileBytesLength = await fs.ReadAsync(buffer,0,buffer.Length).ConfigureAwait(false); while(fileBytesLength>0) { //FileStream的WriteAsync方法也是net4.5版本出現的 await fs2.WriteAsync(buffer,0,buffer.Length).ConfigureAwait(false); } } } }
相信看完代碼后大家有耳目一新的感覺,不錯,原本異步調用的回調函數不見了,取而代之的是await和方法聲明上的async關鍵字,新特性允許
我們實現這倆個關鍵字后便能在方法中實現“同步方式”的異步方法,其實這解決了一些棘手的問題,諸如原本需要在回調事件里才能釋放的文件句
柄在這里和同步方法一樣,使用using便搞定了,還有截獲異常等等,都不用像之前那樣痛苦了,這里還有一些東東需要關注下,大家先不用去深
究ConfigureAwait這個方法,由於ReadAsync和 WriteAsync方法是.net 4.5新加的屬於返回Task<int>類型的方法所以使用ConfigureAwait
方法能夠將數值取到,關於Task泛型類我會在今后的章節中詳細闡述
自定義一個簡單的線程池
static void Main(string[] args) { ThreadStart[] startArray = { new ThreadStart(()=>{ Console.WriteLine("第一個任務"); }), new ThreadStart(()=>{Console.WriteLine("第二個任務");}), new ThreadStart(()=>{Console.WriteLine("第三個任務");}), new ThreadStart(()=>{Console.WriteLine("第四個任務");}), }; MyThreadPool.SetMaxWorkThreadCount(2); MyThreadPool.MyQueueUserWorkItem(startArray); Console.ReadKey(); } /// <summary> /// 自定義一個簡單的線程池,該線程池實現了默認開啟線程數 /// 當最大線程數全部在繁忙時,循環等待,只到至少一個線程空閑為止 /// 本示例使用BackgroundWorker模擬后台線程,任務將自動進入隊列和離開 /// 隊列 /// </summary> sealed class MyThreadPool { //線程鎖對象 private static object lockObj = new object(); //任務隊列 private static Queue<ThreadStart> threadStartQueue = new Queue<ThreadStart>(); //記錄當前工作的任務集合,從中可以判斷當前工作線程使用數,如果使用int判斷的話可能會有問題, //用集合的話還能取得對象的引用,比較好 private static HashSet<ThreadStart> threadsWorker = new HashSet<ThreadStart>(); //當前允許最大工作線程數 private static int maxThreadWorkerCount = 1; //當前允許最小工作線程數 private static int minThreadWorkerCount = 0; /// <summary> /// 設定最大工作線程數 /// </summary> /// <param name="maxThreadCount">數量</param> public static void SetMaxWorkThreadCount(int maxThreadCount) { maxThreadWorkerCount =minThreadWorkerCount>maxThreadCount? minThreadWorkerCount : maxThreadCount; } /// <summary> /// 設定最小工作線程數 /// </summary> /// <param name="maxThreadCount">數量</param> public static void SetMinWorkThreadCount(int minThreadCount) { minThreadWorkerCount = minThreadCount > maxThreadWorkerCount ? maxThreadWorkerCount : minThreadCount; } /// <summary> /// 啟動線程池工作 /// </summary> /// <param name="threadStartArray">任務數組</param> public static void MyQueueUserWorkItem(ThreadStart[] threadStartArray) { //將任務集合都放入到線程池中 AddAllThreadsToPool(threadStartArray); //線程池執行任務 ExcuteTask(); } /// <summary> /// 將單一任務加入隊列中 /// </summary> /// <param name="ts">單一任務對象</param> private static void AddThreadToQueue(ThreadStart ts) { lock (lockObj) { threadStartQueue.Enqueue(ts); } } /// <summary> /// 將多個任務加入到線程池的任務隊列中 /// </summary> /// <param name="threadStartArray">多個任務</param> private static void AddAllThreadsToPool(ThreadStart[] threadStartArray) { foreach (var threadStart in threadStartArray) AddThreadToQueue(threadStart); } /// <summary> /// 執行任務,判斷隊列中的任務數量是否大於0,如果是則判斷當前正在使用的工作線程的 /// 數量是否大於等於允許的最大工作線程數,如果一旦有線程空閑的話 /// 就會執行ExcuteTaskInQueen方法處理任務 /// </summary> private static void ExcuteTask() { while (threadStartQueue.Count > 0) { if (threadsWorker.Count < maxThreadWorkerCount) { ExcuteTaskInQueen(); } } } /// <summary> /// 執行出對列的任務,加鎖保護 /// </summary> private static void ExcuteTaskInQueen() { lock (lockObj) { ExcuteTaskByThread( threadStartQueue.Dequeue()); } } /// <summary> /// 實現細節,這里使用BackGroudWork來實現后台線程 /// 注冊doWork和Completed事件,當執行一個任務前,前將任務加入到 /// 工作任務集合(表示工作線程少了一個空閑),一旦RunWorkerCompleted事件被觸發則將任務從工作 /// 任務集合中移除(表示工作線程也空閑了一個) /// </summary> /// <param name="threadStart"></param> private static void ExcuteTaskByThread(ThreadStart threadStart) { threadsWorker.Add(threadStart); BackgroundWorker worker = new BackgroundWorker(); worker.DoWork += (o, e) => { threadStart.Invoke(); }; worker.RunWorkerCompleted += (o, e) => { threadsWorker.Remove(threadStart); }; worker.RunWorkerAsync(); } }
顯示結果:
有時我們需要使用IHttpAsyncHandler來異步實現一些特定的功能,讓我用很簡單的示例來闡述這個過程
1:首先編寫Handler1的邏輯,注意要繼承IHttpAsyncHandler接口
/// <summary> /// 異步IHttpHandler,實現了一個簡單的統計流量的功能, /// 由於是示例代碼,所以沒有判斷IP或者MAC /// </summary> public class Handler1 : IHttpAsyncHandler { //默認訪問量是0 static int visitCount = 0; /// <summary> /// 這個HttpHandler的同步方法 /// </summary> /// <param name="context"></param> public void ProcessRequest(HttpContext context) { } public bool IsReusable { get { return false; } } /// <summary> /// 實現IHttpAsyncHandler 接口方法 /// </summary> /// <param name="context">當前HttpContext</param> /// <param name="cb">回調函數</param> /// <param name="extraData"></param> /// <returns></returns> public IAsyncResult BeginProcessRequest(HttpContext context, AsyncCallback cb, object extraData) { //這里可以加上判斷IP或mac的方法 visitCount++; //實例化AsyncUserVisiteCounterResult對象 AsyncUserVisiteCounterResult result = new AsyncUserVisiteCounterResult(cb, visitCount, context); result.Display(); return result; } /// <summary> /// 結束本次IHttpAsyncHandler工作時觸發的request方法 /// </summary> /// <param name="result"></param> public void EndProcessRequest(IAsyncResult result) { } } /// <summary> /// 自定義IAsyncResult 實現我們額外的Display方法 /// </summary> public class AsyncUserVisiteCounterResult : IAsyncResult { //回調參數 private object _param; //是否異步執行完成 private bool _asyncIsComplete; //回調方法 private AsyncCallback _callBack; //當前上下文 private HttpContext _context; public AsyncUserVisiteCounterResult(AsyncCallback callBack, object stateParam, HttpContext context) { this._callBack = callBack; this._param = stateParam; _asyncIsComplete = false; this._context = context; } public object AsyncState { get { return this._param; } } /// <summary> /// 等待句柄用於同步功能,關於等待句柄會在后續章節陳述 /// </summary> public System.Threading.WaitHandle AsyncWaitHandle { get { return null; } } /// <summary> /// 該屬性表示不需要異步任務同步完成 /// </summary> public bool CompletedSynchronously { get { return false; } } /// <summary> /// 該屬性表示異步任務是否已經執行完成 /// </summary> public bool IsCompleted { get { return this._asyncIsComplete; } } /// <summary> /// 自定義的額外功能,需要注意的是,執行完異步功能后 /// 要將_asyncIsComplete設置為true表示任務執行完畢而且 /// 執行回調方法,否則異步工作無法結束頁面會卡死 /// </summary> public void Display() { //這里先不用waitHandle句柄來實現同步 lock (this) { this._context.Response.Write("你是第" + (int)this._param + "位訪問者,訪問時間:"+DateTime.Now.ToString()); this._asyncIsComplete = true; this._callBack(this); } } }
2 在web.config中添加相應的配置,注意path指的是.ashx所在的路徑,指的是相應的文件類型
<httpHandlers> <add verb="*" path="AsyncThreadInAsp.net.Handler1.ashx" type="AsyncThreadInAsp.net.Handler1"/> </httpHandlers>
3 最后在頁面中訪問