繼上文<<基於阻塞隊列的生產者消費者C#並發設計>>的並發隊列版本的並發設計,原文code是基於<<.Net中的並行編程-4.實現高性能異步隊列>>修改過來的,前面的幾篇文章也詳細介紹了並發實現的其它方案及實現。直接給code:
public class MyAsyncQueue<T> { //隊列是否正在處理數據 private int isProcessing; //有線程正在處理數據 private const int Processing = 1; //沒有線程處理數據 private const int UnProcessing = 0; //隊列是否可用 單線程下用while來判斷,多線程下用if來判斷,隨后用while來循環隊列的數量 private volatile bool enabled = true; // 消費者線程 private Task currentTask; // 消費者線程處理事件 public event Action<T> ProcessItemFunction; // public event EventHandler<EventArgs<Exception>> ProcessException; // 並發隊列 private ConcurrentQueue<T> queue; // 消費者的數量 private int _internalTaskCount; // 存儲消費者隊列 List<Task> tasks = new List<Task>(); public MyAsyncQueue() { _internalTaskCount = 3; queue = new ConcurrentQueue<T>(); Start(); } public int Count { get { return queue.Count; } } // 開啟監聽線程 private void Start() { Thread process_Thread = new Thread(PorcessItem); process_Thread.IsBackground = true; process_Thread.Start(); } // 生產者生產 public void Enqueue(T items) { if (items == null) { throw new ArgumentException("items"); } queue.Enqueue(items); DataAdded(); } //數據添加完成后通知消費者線程處理 private void DataAdded() { if (enabled) { if (!IsProcessingItem()) { // 開啟消費者消費隊列 ProcessRangeItem(); } } } //判斷是否隊列有線程正在處理 private bool IsProcessingItem() { return !(Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0); } private void ProcessRangeItem() { for(int i=0; i< _internalTaskCount; i++) { currentTask = Task.Factory.StartNew(() => ProcessItemLoop()); tasks.Add(currentTask); } } // 消費者處理事件 private void ProcessItemLoop() { Console.WriteLine("正在執行的Task的Id: {0}", Task.CurrentId); // 隊列為空,並且隊列不可用 if (!enabled && queue.IsEmpty) { Interlocked.Exchange(ref isProcessing, 0); return; } //處理的線程數 是否小於當前最大任務數 //if (Thread.VolatileRead(ref runingCore) <= this.MaxTaskCount) //{ T publishFrame; while(enabled) { if (queue.TryDequeue(out publishFrame)) { try { // 消費者處理事件 ProcessItemFunction(publishFrame); } catch (Exception ex) { OnProcessException(ex); } } else { Console.WriteLine("線程Id{0}取隊列失敗,跳出循環", Task.CurrentId); break; } } } /// <summary> ///定時處理線程調用函數 ///主要是監視入隊的時候線程 沒有來的及處理的情況 /// </summary> private void PorcessItem(object state) { int sleepCount = 0; int sleepTime = 1000; while (enabled) { //如果隊列為空則根據循環的次數確定睡眠的時間 if (queue.IsEmpty) { // Task消費者消費完了隊列中的數據....注銷掉消費者線程 if(tasks.Count==_internalTaskCount) { Flush(); } if (sleepCount == 0) { sleepTime = 1000; } else if (sleepCount <= 3) { sleepTime = 1000 * 3; } else { sleepTime = 1000 * 50; } sleepCount++; Thread.Sleep(sleepTime); } else { //判斷是否隊列有線程正在處理 if (enabled && Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0) { if (!queue.IsEmpty) { currentTask = Task.Factory.StartNew(ProcessItemLoop); tasks.Add(currentTask); } else { //隊列為空,已經取完了 Interlocked.Exchange(ref isProcessing, 0); } sleepCount = 0; sleepTime = 1000; } } } } //更新並關閉消費者 public void Flush() { Stop(); foreach(var t in tasks) { if (t != null) { t.Wait(); Console.WriteLine("Task已經完成"); } } // 消費者未消費完 while (!queue.IsEmpty) { try { T publishFrame; if (queue.TryDequeue(out publishFrame)) { ProcessItemFunction(publishFrame); } } catch (Exception ex) { OnProcessException(ex); } } currentTask = null; tasks.Clear(); } public void Stop() { this.enabled = false; } private void OnProcessException(System.Exception ex) { var tempException = ProcessException; Interlocked.CompareExchange(ref ProcessException, null, null); if (tempException != null) { ProcessException(ex, new EventArgs<Exception>(ex)); } } }
調用code:
class ComInfo { public int ComId { get; set; } public DateTime Date { get; set; } } class Program { static MyAsyncQueue<ComInfo> queue = new MyAsyncQueue<ComInfo>(); static void Main(string[] args) { Console.WriteLine("開始======"); queue.ProcessItemFunction += A; queue.ProcessException += C; //new EventHandler<EventArgs<Exception>>(C); ComInfo info = new ComInfo(); for (int i = 1; i < 50; i++) { Task.Factory.StartNew((param) => { info = new ComInfo(); info.ComId = int.Parse(param.ToString()); info.Date = DateTime.Now.Date; queue.Enqueue(info); }, i); } Console.WriteLine("結束======"); Console.ReadKey(); } static void A(ComInfo info) { Console.WriteLine(info.ComId + "====" + queue.Count); } static void C(object ex, EventArgs<Exception> args) { Console.WriteLine("出錯了"); } }
並發系列應該就這樣完了,回頭整理成目錄,自己查起來也方便