在前面一章介紹了在.Net Core中如何使用RabbitMQ,至此入門的的部分就完成了,我們內心中一定還有很多疑問:如果多個消費者消費同一個隊列怎么辦?如果這幾個消費者分任務的權重不同怎么辦?怎么把同一個隊列不同級別的任務分發給不同的消費者?如果消費者異常離線怎么辦?不要着急,后面將慢慢解開面紗。我們將結合實際的應用場景來講解更多的高級用法。
任務分發機制
設想如果把每個消息當做一個任務,生產者把任務發布到RabbitMQ,然后Consumer接收消息處理任務,如果我們發現一個Consumer不能完成任務處理怎么辦呢,我們會增加Consumer的數量。由一個Consumer增加到兩個Consumer,如圖由C變為C1和C2共同來分單工作。如果C1和C2是完全一樣的,那RabbitMQ會將任務平均分發到兩個消費者。

如下我們
新建ProductAckDemo開發布訂閱內容
新建ConsumerAckDemo1和ConsumerAckDemo2項目來訂閱同一個隊列在接收到消息后sleep1秒模擬任務處理的時間。
ProductAckDemo代碼,生產100條帶編號的消息:
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace ProductAckDemo { class Program { static void Main(string[] args) { String exchangeName = "wytExchange"; String routeKeyName = "wytRouteKey"; String message = "Task --"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "direct", durable: true, autoDelete: false, arguments:null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; for (int i = 0; i < 100; i++) { Byte[] body = Encoding.UTF8.GetBytes(message+i); channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName, basicProperties: properties, body: body); } } } } } }
ConsumerAckDemo1與ConsumerAckDemo2代碼(一樣的)
using System; using System.Text; using System.Threading; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace ConsumerAckDemo1 { class Program { static void Main(string[] args) { String exchangeName = "wytExchange"; String queueName = "wytQueue"; String routeKeyName = "wytRouteKey"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "direct", durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Byte[] body = ea.Body; String message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); Thread.Sleep(1000); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
打開兩個中斷窗口分別執行ConsumerAckDemo1和ConsumerAckDemo2。確定兩個程序處於訂閱狀態,然后執行ProductAckDemo程序。

看到上面的圖結果就一目了然了。因為兩個程序的時間相同所以任務是完全平均分發到兩個消費者的。我們修改下ConsumerAckDemo2腳本的sleep時間為2秒,看下結果會怎么樣。

可以看到ConsumerAckDemo1共收到66條消息,ConsumerAckDemo2腳本收到34條消息,基本是按照2:1來分配。那RabbitMQ是如何來保證這樣的分發機制呢,下面看RabbitMQ是如何通過ACK確認機制來實現任務分發的。
ACK消息確認機制
首先RabbitMQ支持消息確認機制來本證消息被consumer正常處理,當然也可以通過no-ack不使用確認機制。RabbitMQ默認是使用ACK確認機制的。當Consumer接收到RabbitMQ發布的消息時需要在適當的時機發送一個ACK確認的包來告知RabbitMQ,自己接收到了消息並成功處理。所以前面講到適當的時機建議是在處理完消息任務后發送。正如我們之前的代碼
EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => {
... channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手動發送ACK應答
... }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);//不自動應答
那如果不發送會怎樣呢?
在RabbitMQ中有一個prefetch_count的概念,這個參數的意思是允許Consumer最多同時處理幾個任務。我的版本的RabbitMQ默認這個參數是1,也就是說如果某一個Consumer在收到消息后沒有發送ACK確認包,RabbitMQ就會任務Consumer還在處理任務,當有1個消息都沒有發送ACK確認包時,RabbitMQ就不會再發送消息給該Consumer。
我們把ConsumerAckDemo2的sleep時間改回1秒,並且注釋掉ACK確認。

發現ConsumerAckDemo2只收到1條消息。通過WEB管理工具也可以看到有1條消息是沒有被ACK確認的

當然任務並不會一直卡在這里,在這是RabbitMQ任務ConsumerAckDemo2在處理這1個任務。如果ConsumerAckDemo2忽然終止RabbitMQ會重新分發任務。如果我終止ConsumerAckDemo2,1條任務被重新分發到了ConsumerAckDemo1。再查看下WEB管理工具,unackd已經為0

如果Consumer數量很多或者希望每個Consumer同時只處理一個任務可以通過在Consumer中設置PrefetchCount來實現更加均勻的任務分發。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
如下我修改PrefetchCount為1。在WEB管理插件中可以看到已經有一個Consumer的PrefetchCount為1了。

