C# 生產者與消費者模式


情景:一個線程不斷獲取數據,另一個線程不斷處理這些數據。

常規方法:數據列表加鎖,兩個線程獲取鎖,拿到操作權;類似代碼如下:(不推薦)

  
  static void Main(string[] args)
        {
          lockClass l = new lockClass();
            for (int i = 0; i < 1000000; i++)
            {
                l.Equeue(i.ToString());

            }
       }

public class lockClass
    {
        Queue<string> currentQueue = new Queue<string>(10000000);//當前要插入數據的隊列 
        static readonly object objlock = new object();
        FileStream f = new FileStream("D://1.txt", FileMode.Create, FileAccess.Write, FileShare.None);
        StreamWriter writer;
        public lockClass()
        {
            writer = new StreamWriter(f);
            var backgroundWorker = new BackgroundWorker();
            backgroundWorker.DoWork += backgroundWorker_DoWork;
            backgroundWorker.RunWorkerAsync();
        }
        void backgroundWorker_DoWork(object sender, DoWorkEventArgs e)
        {
            while (true)
            {
                lock (objlock)
                {
                    if (currentQueue.Count > 0)
                    {
                        var item = currentQueue.Dequeue();
                        Console.WriteLine(item);
                        writer.WriteLine(item);

                    }

                }

            }
        }

        public void Equeue(string item)
        {
            lock (objlock)
            {

                currentQueue.Enqueue(item);

            }
        }


    }        

  方法2:雙緩存隊列處理,意思就是說,用兩個隊列,一個隊列用於獲取數據,另一個隊列用於操作數據,通過信號量來處理線程調度,來取消“鎖”帶來的資源切換浪費,參考代碼如下:

  
  static void Main(string[] args)
        {
var test = new DoubleBufferedQueue();
            for (int i = 0; i < 1000000; i++)
            {
                test.Equeue(i.ToString());

            }
}


public class DoubleBufferedQueue
    {
       public readonly Queue<string> Queue1 = new Queue<string>(10000000);
       public readonly Queue<string> Queue2 = new Queue<string>(10000000);
       private readonly ManualResetEvent lock1 = new ManualResetEvent(true);//一開始可以執行
       private readonly ManualResetEvent lock2 = new ManualResetEvent(false);
       private readonly AutoResetEvent _autoReset = new AutoResetEvent(true);
       private volatile Queue<string> currentQueue = new Queue<string>(10000000);//當前要插入數據的隊列 
       FileStream f = new FileStream("D://1.txt", FileMode.Create, FileAccess.Write, FileShare.None);
       StreamWriter writer;
       public DoubleBufferedQueue()
       {
           writer = new StreamWriter(f);
           currentQueue = Queue1;
           var backgroundWorker = new BackgroundWorker();
           backgroundWorker.DoWork += backgroundWorker_DoWork;
           backgroundWorker.RunWorkerAsync();
       }

       void backgroundWorker_DoWork(object sender, DoWorkEventArgs e)
       {
           while (true)
           {
               this._autoReset.WaitOne();//沒有成員入隊列時不進行其他操作;
               this.lock2.Reset();
               this.lock1.WaitOne();
               var readQueue = currentQueue;
               currentQueue = (currentQueue == Queue1) ? Queue2 : Queue1;
               this.lock2.Set();
               writeToConsonle(currentQueue);
           
           }
       }

       void writeToConsonle(Queue<string> readQueue)
       {

           while (readQueue.Count > 0)
           {
              var item= readQueue.Dequeue();
              Console.WriteLine(item);
              writer.WriteLine(item);
           }
       }

       public void Equeue(string item)
       {
           this.lock2.WaitOne();
           this.lock1.Reset();
           currentQueue.Enqueue(item);
           lock1.Set();
           _autoReset.Set();
       }

    }

  方法3:用微軟提供的BlockingCollection(線程安全的,可阻塞的資源的),個人理解就是資源安全的隊列,並且當沒有操作的時候(隊列空閑的時候)不耗費資源,個人覺得和方法2原理類似(推薦使用)

 static void Main(string[] args)
        {

   var block = new blockingCollectionClass();
            for (int i = 0; i < 10000; i++)
            {

                block.Add(i.ToString());
              

            }
           Console.ReadKey(); 
}

public class blockingCollectionClass
    {
        BlockingCollection<string> blockingCollection = new BlockingCollection<string>();
        FileStream f = new FileStream("D://1.txt", FileMode.Create, FileAccess.Write, FileShare.None);
        StreamWriter writer;
        public void Add(string Item)
        {
            blockingCollection.Add(Item);
        }

        public blockingCollectionClass()
        {
            writer = new StreamWriter(f);
            var backgroundWorker = new BackgroundWorker();
            backgroundWorker.DoWork += backgroundWorker_DoWork;
            backgroundWorker.RunWorkerAsync();
        }

        void backgroundWorker_DoWork(object sender, DoWorkEventArgs e)
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
            {
                Console.WriteLine(value);
                writer.WriteLine(value);
            }
        }

    }

  情景2:秒殺活動、搶票等活動時,並發性很高,導致服務器阻塞,用戶請求丟失;

策略1:可以采用以上隊列的形式處理服務器高並發問題,所有的請求先加入隊列,排隊,后台線程來處理隊列里面的請求;

策略2:夠建一個隊列容器,接收請求的線程從容器中取一個空的對列,當隊列填滿后,放回到容器中,再次從容器中取一個空隊列;處理線程需要從容器中取出非空的隊列,處理完隊列為空,放回到容器去;從容器中取放隊列需要加鎖。如果要保證處理的順序,容器可以選隊列(放隊列的隊列);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM