生產消費模式:多線程讀寫隊列ConcurrentQueue


需求:現需要將多個數據源的數據導入到目標數據庫,這是一個經典的生產消費應用的例子。

直接上代碼,看下實現:

            // 初始化列隊緩沖區 隊列大小為100
            IDataCollection<List<T>> queue = new QueueCollection<List<T>>(100);

            //開啟X個后台任務,讀取RabbitMQ隊列信息, 把列隊信息插入緩沖區隊列
            var count = 1;
            for (int i = 0; i < count; i++)
            {
                Task.Factory.StartNew(() => new Producer<List<T>>(queue).Start(new RabbitSource<List<T>>().Get));
            }

            //開啟X個后台任務,主動獲取數據庫數據,作為數據生產者,插入到緩沖區隊列,
            for (int i = 0; i < count; i++)
            {
                Task.Factory.StartNew(() => new Producer<List<T>>(queue).Start(new DatabaseSource<List<T>>().Get));
            }

            //開啟X個后台任務,主動獲取讀取緩沖區列隊,作為數據消息者,把數據插入到ES庫,
            for (int i = 0; i < count; i++)
            {
                Task.Factory.StartNew(() => new Customer<List<T>>(queue).Start(new Elastic().Insert));
            }

隊列我們采用線程安全的ConcurrentQueue隊列:

/// <summary>
    /// 緩沖區隊列
    /// ConcurrentQueue線程安全,不用考慮鎖的問題
    /// </summary>
    public class QueueCollection<T> :IDataCollection<T>
    {
        //隊列最大值
        private int _maxSize;

        /// <summary>
        /// 線程安全的隊列
        /// </summary>
        private ConcurrentQueue<T> _queue;

        public QueueCollection(int maxSize)
        {
            this._maxSize = maxSize;
            _queue = new ConcurrentQueue<T>();
        }
        
        public bool isPopWaiting()
        {
            return !_queue.Any();
        }

        public bool isPushWaiting()
        {
            return this._maxSize == _queue.Count;
        }
        
        public T Pop()
        {
            T _obj = default(T);
            if (!_queue.IsEmpty)
                _queue.TryDequeue(out _obj);

            return _obj;
        }

        public void Push(T t)
        {
            if (this._maxSize > _queue.Count)
            {
                _queue.Enqueue(t);
            }
        }
    }

如果我們不使用這個隊列,只要滿足IDataCollection接口,也可以進行替換:

public interface IDataCollection<T>
    {
        /// <summary>
        /// 插入數據 
        /// </summary>
        /// <param name="t"></param>
        void Push(T t);

        /// <summary>
        /// 取出數據
        /// </summary>
        /// <returns></returns>
        T Pop();

        /// <summary>
        /// 是否插入數據等待
        /// </summary>
        /// <returns></returns>
        bool isPushWaiting();

        /// <summary>
        /// 是否取出數據等待
        /// </summary>
        /// <returns></returns>
        bool isPopWaiting();
        
    }

生產者:

 public class Producer<T> : ITransientDependency
    {
        private int sleep;

        private IDataCollection<T> bufferQueue;

        public Producer(IDataCollection<T> queue)
        {
            sleep = 3000;
            bufferQueue = queue;
        }

        public void Start(Action<Action<T>> methodCall)
        {
            //入隊
            methodCall((bills) => 
            {
                this.Enqueue(bills);
            });
        }

        private void Enqueue(T t)
        {
            var isWaiting = true;

            while (isWaiting)
            {
                if (!bufferQueue.isPushWaiting())
                {
                    this.bufferQueue.Push(t);
                    isWaiting = false;
                }
                else
                {
                    //生產者等待時間
                    Thread.Sleep(sleep);
                }
            }
        }
    }

消費者:

/// <summary>
    /// 消費者
    /// </summary>
    public class Customer<T>
    {
        //產品緩存隊列
        private IDataCollection<T> _queue;
        
        //消費者等待時間
        private int Spead = 5000;//消費者等待時間

        public Customer(IDataCollection<T> queue)
        {
            this._queue = queue;
        }

        public void Start(Action<T> method)
        {
            while (true)
            {
                if (!_queue.isPopWaiting())
                {
                    T box = this._queue.Pop();

                    method(box);
                }
                else
                {
                    Thread.Sleep(Spead);
                }
            }
        }
    }

方法委托,也寫了個基類,其實意義並不大,只是為了規范, 防止方法命名隨意起。

    public interface IDataSource<T>
    {
        void Get(Action<T> func);
    }

最后,在DataSource的get方法中,調用 func即可。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM