目錄
本系列向大家介紹RabbitMQ的簡單用法;
1. RabbitMQ的簡單實踐
2. RabbitMQ的輪詢模式和公平分發
3. RabbitMQ的發布訂閱模式(Publish/Subscribe)
4. RabbitMQ路由模式(Routing)
5. RabbitMQ的主題(Topic)模式
一、定義
我們先來看一張圖:
這張圖有幾個不同的routingkey,每個routingkey都是由兩段組成的;
下面我們來編寫代碼來實現相關功能;
二、生產者生產消息
生產者發送四條消息,routingkey不相同,代碼如下:
class Program
{
static void Main(string[] args)
{
Console.WriteLine("msg send start!");
TopicPublisher();
Console.WriteLine("msg send end!");
}
static string EXCHANGE_NAME = "routing_exchange_topic";
static void TopicPublisher()
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
var routingKeyList = new List<string>() { "usa.news", "usa.weather", "europe.news", "europe.weather" };
for (var i = 0; i < routingKeyList.Count; i++)
{
var msg = $"hello topic {routingKeyList[i]}!";
channel.BasicPublish(
exchange: EXCHANGE_NAME,
routingKey: routingKeyList[i],
basicProperties: null,
body: Encoding.UTF8.GetBytes(msg));
Console.WriteLine($"消息發送:" + msg);
}
Console.WriteLine($"消息發送完成!");
channel.Close();
conn.Close();
}
}
結果運行如圖:
三、消費者
3.1 消費者1,綁定的routingkey為europe.#
,代碼如下:
static string EXCHANGE_NAME = "routing_exchange_topic";
static string QUEUE_NAME = "routing_queue_topic1";
static void TopicConsumer1()
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
channel.QueueDeclare(queue: QUEUE_NAME);
// 綁定routingKey
string routingKey = "europe.#";
channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKey);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"Topic Consumer1 收到消息: {message},時間:{DateTime.Now}");
};
channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
Console.ReadKey();
channel.Close();
conn.Close();
}
}
3.2 消費者2,綁定的routingkey為#.weather
,代碼如下:
static string EXCHANGE_NAME = "routing_exchange_topic";
static string QUEUE_NAME = "routing_queue_topic2";
static void TopicConsumer2()
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
channel.QueueDeclare(queue: QUEUE_NAME);
string routingKey = "#.weather";
// 可以綁定多個routingKey
channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKey);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"Topic Consumer2 收到消息: {message},時間:{DateTime.Now}");
};
channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
Console.ReadKey();
channel.Close();
conn.Close();
}
3.3 查看運行結果
(1)隊列綁定到交換機的結果如圖:
(2)消費者1的接收到的消息:
(3)消費者2的接收到的消息:
通過結果我們可以看到,消費者1和消費者2分別接收到了不同key的消息;
四、小結
1、topic模式,我們可以通過模糊匹配的,來接收自己想要的消息;使用Topic模式生產者在聲明隊列時需要制定消息到達隊列方式為topic;
2、Consumer消費者,匹配消息時:#匹配一個或者多個關鍵字,*匹配一個關鍵字
3、參考文檔:https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html