上文《.Net中的並行編程-3.ConcurrentQueue實現與分析》分析了ConcurrentQueue的實現,本章就基於ConcurrentQueue實現一個高性能的異步隊列,該隊列主要用於實時數據流的處理並簡化多線程編程模型。設計該隊列時考慮以下幾點需求(需求來自公司的一個實際項目):
1. 支持多線程入隊出隊,盡量簡化多線程編程的復雜度。
2. 支持事件觸發機制,數據入隊時才進行處理而不是使用定時處理機制, 而且內部能阻塞消費者線程。
3. 出隊時數據處理的順序要保證和入隊時是一致的。
4. 容錯性強,可以不間斷運行。
以上需求點對應的解決方案:
1.ConcurrentQueue支持多線程而且多線程環境下的性能較高,對於多線程編程模型簡化可用適配器模式可將消費者線程封裝到隊列內部,內部采用處理事件方式處理用戶的任務。
2.對於事件觸發機制首先信號量不適合,因為信號量達到指定數目時會阻塞線程,所以該部分需要自己編程實現(具體參考源碼)。
3.隊列的特性以及保證入隊和出隊順序,這里需要保證的是線程處理數據項的順序。
4.可通過注冊異常處理函數的方式解決異常的問題。
所以開發出以下代碼:
public class AsynQueue<T> { //隊列是否正在處理數據 private int isProcessing; //有線程正在處理數據 private const int Processing = 1; //沒有線程處理數據 private const int UnProcessing = 0; //隊列是否可用 private volatile bool enabled = true; private Task currentTask; public event Action<T> ProcessItemFunction; public event EventHandler<EventArgs<Exception>> ProcessException; private ConcurrentQueue<T> queue; public AsynQueue() { queue = new ConcurrentQueue<T>(); Start(); } public int Count { get { return queue.Count; } } private void Start() { Thread process_Thread = new Thread(PorcessItem); process_Thread.IsBackground = true; process_Thread.Start(); } public void Enqueue(T items) { if (items == null) { throw new ArgumentException("items"); } queue.Enqueue(items); DataAdded(); } //數據添加完成后通知消費者線程處理 private void DataAdded() { if (enabled) { if (!IsProcessingItem()) { currentTask = Task.Factory.StartNew(ProcessItemLoop); } } } //判斷是否隊列有線程正在處理 private bool IsProcessingItem() { return !(Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0); } private void ProcessItemLoop() { if (!enabled && queue.IsEmpty) { Interlocked.Exchange(ref isProcessing, 0); return; } //處理的線程數 是否小於當前最大任務數 //if (Thread.VolatileRead(ref runingCore) <= this.MaxTaskCount) //{ T publishFrame; if (queue.TryDequeue(out publishFrame)) { try { ProcessItemFunction(publishFrame); } catch (Exception ex) { OnProcessException(ex); } } if (enabled && !queue.IsEmpty) { currentTask = Task.Factory.StartNew(ProcessItemLoop); } else { Interlocked.Exchange(ref isProcessing, UnProcessing); } } /// <summary> ///定時處理線程調用函數 ///主要是監視入隊的時候線程 沒有來的及處理的情況 /// </summary> private void PorcessItem(object state) { int sleepCount = 0; int sleepTime = 1000; while (enabled) { //如果隊列為空則根據循環的次數確定睡眠的時間 if (queue.IsEmpty) { if (sleepCount == 0) { sleepTime = 1000; } else if (sleepCount <= 3) { sleepTime = 1000 * 3; } else { sleepTime = 1000 * 50; } sleepCount++; Thread.Sleep(sleepTime); } else { //判斷是否隊列有線程正在處理 if (enabled && Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0) { if (!queue.IsEmpty) { currentTask = Task.Factory.StartNew(ProcessItemLoop); } else { Interlocked.Exchange(ref isProcessing, 0); } sleepCount = 0; sleepTime = 1000; } } } } public void Flsuh() { Stop(); if (currentTask != null) { currentTask.Wait(); } while (!queue.IsEmpty) { try { T publishFrame; if (queue.TryDequeue(out publishFrame)) { ProcessItemFunction(publishFrame); } } catch (Exception ex) { OnProcessException(ex); } } currentTask = null; } public void Stop() { this.enabled = false; } private void OnProcessException(System.Exception ex) { var tempException = ProcessException; Interlocked.CompareExchange(ref ProcessException, null, null); if (tempException != null) { ProcessException(ex, new EventArgs<Exception>(ex)); } } }
[Serializable] public class EventArgs<T> : System.EventArgs { public T Argument; public EventArgs() : this(default(T)) { } public EventArgs(T argument) { Argument = argument; } }
該隊列的思想是:當每次數據入隊時,隊列內部會調用DataAdded()判斷是否數據項已經開始被處理,如果已經開始處理則數據入到內部隊列后直接返回否則開啟消費者線程處理。隊列內部的消費者線程(線程池)(Task內部使用線程池實現,這里就當做線程池吧)會采用采用遞歸的方式處理數據,也就是當前數據處理完成后再將另外一個數據放到線程池去處理,這樣就形成一個處理環而且保證了每條數據都有序的進行處理。由於ConcurrentQueue的IsEmpty只是當前內存的一個快照狀態,可能當前時刻為空下一個時候不為空, 所以還需要一個守護線程process_Thread定時監視隊列內部的消費者線程(線程池)是否正在處理數據,否則會造成消費者線程已經判斷隊列為空而數據已經到達只是還沒插入隊列此時數據可能永遠得不到處理。
適用的場景:
1.適合多個生產者一個消費者的情景(當前如果需要多個消費者可以使用多個單獨線程來實現)。
2.適合處理數據速度較快的情景而對於文件寫入等IO操作不適合,因為線程池內部都是后台線程,當進程關閉時線程會同時關閉線程這時文件可能還沒寫入到磁盤。
3.適合作為流水線處理模型的基礎數據結構,隊列之間通過各自的事件處理函數進行通信(后續會專門撰寫文章介紹關於流水線模型的應用)。
注:內部的ConcurrentQueue隊列還可以使用阻塞隊列(BlockingCollection)來替代,雖然使用阻塞隊列更簡單但是內部的消費者線程比較適合使用單獨的線程不適合使用線程池,而且阻塞隊列為空時會阻塞消費者線程,當然阻塞線程池內的線程也沒什么影響只是不推薦這么做,而且阻塞的隊列的性能也沒有ConcurrentQueue的性能高。
