一個C#多線程的工作隊列


多線程添加元素到隊列中,隊列根據綁定

的事件進行自動處理,可以設置WorkSequential屬性來實現對隊列處理的單線程(嚴格順序處理)或者多線程處理(循序出隊,但是

多線程處理,不保證對隊列元素的處理順利)的選擇。

另外,這段程序不能輸出0,所以,最后的結果是999行,不包含0,原因是if (!item.Equals(default(T))),而default(T)恰恰是0.

代碼

/***********多線程的工作隊列***************
 * 此工作隊列保證線程安全性
 * 
 * 
 * 
 * 
 * *******/
namespace WorkQueue
{
    using System.Collections.Generic;
    using System;
    using System.Threading;

    public delegate void UserWorkEventHandler<T>(object sender, WorkQueue<T>.EnqueueEventArgs e);
    public class WorkQueue<T>
    {
        private bool IsWorking; //表明處理線程是否正在工作
        private object lockIsWorking = new object();//對IsWorking的同步對象


        private Queue<T> queue; //實際的隊列
        private object lockObj = new object(); //隊列同步對象

       



        /// <summary>
        /// 綁定用戶需要對隊列中的item對象
        /// 施加的操作的事件
        /// </summary>
        public event UserWorkEventHandler<T> UserWork;

        public WorkQueue(int n)
        {
            queue = new Queue<T>(n);
        }

        public WorkQueue()
        {
            queue = new Queue<T>();
        }

        /// <summary>
        /// 謹慎使用此函數,
        /// 只保證此瞬間,隊列值為空
        /// </summary>
        /// <returns></returns>
        public bool IsEmpty()
        {
            lock (lockObj)
            {
                return queue.Count == 0;
            }
        }

        private bool isOneThread;

        /// <summary>
        /// 隊列處理是否需要單線程順序執行
        /// ture表示單線程處理隊列的T對象
        /// 默認為false,表明按照順序出隊,但是多線程處理item
        /// *****注意不要頻繁改變此項****
        /// </summary>
        public bool WorkSequential
        {
            get
            {
                return isOneThread;
            }
            set
            {
                isOneThread = value;
            }

        }



        /// <summary>
        /// 向工作隊列添加對象,
        /// 對象添加以后,如果已經綁定工作的事件
        /// 會觸發事件處理程序,對item對象進行處理
        /// </summary>
        /// <param name="item">添加到隊列的對象</param>
        public void EnqueueItem(T item)
        {
            lock (lockObj)
            {
                queue.Enqueue(item);
            }

            lock (lockIsWorking)
            {
                if (!IsWorking)
                {
                    IsWorking = true;
                    ThreadPool.QueueUserWorkItem(doUserWork);
                }
            }



        }


        /// <summary>
        /// 處理隊列中對象的函數
        /// </summary>
        /// <param name="o"></param>
        private void doUserWork(object o)
        {
            try
            {
                T item;

                while (true)
                {
                    lock (lockObj)
                    {
                        if (queue.Count > 0)
                        {
                            item = queue.Dequeue();
                        }
                        else
                        {
                            return;
                        }
                    }
                    if (!item.Equals(default(T)))
                    {

                        if (isOneThread)
                        {
                            if (UserWork != null)
                            {
                                UserWork(this, new EnqueueEventArgs(item));
                            }
                        }
                        else
                        {
                            ThreadPool.QueueUserWorkItem(obj =>
                                                             {
                                                                 if (UserWork != null)
                                                                 {
                                                                     UserWork(this, new EnqueueEventArgs(obj));
                                                                 }
                                                             }, item);
                        }
                        

                    }

                }
            }
            finally
            {
                lock (lockIsWorking)
                {
                    IsWorking = false;
                }

            }
        }

        /// <summary>
        /// UserWork事件的參數,包含item對象
        /// </summary>
        public class EnqueueEventArgs : EventArgs
        {
            public T Item { get; private set; }
            public EnqueueEventArgs(object item)
            {
                try
                {
                    Item = (T)item;
                }
                catch (Exception)
                {

                    throw new InvalidCastException("object to T 轉換失敗");
                }
            }
        }
    }
}
代碼

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.IO;
using WorkQueue;
namespace Program
{
    class Program
    {
        private static List<string> list=new List<string>(1000);
        static  StreamWriter sw = new StreamWriter(new FileStream("test.dat", FileMode.Create));
        static void Main(string[] args)
        {
            WorkQueue<int> workQueue=new WorkQueue<int>(1000);
            workQueue.UserWork += new UserWorkEventHandler<int>(workQueue_UserWork);
           // workQueue.WorkSequential = true;
            ThreadPool.QueueUserWorkItem(o =>
                                             {
                                                 for (int i = 0; i < 1000; i++)
                                                 {
                                                     workQueue.EnqueueItem(i);

                                                 }
                                             });
            Console.ReadLine();

            list.ForEach(str=>sw.WriteLine(str));
            Console.WriteLine(workQueue.IsEmpty());
            sw.Close();
        }

        static void workQueue_UserWork(object sender, WorkQueue<int>.EnqueueEventArgs e)
        {
           
            StringBuilder sb=new StringBuilder();
            sb.Append(e.Item).Append("\t\t").Append(DateTime.Now.ToString("u")+"\t\t").Append(Thread.CurrentThread.ManagedThreadId);
            list.Add(sb.ToString());
            Thread.Sleep(15);
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM