目錄
本系列向大家介紹RabbitMQ的簡單用法;
1. RabbitMQ的簡單實踐
2. RabbitMQ的輪詢模式和公平分發
3. RabbitMQ的發布訂閱模式(Publish/Subscribe)
4. RabbitMQ路由模式(Routing)
5. RabbitMQ的主題(Topic)模式
一、發布/訂閱(Publish/Subscribe)模式
發布訂閱是我們經常會用到的一種模式,生產者生產消息后,所有訂閱者都可以收到。RabbitMQ的發布/訂閱模型圖如下:
1、該模式下生產者並不是直接操作隊列,而是將數據發送給交換機,由交換機將數據發送給與之綁定的隊列;
2、該模式必須聲明交換機,並且設置模式: channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
fanout指分發模式(將每一條消息都發送到與交換機綁定的隊)。
3、隊列必須綁定交換機:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
二、發布消息
消息生產者向交換機(exchange)發送消息,代碼如下:
// 定義交換機名稱
static string EXCHANGE_NAME = "ps_exchange_fanout";
public static void PublishMessage()
{
try
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
// 定義exchange
channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
string msg = "hello ps!";
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(EXCHANGE_NAME, "", null, body);
Console.WriteLine("send msg:" + msg);
channel.Close();
conn.Close();
}
catch (Exception ex)
{
throw ex;
}
}
消息發送成功,截圖如下:
三、訂閱消息
在這里需要兩個消費者,消息發送后,所有的訂閱者都可以收到消息;
3.1 消費者1
和輪詢分發以及公平分發不同的是,消費者需要將隊列綁定到交換機,來訂閱消息;實現代碼如下:
static string EXCHANGE_NAME = "ps_exchange_fanout";
static string QUEUE_NAME = "ps_queue_sub1";
/// <summary>
/// 訂閱消費者1
/// </summary>
static void SubscribeConsumer1()
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
// 定義exchange
channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
// 綁定queue
channel.QueueDeclare(queue: QUEUE_NAME);
channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定義Consumer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model,ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"SubscribeConsumer1 收到消息: {message},時間:{DateTime.Now}");
};
//啟動消費者 設置為手動應答消息
channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
Console.WriteLine("Subscribe Consumer1 消費者已啟動");
Console.ReadKey();
channel.Dispose();
conn.Close();
}
3.2 消費者2
消費者2的代碼和1的基本相同,大家可以將1的修改一下,就可以使用,在此就不重復貼出了;
3.3 接收消息結果
消費者1接收消息截圖:
消費者2接收消息截圖:
通過上圖,我們可以看到,發布者發布消息后,訂閱者1、2均受到了相同的消息,至此功能已經完成;
四、小結
4.1 訂閱者代碼的主要流程
根據消費者的代碼,我們可以提煉流程如下
(1)創建連接
(2)聲明exchange
(3)綁定隊列到exchange
(4)聲明消費者
(5)綁定消費者到channel,監聽處理消息
(6)關閉連接
4.2 訂閱成功后,我們打開mq的管理地址可以看到,有兩個queue綁定到exchange上了:
注意:如果我們的exchange沒有消費者訂閱,發布的消息將不會被保存到任何隊列,直接丟失了;
參考鏈接:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html