c# 生成者和消費者模式


 

概念:

  • 某個模塊負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、線程、進程等)。
  • 產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。
  • 單單抽象出生產者和消費者,還夠不上是生產者/消費者模式。
  • 該模式還需要有一個緩沖區處於生產者和消費者之間,作為一個中介。
  • 生產者把數據放入緩沖區,而消費者從緩沖區取出數據。

 

 大概有三個特點:

 1.解耦

 2.支持並發

 3.

 

這里我提供了多個版本的模式;

基本模式;

1.生產者同步生產msg,進入如queene中,消費者同步從隊列中取出數據,然后進行同步消費(這里主線程同步生產msg,一個子線程負責消費msg)

主線程負責往我們的qunen中添加msg,沒添加一個就釋放一個信號,讓子線程去消費;

首先啟動的是我們的子線程的狀態;如果檢車到隊列為空,就等待,;

應該可以這么說,子線程先啟動,出去等待信號的狀態;

子線程在不斷的while(true) { ........ }

 

線程的同步是,指有序的去訪問公共變量;每次只能一個線程去訪問;獲取鎖和是方法鎖,必須是當前的同一個線程;

線程之間的互相通信,之間的協同操作,則是通過信號變量的;主線程負責釋放開始的信號,子線程接受到信號之后,開始進行運動,

線程之間的通信,則用的是EventWaitHandle

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication38
{

    /// <summary>
    /// 生產者  消息  消費者;  這個三個應該是相互獨立的對象;
    /// 在這里例子中,我們只有一個消費者; 后面我們可以將起改造成多個線程去消費我們的任務;
    /// 去處理其中的任務;
    /// 當隊列中有的數據之后,我們要想法通知我們的消費者;
    /// 還要去看更多的實例;
    /// 看別人的各種設計想法和思路;
    /// To ensure thread safety, we used a lock to protect access to the Queue<string> collection.
    /// 
    /// 這個生成這的消費者模式;produce 和 consume 並不是同步的,也就是生產一個 消費一個
    /// 生產者由主線程不斷的生成,然后加入到隊列中, 消費者一個個的從隊列中去取出數據,進行消費;
    /// 實現方式有多中滴呀;
    /// 線程的進入是同步的,    線程的消費也是同步的,而且只有一個線程;
    /// 如何做到多個線程異步生成  然后進入隊列, 然后多個線程異步消費我們的額隊列呢;
    /// </summary>

    public class ProducerConsumerQueue : IDisposable
    {

        EventWaitHandle _wh = new AutoResetEvent(false);
        Thread _worker;
        readonly object _locker = new object();     //這個鎖,是用來同步讀和寫的;集合的讀和寫的;
        Queue<string> _tasks = new Queue<string>();     //一旦隊列有了消息就要通知我們的線程去消費;


        /// <summary>
        /// 讓線程跑起來
        /// </summary>
        public ProducerConsumerQueue()
        {
            _worker = new Thread(Work);
            _worker.Start();
        }

        /// <summary>
        /// 往線程中添加任務;一旦有了任務,就通知我們的消費者;
        /// 這里例子有很多問題滴呀;
        /// </summary>
        /// <param name="task"></param>
        public void EnqueueTask(string task)
        {
            lock (_locker) { _tasks.Enqueue(task); }
            _wh.Set();
        }

        public void Work()
        {
            while (true)  //相當於就是在不斷的去輪詢我們的隊列中的信息;讓這個線程一直處於執行的狀態,應為只有一個線程,不能讓它執行一完一次任務只有就停止了
            {
                string task = null;
                lock (_locker)   //這個鎖是用來同步讀和寫的;
                {
                    if (_tasks.Count > 0) //隊列中有數據,我們就從中取出數據;
                    {
                        task = _tasks.Dequeue();// 取出任務;
                        if (task == null) return;//任務為空就停止了;
                    }
                }
                if (task != null)
                {
                    //就取執行我們的任務;
                    Console.WriteLine("始終只有一個消費者:Task:{0} ThreadID{1} and now Quen length:{2}", task, Thread.CurrentThread.ManagedThreadId, _tasks.Count);
                    Thread.Sleep(1000);

                }
                else
                {
                    _wh.WaitOne(); //隊列為空就等待
                   
                }

           } 
        }

        public void Dispose()
        {
            EnqueueTask(null);     // Signal the consumer to exit.
            _worker.Join();         // Wait for the consumer's thread to finish.
            _wh.Close();            // Release any OS resources.
            Console.WriteLine("對象銷毀完畢...");

        }

    }


    class Program
    {

        static void Test()
        {
            //主線程在往我們的隊列中添加消息。相當於就是我們的生產者; 往我們的隊列中添加消息;
            using (ProducerConsumerQueue pc = new ProducerConsumerQueue())
            {
                pc.EnqueueTask("Hello");
                for (int i = 0; i < 10; i++) pc.EnqueueTask("Say " + i);
                pc.EnqueueTask("Goodbye!");
            }
        }

        static void Main(string[] args)
        {
            Test();
            Console.ReadLine();


        }
    }
}

 

 

這幾種的設計思路:都是開辟一個線程,執行while(true){} 的循環,讓線程一直處於run的狀態,然后去poll(輪詢我們的quen;) 這個去qunen

實現方式二:主線push, 三個或者多個線程pull,前提還是要將我們的子線程先run起來,然后處於等待狀態,等待push進queue,然后進一個,釋放一個信號,然后子線程

消費一個;

這里就是我們的示例代碼:

 首先這里涉及到一個Montior.wait 和 Monitor.Pluse 的基本用法:https://www.codeproject.com/Articles/28785/Thread-synchronization-Wait-and-Pulse-demystified

 

這里有一個很好的栗子,倆解釋我們的monitor.wait  and monitor.pulse;

    //there is a good example monitor wait  and monitor pualse;
    class Program
    {
        private static readonly object _phone = new object();

        static void DoWork()
        {

            Console.WriteLine("do some work...");
        }

        static void Worker()
        {
            lock (_phone)
            {
                while (true)
                {
                    Monitor.Wait(_phone);  // Wait for a signal from the boss
                    DoWork();
                    Monitor.PulseAll(_phone); // Signal boss we are done

                }
            }

        }

        static void Boss()
        {
            lock (_phone) // Grab the phone when I have something ready for the worker
            {
                Monitor.PulseAll(_phone); // Signal worker there is work to do
                Monitor.Wait(_phone); // Wait for the work to be done
            }
        }

        static void Main(string[] args)
        {
            Worker();
            Thread.Sleep(100); //首先得讓我們的worker 處於待命的狀態;
            Boss(); //然后我們的boss 上場;

        }
    }

然后我們看到上面的栗子,得讓我們的worker先處於wait的狀態,然后我們boss的pulse才有效的滴呀; 則就是我們的Thread.sleep(100) 的目的;那么,我們有沒有更好餓辦法呢;

然后........

略微復雜版本的控制;

        //這里同樣可以添加一個復雜版本的call
        public void SmartWorker()
        {
            lock (_phone)
            {
                while (true)
                {
                    if (Monitor.Wait(_phone, 1000)) // Wait for one second at most
                    {
                        DoWork();
                        Monitor.PulseAll(_phone); // Signal boss we are done
                    }
                    else
                    {
                        // do something else.....
                    }
                }
            }
        }
        /// <summary>
        /// 沒喲耐性的老板 窩草
        /// </summary>
        public static void Impatient_Boss()
        {
            lock (_phone)
            {
                Monitor.PulseAll(_phone); // Signal worker there is work to do
                if (Monitor.Wait(_phone, 1000)) // Wait for one second at most
                    Console.WriteLine("Good work!");
            }
        }

 

到這里,你可能會發現,wait 和 pulse 都和我們的同一個_locker 相關聯滴呀;

 

中的來說,還是挺繞的一個概念;

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication43
{

    //這里的qune就不再裝得是消息了;
    //而是我們的action
    //使用的也是基於我們的 Monitor.wait(locker) 和 Monitor.Pulse(locker) 方法進行同步滴呀;
   //先讓線程進入等待的狀態,然后有信息(action) 進入隊列的時候,我們就釋放一個locker;然后另外一段就獲取locker 進行消費;
   //直接將要執行的任務封裝成了我們的action,
   //要懂得區分幾個不同的概念:同步 異步  並行 並發  這個幾個基本的概念;
   //這個例子是讓消費者處於運動額狀態,讓后去poll 隊列中的信息;


     public class PCQueue
    {
        Thread[] _threads;
        Queue<Action> _itemQ = new Queue<Action>();
        private readonly object _locker = new object();
        //初始化線程,讓其進行執行;

        public  PCQueue(int count)
        {
            _threads = new Thread[count]; //先盡心初始化;

            for(var i = 0; i < count; i++)
            {
               ( _threads[i] = new Thread(Consume)).Start();  //初始化一定數量的工作者線程;
            }
        }

        public void ShutDown(bool waitForWorkers)
        {
            //是否等待線程執行完再關閉呢; 退出線程,不讓線程一直在哪里pool 一個永遠為空的 隊列,我們就先進行 
            //Enqueue one null item per worker to make each exit
            foreach (var thread in _threads)
            {
                EnqueueItem(null);
                
            }
            if (waitForWorkers)
            {
                foreach (Thread thread in _threads)
                {
                    thread.Join();  //這個主要是為了堵塞主線程,讓其進行等待;
                }
            }
                    
        }

        public void EnqueueItem(Action  action)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(action); //一旦進入了隊列之后;
                Monitor.Pulse(_locker); //重新激活; 
            }
            
        }

        private void Consume()
        {
            //讓所有的線程進行等待的狀態;
            //消費的方法,就不斷沖隊列中取出msg 然后進行各種消費低啊;
            //一開始就要讓我們的線程給跑起來;
            while (true)   
            {
                Action item;
                lock (_locker) //鎖住取數據的步驟; 這里會造成堵塞; 線程序列化的訪問;
                {
                    //去輪詢消息隊列;
                    while (_itemQ.Count == 0)
                    {
                        Monitor.Wait(_locker); //隊列為空,讓出locker; 等待激活后重新loop condtion  讓出這個鎖,造成當前線程的堵塞;
                       //堵塞當前錢線程並且暫時四方鎖,一直到其他線程pulse通知 
                       //釋放鎖之后,還是當前線程獲取鎖,
                       //三個線程都處於等待狀態;
                    }
                    item = _itemQ.Dequeue(); 
                }
                //取出來之后,進行讀取;
                if (item == null) { return; }    // This signals our exit 讓線程退租的信號;
                item();

            }
            
        }

    }
    class Program
    {
        static void Main(string[] args)
        {
            PCQueue p = new PCQueue(3); //tdd ,基於測試驅動開發,這是一種倒退的編程模式進行的開發; 嘗試着進行倒退額方式進行編程;
                                        //相當於初始化三個線程,在線程池中 處於運動的狀態
                                        //這里模擬,初始化三個線程去消費是個方法;?如何去停止這三個方法;

            //主線程push msg 幾年對壘;
            Console.WriteLine("Enqueuing 10 items...");
            for (int i = 0; i < 10; i++)
            {
                int itemNumber = i;
                p.EnqueueItem(()=>{
                    //這里就是我們消費者,需要進行消費的各種方法;
                    Thread.Sleep(1000);    //simulate time-consuming work
                    Console.Write(" Task" + itemNumber);
                });

            }

            p.ShutDown(true);  //結束完之后,我們可以調用這個方法;
            Console.WriteLine();
            Console.WriteLine("Workers complete!");

        }
    }
}

多線程,者壇子水,很深的哦,一步小心就搞錯了;

然后我們還有一種新的方法來實現這種模式;

c# 中當然給我們提供這樣的集合不用我們手動的去寫這么多的方法和實現的呀;下面這個栗子就是我們簡易版的實現;

值得注意的一點就是我們的構造函數參數傳遞的問題:

ecause we didn’t pass anything into BlockingCollection’s constructor, it instantiated a concurrent queue automatically. 

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication49
{

   public class PCQueue : IDisposable
    {

        //If you call the constructor without passing in a collection, the class will automatically instantiate a ConcurrentQueue<T>
        BlockingCollection<Action> _tasks = new BlockingCollection<Action>();

        public PCQueue(int workerCount)
        {
            for (int i = 0; i < workerCount; i++)
            {
                Task.Factory.StartNew(Consume);
            }
        }

        public void EnqueueTask(Action action)
        {
            _tasks.TryAdd(action);
        }

        private void Consume()
        {
            // This sequence that we’re enumerating will block when no elements
           // are available and will end when CompleteAdding is called.
            foreach (Action action in _tasks.GetConsumingEnumerable())
            {
                action();
            }

        }

        public void Dispose()
        {

            _tasks.CompleteAdding();  //標記不再接受任何的添加
        }

    }



    class Program
    {
        static void Main(string[] args)
        {

            //模擬一個快速生成這;

            //連個慢速消費者;
            PCQueue p = new PCQueue(2);  //運行起來后,兩個thread 將處於 被堵塞的狀態;

            //主線程負責push

            for (int i = 0; i < 10; i++)
            {
                int value = i;
                p.EnqueueTask(() =>
                {
                    Console.WriteLine($"{value} 被消費了 ");  //隊列中有消息之后,我們就能通知 將集合改為非堵塞;
                });
            }

            //Thread.Sleep(3000);

            Console.ReadLine();
           

        }
    }
}

 

辣么,如果我們需要如下的功能;我們又如何進行相關的完善呢;

 

The producer/consumer that we just wrote is inflexible in that we can’t track work items after they’ve been enqueued. It would be nice if we could:

  • Know when a work item has completed.
  • Cancel an unstarted work item.
  • Deal elegantly with any exceptions thrown by a work item

 

An ideal solution would be to have the EnqueueTask method return some object giving us the functionality just described.

//這幾年的發展對一個人的性格和價值觀形成了很大的不同;所以加油吧,騷年;

 首先,這里我們添加了任務完成的通知事件

  可以取消一個沒有開始的任務項目; 

  還要我們一個異常的捕獲,

具備以上的功能之后,我們才能稱之為一基本完整的

這里涉及到一個TaskCompleteSource<TResult>的基本用法

這里是它的一個基本用法的介紹的呀:https://msdn.microsoft.com/en-us/library/dd449174(v=vs.110).aspx

 

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication50
{
    public class PCQueue : IDisposable
    {

        //封裝一個workItem;里面有三個東西 TaskCompleteSource Action CancleToken
        class WorkItem
        {
            public readonly TaskCompletionSource<object> TaskSource;
            public readonly Action Action;
            public readonly CancellationToken? CancleToken;

            public WorkItem(TaskCompletionSource<object> taskSource, Action action, CancellationToken? cancleToken)
            {
                this.TaskSource = taskSource;
                this.Action = action;
                this.CancleToken = cancleToken;
            }
        }

        //然后是我們堵塞隊列;
        BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>();

        //初始化消費者數量;
        public PCQueue(int workerCount)
        {
            for (int i = 0; i < workerCount; i++)
            {
                Task.Factory.StartNew(Consume);
            }
        }

        public Task EnqueueTask(Action action)
        {
            return EnqueueTask(action,null);
        }

        public Task EnqueueTask(Action action, CancellationToken? cancelToken)
        {
            //lets us later control the task that we return to the consumer  返回了我們執行的結果,這樣方便我么進行控制滴啊;效果很好;
            var tcs=new TaskCompletionSource<object>();
            _taskQ.Add(new WorkItem(tcs,action, cancelToken));
            return tcs.Task; //返回的就是我們的這個Task滴呀;
        }
        
        //消費者的方法,這樣我們的基本任務就算基本實現滴呀;效果還是挺好的;
        private void Consume()
        {

            foreach(WorkItem workItem in this._taskQ.GetConsumingEnumerable())
            {
                if(workItem.CancleToken.HasValue && workItem.CancleToken.Value.IsCancellationRequested)
                {
                    workItem.TaskSource.SetCanceled();
                }
                else
                {

                    //要執行的基本任務;
                    try
                    {
                        workItem.Action();
                        workItem.TaskSource.SetResult(null);   // Indicate completion

                    }
                    catch (OperationCanceledException ex)
                    {
                        if (ex.CancellationToken == workItem.CancleToken)
                        {
                            workItem.TaskSource.SetCanceled();
                        }
                        else
                        {
                            workItem.TaskSource.SetException(ex);
                        }

                    }
                    catch(Exception ex)
                    {
                        workItem.TaskSource.SetException(ex);
                    }
                }
            }

        }

        public void Dispose()
        {
            _taskQ.CompleteAdding();
        }
    }


    class Program
    {
        static void Test()
        {
            var pcQ= new PCQueue(1);
            Task task = pcQ.EnqueueTask(() =>
             {
                 Console.WriteLine("easy  done,producer and consumer.....cool task!");
                 int value = 0;
                 var result = 1 / value;

             });

            //等搞task的時候,我們再深入的理解一哈這個方法;
            Console.WriteLine(task.IsCompleted);
            Console.WriteLine(task.Status);
            Console.WriteLine(task.Exception);
            Console.WriteLine(task);
            Console.ReadLine();


        }

        static void Main(string[] args)
        {
            Test();
        }
    }
}

 

我們的生產者,消費者就這樣一步一步的完善到這里了;效果整體來說不錯滴呀;

使用我們的 BlockingCollection:

處於以下的幾個原因和解決方案;

第一個:必須是線程安全的;

第二個:buffer的大小應該有一個限制;也就是提供集合的邊界問題;

第三個:Producer should discard the data or enter into sleep mode when queue size is full

第四個:Consumer should go to sleep more when buffer is empty

第五個: resume its operation when items are available in queue for processing

 

//下面這三個集合都繼承自我們的:IProducerConsumerCollection
//都可以用於我們的 生成者和消費者模式的使用;

ConcurrentQueue<Task> conQueue = new ConcurrentQueue<Task>();
ConcurrentStack<Task> conStack= new ConcurrentStack<Task>(); //我們的這幾個線程安全的集合,都繼承自我們的IProducerConsumerCollection
ConcurrentBag<Task> conBag = new ConcurrentBag<Task>();

接下來,我們再添加一種寫法;

 這里還有一個示例:

http://www.nullskull.com/a/1464/producerconsumer-queue-and-blockingcollection-in-c-40.aspx

 

還要我們的異步方式去實現這個東西;等我學了dataflow 之后,我們回過頭來研究這個東西;

https://blog.stephencleary.com/2012/11/async-producerconsumer-queue-using.html

當我們的吧 dataflow的方式學習了,之后,我們隨后就開啟了 actor的大門,然后就是我們 akka.net 還有我們

函數編程的大門,簡直就是發現了我們的新大陸的感覺;

隨后,我們就要使用我們的dataflow 來實現我們的消費者和生產者模式的拉;(this is simple demo)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApplication54
{

    //use data flow to create producer and consumer dataFlow pattern;

    public class DataFlowProducerConsumer
    {
        public static  readonly Random random = new Random();

        /// <summary>
        /// The Produce method calls the Post method in a loop to synchronously write data to the target block
        /// </summary>
        /// <param name="target"></param>
        public static void Produce(ITargetBlock<byte[]> target)
        {

            for (int i = 0; i < 100; i++)
            {
                byte[] buffer = new byte[1024];

                //Fill the buffer with random bytes
                random.NextBytes(buffer);

                target.Post(buffer);
            }
            // After the Produce method writes all data to the target block, it calls the Complete method to indicate that the block will never have additional data available
            target.Complete(); // Post the result to the message block
        }

        //就不用你再去開啟線程去干一些事兒了;
        /// <summary>
        /// 
        /// </summary>
        /// <param name="source"></param>
        /// <returns></returns>
        public static async Task<int> ConsumeTaskAsync(ISourceBlock<byte[]> source)
        {
            int result = 0;
            //Consume method uses the async and await operators to asynchronously compute the total number of bytes that are received from the 
            //異步的去消費我們的data
            while (await source.OutputAvailableAsync().ConfigureAwait(false))  //在控制台中這么配置低呀;效果還不錯滴呀;
            {

                //統一子線程來做剩下的事情;
                byte[] data = source.Receive();
                result += data.Length;
                Console.WriteLine($"thread id {Thread.CurrentThread.ManagedThreadId}");
            }

            return result;
        }
    }

    class Program
    {
        static void Main(string[] args)
        {

            //The Produce method writes arrays that contain random bytes of data to a ITargetBlock<TInput> object and the Consume method reads bytes from a ISourceBlock<TOutput> object.

            Console.WriteLine($"Main Thread id {Thread.CurrentThread.ManagedThreadId}");
            var buffer = new BufferBlock<byte[]>();

            //相當於先開啟我們的消費者線程;
            var consumer = DataFlowProducerConsumer.ConsumeTaskAsync(buffer);

            DataFlowProducerConsumer.Produce(buffer);

            consumer.Wait();
            Console.WriteLine("Processed {0} bytes.", consumer.Result);

        }
    }
}

 

This example uses just one consumer to process the source data. If you have multiple consumers in your application, use the TryReceive method to read data from the source block, as shown in the following example. 

然后按做一點點小小的改造;效果還挺不錯滴呀;

        /// <summary>
        /// the TryReceive method returns False when no data is available. 
        /// When multiple consumers must access the source block concurrently,
        /// this mechanism guarantees that data is still available 
        /// after the call to OutputAvailableAsync.
        /// </summary>
        /// <param name="source"></param>
        /// <returns></returns>
        public static async Task<int> MoreConsumeTaskAsync(IReceivableSourceBlock<byte[]> source)
        {

            int result = 0;
            while(await source.OutputAvailableAsync())
            {

                byte [] data;
                while (source.TryReceive(out data))
                {
                    result += data.Length;
                    Console.WriteLine($"thread id {Thread.CurrentThread.ManagedThreadId}");
                }
            }

            return result;

        }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM