在上一篇文章中,演示了一個發送者和一個消費者的情況。這一篇介紹一下多個消費者在同一個消息隊列中獲取消息的情況。
在有些應用當中,消費端接收到消息任務需要長時間的處理,如果等上一個消息處理完成以后再取下一個數據進行處理的話,勢必會有一些延遲。在消息隊列中的數據也會不斷增多,延遲將越來越大。當然對於一個消費進程來說,在某些情況下可以起多個線程來處理,而在這里將介紹另一種處理方式,多個消費進程的情況。而RabbitMQ在這方面進行了很好的處理和封裝,使客戶程序可以很方便的使用。
其實,在代碼實現上和上一遍的例子中並沒有什么不同,我們只需要運行兩個Server端的程序就可以了。我們可以看到,發送的消息會平均的由兩個Server中的一個來處理。不會有重復。倘若一個Server程序關閉了,那之后發送的所有消息都會在還運行着的那個Server程序上處理。這可以解決我們一些需要負載均衡的場景,而且擴展非常方便,只要在運行一個Server也就是Worker就可以了,Worker之間的狀態同步都免了。
當我們處理一個較長時間任務的時候,程序在處理過程中如果出現異常,或程序掛了導致消息沒有處理成功,我們通常並不希望丟失該消息任務,而希望由其他Worker來處理或者等掛了的Worker重新起來以后再處理。同樣,應對該場景,RabbitMQ也提供了簡便的API方便我們處理。在RabbitMQ中,為了不讓消息丟失,它提供了消息應答的概念。當消費者獲取到了一個消息以后,需要給RabbitMQ服務一個應答的消息,告知服務我已經收到或正確處理了該消息。那么RabbitMQ可以放心的在隊列中刪除該消息。在上一篇的服務端的代碼中
channel.BasicConsume("TaskQueue", true, consumer);
的第二個參數是true,RabbitMQ服務一旦把消息送達目標隊列及認為應答了。為了演示不發送應答消息的情況,我們需要安裝一個RabbitMQ的plugin:Management UI。安裝這個插件比較簡單。
你可以再 http://www.rabbitmq.com/management.html 了解到詳細信息。
打開瀏覽器, http://localhost:55672,輸入默認的用戶名密碼 guest/guest,進入主界面。界面上信息很多,我們只看最上面的一塊:Queued Messages。
還是啟動一個發送程序和一個接受程序。把接收程序的定義消費者的代碼修改為
channel.BasicConsume("TaskQueue", false, consumer);
然后運行兩個程序,發送一個消息。由於我們在接收消息的代碼中沒有發送消息收到的應答包,所以在剛才監控的網頁上回出現如下結果:
沒有回應的消息是1個。接下來我們關閉接收程序,不修改任何代碼,然后再次運行接收程序。我們發現接收程序再次收到了那個原來的消息。而瀏覽器上顯示的狀態還是沒有變說明還是有一個消息沒有應答,因為我們第二次運行的接收程序還是沒有發應答包。關掉接收程序,修改一下代碼,在處理完消息的時候我們添加如下代碼:
channel.BasicAck(ea.DeliveryTag, false);
這時,我發現程序如我們所料,還是會收到一次那個消息包,但是監控網頁的界面卻變了:
然后,無論在運行多少次接收程序,都不會再收到該消息包了。這也就說明了,RabbitMQ真正認為該消息被正確處理了。
需要注意的是,RabbitMQ對於沒有發送應答包的消息沒有時間的限制,也就是說沒有Timeout。RabbitMQ只會在處理消息的接收程序與RabbitMQ服務端斷開連接后才會重新分配該消息。如果連接沒有斷,但處理程序幾天沒有給回應包,它也不會重新發送。所以,如果在處理程序出現異常的時候,我們可以寫代碼將與RabbitMQ的連接斷開來實現消息的重新發送(也許會發到其他負載均衡的機器上處理)。
修改后的接收端代碼如下:
public class Worker
{
public void Listen()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare("TaskQueue", true, false, false, null);
//channel.BasicQos(500, 1, false);
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("TaskQueue", false, consumer);
while (true)
{
BasicDeliverEventArgs ea =
(BasicDeliverEventArgs)consumer.Queue.Dequeue();
Console.WriteLine("Receive a Message, start to handle");
//模擬斷開連接
return;
//模擬長時間運行
Thread.Sleep(60000);
byte[] bytes = ea.Body;
XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
using (MemoryStream ms = new MemoryStream(bytes))
{
RequestMessage message = (RequestMessage)xs.Deserialize(ms);
Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message);
}
//發送應答包
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
}
需要注意的是,該功能只有在point-point模式下才有如此效果,也就是一發一接的模式下。如果是發布和訂閱這種broadcasting的模式下,這種配置項的結果會有一些不同,我們下一篇再說。
完整示例代碼從這里下載