介紹

Producer:消息的生產者(發送消息的程序)。Queue:消息隊列,理解為一個容器,生產者向它發送消息,它把消息存儲,等待消費者消費。Consumer:消息的消費者(接收消息的程序)。
由圖所示,簡單隊列模式,一個生產者,經過一個隊列,對應一個消費者。可以看做是點對點的一種傳輸方式,相較架構模型圖,最主要的特點就是看不到 Exchange(交換機) 和 routekey(路由鍵) ,正是因為這種模式簡單,所以並不會涉及到復雜的條件分發等等,因此也不需要用戶去顯式的考慮交換機和路由鍵的問題。
- 但是要注意,這種模式並不是生產者直接對接隊列,而是用了默認的交換機,默認的交換機會把消息發送到和 routekey 名稱相同的隊列中去,這也是我們在后面代碼中在 routekey 位置填寫了隊列名稱的原因
.net 5.0 NuGet引用包
RabbitMQ.Client
簡單測試代碼:
- 生產者實現
using RabbitMQ.Client;
using System;
using System.Text;
namespace RabbitMQTest.Producer
{
/// <summary>
/// RabbitMQ測試_Producer生產者
/// </summary>
class Program
{
static void Main(string[] args)
{
// 1、創建連接工廠
IConnectionFactory factory = new ConnectionFactory()
{
UserName = "admin",
Password = "admin",
HostName = "192.168.1.101", //rabbitmq server ip
Port = 5672,
VirtualHost = "testhost" //在此連接期間要訪問的虛擬主機。
};
// 2、創建連接
IConnection connection = factory.CreateConnection();
// 3、創建通道
IModel channel = connection.CreateModel();
string name = "testQueue";
// 4、聲明一個隊列
channel.QueueDeclare(
queue: name, //消息隊列名稱
durable: false, //是否持久化,true持久化,隊列會保存磁盤,服務器重啟時可以保證不丟失相關信息。
exclusive: false, //是否排他,true排他的,如果一個隊列聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。
autoDelete: false, //是否自動刪除,true是自動刪除,自動刪除的前提是:致少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。
arguments: null); //設置隊列的一些其它參數。
Console.WriteLine("\n RabbitMQ連接成功,請輸入消息,輸入exit退出!");
string input;
do
{
input = Console.ReadLine();
byte[] sendBytes = Encoding.UTF8.GetBytes(input);
//發布消息
channel.BasicPublish("", name, null, sendBytes);
Console.WriteLine("消息發布完畢");
} while (input.Trim().ToLower() != "exit");
Console.WriteLine("\n RabbitMQ測試完畢!");
// 6、關閉通道
channel.Close();
// 7、關閉連接
connection.Close();
}
}
}
- queueDeclare 方法解釋
- 參數1:queue(隊列名稱),如果隊列不存在,則自動創建。
- 參數2:durable(隊列是否持久化),持久化可以保證服務器重啟后此隊列仍然存在。
- 參數3:exclusive(排他隊列)即是否獨占隊列,如果此項為 true,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除。
- 參數4:autoDelete(自動刪除),最后一個消費者將消息消費完畢后,自動刪除隊列。
- 參數5:arguments(攜帶附加屬性)。
- basicPublish 方法解釋
- 參數1:exchange(交換機名稱)。
- 參數2:routingKey(路由key),此處填寫隊列名,可理解為把消息發送到和 routekey 名稱相同的隊列中去。
- 參數3:props(消息的控制狀態),可以在此處控制消息的持久化。
- 參數為:MessageProperties.PERSISTENT_TEXT_PLAIN
- 參數4:body(消息主體),類型是一個字節數組,要轉一下類型。
通過工具關閉channel和釋放連接:先關閉通道,再釋放連接。
- 消費者實現
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace RabbitMQTest.Consumer
{
/// <summary>
/// RabbitMQ測試_Consumer消費者
/// </summary>
class Program
{
static void Main(string[] args)
{
// 1、創建連接工廠
ConnectionFactory factory = new ConnectionFactory()
{
UserName = "admin", //用戶名
Password = "admin", //密碼
HostName = "192.168.1.101", //rabbitmq server ip
Port = 5672, //端口號
VirtualHost = "testhost" //在此連接期間要訪問的虛擬主機。
};
// 2、創建連接
IConnection connection = factory.CreateConnection();
// 3、創建通道
IModel channel = connection.CreateModel();
// 4、事件基本消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
string name = "testQueue";
// 5、接收到消息事件
consumer.Received += (ch, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
Console.WriteLine($"接受到消息:{message}");
Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲1s發送回執!");
Thread.Sleep(1000);
// 6、確認該消費已被消費
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"已發送回執{ea.DeliveryTag}");
};
// 7、啟動消費者 設置為手動應答消息
channel.BasicConsume(name, false, consumer);
Console.WriteLine("消費者已啟動");
Console.ReadKey();
channel.Close();
connection.Close();
}
}
}
- basicConsume 方法解釋
- 參數1:queue(隊列名稱),即消費哪個隊列的消息 。
- 參數2:autoAck(自動應答)開始消息的自動確認機制,只要消費了就從隊列刪除消息。
- 參數3:callback(消費時的回調接口),callback 的類型是 Consumer 這里使用了 DefaultConsumer 就是 Consumer 的一個實現類。其中重寫 handleDelivery 方法,就可以獲取到消費的數據內容了,這里主要使用了其中的 body,即查看消息主體,其他三個參數暫時還沒用到,有興趣可以先打印輸出一下,能先有個大概的了解。
