基於異步隊列的生產者消費者C#並發設計


繼上文<<基於阻塞隊列的生產者消費者C#並發設計>>的並發隊列版本的並發設計,原文code是基於<<.Net中的並行編程-4.實現高性能異步隊列>>修改過來的,前面的幾篇文章也詳細介紹了並發實現的其它方案及實現。直接給code:

public class MyAsyncQueue<T>
    {
        //隊列是否正在處理數據
        private int isProcessing;
        //有線程正在處理數據
        private const int Processing = 1;
        //沒有線程處理數據
        private const int UnProcessing = 0;
        //隊列是否可用 單線程下用while來判斷,多線程下用if來判斷,隨后用while來循環隊列的數量
        private volatile bool enabled = true;
        // 消費者線程
        private Task currentTask;
        // 消費者線程處理事件
        public event Action<T> ProcessItemFunction;
        //
        public event EventHandler<EventArgs<Exception>> ProcessException;
        // 並發隊列
        private ConcurrentQueue<T> queue;
        // 消費者的數量
        private int _internalTaskCount;
        // 存儲消費者隊列
        List<Task> tasks = new List<Task>();

        public MyAsyncQueue()
        {
            _internalTaskCount = 3;
            queue = new ConcurrentQueue<T>();
            Start();
        }

        public int Count
        {
            get
            {
                return queue.Count;
            }
        }
        // 開啟監聽線程
        private void Start()
        {
            Thread process_Thread = new Thread(PorcessItem);
            process_Thread.IsBackground = true;
            process_Thread.Start();
        }

        // 生產者生產
        public void Enqueue(T items)
        {
            if (items == null)
            {
                throw new ArgumentException("items");
            }

            queue.Enqueue(items);
            DataAdded();
        }

        //數據添加完成后通知消費者線程處理
        private void DataAdded()
        {
            if (enabled)
            {
                if (!IsProcessingItem())
                {
                    // 開啟消費者消費隊列
                    ProcessRangeItem();
                }
            }
        }

        //判斷是否隊列有線程正在處理 
        private bool IsProcessingItem()
        {
            return !(Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0);
        }

        private void ProcessRangeItem()
        {
            for(int i=0; i< _internalTaskCount; i++)
            {
                currentTask = Task.Factory.StartNew(() => ProcessItemLoop());
                tasks.Add(currentTask);
            }
        }
        // 消費者處理事件
        private void ProcessItemLoop()
        {
            Console.WriteLine("正在執行的Task的Id: {0}", Task.CurrentId);
            // 隊列為空,並且隊列不可用
            if (!enabled && queue.IsEmpty)
            {
                Interlocked.Exchange(ref isProcessing, 0);
                return;
            }
            //處理的線程數 是否小於當前最大任務數
            //if (Thread.VolatileRead(ref runingCore) <= this.MaxTaskCount)
            //{
            T publishFrame;

            while(enabled)
            {
                if (queue.TryDequeue(out publishFrame))
                {
                    try
                    {
                        // 消費者處理事件
                        ProcessItemFunction(publishFrame);
                    }
                    catch (Exception ex)
                    {
                        OnProcessException(ex);
                    }
                }
                else
                {
                    Console.WriteLine("線程Id{0}取隊列失敗,跳出循環", Task.CurrentId);
                    break;
                }
            }
        }

        /// <summary>
        ///定時處理線程調用函數  
        ///主要是監視入隊的時候線程 沒有來的及處理的情況
        /// </summary>
        private void PorcessItem(object state)
        {
            int sleepCount = 0;
            int sleepTime = 1000;
            while (enabled)
            {
                //如果隊列為空則根據循環的次數確定睡眠的時間
                if (queue.IsEmpty)
                {
                    // Task消費者消費完了隊列中的數據....注銷掉消費者線程
                    if(tasks.Count==_internalTaskCount)
                    {
                        Flush();
                    }
                    if (sleepCount == 0)
                    {
                        sleepTime = 1000;
                    }
                    else if (sleepCount <= 3)
                    {
                        sleepTime = 1000 * 3;
                    }
                    else
                    {
                        sleepTime = 1000 * 50;
                    }
                    sleepCount++;
                    Thread.Sleep(sleepTime);
                }
                else
                {
                    //判斷是否隊列有線程正在處理 
                    if (enabled && Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0)
                    {
                        if (!queue.IsEmpty)
                        {
                            currentTask = Task.Factory.StartNew(ProcessItemLoop);
                            tasks.Add(currentTask);
                        }
                        else
                        {
                            //隊列為空,已經取完了
                            Interlocked.Exchange(ref isProcessing, 0);
                        }
                        sleepCount = 0;
                        sleepTime = 1000;
                    }
                }
            }
        }

        //更新並關閉消費者
        public void Flush()
        {
            Stop();
            foreach(var t in tasks)
            {
                if (t != null)
                {
                    t.Wait();
                    Console.WriteLine("Task已經完成");
                }
            }

            // 消費者未消費完
            while (!queue.IsEmpty)
            {
                try
                {
                    T publishFrame;
                    if (queue.TryDequeue(out publishFrame))
                    {
                        ProcessItemFunction(publishFrame);
                    }
                }
                catch (Exception ex)
                {
                    OnProcessException(ex);
                }
            }
            currentTask = null;
            tasks.Clear();
        }

        public void Stop()
        {
            this.enabled = false;
        }

        private void OnProcessException(System.Exception ex)
        {
            var tempException = ProcessException;
            Interlocked.CompareExchange(ref ProcessException, null, null);

            if (tempException != null)
            {
                ProcessException(ex, new EventArgs<Exception>(ex));
            }
        }
}

調用code:

class ComInfo
    {
        public int ComId { get; set; }

        public DateTime Date { get; set; }
    }
    class Program
    {
        static MyAsyncQueue<ComInfo> queue = new MyAsyncQueue<ComInfo>();
        static void Main(string[] args)
        {
            Console.WriteLine("開始======");
            queue.ProcessItemFunction += A;
            queue.ProcessException += C; //new EventHandler<EventArgs<Exception>>(C);

            ComInfo info = new ComInfo();

            for (int i = 1; i < 50; i++)
            {
                Task.Factory.StartNew((param) =>
                {
                    info = new ComInfo();
                    info.ComId = int.Parse(param.ToString());
                    info.Date = DateTime.Now.Date;
                    queue.Enqueue(info);
                }, i);
            }

            Console.WriteLine("結束======");
            
            Console.ReadKey();
        }

        static void A(ComInfo info)
        {
            Console.WriteLine(info.ComId + "====" + queue.Count);
        }

        static void C(object ex, EventArgs<Exception> args)
        {
            Console.WriteLine("出錯了");
        }
    }

並發系列應該就這樣完了,回頭整理成目錄,自己查起來也方便


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM