ThreadPool 類
提供一個線程池,該線程池可用於執行任務、發送工作項、處理異步 I/O、代表其他線程等待以及處理計時器。
命名空間: System.Threading
程序集: mscorlib(位於 mscorlib.dll)版本信息
.NET Framework
自 1.1 起可用
可移植類庫
在 可移植 .NET 平台 中受支持
Silverlight
自 2.0 起可用
Windows Phone Silverlight
自 7.0 起可用
一邊說着要用技術安身立命,一邊感嘆自己的野生屬性。好吧,知之為知之,不知就不知。我"以為"是這樣這樣那樣那樣,這樣說真是沒意思。現在的疑惑有以下幾點:
- 1、線程池內部有幾個工作線程?
- 2、使用線程池的正確姿勢(場景和控制)?
- 3、有必要自己封裝一個不?
開始看MSDN文檔
[HostProtectionAttribute(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)] public static class ThreadPool
方法
偽代碼
#region 程序集 mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 // C:\Program Files (x86)\Reference Assemblies\Microsoft\Framework\.NETFramework\v4.0\mscorlib.dll #endregion using System.Runtime.InteropServices; using System.Security; namespace System.Threading { // // 摘要: // 提供一個線程池,該線程池可用於發送工作項、處理異步 I/O、代表其他線程等待以及處理計時器。 public static class ThreadPool { // // 摘要: // 將操作系統句柄綁定到 System.Threading.ThreadPool。 // // 參數: // osHandle: // 保存操作系統句柄的 System.Runtime.InteropServices.SafeHandle。在非托管端必須為重疊 I/O 打開該句柄。 // // 返回結果: // 如果綁定了句柄,則為 true;否則為 false。 // // 異常: // T:System.ArgumentNullException: // osHandle 為 null。 [SecuritySafeCritical] public static bool BindHandle(SafeHandle osHandle); // // 摘要: // 將操作系統句柄綁定到 System.Threading.ThreadPool。 // // 參數: // osHandle: // 持有句柄的 System.IntPtr。在非托管端必須為重疊 I/O 打開該句柄。 // // 返回結果: // 如果綁定了句柄,則為 true;否則為 false。 // // 異常: // T:System.Security.SecurityException: // 調用方沒有所要求的權限。 [Obsolete("ThreadPool.BindHandle(IntPtr) has been deprecated. Please use ThreadPool.BindHandle(SafeHandle) instead.", false)] [SecuritySafeCritical] public static bool BindHandle(IntPtr osHandle); // // 摘要: // 檢索由 System.Threading.ThreadPool.GetMaxThreads(System.Int32@,System.Int32@) 方法返回的最大線程池線程數和當前活動線程數之間的差值。 // // 參數: // workerThreads: // 可用輔助線程的數目。 // // completionPortThreads: // 可用異步 I/O 線程的數目。 [SecuritySafeCritical] public static void GetAvailableThreads(out int workerThreads, out int completionPortThreads); // // 摘要: // 檢索可以同時處於活動狀態的線程池請求的數目。所有大於此數目的請求將保持排隊狀態,直到線程池線程變為可用。 // // 參數: // workerThreads: // 線程池中輔助線程的最大數目。 // // completionPortThreads: // 線程池中異步 I/O 線程的最大數目。 [SecuritySafeCritical] public static void GetMaxThreads(out int workerThreads, out int completionPortThreads); // // 摘要: // 檢索線程池在新請求預測中維護的空閑線程數。 // // 參數: // workerThreads: // 當前由線程池維護的空閑輔助線程的最小數目。 // // completionPortThreads: // 當前由線程池維護的空閑異步 I/O 線程的最小數目。 [SecuritySafeCritical] public static void GetMinThreads(out int workerThreads, out int completionPortThreads); // // 摘要: // 將方法排入隊列以便執行。此方法在有線程池線程變得可用時執行。 // // 參數: // callBack: // 一個 System.Threading.WaitCallback,表示要執行的方法。 // // 返回結果: // 如果此方法成功排隊,則為 true;如果未能將該工作項排隊,則引發 System.NotSupportedException。 // // 異常: // T:System.ArgumentNullException: // callBack 為 null。 // // T:System.NotSupportedException: // 承載公共語言運行時 (CLR) 的宿主不支持此操作。 [SecuritySafeCritical] public static bool QueueUserWorkItem(WaitCallback callBack); // // 摘要: // 將方法排入隊列以便執行,並指定包含該方法所用數據的對象。此方法在有線程池線程變得可用時執行。 // // 參數: // callBack: // System.Threading.WaitCallback,它表示要執行的方法。 // // state: // 包含方法所用數據的對象。 // // 返回結果: // 如果此方法成功排隊,則為 true;如果未能將該工作項排隊,則引發 System.NotSupportedException。 // // 異常: // T:System.NotSupportedException: // 承載公共語言運行時 (CLR) 的宿主不支持此操作。 // // T:System.ArgumentNullException: // callBack 為 null。 [SecuritySafeCritical] public static bool QueueUserWorkItem(WaitCallback callBack, object state); // // 摘要: // 注冊一個等待 System.Threading.WaitHandle 的委托,並指定一個 System.TimeSpan 值來表示超時時間。 // // 參數: // waitObject: // 要注冊的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 參數終止時調用的 System.Threading.WaitOrTimerCallback 委托。 // // state: // 傳遞給委托的對象。 // // timeout: // System.TimeSpan 表示的超時時間。如果 timeout 為 0(零),則函數將測試對象的狀態並立即返回。如果 timeout 為 -1,則函數的超時間隔永遠不過期。 // // executeOnlyOnce: // 如果為 true,表示在調用了委托后,線程將不再在 waitObject 參數上等待;如果為 false,表示每次完成等待操作后都重置計時器,直到注銷等待。 // // 返回結果: // 封裝本機句柄的 System.Threading.RegisteredWaitHandle。 // // 異常: // T:System.ArgumentOutOfRangeException: // timeout 參數小於 -1。 // // T:System.NotSupportedException: // timeout 參數大於 System.Int32.MaxValue。 [SecuritySafeCritical] public static RegisteredWaitHandle RegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, TimeSpan timeout, bool executeOnlyOnce); // // 摘要: // 注冊一個等待 System.Threading.WaitHandle 的委托,並指定一個 64 位有符號整數來表示超時值(以毫秒為單位)。 // // 參數: // waitObject: // 要注冊的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 參數終止時調用的 System.Threading.WaitOrTimerCallback 委托。 // // state: // 傳遞給委托的對象。 // // millisecondsTimeOutInterval: // 以毫秒為單位的超時。如果 millisecondsTimeOutInterval 參數為 0(零),函數將測試對象的狀態並立即返回。如果 millisecondsTimeOutInterval // 為 -1,則函數的超時間隔永遠不過期。 // // executeOnlyOnce: // 如果為 true,表示在調用了委托后,線程將不再在 waitObject 參數上等待;如果為 false,表示每次完成等待操作后都重置計時器,直到注銷等待。 // // 返回結果: // 封裝本機句柄的 System.Threading.RegisteredWaitHandle。 // // 異常: // T:System.ArgumentOutOfRangeException: // millisecondsTimeOutInterval 參數小於 -1。 [SecuritySafeCritical] public static RegisteredWaitHandle RegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, long millisecondsTimeOutInterval, bool executeOnlyOnce); // // 摘要: // 注冊一個等待 System.Threading.WaitHandle 的委托,並指定一個 32 位有符號整數來表示超時值(以毫秒為單位)。 // // 參數: // waitObject: // 要注冊的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 參數終止時調用的 System.Threading.WaitOrTimerCallback 委托。 // // state: // 傳遞給委托的對象。 // // millisecondsTimeOutInterval: // 以毫秒為單位的超時。如果 millisecondsTimeOutInterval 參數為 0(零),函數將測試對象的狀態並立即返回。如果 millisecondsTimeOutInterval // 為 -1,則函數的超時間隔永遠不過期。 // // executeOnlyOnce: // 如果為 true,表示在調用了委托后,線程將不再在 waitObject 參數上等待;如果為 false,表示每次完成等待操作后都重置計時器,直到注銷等待。 // // 返回結果: // 封裝本機句柄的 System.Threading.RegisteredWaitHandle。 // // 異常: // T:System.ArgumentOutOfRangeException: // millisecondsTimeOutInterval 參數小於 -1。 [SecuritySafeCritical] public static RegisteredWaitHandle RegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, int millisecondsTimeOutInterval, bool executeOnlyOnce); // // 摘要: // 指定表示超時(以毫秒為單位)的 32 位無符號整數,注冊一個委托等待 System.Threading.WaitHandle。 // // 參數: // waitObject: // 要注冊的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 參數終止時調用的 System.Threading.WaitOrTimerCallback 委托。 // // state: // 傳遞給委托的對象。 // // millisecondsTimeOutInterval: // 以毫秒為單位的超時。如果 millisecondsTimeOutInterval 參數為 0(零),函數將測試對象的狀態並立即返回。如果 millisecondsTimeOutInterval // 為 -1,則函數的超時間隔永遠不過期。 // // executeOnlyOnce: // 如果為 true,表示在調用了委托后,線程將不再在 waitObject 參數上等待;如果為 false,表示每次完成等待操作后都重置計時器,直到注銷等待。 // // 返回結果: // System.Threading.RegisteredWaitHandle,可用於取消已注冊的等待操作。 // // 異常: // T:System.ArgumentOutOfRangeException: // millisecondsTimeOutInterval 參數小於 -1。 [CLSCompliant(false)] [SecuritySafeCritical] public static RegisteredWaitHandle RegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, uint millisecondsTimeOutInterval, bool executeOnlyOnce); // // 摘要: // 設置可以同時處於活動狀態的線程池的請求數目。所有大於此數目的請求將保持排隊狀態,直到線程池線程變為可用。 // // 參數: // workerThreads: // 線程池中輔助線程的最大數目。 // // completionPortThreads: // 線程池中異步 I/O 線程的最大數目。 // // 返回結果: // 如果更改成功,則為 true;否則為 false。 [SecuritySafeCritical] public static bool SetMaxThreads(int workerThreads, int completionPortThreads); // // 摘要: // 設置線程池在新請求預測中維護的空閑線程數。 // // 參數: // workerThreads: // 要由線程池維護的新的最小空閑輔助線程數。 // // completionPortThreads: // 要由線程池維護的新的最小空閑異步 I/O 線程數。 // // 返回結果: // 如果更改成功,則為 true;否則為 false。 [SecuritySafeCritical] public static bool SetMinThreads(int workerThreads, int completionPortThreads); // // 摘要: // 將重疊的 I/O 操作排隊以便執行。 // // 參數: // overlapped: // 要排隊的 System.Threading.NativeOverlapped 結構。 // // 返回結果: // 如果成功地將此操作排隊到 I/O 完成端口,則為 true;否則為 false。 [CLSCompliant(false)] [SecurityCritical] public static bool UnsafeQueueNativeOverlapped(NativeOverlapped* overlapped); // // 摘要: // 將指定的委托排隊到線程池,但不會將調用堆棧傳播到輔助線程。 // // 參數: // callBack: // 一個 System.Threading.WaitCallback,表示當線程池中的線程選擇工作項時調用的委托。 // // state: // 在接受線程池服務時傳遞給委托的對象。 // // 返回結果: // 如果方法成功,則為 true;如果未能將該工作項排隊,則引發 System.OutOfMemoryException。 // // 異常: // T:System.Security.SecurityException: // 調用方沒有所要求的權限。 // // T:System.ApplicationException: // 遇到了內存不足的情況。 // // T:System.OutOfMemoryException: // 未能將該工作項排隊。 // // T:System.ArgumentNullException: // callBack 為 null。 [SecurityCritical] public static bool UnsafeQueueUserWorkItem(WaitCallback callBack, object state); // // 摘要: // 注冊一個等待 System.Threading.WaitHandle 的委托,並指定一個 64 位有符號整數來表示超時值(以毫秒為單位)。不將調用堆棧傳播到輔助線程。 // // 參數: // waitObject: // 要注冊的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 參數終止時調用的委托。 // // state: // 傳遞給委托的對象。 // // millisecondsTimeOutInterval: // 以毫秒為單位的超時。如果 millisecondsTimeOutInterval 參數為 0(零),函數將測試對象的狀態並立即返回。如果 millisecondsTimeOutInterval // 為 -1,則函數的超時間隔永遠不過期。 // // executeOnlyOnce: // 如果為 true,表示在調用了委托后,線程將不再在 waitObject 參數上等待;如果為 false,表示每次完成等待操作后都重置計時器,直到注銷等待。 // // 返回結果: // System.Threading.RegisteredWaitHandle 對象,可用於取消已注冊的等待操作。 // // 異常: // T:System.ArgumentOutOfRangeException: // millisecondsTimeOutInterval 參數小於 -1。 // // T:System.Security.SecurityException: // 調用方沒有所要求的權限。 [SecurityCritical] public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, long millisecondsTimeOutInterval, bool executeOnlyOnce); // // 摘要: // 指定表示超時(以毫秒為單位)的 32 位無符號整數,注冊一個委托等待 System.Threading.WaitHandle。不將調用堆棧傳播到輔助線程。 // // 參數: // waitObject: // 要注冊的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 參數終止時調用的委托。 // // state: // 傳遞給委托的對象。 // // millisecondsTimeOutInterval: // 以毫秒為單位的超時。如果 millisecondsTimeOutInterval 參數為 0(零),函數將測試對象的狀態並立即返回。如果 millisecondsTimeOutInterval // 為 -1,則函數的超時間隔永遠不過期。 // // executeOnlyOnce: // 如果為 true,表示在調用了委托后,線程將不再在 waitObject 參數上等待;如果為 false,表示每次完成等待操作后都重置計時器,直到注銷等待。 // // 返回結果: // System.Threading.RegisteredWaitHandle 對象,可用於取消已注冊的等待操作。 // // 異常: // T:System.Security.SecurityException: // 調用方沒有所要求的權限。 [CLSCompliant(false)] [SecurityCritical] public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, uint millisecondsTimeOutInterval, bool executeOnlyOnce); // // 摘要: // 注冊一個等待 System.Threading.WaitHandle 的委托,並指定一個 System.TimeSpan 值來表示超時時間。不將調用堆棧傳播到輔助線程。 // // 參數: // waitObject: // 要注冊的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 參數終止時調用的委托。 // // state: // 傳遞給委托的對象。 // // timeout: // System.TimeSpan 表示的超時時間。如果 timeout 為 0(零),則函數將測試對象的狀態並立即返回。如果 timeout 為 -1,則函數的超時間隔永遠不過期。 // // executeOnlyOnce: // 如果為 true,表示在調用了委托后,線程將不再在 waitObject 參數上等待;如果為 false,表示每次完成等待操作后都重置計時器,直到注銷等待。 // // 返回結果: // System.Threading.RegisteredWaitHandle 對象,可用於取消已注冊的等待操作。 // // 異常: // T:System.ArgumentOutOfRangeException: // timeout 參數小於 -1。 // // T:System.NotSupportedException: // timeout 參數大於 System.Int32.MaxValue。 // // T:System.Security.SecurityException: // 調用方沒有所要求的權限。 [SecurityCritical] public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, TimeSpan timeout, bool executeOnlyOnce); // // 摘要: // 注冊一個等待 System.Threading.WaitHandle 的委托,並使用一個 32 位帶符號整數來表示超時時間(以毫秒為單位)。不將調用堆棧傳播到輔助線程。 // // 參數: // waitObject: // 要注冊的 System.Threading.WaitHandle。使用 System.Threading.WaitHandle 而非 System.Threading.Mutex。 // // callBack: // waitObject 參數終止時調用的委托。 // // state: // 傳遞給委托的對象。 // // millisecondsTimeOutInterval: // 以毫秒為單位的超時。如果 millisecondsTimeOutInterval 參數為 0(零),函數將測試對象的狀態並立即返回。如果 millisecondsTimeOutInterval // 為 -1,則函數的超時間隔永遠不過期。 // // executeOnlyOnce: // 如果為 true,表示在調用了委托后,線程將不再在 waitObject 參數上等待;如果為 false,表示每次完成等待操作后都重置計時器,直到注銷等待。 // // 返回結果: // System.Threading.RegisteredWaitHandle 對象,可用於取消已注冊的等待操作。 // // 異常: // T:System.ArgumentOutOfRangeException: // millisecondsTimeOutInterval 參數小於 -1。 // // T:System.Security.SecurityException: // 調用方沒有所要求的權限。 [SecurityCritical] public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callBack, object state, int millisecondsTimeOutInterval, bool executeOnlyOnce); } }
“承載公共語言運行時 (CLR) 的宿主不支持此操作”出現了好多次吧。試試反編譯mscorlib.dll,看看有什么發現。
[SecurityCritical, SuppressUnmanagedCodeSecurity] [DllImport("QCall", CharSet = CharSet.Unicode)] internal static extern bool AdjustThreadsInPool(uint QueueLength); [CLSCompliant(false), SecurityCritical] public unsafe static bool UnsafeQueueNativeOverlapped(NativeOverlapped* overlapped) { } [SecurityCritical, SuppressUnmanagedCodeSecurity] [DllImport("QCall", CharSet = CharSet.Unicode)] internal static extern bool ShouldUseNewWorkerPool(); [SecurityCritical, SuppressUnmanagedCodeSecurity] [DllImport("QCall", CharSet = CharSet.Unicode)] internal static extern bool CompleteThreadPoolRequest(uint QueueLength); [SecurityCritical] [MethodImpl(MethodImplOptions.InternalCall)] internal static extern bool NotifyWorkItemComplete(); [SecurityCritical] [MethodImpl(MethodImplOptions.InternalCall)] internal static extern void ReportThreadStatus(bool isWorking); [SecuritySafeCritical] internal static void NotifyWorkItemProgress() { } [SecurityCritical] [MethodImpl(MethodImplOptions.InternalCall)] internal static extern void NotifyWorkItemProgressNative(); [SecurityCritical, SuppressUnmanagedCodeSecurity] [DllImport("QCall", CharSet = CharSet.Unicode)] internal static extern bool ShouldReturnToVm(); [ReliabilityContract(Consistency.WillNotCorruptState, Cer.Success), SecurityCritical, SuppressUnmanagedCodeSecurity] [DllImport("QCall", CharSet = CharSet.Unicode)] internal static extern bool SetAppDomainRequestActive(); [ReliabilityContract(Consistency.WillNotCorruptState, Cer.Success), SecurityCritical, SuppressUnmanagedCodeSecurity] [DllImport("QCall", CharSet = CharSet.Unicode)] internal static extern void ClearAppDomainRequestActive(); [SecurityCritical] [MethodImpl(MethodImplOptions.InternalCall)] internal static extern bool IsThreadPoolHosted(); [ReliabilityContract(Consistency.WillNotCorruptState, Cer.Success), SecurityCritical] [MethodImpl(MethodImplOptions.InternalCall)] internal static extern void SetNativeTpEvent();
[MethodImplAttribute(MethodImplOptions.Synchronized)]標簽應用到實例方法,相當於對當前實例加鎖 lock(this)
[MethodImplAttribute(MethodImplOptions.Synchronized)]標簽應用到靜態方法,相當於當前類型加鎖。如 WithDraw 是靜態方法,就相當於 lock (typeof(Account))
接下來我們再來看看SynchronizationAttribute類:
MSDN對SynchronizationAttribute的解釋為:為當前上下文和所有共享同一實例的上下文強制一個同步域。
SynchronizationAttribute 的類:一個在 System.Runtime.Remoting.Contexts 命名空間中,另一個在 System.EnterpriseServices 命名空間中。System.EnterpriseServices.SynchronizationAttribute 類僅支持同步調用,並且只可與接受服務的組件一起使用。System.Runtime.Remoting.Contexts.SynchronizationAttribute 同時支持同步調用和異步調用,並且只可與上下文綁定對象一起使用。
毛發現都沒有,看來還是功力尚淺,資質平庸啊。發現了一堆空方法,什么鬼?都是沒有具體實現的。看來先猜上一猜了:SetMaxThreads和SetMinThreads說明可以設置工作線程的數量,線程的內部使用了完成端口(沒文化的我理解為ConcurrentQueue,大白話就是說是一個線程安全的隊列)。那么完成端口編程模式號稱是windows系統最優秀的編程模型,會不會非常智能呢?是不是不調用SetMaxThreads和SetMinThreads操作系統就根據你機器的CPU核心數來自己設定最大值呢?
回到前面的幾個問題,偶還是搞不清楚啊。誰能告訴我,什么是什么,什么是什么...咦,這兄台唱上了吧。^_^
反編譯大神實現的CoreThreadPool
public class CoreThreadPool : IDisposable { /// <summary> /// 隊列元素申明 /// </summary> [StructLayout(LayoutKind.Sequential)] private class PoolData { /// <summary> /// 外部要求放入隊列的數據 /// </summary> public object Data; /// <summary> /// 需要執行的命令(Exit/Command(自定義)) /// </summary> public CoreThreadPool.PoolCommand Command; public PoolData() { this.Command = CoreThreadPool.PoolCommand.Exit; } public PoolData(object data) { this.Data = data; this.Command = CoreThreadPool.PoolCommand.Command; } public PoolData(CoreThreadPool.PoolCommand cmd) { this.Command = cmd; } } protected enum PoolCommand { Command, Exit } protected SafeFileHandle complatePort; /// <summary> /// 線程池主線程 /// </summary> protected Thread thread; protected volatile bool isOpened; [method: CompilerGenerated] [CompilerGenerated] public event Action<object> Exceute; [method: CompilerGenerated] [CompilerGenerated] public event Action<object> ExitExceute; /// <summary> /// 線程池是否正在運行 /// </summary> public bool IsOpened { get { return this.isOpened; } set { this.isOpened = value; } } [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)] private static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads); [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)] private static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort, out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey, out IntPtr lpOverlapped, uint dwMilliseconds); [DllImport("Kernel32", CharSet = CharSet.Auto)] private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped); /// <summary> /// 啟動線程池的主線程 /// </summary> public void Start() { isOpened = true; if (thread != null) { throw new Exception("線程池已經是啟動狀態!"); } complatePort = CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, IntPtr.Zero, 0u); if (complatePort.IsInvalid) { throw new Exception(string.Format("創建IOCP出錯!原因是:{0}", Marshal.GetLastWin32Error().ToString())); } thread = new Thread(new ParameterizedThreadStart(this.Run)); thread.Start(complatePort); } /// <summary> /// 外部提交數據對象到隊列 /// </summary> /// <param name="data"></param> public void Post(object data) { PostData(new CoreThreadPool.PoolData(data)); } /// <summary> /// 線程池主線程執行邏輯 /// </summary> /// <param name="CompletionPortID"></param> private void Run(object CompletionPortID) { SafeFileHandle completionPort = (SafeFileHandle)CompletionPortID; while (IsOpened) { uint num; IntPtr intPtr; IntPtr value; //從隊列里取出最前面的對象 CoreThreadPool.GetQueuedCompletionStatus(completionPort, out num, out intPtr, out value, 4294967295u); if (num > 0u) { GCHandle gCHandle = GCHandle.FromIntPtr(value); CoreThreadPool.PoolData poolData = (CoreThreadPool.PoolData)gCHandle.Target; gCHandle.Free(); if (poolData.Command != CoreThreadPool.PoolCommand.Command) { IsOpened = false; break; } RaiseExecute(poolData.Data); } } RaiseExitExecute("線程池已經停止。"); isOpened = false; thread = null; } /// <summary> /// 觸發Execute事件 /// </summary> /// <param name="data"></param> private void RaiseExecute(object data) { Exceute?.Invoke(data); } /// <summary> /// 觸發ExitExecute事件 /// </summary> /// <param name="data"></param> private void RaiseExitExecute(object data) { ExitExceute?.Invoke(data); } /// <summary> /// 結束線程池主線程 /// </summary> public void Stop() { PostData(new PoolData(PoolCommand.Exit)); IsOpened = false; } /// <summary> /// 內部提交數據到線程池隊列中 /// </summary> /// <param name="data"></param> private void PostData(PoolData data) { if (complatePort.IsClosed) { return; } GCHandle value = GCHandle.Alloc(data); PostQueuedCompletionStatus(complatePort, (uint)IntPtr.Size, IntPtr.Zero, GCHandle.ToIntPtr(value)); } public void Dispose() { if (this.thread != null && this.thread.ThreadState != System.Threading.ThreadState.Stopped) { this.Stop(); } } }
經過幾個小時測試,最終得出一個結論:ThreadPool的性能已經上了天。微軟威武!不用再去重復造輪子了。這個線程池的使用場景是生產線上的高速生產線的采集器上用的,毫秒級別的,但也是1秒幾十個產品而已。
測試代碼
class Program { private static Stopwatch sw = new Stopwatch(); static void Main(string[] args) { int id = Thread.CurrentThread.ManagedThreadId; Console.WriteLine("CurrentThread.ManagedThreadId是:" + id.ToString()); ThreadPool.QueueUserWorkItem(Pool_Exceute, null); Action task = () => { Thread thread = new Thread(() => { while (true) { object queueObj; queueObj = (object)DateTime.Now.Ticks; sw.Reset(); sw.Start(); if (ThreadPool.QueueUserWorkItem(Pool_Exceute, queueObj)) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine(DateTime.Now + "->成功拋入隊列,拋入的委托對象的參數是:" + queueObj.ToString()); } Thread.Sleep(1); } }); thread.Start(); }; Parallel.Invoke(task, task, task, task, task); Console.ReadLine(); } private static void Pool_Exceute(object obj) { if (obj != null) { Console.ResetColor(); int id = Thread.CurrentThread.ManagedThreadId; Console.WriteLine("CurrentThread.ManagedThreadId是:" + id.ToString()); Console.WriteLine(DateTime.Now + "->委托對象是:" + obj.ToString()); int workThread_Count = 0; int id_IOCP = 0; ThreadPool.GetMaxThreads(out workThread_Count, out id_IOCP); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine(DateTime.Now + string.Format("->線程池最大工作線程數是{0}.當前完成端口的ID是{1}.", workThread_Count, id_IOCP)); ThreadPool.GetAvailableThreads(out workThread_Count, out id_IOCP); Console.WriteLine(DateTime.Now + string.Format("->線程池當前可用的工作線程數是{0}.當前完成端口的ID是{1}.", workThread_Count, id_IOCP)); sw.Stop(); Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine("從入隊到出隊耗時:" + sw.ElapsedMilliseconds); sw.Reset(); } } }