RabbitMQ Queue分發多個Consumer


多個Consumer的消息分發

之前講過一個queue對應一個consumer的小例子, 但是在實際項目中,一個consumer肯定是不夠的,queue中的消息過多。一個consumer明顯會處理過慢,等待時間過長。這時候就需要多個consumer來緩解壓力。

 

消息發布端

無論是創建connection還是創建channel與之前的步驟都是一樣的,在上面我們使用的是默認的交換機。在這里可以自己聲明一個交換機

這里是與上個例子不同的地方,創建了exchange,把queue綁定到了交換機上。然后去發布一百個消息

//聲明一個direct類型的交換機
channel.ExchangeDeclare("firstExchange", "direct", true, false, null);

//聲明隊列
 channel.QueueDeclare("firstTest", true, false, false, null);

 //綁定隊列
  channel.QueueBind("firstTest", "firstExchange", "firstExchange_Demo_firstTest", null);

//發布一百個消息

for (var i = 0; i < 100; i++)
 {
      var msg = Encoding.UTF8.GetBytes($"{i} :Hello RabbitMQ");
      channel.BasicPublish("firstExchange", routingKey: "firstExchange_Demo_firstTest", basicProperties: null, body: msg);
}

 

Consumer

這邊大部分也與上個例子的代碼一致。

//使用訂閱的方式
//這里的創建隊列,是為了防止 消費 在 生產 之前
channel.QueueDeclare("firstTest", true, false, false, null);
//綁定隊列 
channel.ExchangeDeclare("firstExchange", "direct", true, false, null); 
channel.QueueBind("firstTest", "firstExchange", "firstExchange_Demo_firstTest", null); 

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (sender, e) =>
{
       var msg = Encoding.UTF8.GetString(e.Body);

        Console.WriteLine(msg);
};
//進行消費
 channel.BasicConsume("firstTest", true, consumer);

 

 

然后可以運行多個consumer,然后再打開消息發布程序。觀察我們不同的consumer窗口

可以看到queue是把第n個消息發送給了第n個consumer。如果這時候有三個consumer。那么它們收到的消息順序分別是

consumer1    consumer2   consumer3

      1                      2                    3

      4                      5                    6

image

 

這會帶來一個問題,當consumer過多的時候,消息就會分配的不均勻,導致某些concumer非常忙,有些特別閑。而且consumer也會有掉線的情況,甚至queue和rabbitmq也會有崩潰的情況,這時候應該如此保持我們的消息的有效性、持久性、以及准確性呢。這些在下一篇博文中會詳細說到


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM