原文:https://www.cnblogs.com/yanbigfeg/p/9674238.html#_label3
首先我們知道隊列是先進先出的機制,所以在處理並發是個不錯的選擇。然后就寫兩個隊列的簡單應用。
Queue
命名空間
命名空間:System.Collections,不在這里做過多的理論解釋,這個東西非常的好理解。
可以看下官方文檔:https://docs.microsoft.com/zh-cn/dotnet/api/system.collections.queue?view=netframework-4.7.2
示例代碼
我這里就是為了方便記憶做了一個基本的例子,首先創建了QueueTest類:
包含了獲取隊列的數量,入隊和出隊的實現
1 public class QueueTest
2 {
3 public static Queue<string> q = new Queue<string>();
4
5 #region 獲取隊列數量
6 public int GetCount()
7 {
8
9 return q.Count;
10 }
11 #endregion
12
13 #region 隊列添加數據
14 public void IntoData(string qStr)
15 {
16 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
17 q.Enqueue(qStr);
18 Console.WriteLine($"隊列添加數據: {qStr};當前線程id:{threadId}");
19 }
20 #endregion
21
22 #region 隊列輸出數據
23
24 public string OutData()
25 {
26 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
27 string str = q.Dequeue();
28 Console.WriteLine($"隊列輸出數據: {str};當前線程id:{threadId}");
29 return str;
30 }
31 #endregion
32
33 }
為了模擬並發情況下也不會出現重復讀取和插入混亂的問題所以寫了TaskTest類里面開辟了兩個異步線程進行插入和讀取:
這里只是證明了多線程插入不會造成丟失。無憂證明並發的先進先出
1 class TaskTest
2 {
3
4 #region 隊列的操作模擬
5 public static void QueueMian()
6 {
7 QueueA();
8 QueueB();
9 }
10 private static async void QueueA()
11 {
12 QueueTest queue = new QueueTest();
13 var task = Task.Run(() =>
14 {
15 for (int i = 0; i < 20; i++)
16 {
17 queue.IntoData("QueueA" + i);
18 }
19 });
20 await task;
21 Console.WriteLine("QueueAA插入完成,進行輸出:");
22
23 while (queue.GetCount() > 0)
24 {
25 queue.OutData();
26 }
27 }
28
29 private static async void QueueB()
30 {
31 QueueTest queue = new QueueTest();
32 var task = Task.Run(() =>
33 {
34 for (int i = 0; i < 20; i++)
35 {
36 queue.IntoData("QueueB" + i);
37 }
38 });
39 await task;
40 Console.WriteLine("QueueB插入完成,進行輸出:");
41
42 while (queue.GetCount() > 0)
43 {
44 queue.OutData();
45 }
46 }
47 #endregion
48
49 }
效果展示
然后在main函數直接調用即可:

通過上面的截圖可以看出插入線程是無先后的。

這張圖也是線程無先后。
補充:通過園友的提問,我發現我一開始測試的不太仔細,只注意多線程下的插入,沒有注意到輸出其實不是跟插入的順序一致,對不起,這說明queue不是線程安全的,所以這個就當是入隊,出隊的基礎例子並不能說明並發。后面有一個補充的ConcurrentQueue隊列是說明了並發線程的先進先出。
MSMQ
msmq是微軟提供的消息隊列,本來在windows系統中就存在,但是默認沒有開啟。需要開啟。
開啟安裝
打開控制面板=>程序和功能=> 啟動或關閉windows功能 => Microsoft Message Queue(MSMQ)服務器=>Microsoft Message Queue(MSMQ)服務器核心
一般選擇:MSMQ Active Directory域服務繼承和MSMQ HTTP支持即可。

點擊確定等待安裝成功。
命名空間
需要引用System.Messaging.DLL
命名空間:System.Messaging
官方資料文檔:https://docs.microsoft.com/zh-cn/dotnet/api/system.messaging.messagequeue?view=netframework-4.7.2
示例代碼
與上面queue同樣的示例方式,創建一個MSMQ類,實現創建消息隊列,查詢數據,入列,出列功能:
1 /// <summary>
2 /// MSMQ消息隊列
3 /// </summary>
4 class MSMQ
5 {
6 static string path = ".\\Private$\\myQueue";
7 static MessageQueue queue;
8 public static void Createqueue(string queuePath)
9 {
10 try
11 {
12 if (MessageQueue.Exists(queuePath))
13 {
14 Console.WriteLine("消息隊列已經存在");
15 //獲取這個消息隊列
16 queue = new MessageQueue(queuePath);
17 }
18 else
19 {
20 //不存在,就創建一個新的,並獲取這個消息隊列對象
21 queue = MessageQueue.Create(queuePath);
22 path = queuePath;
23 }
24 }
25 catch (Exception e)
26 {
27 Console.WriteLine(e.Message);
28 }
29
30 }
31
32
33 #region 獲取消息隊列的數量
34 public static int GetMessageCount()
35 {
36 try
37 {
38 if (queue != null)
39 {
40 int count = queue.GetAllMessages().Length;
41 Console.WriteLine($"消息隊列數量:{count}");
42 return count;
43 }
44 else
45 {
46 return 0;
47 }
48 }
49 catch (MessageQueueException e)
50 {
51
52 Console.WriteLine(e.Message);
53 return 0;
54 }
55
56
57 }
58 #endregion
59
60 #region 發送消息到隊列
61 public static void SendMessage(string qStr)
62 {
63 try
64 {
65 //連接到本地隊列
66
67 MessageQueue myQueue = new MessageQueue(path);
68
69 //MessageQueue myQueue = new MessageQueue("FormatName:Direct=TCP:192.168.12.79//Private$//myQueue1");
70
71 //MessageQueue rmQ = new MessageQueue("FormatName:Direct=TCP:121.0.0.1//private$//queue");--遠程格式
72
73 Message myMessage = new Message();
74
75 myMessage.Body = qStr;
76
77 myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
78
79 //發生消息到隊列中
80
81 myQueue.Send(myMessage);
82
83 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
84 Console.WriteLine($"消息發送成功: {qStr};當前線程id:{threadId}");
85 }
86 catch (MessageQueueException e)
87 {
88 Console.WriteLine(e.Message);
89 }
90 }
91 #endregion
92
93 #region 連接消息隊列讀取消息
94 public static void ReceiveMessage()
95 {
96 MessageQueue myQueue = new MessageQueue(path);
97
98
99 myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
100
101 try
102
103 {
104
105 //從隊列中接收消息
106
107 Message myMessage = myQueue.Receive(new TimeSpan(10));// myQueue.Peek();--接收后不消息從隊列中移除
108 myQueue.Close();
109
110 string context = myMessage.Body.ToString();
111 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
112 Console.WriteLine($"--------------------------消息內容: {context};當前線程id:{threadId}");
113
114 }
115
116 catch (System.Messaging.MessageQueueException e)
117
118 {
119
120 Console.WriteLine(e.Message);
121
122 }
123
124 catch (InvalidCastException e)
125
126 {
127
128 Console.WriteLine(e.Message);
129
130 }
131
132 }
133 #endregion
134 }
這里說明一下path這個字段,這是消息隊列的文件位置和隊列名稱,我這里寫的“.”(點)就是代表的位置MachineName字段,,代表本機的意思

然后TaskTest類修改成這個樣子:
1 class TaskTest
2 {
3
4 #region 消息隊列的操作模擬
5 public static void MSMQMian()
6 {
7 MSMQ.Createqueue(".\\Private$\\myQueue");
8 MSMQA();
9 MSMQB();
10 Console.WriteLine("MSMQ結束");
11 }
12 private static async void MSMQA()
13 {
14 var task = Task.Run(() =>
15 {
16 for (int i = 0; i < 20; i++)
17 {
18 MSMQ.SendMessage("MSMQA" + i);
19 }
20 });
21 await task;
22 Console.WriteLine("MSMQA發送完成,進行讀取:");
23
24 while (MSMQ.GetMessageCount() > 0)
25 {
26 MSMQ.ReceiveMessage();
27 }
28 }
29
30 private static async void MSMQB()
31 {
32 var task = Task.Run(() =>
33 {
34 for (int i = 0; i < 20; i++)
35 {
36 MSMQ.SendMessage("MSMQB" + i);
37 }
38 });
39 await task;
40 Console.WriteLine("MSMQB發送完成,進行讀取:");
41
42 while (MSMQ.GetMessageCount() > 0)
43 {
44 MSMQ.ReceiveMessage();
45 }
46 }
47 #endregion
效果展示


本機查看消息隊列
創建成功的消息隊列我們可以在電腦上查看:我的電腦=>管理 =>計算機管理 =>服務與應用程序 =>消息隊列 =>專用隊列就看到我剛才創建的消息隊列

補充感謝
感謝 virtual1988 提出的queue不是線程安全這個問題,是我沒搞清楚。線程安全要使用ConcurrentQueue隊列。
謝謝提出的寶貴意見。
ConcurrentQueue
所以我有修改了一下寫了個ConcurrentQueue隊列的:
修改代碼如下:
//public static Queue<string> q = new Queue<string>();
public static ConcurrentQueue<string> q = new ConcurrentQueue<string>();
//public static Queue q =Queue.Synchronized(new Queue());
#region 獲取隊列數量
public static int GetCount()
{
return q.Count;
}
#endregion
#region 隊列添加數據
public static void IntoData(string qStr)
{
string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
q.Enqueue(qStr);
System.Threading.Thread.Sleep(10);
Console.WriteLine($"隊列添加數據: {qStr};當前線程id:{threadId}");
}
#endregion
#region 隊列輸出數據
public static string OutData2()
{
string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
foreach (var item in q)
{
Console.WriteLine($"------隊列輸出數據: {item};當前線程id:{threadId}");
string d="";
q.TryDequeue( out d);
}
return "211";
}
#endregion
task類:
#region 隊列的操作模擬
public static async void QueueMian()
{
QueueA();
QueueB();
}
private static async void QueueA()
{
var task = Task.Run(() =>
{
for (int i = 0; i < 20; i++)
{
QueueTest.IntoData("QueueA" + i);
}
});
await task;
Console.WriteLine("QueueA插入完成,進行輸出:");
}
private static async void QueueB()
{
var task = Task.Run(() =>
{
for (int i = 0; i < 20; i++)
{
QueueTest.IntoData("QueueB" + i);
}
});
await task;
Console.WriteLine("QueueB插入完成,進行輸出:");
}
public static void QueueC()
{
Console.WriteLine("Queue插入完成,進行輸出:");
while (QueueTest.GetCount() > 0)
{
QueueTest.OutData2();
}
}
#endregion
Main函數調用:
static void Main(string[] args)
{
try
{
Stopwatch stopWatch = new Stopwatch();
TaskTest.QueueMian();
Console.ReadLine();
TaskTest.QueueC();
Console.ReadLine();
}
catch (Exception e)
{
throw;
}
}
插入效果:

輸出效果:


