.Net下RabbitMQ的使用(3) -- 競爭的消費者


 

上一篇文章中,演示了一個發送者和一個消費者的情況。這一篇介紹一下多個消費者在同一個消息隊列中獲取消息的情況。

 

在有些應用當中,消費端接收到消息任務需要長時間的處理,如果等上一個消息處理完成以后再取下一個數據進行處理的話,勢必會有一些延遲。在消息隊列中的數據也會不斷增多,延遲將越來越大。當然對於一個消費進程來說,在某些情況下可以起多個線程來處理,而在這里將介紹另一種處理方式,多個消費進程的情況。而RabbitMQ在這方面進行了很好的處理和封裝,使客戶程序可以很方便的使用。

 

python-two

 

 

其實,在代碼實現上和上一遍的例子中並沒有什么不同,我們只需要運行兩個Server端的程序就可以了。我們可以看到,發送的消息會平均的由兩個Server中的一個來處理。不會有重復。倘若一個Server程序關閉了,那之后發送的所有消息都會在還運行着的那個Server程序上處理。這可以解決我們一些需要負載均衡的場景,而且擴展非常方便,只要在運行一個Server也就是Worker就可以了,Worker之間的狀態同步都免了。

 

當我們處理一個較長時間任務的時候,程序在處理過程中如果出現異常,或程序掛了導致消息沒有處理成功,我們通常並不希望丟失該消息任務,而希望由其他Worker來處理或者等掛了的Worker重新起來以后再處理。同樣,應對該場景,RabbitMQ也提供了簡便的API方便我們處理。在RabbitMQ中,為了不讓消息丟失,它提供了消息應答的概念。當消費者獲取到了一個消息以后,需要給RabbitMQ服務一個應答的消息,告知服務我已經收到或正確處理了該消息。那么RabbitMQ可以放心的在隊列中刪除該消息。在上一篇的服務端的代碼中

channel.BasicConsume("TaskQueue", true, consumer);

的第二個參數是true,RabbitMQ服務一旦把消息送達目標隊列及認為應答了。為了演示不發送應答消息的情況,我們需要安裝一個RabbitMQ的plugin:Management UI。安裝這個插件比較簡單。

你可以再 http://www.rabbitmq.com/management.html 了解到詳細信息。

打開瀏覽器, http://localhost:55672,輸入默認的用戶名密碼 guest/guest,進入主界面。界面上信息很多,我們只看最上面的一塊:Queued Messages。

還是啟動一個發送程序和一個接受程序。把接收程序的定義消費者的代碼修改為

channel.BasicConsume("TaskQueue", false, consumer);

然后運行兩個程序,發送一個消息。由於我們在接收消息的代碼中沒有發送消息收到的應答包,所以在剛才監控的網頁上回出現如下結果:

image

 

沒有回應的消息是1個。接下來我們關閉接收程序,不修改任何代碼,然后再次運行接收程序。我們發現接收程序再次收到了那個原來的消息。而瀏覽器上顯示的狀態還是沒有變說明還是有一個消息沒有應答,因為我們第二次運行的接收程序還是沒有發應答包。關掉接收程序,修改一下代碼,在處理完消息的時候我們添加如下代碼:

channel.BasicAck(ea.DeliveryTag, false);

這時,我發現程序如我們所料,還是會收到一次那個消息包,但是監控網頁的界面卻變了:

image

 

然后,無論在運行多少次接收程序,都不會再收到該消息包了。這也就說明了,RabbitMQ真正認為該消息被正確處理了。

需要注意的是,RabbitMQ對於沒有發送應答包的消息沒有時間的限制,也就是說沒有Timeout。RabbitMQ只會在處理消息的接收程序與RabbitMQ服務端斷開連接后才會重新分配該消息。如果連接沒有斷,但處理程序幾天沒有給回應包,它也不會重新發送。所以,如果在處理程序出現異常的時候,我們可以寫代碼將與RabbitMQ的連接斷開來實現消息的重新發送(也許會發到其他負載均衡的機器上處理)。

修改后的接收端代碼如下:

    public class Worker
    {
        public void Listen()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";
            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare("TaskQueue", true, false, false, null);
                    //channel.BasicQos(500, 1, false);
                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume("TaskQueue", false, consumer);
 
                    while (true)
                    {
                        BasicDeliverEventArgs ea =
                            (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        Console.WriteLine("Receive a Message, start to handle");
 
                        //模擬斷開連接
                        return;
 
                        //模擬長時間運行
                        Thread.Sleep(60000);
                        byte[] bytes = ea.Body;
 
                        XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
                        using (MemoryStream ms = new MemoryStream(bytes))
                        {
                            RequestMessage message = (RequestMessage)xs.Deserialize(ms);
                            Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message);
                        }
                        //發送應答包
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                }
            }
        }
    }

 

需要注意的是,該功能只有在point-point模式下才有如此效果,也就是一發一接的模式下。如果是發布和訂閱這種broadcasting的模式下,這種配置項的結果會有一些不同,我們下一篇再說。

 

完整示例代碼從這里下載


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM