上一篇講了簡單隊列,實際工作中,這種隊列應該很少用到,因為生產者發送消息的耗時一般都很短,但是消費者收到消息后,往往伴隨着對高消息的業務邏輯處理,是個耗時的過程,這勢必會導致大量的消息積壓在一個消費者手中,從而導致業務的積壓.
所以我們需要多個消費者一起消費隊列中的消息,模型如下:(為了方便講解,暫時隱藏掉"交換機")

生產者
public class Producer { private const string QueueName = "test_work_queue"; public static void Send() { //獲取一個連接 using (IConnection connection = ConnectionHelper.GetConnection()) { //從連接中獲取一個信道 using (IModel channel = connection.CreateModel()) { //聲明隊列 channel.QueueDeclare(QueueName, false, false, false, null); for (int i = 0; i < 50; i++) { //創建消息 string msg = "hello world " + i; //發送消息 channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"{DateTime.Now} : send {msg}"); } } } } }
消費者1
public class Consumer1 { private const string QueueName = "test_work_queue"; public static void Receive() { //獲取一個連接 IConnection connection = ConnectionHelper.GetConnection(); //從連接中獲取一個信道 IModel channel = connection.CreateModel(); //聲明隊列 channel.QueueDeclare(QueueName, false, false, false, null); //添加消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //注冊消費者收消息事件 consumer.Received += (s, e) => { byte[] bytes = e.Body; string str = Encoding.Default.GetString(bytes); Console.WriteLine("consumer1 receive : " + str); Thread.Sleep(500);//休息0.5秒 }; //開啟消費者監聽 channel.BasicConsume(QueueName, true, "", false, false, null, consumer); } }
消費者2
只有一點點區別:
Console.WriteLine("consumer2 receive : " + str); Thread.Sleep(1000);//休息1秒
我們這里故意讓兩個消費者處理消息的耗時不一樣,一個0.5秒,一個1秒.
我們來看看結果:

可以非常清楚的看到,盡管兩個消費者處理消息的"耗時"不一樣,但是處理的"數量"是一樣的.
這里有幾個細節要說明一下:
1.在生產者和兩個消費者中都聲明了同一個隊列.其實,如果這個隊列之前已經存在了,那么生產者和消費者都可以不用再聲明了;
2.一定要先啟動兩個消費者,再啟動生產者.原因是,我們上面的代碼中,消費者的 BasicConsume 方法的第2個參數傳入的是 true,
這個參數就是 autoAck :是否自動確認(上面文章有講過).
所以如果先開啟生產者,那么會瞬間發送完50條消息,這時候啟動消費者1,那么會立刻"消費"掉這50條消息.有朋友肯定要問,不是"睡"了0.5秒么?
這里"睡"0.5秒,是對消息的業務邏輯處理耗時,而不是"消費"消息,消息已經在消費者啟動的那一刻從隊列中"拿"過來了;
同時,由於采用的是"自動確認",所以隊列看到50條都被"確認"了,就會將這些消息從隊列中移除.
這時候再啟動消費者2,則不會收到任何消息.
