情景:一個線程不斷獲取數據,另一個線程不斷處理這些數據。
常規方法:數據列表加鎖,兩個線程獲取鎖,拿到操作權;類似代碼如下:(不推薦)
static void Main(string[] args) { lockClass l = new lockClass(); for (int i = 0; i < 1000000; i++) { l.Equeue(i.ToString()); } } public class lockClass { Queue<string> currentQueue = new Queue<string>(10000000);//當前要插入數據的隊列 static readonly object objlock = new object(); FileStream f = new FileStream("D://1.txt", FileMode.Create, FileAccess.Write, FileShare.None); StreamWriter writer; public lockClass() { writer = new StreamWriter(f); var backgroundWorker = new BackgroundWorker(); backgroundWorker.DoWork += backgroundWorker_DoWork; backgroundWorker.RunWorkerAsync(); } void backgroundWorker_DoWork(object sender, DoWorkEventArgs e) { while (true) { lock (objlock) { if (currentQueue.Count > 0) { var item = currentQueue.Dequeue(); Console.WriteLine(item); writer.WriteLine(item); } } } } public void Equeue(string item) { lock (objlock) { currentQueue.Enqueue(item); } } }
方法2:雙緩存隊列處理,意思就是說,用兩個隊列,一個隊列用於獲取數據,另一個隊列用於操作數據,通過信號量來處理線程調度,來取消“鎖”帶來的資源切換浪費,參考代碼如下:
static void Main(string[] args) { var test = new DoubleBufferedQueue(); for (int i = 0; i < 1000000; i++) { test.Equeue(i.ToString()); } } public class DoubleBufferedQueue { public readonly Queue<string> Queue1 = new Queue<string>(10000000); public readonly Queue<string> Queue2 = new Queue<string>(10000000); private readonly ManualResetEvent lock1 = new ManualResetEvent(true);//一開始可以執行 private readonly ManualResetEvent lock2 = new ManualResetEvent(false); private readonly AutoResetEvent _autoReset = new AutoResetEvent(true); private volatile Queue<string> currentQueue = new Queue<string>(10000000);//當前要插入數據的隊列 FileStream f = new FileStream("D://1.txt", FileMode.Create, FileAccess.Write, FileShare.None); StreamWriter writer; public DoubleBufferedQueue() { writer = new StreamWriter(f); currentQueue = Queue1; var backgroundWorker = new BackgroundWorker(); backgroundWorker.DoWork += backgroundWorker_DoWork; backgroundWorker.RunWorkerAsync(); } void backgroundWorker_DoWork(object sender, DoWorkEventArgs e) { while (true) { this._autoReset.WaitOne();//沒有成員入隊列時不進行其他操作; this.lock2.Reset(); this.lock1.WaitOne(); var readQueue = currentQueue; currentQueue = (currentQueue == Queue1) ? Queue2 : Queue1; this.lock2.Set(); writeToConsonle(currentQueue); } } void writeToConsonle(Queue<string> readQueue) { while (readQueue.Count > 0) { var item= readQueue.Dequeue(); Console.WriteLine(item); writer.WriteLine(item); } } public void Equeue(string item) { this.lock2.WaitOne(); this.lock1.Reset(); currentQueue.Enqueue(item); lock1.Set(); _autoReset.Set(); } }
方法3:用微軟提供的BlockingCollection(線程安全的,可阻塞的資源的),個人理解就是資源安全的隊列,並且當沒有操作的時候(隊列空閑的時候)不耗費資源,個人覺得和方法2原理類似(推薦使用)
static void Main(string[] args) { var block = new blockingCollectionClass(); for (int i = 0; i < 10000; i++) { block.Add(i.ToString()); } Console.ReadKey(); } public class blockingCollectionClass { BlockingCollection<string> blockingCollection = new BlockingCollection<string>(); FileStream f = new FileStream("D://1.txt", FileMode.Create, FileAccess.Write, FileShare.None); StreamWriter writer; public void Add(string Item) { blockingCollection.Add(Item); } public blockingCollectionClass() { writer = new StreamWriter(f); var backgroundWorker = new BackgroundWorker(); backgroundWorker.DoWork += backgroundWorker_DoWork; backgroundWorker.RunWorkerAsync(); } void backgroundWorker_DoWork(object sender, DoWorkEventArgs e) { foreach (string value in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine(value); writer.WriteLine(value); } } }
情景2:秒殺活動、搶票等活動時,並發性很高,導致服務器阻塞,用戶請求丟失;
策略1:可以采用以上隊列的形式處理服務器高並發問題,所有的請求先加入隊列,排隊,后台線程來處理隊列里面的請求;
策略2:夠建一個隊列容器,接收請求的線程從容器中取一個空的對列,當隊列填滿后,放回到容器中,再次從容器中取一個空隊列;處理線程需要從容器中取出非空的隊列,處理完隊列為空,放回到容器去;從容器中取放隊列需要加鎖。如果要保證處理的順序,容器可以選隊列(放隊列的隊列);