.NetCore利用BlockingCollection實現簡易消息隊列


前言

消息隊列現今的應用場景越來越大,常用的有RabbmitMQ和KafKa。
我們用BlockingCollection來實現簡單的消息隊列。


實現消息隊列

用Vs2017創建一個控制台應用程序。創建DemoQueueBlock類,封裝一些常用判斷。

HasEle,判斷是否有元素

Add向隊列中添加元素

Take從隊列中取出元素

為了不把BlockingCollection直接暴漏給使用者,我們封裝一個DemoQueueBlock

 /// <summary>
/// BlockingCollection演示消息隊列
/// </summary>
/// <typeparam name="T"></typeparam>
public class DemoQueueBlock<T> where T : class {
private static BlockingCollection<T> Colls;
public DemoQueueBlock() {
}
public static bool IsComleted() {
if (Colls != null && Colls.IsCompleted) {
return true;
}
return false;
}
public static bool HasEle() {
if (Colls != null && Colls.Count>0)
{
return true;
}
return false;
}
public static bool Add(T msg) {
if (Colls == null)
{
Colls = new BlockingCollection<T>();
}
Colls.Add(msg);
return true;
}
public static T Take() {
if (Colls == null)
{
Colls = new BlockingCollection<T>();
}
return Colls.Take();
}
}
/// <summary>
/// 消息體
/// </summary>
public class DemoMessage {
public string BusinessType { get; set; }
public string BusinessId { get; set; }
public string Body { get; set; }
}

添加元素進隊列

通過控制台,添加元素

 //添加元素
while (true)
{
Console.WriteLine("請輸入隊列");
var read = Console.ReadLine();
if (read == "exit")
{
return;
}
DemoQueueBlock<DemoMessage>.Add(new DemoMessage() { BusinessId = read });
}

消費隊列

通過判斷IsComleted,來確定是否獲取隊列

 Task.Factory.StartNew(() =>
{
//從隊列中取元素。
while (!DemoQueueBlock<DemoMessage>.IsComleted())
{
try
{
var m = DemoQueueBlock<DemoMessage>.Take();
Console.WriteLine("已消費:" + m.BusinessId);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
});

查看運行結果

運行結果

這樣我們就實現了簡易的消息隊列。

示例源碼

簡易隊列

參考鏈接

BlockingCollection
Orleans源碼分析


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM