SmartThreadPool


示例

using Amib.Threading;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;

namespace SmartThreadPoolDemo
{
    class Program
    {
        // SmartThreadPool網址 https://github.com/amibar/SmartThreadPool
        static void Main(string[] args)
        {
            #region 這個例子將演示傳入數個參數並且等待運行然后傳出的全過程    
            /*
                SmartThreadPool特性如下:
                池中的線程數量會根據負載自動增減
                任務異步執行后可以返回值
                處於任務隊列中未執行的任務可以取消
                回調函數可以等待多個任務都執行完成后再觸發
                任務可以有優先級(priority)
                任務可以分組
                支持泛型Action<T> 和 Func<T>
                有性能監測機制             
            */

            SmartThreadPool stp = new SmartThreadPool();

            IWorkItemResult<string> resultCallback = stp.QueueWorkItem(
                new Func<string, string, string>(GetResultstring), "hello ", "world"
                );

            var stopWtch = new Stopwatch();

            stopWtch.Start();

            Console.WriteLine("操作開始...");

            stp.Start();
            stp.WaitForIdle();//等待該實例下的所有結果返回

            Console.WriteLine(resultCallback.Result);

            stopWtch.Stop();

            Console.WriteLine("操作結束,耗時 {0}", stopWtch.ElapsedMilliseconds);
            stp.Shutdown();

            #endregion

            #region 這個例子將演示一批參數的傳入一批線程並且等待執行結束返回值

            SmartThreadPool stp2 = new SmartThreadPool();
            List<IWorkItemResult> t_lResultItem = new List<IWorkItemResult>();//不對IWorkItemResult定義其類型,其結果需要自己做類型轉換
            for (int i = 0; i < 100; i++)
            {
                t_lResultItem.Add(stp2.QueueWorkItem(
                    new WorkItemCallback(GetObjectString), new string[] { "hello ", i.ToString() }));
            }

            stp2.Start();

            //等待所需的結果返回
            if (SmartThreadPool.WaitAll(t_lResultItem.ToArray()))
            {
                foreach (IWorkItemResult t in t_lResultItem)
                {
                    Console.WriteLine("{0} : {1}", t.State, t.Result);
                }
            }

            #endregion

            #region 這個例子將演示處理線程執行過程中出現的錯誤

            SmartThreadPool stp3 = new SmartThreadPool();//如果需要將線程池設置為調用start的時候才運行,需要設置其StartSuspended參數為true,然后為其調用start方法來啟動
            IWorkItemResult<double> ret = stp3.QueueWorkItem(
                new Func<double, double, double>(Diverse), 10, 0);
            //接收錯誤的句柄
            stp3.Start();

            double resule = ret.GetResult(out Exception e);//在取出結果的時候判斷是否有錯誤產生
            if (e != null)
            {
                //在這里進行錯誤處理,錯誤在InnerException中
                Console.WriteLine("產生錯誤:" + e.InnerException.Message);
            }
            else
            {
                Console.WriteLine(resule.ToString());
            }

            stp3.Shutdown();

            #endregion

            #region 方法屬性解釋

            //SmartThreadPool的部分解釋
            SmartThreadPool smartThreadPool = new SmartThreadPool();

            //獲取當前線程池中的工作線程數,與InUseThreads可能會有差別,因為InUseThreads不包含Idle狀態的線程
            int threadNum = smartThreadPool.ActiveThreads;

            //取消所有工作項,如果工作項在執行,那么等待工作項執行完
            smartThreadPool.Cancel();

            //如果不想等待工作項執行完,
            smartThreadPool.Cancel(true);

            //線程池的最大並發數,即MaxWorkerThreads,
            //如果修改后的Concurrency小於MinWorkerThreads,那么MinWorkerThreads也會隨之改變
            smartThreadPool.Concurrency = 25;

            //創建一個工作組,最大並發為3,工作組在后面會詳細說明,
            smartThreadPool.CreateWorkItemsGroup(3);

            //卸載線程池
            smartThreadPool.Dispose();

            //反回所有未執行的工作項的參數對象
            smartThreadPool.GetStates();

            //獲取線程池中正在工作的線程數,與ActiveThreads會有差別,因為ActiveThreads可能包含Idle狀態的線程
            int useThreadNum = smartThreadPool.InUseThreads;

            //當線程池用沒有工作項時,反回true,否則,反回false
            bool IsIdle = smartThreadPool.IsIdle;

            //同時並行執行多個方法,並且阻塞到所有工作項都執行完,這里會有多少個工作項就會創造多少個線程,
            smartThreadPool.Join(new Action[] { new Action(Test1) });

            //獲取或設置最大線程數,即MaxWorkerThreads,
            smartThreadPool.MaxThreads = 25;

            //最小線程數,當沒有工作項時,線程池最多剩余的線程數
            smartThreadPool.MinThreads = 0;

            //線程池的名稱,沒什么特殊的用處,
            smartThreadPool.Name = "StartThreadPool";

            //當線程池中沒有工作項(即閑置)時觸發的事件
            smartThreadPool.OnIdle += new WorkItemsGroupIdleHandler(smartThreadPool_OnIdle);

            //當線程池啟動一個線程時,觸發的事件
            smartThreadPool.OnThreadInitialization += new ThreadInitializationHandler(smartThreadPool_OnThreadInitialization);

            //當線程池釋放一個線程時,所觸發的事件
            smartThreadPool.OnThreadTermination += new ThreadTerminationHandler(smartThreadPool_OnThreadTermination);

            //與Join方法類似,並行執行多個帶參數的方法,這里會有多少個工作項就會創造多少個線程
            smartThreadPool.Pipe<object>(new object(), new Action<object>[] { new Action<object>(Test) });

            //卸載線程池
            smartThreadPool.Shutdown();

            //啟動線程池
            smartThreadPool.Start();

            //STPStartInfo對象的只讀實例
            STPStartInfo stpStartInfo = smartThreadPool.STPStartInfo;

            //等待所有的工作項執行完成(即IsIdle為true)
            smartThreadPool.WaitForIdle();

            //獲取還未執行的工作項數量
            int wiNum = smartThreadPool.WaitingCallbacks;

            //WorkItemGroup的啟動信息的只讀實力
            WIGStartInfo wigStartInfo = smartThreadPool.WIGStartInfo;

            #endregion

            #region 簡單調用實例


            //****************************使用示例****************************
            //創建一個線程池
            smartThreadPool = new SmartThreadPool();

            //執行任務
            smartThreadPool.QueueWorkItem(() =>
            {
                Console.WriteLine("Hello World!");
            });

            //****************************分割線****************************
            //創建一個線程池 帶返回值的任務
            smartThreadPool = new SmartThreadPool();

            //執行任務
            var result = smartThreadPool.QueueWorkItem(() =>
            {
                var sum = 0;
                for (var i = 0; i < 10; i++)
                {
                    sum += i;
                }
                return sum;
            });
            Console.WriteLine(result.Result); //輸出計算結果

            //****************************分割線****************************
            // 創建一個線程池 等待多個任務執行完成
            smartThreadPool = new SmartThreadPool();

            // 執行任務
            var result1 = smartThreadPool.QueueWorkItem(() =>
            {
                //模擬計算較長時間
                Thread.Sleep(5000);
                return 3;
            });

            var result2 = smartThreadPool.QueueWorkItem(() =>
            {
                //模擬計算較長時間
                Thread.Sleep(3000);
                return 5;
            });

            bool success = SmartThreadPool.WaitAll(new IWorkItemResult<int>[] { result1, result2 });
            if (success)
            {
                //輸出結果
                Console.WriteLine(result1.Result);
                Console.WriteLine(result2.Result);
            }
            Console.ReadLine();

            #endregion

            Test();
        }

        #region 事件

        private static void Test1()
        {
        }

        private static void Test(object obj)
        {
        }

        private static void smartThreadPool_OnThreadTermination()
        {
        }

        private static void smartThreadPool_OnThreadInitialization()
        {
        }

        private static void smartThreadPool_OnIdle(IWorkItemsGroup workItemsGroup)
        {
        }

        #endregion

        static Random _random = new Random();

        /// <summary>
        /// 類似 易語言的寫法 可能會出現 bug
        /// </summary>
        private static void Test()
        {
            var idleCount = 0;
            var workingCount = 0;
            var queueWorkCount = 0;
            var job = new ConcurrentQueue<int>();

            for (int i = 0; i < 100; i++)
                job.Enqueue(i);//模擬1000個任務

            #region 只投遞空閑線程數

            var smartThreadPool = new SmartThreadPool(new STPStartInfo
            {
                MinWorkerThreads = 1, //最小線程數
                MaxWorkerThreads = 10, //最大線程數
                AreThreadsBackground = true, //設置為后台線程
            });

            #region 手動釋放寫法

            while (true)
            {
                idleCount = smartThreadPool.MaxThreads - smartThreadPool.InUseThreads;//最大線程數 減去 正在執行的線程數 等於 空閑線程數

                if (idleCount <= 0)
                {
                    smartThreadPool.WaitForIdle(1000);
                    continue;
                }

                if (job.Count <= 0)//剩下任務數為0
                {
                    if (smartThreadPool.IsIdle)//判斷是否空閑
                        break;

                    smartThreadPool.WaitForIdle(1000);
                    continue;
                }

                if (job.Count > idleCount)
                    workingCount = idleCount;
                else
                    workingCount = job.Count;

                for (int i = 0; i < workingCount; i++)
                {
                    queueWorkCount++;
                    smartThreadPool.QueueWorkItem(() =>
                    {
                        job.TryDequeue(out int result);
                        Thread.Sleep(_random.Next(10000));//模擬工作耗時
                    });
                }

                if (queueWorkCount > 10)//模擬撥號 
                {
                    smartThreadPool.WaitForIdle();//等待所有任務執行完畢

                    // adsl撥號程序
                    Console.WriteLine("adsl");

                    Thread.Sleep(_random.Next(5000));//模擬撥號

                    queueWorkCount = 0;
                }

                smartThreadPool.WaitForIdle(1000);
            }

            smartThreadPool.Shutdown();
            smartThreadPool.Dispose();
            #endregion

            #region 自動釋放寫法

            using (smartThreadPool = new SmartThreadPool(new STPStartInfo
            {
                MinWorkerThreads = 1, //最小線程數
                MaxWorkerThreads = 10, //最大線程數
                AreThreadsBackground = true, //設置為后台線程
            }))
            {
                while (true)
                {
                    idleCount = smartThreadPool.MaxThreads - smartThreadPool.InUseThreads;//最大線程數 減去 正在執行的線程數 等於 空閑線程數

                    if (idleCount <= 0)
                    {
                        smartThreadPool.WaitForIdle(1000);
                        continue;
                    }

                    if (job.Count <= 0)//剩下任務數為0
                    {
                        if (smartThreadPool.IsIdle)//判斷是否空閑
                            break;

                        smartThreadPool.WaitForIdle(1000);
                        continue;
                    }

                    if (job.Count > idleCount)
                        workingCount = idleCount;
                    else
                        workingCount = job.Count;

                    for (int i = 0; i < workingCount; i++)
                    {
                        queueWorkCount++;
                        smartThreadPool.QueueWorkItem(() =>
                        {
                            job.TryDequeue(out int result);
                            Thread.Sleep(_random.Next(10000));//模擬工作耗時
                        });
                    }

                    if (queueWorkCount > 10)//模擬撥號 
                    {
                        smartThreadPool.WaitForIdle();//等待所有任務執行完畢

                        // adsl撥號程序
                        Console.WriteLine("adsl");

                        Thread.Sleep(_random.Next(5000));//模擬撥號

                        queueWorkCount = 0;
                    }

                    smartThreadPool.WaitForIdle(1000);
                }
            }

            #endregion

            #endregion


            #region 投遞所有任務

            using (smartThreadPool = new SmartThreadPool(new STPStartInfo
            {
                MinWorkerThreads = 1, //最小線程數
                MaxWorkerThreads = 10, //最大線程數
                AreThreadsBackground = true, //設置為后台線程
            }))
            {
                for (int i = 0; i < job.Count; i++)
                {
                    smartThreadPool.QueueWorkItem(() =>
                    {
                        job.TryDequeue(out int result);
                        Thread.Sleep(_random.Next(10000));//模擬工作耗時
                    });
                }

                smartThreadPool.WaitForIdle();//等待所有任務執行完畢
            }

            #endregion
        }

        private static string GetResultstring(string str, string str2)
        {
            string result = "";
            for (int i = 0; i < 20; i++)
            {
                System.Threading.Thread.Sleep(200);
                result += i + ",";
            }
            return str + str2 + result.TrimEnd(',');
        }

        private static object GetObjectString(object obj)
        {
            return string.Format("{0}{1}", (obj as string[])[0], (obj as string[])[1]);
        }

        private static double Diverse(double x, double y)
        {
            return x / y;
        }
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM