情景:一個線程不斷獲取數據,另一個線程不斷處理這些數據。
常規方法:數據列表加鎖,兩個線程獲取鎖,拿到操作權;類似代碼如下:(不推薦)
static void Main(string[] args)
{
lockClass l = new lockClass();
for (int i = 0; i < 1000000; i++)
{
l.Equeue(i.ToString());
}
}
public class lockClass
{
Queue<string> currentQueue = new Queue<string>(10000000);//當前要插入數據的隊列
static readonly object objlock = new object();
FileStream f = new FileStream("D://1.txt", FileMode.Create, FileAccess.Write, FileShare.None);
StreamWriter writer;
public lockClass()
{
writer = new StreamWriter(f);
var backgroundWorker = new BackgroundWorker();
backgroundWorker.DoWork += backgroundWorker_DoWork;
backgroundWorker.RunWorkerAsync();
}
void backgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
while (true)
{
lock (objlock)
{
if (currentQueue.Count > 0)
{
var item = currentQueue.Dequeue();
Console.WriteLine(item);
writer.WriteLine(item);
}
}
}
}
public void Equeue(string item)
{
lock (objlock)
{
currentQueue.Enqueue(item);
}
}
}
方法2:雙緩存隊列處理,意思就是說,用兩個隊列,一個隊列用於獲取數據,另一個隊列用於操作數據,通過信號量來處理線程調度,來取消“鎖”帶來的資源切換浪費,參考代碼如下:
static void Main(string[] args)
{
var test = new DoubleBufferedQueue();
for (int i = 0; i < 1000000; i++)
{
test.Equeue(i.ToString());
}
}
public class DoubleBufferedQueue
{
public readonly Queue<string> Queue1 = new Queue<string>(10000000);
public readonly Queue<string> Queue2 = new Queue<string>(10000000);
private readonly ManualResetEvent lock1 = new ManualResetEvent(true);//一開始可以執行
private readonly ManualResetEvent lock2 = new ManualResetEvent(false);
private readonly AutoResetEvent _autoReset = new AutoResetEvent(true);
private volatile Queue<string> currentQueue = new Queue<string>(10000000);//當前要插入數據的隊列
FileStream f = new FileStream("D://1.txt", FileMode.Create, FileAccess.Write, FileShare.None);
StreamWriter writer;
public DoubleBufferedQueue()
{
writer = new StreamWriter(f);
currentQueue = Queue1;
var backgroundWorker = new BackgroundWorker();
backgroundWorker.DoWork += backgroundWorker_DoWork;
backgroundWorker.RunWorkerAsync();
}
void backgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
while (true)
{
this._autoReset.WaitOne();//沒有成員入隊列時不進行其他操作;
this.lock2.Reset();
this.lock1.WaitOne();
var readQueue = currentQueue;
currentQueue = (currentQueue == Queue1) ? Queue2 : Queue1;
this.lock2.Set();
writeToConsonle(currentQueue);
}
}
void writeToConsonle(Queue<string> readQueue)
{
while (readQueue.Count > 0)
{
var item= readQueue.Dequeue();
Console.WriteLine(item);
writer.WriteLine(item);
}
}
public void Equeue(string item)
{
this.lock2.WaitOne();
this.lock1.Reset();
currentQueue.Enqueue(item);
lock1.Set();
_autoReset.Set();
}
}
方法3:用微軟提供的BlockingCollection(線程安全的,可阻塞的資源的),個人理解就是資源安全的隊列,並且當沒有操作的時候(隊列空閑的時候)不耗費資源,個人覺得和方法2原理類似(推薦使用)
static void Main(string[] args)
{
var block = new blockingCollectionClass();
for (int i = 0; i < 10000; i++)
{
block.Add(i.ToString());
}
Console.ReadKey();
}
public class blockingCollectionClass
{
BlockingCollection<string> blockingCollection = new BlockingCollection<string>();
FileStream f = new FileStream("D://1.txt", FileMode.Create, FileAccess.Write, FileShare.None);
StreamWriter writer;
public void Add(string Item)
{
blockingCollection.Add(Item);
}
public blockingCollectionClass()
{
writer = new StreamWriter(f);
var backgroundWorker = new BackgroundWorker();
backgroundWorker.DoWork += backgroundWorker_DoWork;
backgroundWorker.RunWorkerAsync();
}
void backgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
foreach (string value in blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine(value);
writer.WriteLine(value);
}
}
}
情景2:秒殺活動、搶票等活動時,並發性很高,導致服務器阻塞,用戶請求丟失;
策略1:可以采用以上隊列的形式處理服務器高並發問題,所有的請求先加入隊列,排隊,后台線程來處理隊列里面的請求;
策略2:夠建一個隊列容器,接收請求的線程從容器中取一個空的對列,當隊列填滿后,放回到容器中,再次從容器中取一個空隊列;處理線程需要從容器中取出非空的隊列,處理完隊列為空,放回到容器去;從容器中取放隊列需要加鎖。如果要保證處理的順序,容器可以選隊列(放隊列的隊列);
