多個Consumer的消息分發
之前講過一個queue對應一個consumer的小例子, 但是在實際項目中,一個consumer肯定是不夠的,queue中的消息過多。一個consumer明顯會處理過慢,等待時間過長。這時候就需要多個consumer來緩解壓力。
消息發布端
無論是創建connection還是創建channel與之前的步驟都是一樣的,在上面我們使用的是默認的交換機。在這里可以自己聲明一個交換機
這里是與上個例子不同的地方,創建了exchange,把queue綁定到了交換機上。然后去發布一百個消息
//聲明一個direct類型的交換機 channel.ExchangeDeclare("firstExchange", "direct", true, false, null); //聲明隊列 channel.QueueDeclare("firstTest", true, false, false, null); //綁定隊列 channel.QueueBind("firstTest", "firstExchange", "firstExchange_Demo_firstTest", null); //發布一百個消息 for (var i = 0; i < 100; i++) { var msg = Encoding.UTF8.GetBytes($"{i} :Hello RabbitMQ"); channel.BasicPublish("firstExchange", routingKey: "firstExchange_Demo_firstTest", basicProperties: null, body: msg); }
Consumer
這邊大部分也與上個例子的代碼一致。
//使用訂閱的方式 //這里的創建隊列,是為了防止 消費 在 生產 之前 channel.QueueDeclare("firstTest", true, false, false, null); //綁定隊列
channel.ExchangeDeclare("firstExchange", "direct", true, false, null);
channel.QueueBind("firstTest", "firstExchange", "firstExchange_Demo_firstTest", null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var msg = Encoding.UTF8.GetString(e.Body); Console.WriteLine(msg); }; //進行消費 channel.BasicConsume("firstTest", true, consumer);
然后可以運行多個consumer,然后再打開消息發布程序。觀察我們不同的consumer窗口
可以看到queue是把第n個消息發送給了第n個consumer。如果這時候有三個consumer。那么它們收到的消息順序分別是
consumer1 consumer2 consumer3
1 2 3
4 5 6
這會帶來一個問題,當consumer過多的時候,消息就會分配的不均勻,導致某些concumer非常忙,有些特別閑。而且consumer也會有掉線的情況,甚至queue和rabbitmq也會有崩潰的情況,這時候應該如此保持我們的消息的有效性、持久性、以及准確性呢。這些在下一篇博文中會詳細說到