多線程添加元素到隊列中,隊列根據綁定
的事件進行自動處理,可以設置WorkSequential屬性來實現對隊列處理的單線程(嚴格順序處理)或者多線程處理(循序出隊,但是
多線程處理,不保證對隊列元素的處理順利)的選擇。
另外,這段程序不能輸出0,所以,最后的結果是999行,不包含0,原因是if (!item.Equals(default(T))),而default(T)恰恰是0.
代碼 /***********多線程的工作隊列*************** * 此工作隊列保證線程安全性 * * * * * *******/ namespace WorkQueue { using System.Collections.Generic; using System; using System.Threading; public delegate void UserWorkEventHandler<T>(object sender, WorkQueue<T>.EnqueueEventArgs e); public class WorkQueue<T> { private bool IsWorking; //表明處理線程是否正在工作 private object lockIsWorking = new object();//對IsWorking的同步對象 private Queue<T> queue; //實際的隊列 private object lockObj = new object(); //隊列同步對象 /// <summary> /// 綁定用戶需要對隊列中的item對象 /// 施加的操作的事件 /// </summary> public event UserWorkEventHandler<T> UserWork; public WorkQueue(int n) { queue = new Queue<T>(n); } public WorkQueue() { queue = new Queue<T>(); } /// <summary> /// 謹慎使用此函數, /// 只保證此瞬間,隊列值為空 /// </summary> /// <returns></returns> public bool IsEmpty() { lock (lockObj) { return queue.Count == 0; } } private bool isOneThread; /// <summary> /// 隊列處理是否需要單線程順序執行 /// ture表示單線程處理隊列的T對象 /// 默認為false,表明按照順序出隊,但是多線程處理item /// *****注意不要頻繁改變此項**** /// </summary> public bool WorkSequential { get { return isOneThread; } set { isOneThread = value; } } /// <summary> /// 向工作隊列添加對象, /// 對象添加以后,如果已經綁定工作的事件 /// 會觸發事件處理程序,對item對象進行處理 /// </summary> /// <param name="item">添加到隊列的對象</param> public void EnqueueItem(T item) { lock (lockObj) { queue.Enqueue(item); } lock (lockIsWorking) { if (!IsWorking) { IsWorking = true; ThreadPool.QueueUserWorkItem(doUserWork); } } } /// <summary> /// 處理隊列中對象的函數 /// </summary> /// <param name="o"></param> private void doUserWork(object o) { try { T item; while (true) { lock (lockObj) { if (queue.Count > 0) { item = queue.Dequeue(); } else { return; } } if (!item.Equals(default(T))) { if (isOneThread) { if (UserWork != null) { UserWork(this, new EnqueueEventArgs(item)); } } else { ThreadPool.QueueUserWorkItem(obj => { if (UserWork != null) { UserWork(this, new EnqueueEventArgs(obj)); } }, item); } } } } finally { lock (lockIsWorking) { IsWorking = false; } } } /// <summary> /// UserWork事件的參數,包含item對象 /// </summary> public class EnqueueEventArgs : EventArgs { public T Item { get; private set; } public EnqueueEventArgs(object item) { try { Item = (T)item; } catch (Exception) { throw new InvalidCastException("object to T 轉換失敗"); } } } } }
代碼 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.IO; using WorkQueue; namespace Program { class Program { private static List<string> list=new List<string>(1000); static StreamWriter sw = new StreamWriter(new FileStream("test.dat", FileMode.Create)); static void Main(string[] args) { WorkQueue<int> workQueue=new WorkQueue<int>(1000); workQueue.UserWork += new UserWorkEventHandler<int>(workQueue_UserWork); // workQueue.WorkSequential = true; ThreadPool.QueueUserWorkItem(o => { for (int i = 0; i < 1000; i++) { workQueue.EnqueueItem(i); } }); Console.ReadLine(); list.ForEach(str=>sw.WriteLine(str)); Console.WriteLine(workQueue.IsEmpty()); sw.Close(); } static void workQueue_UserWork(object sender, WorkQueue<int>.EnqueueEventArgs e) { StringBuilder sb=new StringBuilder(); sb.Append(e.Item).Append("\t\t").Append(DateTime.Now.ToString("u")+"\t\t").Append(Thread.CurrentThread.ManagedThreadId); list.Add(sb.ToString()); Thread.Sleep(15); } } }