RabbitMQ消息隊列(六)-消息任務分發與消息ACK確認機制(.Net Core版)


在前面一章介紹了在.Net Core中如何使用RabbitMQ,至此入門的的部分就完成了,我們內心中一定還有很多疑問:如果多個消費者消費同一個隊列怎么辦?如果這幾個消費者分任務的權重不同怎么辦?怎么把同一個隊列不同級別的任務分發給不同的消費者?如果消費者異常離線怎么辦?不要着急,后面將慢慢解開面紗。我們將結合實際的應用場景來講解更多的高級用法。

任務分發機制

設想如果把每個消息當做一個任務,生產者把任務發布到RabbitMQ,然后Consumer接收消息處理任務,如果我們發現一個Consumer不能完成任務處理怎么辦呢,我們會增加Consumer的數量。由一個Consumer增加到兩個Consumer,如圖由C變為C1和C2共同來分單工作。如果C1和C2是完全一樣的,那RabbitMQ會將任務平均分發到兩個消費者。 

 

如下我們

新建ProductAckDemo開發布訂閱內容

新建ConsumerAckDemo1和ConsumerAckDemo2項目來訂閱同一個隊列在接收到消息后sleep1秒模擬任務處理的時間。

ProductAckDemo代碼,生產100條帶編號的消息:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace ProductAckDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchange";
            String routeKeyName = "wytRouteKey";
            String message = "Task --";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            using (IConnection connection=factory.CreateConnection())
            {
                using (IModel channel=connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: "direct", durable: true, autoDelete: false, arguments:null);

                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.Persistent = true;

                    for (int i = 0; i < 100; i++)
                    {
                        Byte[] body = Encoding.UTF8.GetBytes(message+i);

                        channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName, basicProperties: properties, body: body);
                    }
                }
            }
        }
    }
}
View Code

ConsumerAckDemo1與ConsumerAckDemo2代碼(一樣的)

using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace ConsumerAckDemo1
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchange";
            String queueName = "wytQueue";
            String routeKeyName = "wytRouteKey";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            using (IConnection connection=factory.CreateConnection())
            {
                using (IModel channel=connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: "direct", durable: true, autoDelete: false, arguments: null);

                    channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName, arguments: null);

                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Byte[] body = ea.Body;
                        String message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [x] {0}", message);

                        Thread.Sleep(1000);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };

                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }
}
View Code

打開兩個中斷窗口分別執行ConsumerAckDemo1和ConsumerAckDemo2。確定兩個程序處於訂閱狀態,然后執行ProductAckDemo程序。 

看到上面的圖結果就一目了然了。因為兩個程序的時間相同所以任務是完全平均分發到兩個消費者的。我們修改下ConsumerAckDemo2腳本的sleep時間為2秒,看下結果會怎么樣。 

可以看到ConsumerAckDemo1共收到66條消息,ConsumerAckDemo2腳本收到34條消息,基本是按照2:1來分配。那RabbitMQ是如何來保證這樣的分發機制呢,下面看RabbitMQ是如何通過ACK確認機制來實現任務分發的。

ACK消息確認機制

首先RabbitMQ支持消息確認機制來本證消息被consumer正常處理,當然也可以通過no-ack不使用確認機制。RabbitMQ默認是使用ACK確認機制的。當Consumer接收到RabbitMQ發布的消息時需要在適當的時機發送一個ACK確認的包來告知RabbitMQ,自己接收到了消息並成功處理。所以前面講到適當的時機建議是在處理完消息任務后發送。正如我們之前的代碼

EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
  ...
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手動發送ACK應答
  ... }; channel.BasicConsume(queue: queueName, autoAck:
false, consumer: consumer);//不自動應答

那如果不發送會怎樣呢?

在RabbitMQ中有一個prefetch_count的概念,這個參數的意思是允許Consumer最多同時處理幾個任務。我的版本的RabbitMQ默認這個參數是1,也就是說如果某一個Consumer在收到消息后沒有發送ACK確認包,RabbitMQ就會任務Consumer還在處理任務,當有1個消息都沒有發送ACK確認包時,RabbitMQ就不會再發送消息給該Consumer。 
我們把ConsumerAckDemo2的sleep時間改回1秒,並且注釋掉ACK確認。

 發現ConsumerAckDemo2只收到1條消息。通過WEB管理工具也可以看到有1條消息是沒有被ACK確認的

當然任務並不會一直卡在這里,在這是RabbitMQ任務ConsumerAckDemo2在處理這1個任務。如果ConsumerAckDemo2忽然終止RabbitMQ會重新分發任務。如果我終止ConsumerAckDemo2,1條任務被重新分發到了ConsumerAckDemo1。再查看下WEB管理工具,unackd已經為0

如果Consumer數量很多或者希望每個Consumer同時只處理一個任務可以通過在Consumer中設置PrefetchCount來實現更加均勻的任務分發。

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

如下我修改PrefetchCount為1。在WEB管理插件中可以看到已經有一個Consumer的PrefetchCount為1了。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM