消息的訂閱和發布是使用消息隊列的常用場景。在上一篇文章中,雖然有多個消費者,但是一個消息只會有一個消費者來處理。而訂閱和發布則是每個訂閱該消息的消費者都會收到這個消息。RabbitMQ的路由機制讓我們實現這個功能輕而易舉。
要了解RabbitMQ的路由機制,exchange是一個關鍵。exchange可以叫做交換機,也似乎可以叫做路由器,反正它是用來選擇路由的。前文說到,RabbitMQ的核心思想就是消息的發布者不是直接把消息發送到目標隊列中的,事實上,通常它並不知道消息要發到哪個隊列中,它只知道把消息隊列發送到exchange中。exchange一邊接收發送者發過來的消息,而另一邊則把消息發送到目標隊列中去。exchange一定知道哪些隊列需要接收這個消息,是加到一個隊列里還是加到好幾個隊列里,還是直接扔掉。下圖中的X就是exchange。
RabbitMQ的exchange有一些類型,這些類型決定了exchange的行為。分別是 direct, topic, headers 和 fanout四種類型。在這一篇文章中介紹的最簡單的發送和接收例子中使用的如下代碼中
//指定發送的路由,通過默認的exchange直接發送到指定的隊列中。
channel.BasicPublish("", "esbtest.rmq.consoleserver", null, bytes);
第一個參數我們輸入了空字符串來代表一個exchange。空字符串的exchange在RabbitMQ中時默認的exchange,類型是direct。在這個例子中它會直接將消息發送到第二個參數route_key定義的同名的隊列中。
而我們這篇介紹的訂閱和發布是用了fanout這個類型。由於默認類型是direct的,所以需要使用fanout就需要額外定義。如下代碼就是定義了一個名字叫publish的fanout類型的exchange。
channel.ExchangeDeclare("publish", "fanout");
這種exchange有分發的意思。那分發到哪些隊列中呢?因為發布者不需要知道,所以這段代碼頁就在訂閱者那邊來實現。來看訂閱者得代碼片段:
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.ExchangeDeclare("publish", "fanout");
//隨機創建一個隊列
string queue_name = channel.QueueDeclare("subscriber1", true, false, false, null);
//綁定到名字叫publish的exchange上
channel.QueueBind(queue_name, "publish", "");
//定義這個隊列的消費者
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queue_name, true, consumer);
while (true)
{
BasicDeliverEventArgs ea =
(BasicDeliverEventArgs)consumer.Queue.Dequeue();
byte[] bytes = ea.Body;
XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
using (MemoryStream ms = new MemoryStream(bytes))
{
RequestMessage message = (RequestMessage)xs.Deserialize(ms);
Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message);
}
}
}
}
同樣先定義一個fanout的exchange。對於為什么要在發布者和訂閱者都要定義同名的exchange,我的理解是如果沒有定義的一方先啟動的話則會報錯說找不到那個exchange。我測試過如果有定義的先啟動,沒定義的后啟動也是沒有問題的。
因為每個訂閱者都需要一個隊列來存放發給自己的消息,所以需要創建一個隊列。通過QueueBind來和exchange關聯了。所有發送給名字為publish的exchange的消息,都會被它分發給所有與之綁定的隊列中,這樣,每個對應的消費者都會收到一個副本。
在瀏覽器的管理界面上我們可以看見,RabbitMQ為每一個消費者(訂閱者)創建了一個隊列,而沒有為發送者創建隊列。
從這里下載示例代碼