C#多線程(12):線程池


線程池

線程池全稱為托管線程池,線程池受 .NET 通用語言運行時(CLR)管理,線程的生命周期由 CLR 處理,因此我們可以專注於實現任務,而不需要理會線程管理。

線程池的應用場景:任務並行庫 (TPL)操作、異步 I/O 完成、計時器回調、注冊的等待操作、使用委托的異步方法調用和套接字連接。

很多人不清楚 Task、Task<TResult> 原理,原因是沒有好好了解線程池。

ThreadPool 常用屬性和方法

屬性:

屬性 說明
CompletedWorkItemCount 獲取迄今為止已處理的工作項數。
PendingWorkItemCount 獲取當前已加入處理隊列的工作項數。
ThreadCount 獲取當前存在的線程池線程數。

方法:

方法 說明
BindHandle(IntPtr) 將操作系統句柄綁定到 ThreadPool。
BindHandle(SafeHandle) 將操作系統句柄綁定到 ThreadPool。
GetAvailableThreads(Int32, Int32) 檢索由 GetMaxThreads(Int32, Int32) 方法返回的最大線程池線程數和當前活動線程數之間的差值。
GetMaxThreads(Int32, Int32) 檢索可以同時處於活動狀態的線程池請求的數目。 所有大於此數目的請求將保持排隊狀態,直到線程池線程變為可用。
GetMinThreads(Int32, Int32) 發出新的請求時,在切換到管理線程創建和銷毀的算法之前檢索線程池按需創建的線程的最小數量。
QueueUserWorkItem(WaitCallback) 將方法排入隊列以便執行。 此方法在有線程池線程變得可用時執行。
QueueUserWorkItem(WaitCallback, Object) 將方法排入隊列以便執行,並指定包含該方法所用數據的對象。 此方法在有線程池線程變得可用時執行。
QueueUserWorkItem(Action, TState, Boolean) 將 Action 委托指定的方法排入隊列以便執行,並提供該方法使用的數據。 此方法在有線程池線程變得可用時執行。
RegisterWaitForSingleObject(WaitHandle, WaitOrTimerCallback, Object, Int32, Boolean) 注冊一個等待 WaitHandle 的委托,並指定一個 32 位有符號整數來表示超時值(以毫秒為單位)。
SetMaxThreads(Int32, Int32) 設置可以同時處於活動狀態的線程池的請求數目。 所有大於此數目的請求將保持排隊狀態,直到線程池線程變為可用。
SetMinThreads(Int32, Int32) 發出新的請求時,在切換到管理線程創建和銷毀的算法之前設置線程池按需創建的線程的最小數量。
UnsafeQueueNativeOverlapped(NativeOverlapped) 將重疊的 I/O 操作排隊以便執行。
UnsafeQueueUserWorkItem(IThreadPoolWorkItem, Boolean) 將指定的工作項對象排隊到線程池。
UnsafeQueueUserWorkItem(WaitCallback, Object) 將指定的委托排隊到線程池,但不會將調用堆棧傳播到輔助線程。
UnsafeRegisterWaitForSingleObject(WaitHandle, WaitOrTimerCallback, Object, Int32, Boolean) 注冊一個等待 WaitHandle 的委托,並使用一個 32 位帶符號整數來表示超時時間(以毫秒為單位)。 此方法不將調用堆棧傳播到輔助線程。

線程池說明和示例

通過 System.Threading.ThreadPool 類,我們可以使用線程池。

ThreadPool 類是靜態類,它提供一個線程池,該線程池可用於執行任務、發送工作項、處理異步 I/O、代表其他線程等待以及處理計時器。

理論的東西這里不會說太多,你可以參考官方文檔地址: https://docs.microsoft.com/zh-cn/dotnet/api/system.threading.threadpool?view=netcore-3.1

ThreadPool 有一個 QueueUserWorkItem() 方法,該方法接受一個代表用戶異步操作的委托(名為 WaitCallback ),調用此方法傳入委托后,就會進入線程池內部隊列中。

WaitCallback 委托的定義如下:

public delegate void WaitCallback(object state);

現在我們來寫一個簡單的線程池示例,再扯淡一下。

    class Program
    {
        static void Main(string[] args)
        {
            ThreadPool.QueueUserWorkItem(MyAction);

            ThreadPool.QueueUserWorkItem(state =>
            {
                Console.WriteLine("任務已被執行2");
            });
            Console.ReadKey();
        }
        // state 表示要傳遞的參數信息,這里為 null
        private static void MyAction(Object state)
        {
            Console.WriteLine("任務已被執行1");
        }
    }

十分簡單對不對~

這里有幾個要點:

  • 不要將長時間運行的操作放進線程池中;
  • 不應該阻塞線程池中的線程;
  • 線程池中的線程都是后台線程(又稱工作者線程);

另外,這里一定要記住 WaitCallback 這個委托。

我們觀察創建線程需要的時間:

        static void Main()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();
            for (int i = 0; i < 10; i++)
                new Thread(() => { }).Start();
            watch.Stop();
            Console.WriteLine("創建 10 個線程需要花費時間(毫秒):" + watch.ElapsedMilliseconds);
            Console.ReadKey();
        }

筆者電腦測試結果大約 160。

線程池線程數

線程池中的 SetMinThreads()SetMaxThreads() 可以設置線程池工作的最小和最大線程數。其定義分別如下:

// 設置線程池最小工作線程數線程
public static bool SetMinThreads (int workerThreads, int completionPortThreads);
// 獲取
public static void GetMinThreads (out int workerThreads, out int completionPortThreads);

workerThreads:要由線程池根據需要創建的新的最小工作程序線程數。

completionPortThreads:要由線程池根據需要創建的新的最小空閑異步 I/O 線程數。

SetMinThreads() 的返回值代表是否設置成功。


// 設置線程池最大工作線程數
public static bool SetMaxThreads (int workerThreads, int completionPortThreads);
// 獲取
public static void GetMaxThreads (out int workerThreads, out int completionPortThreads);

workerThreads:線程池中輔助線程的最大數目。

completionPortThreads:線程池中異步 I/O 線程的最大數目。

SetMaxThreads() 的返回值代表是否設置成功。


這里就不給出示例了,不過我們也看到了上面出現 異步 I/O 線程 這個關鍵詞,下面會學習到相關知識。


線程池線程數說明

關於最大最小線程數,這里有一些知識需要說明。在此前,我們來寫一個示例:

    class Program
    {
        static void Main(string[] args)
        {
            // 不斷加入任務
            for (int i = 0; i < 8; i++)
                ThreadPool.QueueUserWorkItem(state =>
                {
                    Thread.Sleep(100);
                    Console.WriteLine("");
                });
            for (int i = 0; i < 8; i++)
                ThreadPool.QueueUserWorkItem(state =>
                {
                    Thread.Sleep(TimeSpan.FromSeconds(1));
                    Console.WriteLine("");
                });

            Console.WriteLine("     此計算機處理器數量:" + Environment.ProcessorCount);

            // 工作項、任務代表同一個意思
            Console.WriteLine("     當前線程池存在線程數:" + ThreadPool.ThreadCount);
            Console.WriteLine("     當前已處理的工作項數:" + ThreadPool.CompletedWorkItemCount);
            Console.WriteLine("     當前已加入處理隊列的工作項數:" + ThreadPool.PendingWorkItemCount);
            int count;
            int ioCount;
            ThreadPool.GetMinThreads(out count, out ioCount);
            Console.WriteLine($"     默認最小輔助線程數:{count},默認最小異步IO線程數:{ioCount}");

            ThreadPool.GetMaxThreads(out count, out ioCount);
            Console.WriteLine($"     默認最大輔助線程數:{count},默認最大異步IO線程數:{ioCount}");
            Console.ReadKey();
        }
    }

運行后,筆者電腦輸出結果(我們的運行結果可能不一樣):

     此計算機處理器數量:8
     當前線程池存在線程數:8
     當前已處理的工作項數:2
     當前已加入處理隊列的工作項數:8
     默認最小輔助線程數:8,默認最小異步IO線程數:8
     默認最大輔助線程數:32767,默認最大異步IO線程數:1000

我們結合運行結果,來了解一些知識點。

線程池最小線程數,默認是當前計算機處理器數量。另外我們也看到了。當前線程池存在線程數為 8 ,因為線程池創建后,無論有沒有任務,都有 8 個線程存活。

如果將線程池最小數設置得過大(SetMinThreads()),會導致任務切換開銷變大,消耗更多得性能資源。

如果設置得最小值小於處理器數量,則也可能會影響性能。

Environment.ProcessorCount 可以確定當前計算機上有多少個處理器數量(例如CPU是四核八線程,結果就是八)。

SetMaxThreads() 設置的最大工作線程數或 I/O 線程數,不能小於 SetMinThreads() 設置的最小工作線程數或 I/O 線程數。

設置線程數過大,會導致任務切換開銷變大,消耗更多得性能資源。

如果加入的任務大於設置的最大線程數,那么將會進入等待隊列。

不能將工作線程或 I/O 完成線程的最大數目設置為小於計算機上的處理器數。

不支持的線程池異步委托

扯淡了這么久,我們從設置線程數中,發現有個 I/O 異步線程數,這個線程數限制的是執行異步委托的線程數量,這正是本節要介紹的。

異步編程模型(Asynchronous Programming Model,簡稱 APM),在日常擼碼中,我們可以使用 asyncawaitTask 一把梭了事。

.NET Core 不再使用 BeginInvoke 這種模式。你可以跟着筆者一起踩坑先。

筆者在看書的時候,寫了這個示例:

很多地方也在使用這種形式的示例,但是在 .NET Core 中用不了,只能在 .NET Fx 使用。。。

    class Program
    {
        private delegate string MyAsyncDelete(out int thisThreadId);
        static void Main(string[] args)
        {
            int threadId;
            // 不是異步調用
            MyMethodAsync(out threadId);

            // 創建自定義的委托
            MyAsyncDelete myAsync = MyMethodAsync;

            // 初始化異步的委托
            IAsyncResult result = myAsync.BeginInvoke(out threadId, null, null);

            // 當前線程等待異步完成任務,也可以去掉
            result.AsyncWaitHandle.WaitOne();
            Console.WriteLine("異步執行");

            // 檢索異步執行結果
            string returnValue = myAsync.EndInvoke(out threadId, result);

            // 關閉
            result.AsyncWaitHandle.Close();

            Console.WriteLine("異步處理結果:" + returnValue);
        }
        private static string MyMethodAsync(out int threadId)
        {
            // 獲取當前線程在托管線程池的唯一標識
            threadId = Thread.CurrentThread.ManagedThreadId;
            // 模擬工作請求
            Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 5)));
            // 返回工作完成結果
            return "喜歡我的讀者可以關注筆者的博客歐~";
        }
    }

目前百度到的很多文章也是 .NET FX 時代的代碼了,要注意 C# 在版本迭代中,對異步這些 API ,做了很多修改,不要看別人的文章,學完后才發現不能在 .NET Core 中使用(例如我... ...),浪費時間。

上面這個代碼示例,也從側面說明了,以往 .NET Fx (C# 5.0 以前)中使用異步是很麻煩的。

.NET Core 是不支持異步委托的,具體可以看 https://github.com/dotnet/runtime/issues/16312

官網文檔明明說支持的https://docs.microsoft.com/zh-cn/dotnet/api/system.iasyncresult?view=netcore-3.1#examples,而且示例也是這樣,搞了這么久,居然不行,我等下一刀過去。

關於為什么不支持,可以看這里:https://devblogs.microsoft.com/dotnet/migrating-delegate-begininvoke-calls-for-net-core/

不支持就算了,我們跳過,后面學習異步時再仔細討論。

任務取消功能

這個取消跟線程池池無關。

CancellationToken:傳播有關應取消操作的通知。

CancellationTokenSource:向應該被取消的 CancellationToken 發送信號。

兩者關系如下:

        CancellationTokenSource cts = new CancellationTokenSource();
        CancellationToken token = cts.Token;  

這個取消,在於信號的發生和信號的捕獲,任務的取消不是實時的。

示例代碼如下:

CancellationTokenSource 實例化一個取消標記,然后傳遞 CancellationToken 進去;

被啟動的線程,每個階段都判斷 .IsCancellationRequested,然后確定是否停止運行。這取決於線程的自覺性。

    class Program
    {
        static void Main()
        {
            CancellationTokenSource cts = new CancellationTokenSource();

            Console.WriteLine("按下回車鍵,將取消任務");

            new Thread(() => { CanceTask(cts.Token); }).Start();
            new Thread(() => { CanceTask(cts.Token); }).Start();

            Console.ReadKey();
            
            // 取消執行
            cts.Cancel();
            Console.WriteLine("完成");
            Console.ReadKey();
        }

        private static void CanceTask(CancellationToken token)
        {
            Console.WriteLine("第一階段");
            Thread.Sleep(TimeSpan.FromSeconds(1));
            if (token.IsCancellationRequested)
                return;

            Console.WriteLine("第二階段");
            Thread.Sleep(TimeSpan.FromSeconds(1));
            if (token.IsCancellationRequested)
                return;

            Console.WriteLine("第三階段");
            Thread.Sleep(TimeSpan.FromSeconds(1));
            if (token.IsCancellationRequested)
                return;

            Console.WriteLine("第四階段");
            Thread.Sleep(TimeSpan.FromSeconds(1));
            if (token.IsCancellationRequested)
                return;

            Console.WriteLine("第五階段");
            Thread.Sleep(TimeSpan.FromSeconds(1));
            if (token.IsCancellationRequested)
                return;
        }
    }

這個取消標記,在前面的很多同步方式中,都用的上。

計時器

常用的定時器有兩種,分別是:System.Timers.Timer 和 System.Thread.Timer。

System.Threading.Timer是一個普通的計時器,它是線程池中的線程中。

System.Timers.Timer包裝了System.Threading.Timer,並提供了一些用於在特定線程上分派的其他功能。

什么線程安全不安全。。。俺不懂這個。。。不過你可以參考https://stackoverflow.com/questions/19577296/thread-safety-of-system-timers-timer-vs-system-threading-timer

如果你想認真區分兩者的關系,可以查看:https://web.archive.org/web/20150329101415/https://msdn.microsoft.com/en-us/magazine/cc164015.aspx

兩者主要使用區別:

大多數情況下使用 System.Threading.Timer,因為它比較“輕”,另外就是 .NET Core 1.0 時,System.Timers.Timer 被取消了,NET Core 2.0 時又回來了。主要是為了 .NET FX 和 .NET Core 遷移方便,才加上去的。所以,你懂我的意思吧。

System.Threading.Timer 其中一個構造函數定義如下:

public Timer (System.Threading.TimerCallback callback, object state, uint dueTime, uint period);

callback:要定時執行的方法;

state:要傳遞給線程的信息(參數);

dueTime:延遲時間,避免一創建計時器,馬上開始執行方法;

period:設置定時執行方法的時間間隔;

計時器示例:

    class Program
    {
        static void Main()
        {
            Timer timer = new Timer(TimeTask,null,100,1000);
            
            Console.ReadKey();
        }

        // public delegate void TimerCallback(object? state);
        private static void TimeTask(object state)
        {
            Console.WriteLine("www.whuanle.cn");
        }
    }

Timer 有不少方法,但不常用,可以查看官方文檔:https://docs.microsoft.com/zh-cn/dotnet/api/system.threading.timer?view=netcore-3.1#methods


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM