本篇體驗使用C#的泛型隊列Queue<T>實現生產消費模式。
如果把生產消費想像成自動流水生產線的話,生產就是流水線的物料,消費就是某種設備對物料進行加工的行為,流水線就是隊列。
現在,要寫一個體現生產消費模式的泛型幫助類,比如叫ProducerConsumer<T>。
該類肯定會維護一個有關生產、物料的Queue<T>類型的字段,還存在一個有關消費、Action<T>類型的字段。
在ProducerConsumer類的構造函數中,為Action<T>類型的字段賦值,並開啟后台有關消費的線程。
ProducerConsumer類肯定存在一個進隊列的方法,並且要保證在多線程情況下,同一時間只有一個生產或物料進入隊列。
ProducerConsumer類還存在一個有關消費的方法,並且保證在多線程情況下,同一時間只有一個生產或物料出列,並消費它。
另外,在生產或物料在出隊列的時候,可能會出現隊列中暫時沒有生產或物料的情況,這時候我們希望線程阻塞一下,這需要通過AutoResetEvent實現。AutoResetEvent的大致原理是:當生產或物料進入隊列的時候需要告訴AutoResetEvent一下,當隊列中暫時沒有生產或物料的時候,也需要告訴AutoResetEvent,讓它來阻塞線程。
//有關生產消費的泛型類public class ProducerConsumer<T>{//用來存儲生產者的隊列private readonly Queue<T> queue = new Queue<T>();//鎖private readonly object queueLocker = new object();//消費行為private readonly Action<T> consumerAction;//出列的時候需要檢查隊列中是否有元素,如果沒有,需要阻塞private readonly AutoResetEvent queueWaitHandle = new AutoResetEvent(false);public ProducerConsumer(Action<T> consumerAction){if (consumerAction == null){throw new ArgumentNullException("consumerAction");}this.consumerAction = consumerAction;//后台開啟一個線程開始消費生產者new Thread(this.ConsumeItems){IsBackground = true}.Start();}//進列public void Enqueue(T item){//確保同一時間只有一個生產者進列lock (queueLocker){queue.Enqueue(item);//每次進列都要設置AutoResetEvent事件this.queueWaitHandle.Set();}}//消費動作private void ConsumeItems(){while (true){T nextItem = default(T);//標志,確認隊列中的生產者是否存在bool doesItemExist;//確保同一時間只有一個生產者出列lock (this.queueLocker){//先確認隊列中的生產者是否存在doesItemExist = this.queue.Count > 0;if (doesItemExist){nextItem = this.queue.Dequeue();}}//如果生產者存在,才消費生產者if (doesItemExist){this.consumerAction(nextItem);}else//否則的話,再等等下一個隊列中的生產者{this.queueWaitHandle.WaitOne();}}}}
客戶端,針對多線程情形。
class Program{static void Main(string[] args){//實例化一個int類型的生產消費實例var producerConsumer = new ProducerConsumer<int>(i => Console.WriteLine("正在消費" + i));Random random = new Random();//開啟進隊列線程var t1 = new Thread(() =>{for (int i = 0; i < 100; i++)
{producerConsumer.Enqueue(i);Thread.Sleep(random.Next(0,5));}});var t2 = new Thread(() =>
{for (int i = 0; i > -100; i--){producerConsumer.Enqueue(i);Thread.Sleep(random.Next(0, 5));}});t1.Start();t2.Start();t1.Join();t2.Join();Thread.Sleep(50);Console.ReadKey();}}