為什么編寫TaskSchedulerEx類?
因為.NET默認線程池只有一個線程池,如果某個批量任務一直占着大量線程,甚至耗盡默認線程池,則會嚴重影響應用程序域中其它任務或批量任務的性能。
特點:
1、使用獨立線程池,線程池中線程分為核心線程和輔助線程,輔助線程會動態增加和釋放,且總線程數不大於參數_maxThreadCount
2、無縫兼容Task,使用上和Task一樣,可以用它來實現異步,參見:C# async await 異步執行方法封裝 替代 BackgroundWorker
3、隊列中尚未執行的任務可以取消
4、通過擴展類TaskHelper實現任務分組
5、和SmartThreadPool對比,優點是無縫兼容Task類,和Task類使用沒有區別,因為它本身就是對Task、TaskScheduler的擴展,所以Task類的ContinueWith、WaitAll等方法它都支持,以及兼容async、await異步編程
6、代碼量相當精簡,TaskSchedulerEx類只有260多行代碼
7、池中的線程數量會根據負載自動增減,支持,但沒有SmartThreadPool智能,為了性能,使用了比較笨的方式實現,不知道大家有沒有既智能,性能又高的方案,我有一個思路,在定時器中計算每個任務執行平均耗時,然后使用公式(線程數 = CPU核心數 * ( 本地計算時間 + 等待時間 ) / 本地計算時間)來計算最佳線程數,然后按最佳線程數來動態創建線程,但這個計算過程可能會犧牲性能
對比SmartThreadPool:
TaskSchedulerEx類代碼(使用Semaphore實現):

using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// TaskScheduler擴展 /// 每個實例都是獨立線程池 /// </summary> public class TaskSchedulerEx : TaskScheduler, IDisposable { #region 外部方法 [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")] public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize); #endregion #region 變量屬性事件 private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>(); private int _coreThreadCount = 0; private int _maxThreadCount = 0; private int _auxiliaryThreadTimeOut = 20000; //輔助線程釋放時間 private int _activeThreadCount = 0; private System.Timers.Timer _timer; private object _lockCreateTimer = new object(); private bool _run = true; private Semaphore _sem = null; private int _semMaxCount = int.MaxValue; //可以同時授予的信號量的最大請求數 private int _semCount = 0; //可用信號量請求數 private int _runCount = 0; //正在執行的和等待執行的任務數量 /// <summary> /// 活躍線程數 /// </summary> public int ActiveThreadCount { get { return _activeThreadCount; } } /// <summary> /// 核心線程數 /// </summary> public int CoreThreadCount { get { return _coreThreadCount; } } /// <summary> /// 最大線程數 /// </summary> public int MaxThreadCount { get { return _maxThreadCount; } } #endregion #region 構造函數 /// <summary> /// TaskScheduler擴展 /// 每個實例都是獨立線程池 /// </summary> /// <param name="coreThreadCount">核心線程數(大於或等於0,不宜過大)(如果是一次性使用,則設置為0比較合適)</param> /// <param name="maxThreadCount">最大線程數</param> public TaskSchedulerEx(int coreThreadCount = 10, int maxThreadCount = 20) { _sem = new Semaphore(0, _semMaxCount); _maxThreadCount = maxThreadCount; CreateCoreThreads(coreThreadCount); } #endregion #region override GetScheduledTasks protected override IEnumerable<Task> GetScheduledTasks() { return _tasks; } #endregion #region override TryExecuteTaskInline protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } #endregion #region override QueueTask protected override void QueueTask(Task task) { _tasks.Enqueue(task); while (_semCount >= _semMaxCount) //信號量已滿,等待 { Thread.Sleep(1); } _sem.Release(); Interlocked.Increment(ref _semCount); Interlocked.Increment(ref _runCount); if (_activeThreadCount < _maxThreadCount && _activeThreadCount < _runCount) { CreateThread(); } } #endregion #region 資源釋放 /// <summary> /// 資源釋放 /// 隊列中尚未執行的任務不再執行 /// </summary> public void Dispose() { _run = false; if (_timer != null) { _timer.Stop(); _timer.Dispose(); _timer = null; } while (_activeThreadCount > 0) { _sem.Release(); Interlocked.Increment(ref _semCount); } } #endregion #region 創建核心線程池 /// <summary> /// 創建核心線程池 /// </summary> private void CreateCoreThreads(int? coreThreadCount = null) { if (coreThreadCount != null) _coreThreadCount = coreThreadCount.Value; for (int i = 0; i < _coreThreadCount; i++) { Interlocked.Increment(ref _activeThreadCount); Thread thread = null; thread = new Thread(new ThreadStart(() => { Task task; while (_run) { if (_tasks.TryDequeue(out task)) { TryExecuteTask(task); Interlocked.Decrement(ref _runCount); } else { _sem.WaitOne(); Interlocked.Decrement(ref _semCount); } } Interlocked.Decrement(ref _activeThreadCount); if (_activeThreadCount == 0) { GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } })); thread.IsBackground = true; thread.Start(); } } #endregion #region 創建輔助線程 /// <summary> /// 創建輔助線程 /// </summary> private void CreateThread() { Interlocked.Increment(ref _activeThreadCount); Thread thread = null; thread = new Thread(new ThreadStart(() => { Task task; DateTime dt = DateTime.Now; while (_run && DateTime.Now.Subtract(dt).TotalMilliseconds < _auxiliaryThreadTimeOut) { if (_tasks.TryDequeue(out task)) { TryExecuteTask(task); Interlocked.Decrement(ref _runCount); dt = DateTime.Now; } else { _sem.WaitOne(_auxiliaryThreadTimeOut); Interlocked.Decrement(ref _semCount); } } Interlocked.Decrement(ref _activeThreadCount); if (_activeThreadCount == _coreThreadCount) { GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } })); thread.IsBackground = true; thread.Start(); } #endregion #region 全部取消 /// <summary> /// 全部取消 /// 取消隊列中尚未執行的任務 /// </summary> public void CancelAll() { Task tempTask; while (_tasks.TryDequeue(out tempTask)) { Interlocked.Decrement(ref _runCount); } } #endregion } }
TaskSchedulerEx類代碼(使用AutoResetEvent實現):

using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// TaskScheduler擴展 /// 每個實例都是獨立線程池 /// </summary> public class TaskSchedulerEx : TaskScheduler, IDisposable { #region 外部方法 [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")] public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize); #endregion #region 變量屬性事件 private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>(); private int _coreThreadCount = 0; private int _maxThreadCount = 0; private int _auxiliaryThreadTimeOut = 20000; //輔助線程釋放時間 private int _activeThreadCount = 0; private System.Timers.Timer _timer; private object _lockCreateTimer = new object(); private bool _run = true; private AutoResetEvent _evt = new AutoResetEvent(false); /// <summary> /// 活躍線程數 /// </summary> public int ActiveThreadCount { get { return _activeThreadCount; } } /// <summary> /// 核心線程數 /// </summary> public int CoreThreadCount { get { return _coreThreadCount; } } /// <summary> /// 最大線程數 /// </summary> public int MaxThreadCount { get { return _maxThreadCount; } } #endregion #region 構造函數 /// <summary> /// TaskScheduler擴展 /// 每個實例都是獨立線程池 /// </summary> /// <param name="coreThreadCount">核心線程數(大於或等於0,不宜過大)(如果是一次性使用,則設置為0比較合適)</param> /// <param name="maxThreadCount">最大線程數</param> public TaskSchedulerEx(int coreThreadCount = 10, int maxThreadCount = 20) { _maxThreadCount = maxThreadCount; CreateCoreThreads(coreThreadCount); } #endregion #region override GetScheduledTasks protected override IEnumerable<Task> GetScheduledTasks() { return _tasks; } #endregion #region override TryExecuteTaskInline protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } #endregion #region override QueueTask protected override void QueueTask(Task task) { CreateTimer(); _tasks.Enqueue(task); _evt.Set(); } #endregion #region 資源釋放 /// <summary> /// 資源釋放 /// 隊列中尚未執行的任務不再執行 /// </summary> public void Dispose() { _run = false; if (_timer != null) { _timer.Stop(); _timer.Dispose(); _timer = null; } while (_activeThreadCount > 0) { _evt.Set(); } } #endregion #region 創建核心線程池 /// <summary> /// 創建核心線程池 /// </summary> private void CreateCoreThreads(int? coreThreadCount = null) { if (coreThreadCount != null) _coreThreadCount = coreThreadCount.Value; for (int i = 0; i < _coreThreadCount; i++) { Interlocked.Increment(ref _activeThreadCount); Thread thread = null; thread = new Thread(new ThreadStart(() => { Task task; while (_run) { if (_tasks.TryDequeue(out task)) { TryExecuteTask(task); } else { _evt.WaitOne(); } } Interlocked.Decrement(ref _activeThreadCount); if (_activeThreadCount == 0) { GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } })); thread.IsBackground = true; thread.Start(); } } #endregion #region 創建輔助線程 /// <summary> /// 創建輔助線程 /// </summary> private void CreateThread() { Interlocked.Increment(ref _activeThreadCount); Thread thread = null; thread = new Thread(new ThreadStart(() => { Task task; DateTime dt = DateTime.Now; while (_run && DateTime.Now.Subtract(dt).TotalMilliseconds < _auxiliaryThreadTimeOut) { if (_tasks.TryDequeue(out task)) { TryExecuteTask(task); dt = DateTime.Now; } else { _evt.WaitOne(_auxiliaryThreadTimeOut); } } Interlocked.Decrement(ref _activeThreadCount); if (_activeThreadCount == _coreThreadCount) { GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } })); thread.IsBackground = true; thread.Start(); } #endregion #region 創建定時器 private void CreateTimer() { if (_timer == null) //_timer不為空時,跳過,不走lock,提升性能 { if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount) //活躍線程數達到最大線程數時,跳過,不走lock,提升性能 { lock (_lockCreateTimer) { if (_timer == null) { _timer = new System.Timers.Timer(); _timer.Interval = _coreThreadCount == 0 ? 1 : 500; _timer.Elapsed += (s, e) => { if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount) { if (_tasks.Count > 0) { if (_timer.Interval != 20) _timer.Interval = 20; CreateThread(); } else { if (_timer.Interval != 500) _timer.Interval = 500; } } else { if (_timer != null) { _timer.Stop(); _timer.Dispose(); _timer = null; } } }; _timer.Start(); } } } } } #endregion #region 全部取消 /// <summary> /// 全部取消 /// 取消隊列中尚未執行的任務 /// </summary> public void CancelAll() { Task tempTask; while (_tasks.TryDequeue(out tempTask)) { } } #endregion } }
RunHelper類代碼:

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// 線程工具類 /// </summary> public static class RunHelper { #region 變量屬性事件 #endregion #region 線程中執行 /// <summary> /// 線程中執行 /// </summary> public static Task Run(this TaskScheduler scheduler, Action<object> doWork, object arg = null, Action<Exception> errorAction = null) { return Task.Factory.StartNew((obj) => { try { doWork(obj); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run錯誤"); } }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 線程中執行 /// <summary> /// 線程中執行 /// </summary> public static Task Run(this TaskScheduler scheduler, Action doWork, Action<Exception> errorAction = null) { return Task.Factory.StartNew(() => { try { doWork(); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run錯誤"); } }, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 線程中執行 /// <summary> /// 線程中執行 /// </summary> public static Task<T> Run<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null) { return Task.Factory.StartNew<T>((obj) => { try { return doWork(obj); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run錯誤"); return default(T); } }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 線程中執行 /// <summary> /// 線程中執行 /// </summary> public static Task<T> Run<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null) { return Task.Factory.StartNew<T>(() => { try { return doWork(); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run錯誤"); return default(T); } }, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 線程中執行 /// <summary> /// 線程中執行 /// </summary> public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null) { return await Task.Factory.StartNew<T>((obj) => { try { return doWork(obj); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run錯誤"); return default(T); } }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 線程中執行 /// <summary> /// 線程中執行 /// </summary> public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null) { return await Task.Factory.StartNew<T>(() => { try { return doWork(); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run錯誤"); return default(T); } }, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion } }
TaskHelper擴展類:

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Utils { /// <summary> /// Task幫助類基類 /// </summary> public class TaskHelper { #region 變量 /// <summary> /// 處理器數 /// </summary> private static int _processorCount = Environment.ProcessorCount; #endregion #region UI任務 private static TaskScheduler _UITask; /// <summary> /// UI任務(2-4個線程) /// </summary> public static TaskScheduler UITask { get { if (_UITask == null) _UITask = new TaskSchedulerEx(2, 4); return _UITask; } } #endregion #region 菜單任務 private static TaskScheduler _MenuTask; /// <summary> /// 菜單任務(2-4個線程) /// </summary> public static TaskScheduler MenuTask { get { if (_MenuTask == null) _MenuTask = new TaskSchedulerEx(2, 4); return _MenuTask; } } #endregion #region 計算任務 private static TaskScheduler _CalcTask; /// <summary> /// 計算任務(線程數:處理器數*2) /// </summary> public static TaskScheduler CalcTask { get { if (_CalcTask == null) _CalcTask = new LimitedTaskScheduler(_processorCount * 2); return _CalcTask; } } #endregion #region 網絡請求 private static TaskScheduler _RequestTask; /// <summary> /// 網絡請求(8-32個線程) /// </summary> public static TaskScheduler RequestTask { get { if (_RequestTask == null) _RequestTask = new TaskSchedulerEx(8, 32); return _RequestTask; } } #endregion #region 數據庫任務 private static TaskScheduler _DBTask; /// <summary> /// 數據庫任務(8-32個線程) /// </summary> public static TaskScheduler DBTask { get { if (_DBTask == null) _DBTask = new TaskSchedulerEx(8, 32); return _DBTask; } } #endregion #region IO任務 private static TaskScheduler _IOTask; /// <summary> /// IO任務(8-32個線程) /// </summary> public static TaskScheduler IOTask { get { if (_IOTask == null) _IOTask = new TaskSchedulerEx(8, 32); return _IOTask; } } #endregion #region 首頁任務 private static TaskScheduler _MainPageTask; /// <summary> /// 首頁任務(8-32個線程) /// </summary> public static TaskScheduler MainPageTask { get { if (_MainPageTask == null) _MainPageTask = new TaskSchedulerEx(8, 32); return _MainPageTask; } } #endregion #region 圖片加載任務 private static TaskScheduler _LoadImageTask; /// <summary> /// 圖片加載任務(8-32個線程) /// </summary> public static TaskScheduler LoadImageTask { get { if (_LoadImageTask == null) _LoadImageTask = new TaskSchedulerEx(8, 32); return _LoadImageTask; } } #endregion #region 瀏覽器任務 private static TaskScheduler _BrowserTask; /// <summary> /// 瀏覽器任務(2-4個線程) /// </summary> public static TaskScheduler BrowserTask { get { if (_BrowserTask == null) _BrowserTask = new TaskSchedulerEx(2, 4); return _BrowserTask; } } #endregion } }
Form1.cs測試代碼:

using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Management; using System.Reflection; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; using Utils; namespace test { public partial class Form1 : Form { private TaskSchedulerEx _taskSchedulerEx = null; private TaskSchedulerEx _taskSchedulerExSmall = null; private TaskSchedulerEx _task = null; public Form1() { InitializeComponent(); _taskSchedulerEx = new TaskSchedulerEx(50, 500); _taskSchedulerExSmall = new TaskSchedulerEx(5, 50); _task = new TaskSchedulerEx(2, 10); } private void Form1_Load(object sender, EventArgs e) { } /// <summary> /// 模擬大量網絡請求任務 /// </summary> private void button1_Click(object sender, EventArgs e) { DoTask(_taskSchedulerEx, 200000, 1000, 20); } /// <summary> /// 模擬CPU密集型任務 /// </summary> private void button2_Click(object sender, EventArgs e) { DoTask(_taskSchedulerEx, 100000, 2000, 1); } /// <summary> /// 模擬大量網絡請求任務 /// </summary> private void button3_Click(object sender, EventArgs e) { DoTask(_taskSchedulerExSmall, 2000, 100, 20); } /// <summary> /// 模擬CPU密集型任務 /// </summary> private void button4_Click(object sender, EventArgs e) { DoTask(_taskSchedulerExSmall, 2000, 100, 1); } /// <summary> /// 模擬任務 /// </summary> /// <param name="scheduler">scheduler</param> /// <param name="taskCount">任務數量</param> /// <param name="logCount">每隔多少條數據打一個日志</param> /// <param name="delay">模擬延遲或耗時(毫秒)</param> private void DoTask(TaskSchedulerEx scheduler, int taskCount, int logCount, int delay) { _task.Run(() => { Log("開始"); DateTime dt = DateTime.Now; List<Task> taskList = new List<Task>(); for (int i = 1; i <= taskCount; i++) { Task task = scheduler.Run((obj) => { var k = (int)obj; Thread.Sleep(delay); //模擬延遲或耗時 if (k % logCount == 0) { Log("最大線程數:" + scheduler.MaxThreadCount + " 核心線程數:" + scheduler.CoreThreadCount + " 活躍線程數:" + scheduler.ActiveThreadCount.ToString().PadLeft(4, ' ') + " 處理數/總數:" + k + " / " + taskCount); } }, i, (ex) => { Log(ex.Message); }); taskList.Add(task); } Task.WaitAll(taskList.ToArray()); double d = DateTime.Now.Subtract(dt).TotalSeconds; Log("完成,耗時:" + d + "秒"); }); } private void Form1_FormClosed(object sender, FormClosedEventArgs e) { if (_taskSchedulerEx != null) { _taskSchedulerEx.Dispose(); //釋放資源 _taskSchedulerEx = null; } } } }
測試截圖: