RabbitMQ (三) 工作隊列之輪詢分發


上一篇講了簡單隊列,實際工作中,這種隊列應該很少用到,因為生產者發送消息的耗時一般都很短,但是消費者收到消息后,往往伴隨着對高消息的業務邏輯處理,是個耗時的過程,這勢必會導致大量的消息積壓在一個消費者手中,從而導致業務的積壓.

所以我們需要多個消費者一起消費隊列中的消息,模型如下:(為了方便講解,暫時隱藏掉"交換機")

 

 

生產者

    public class Producer
    {
        private const string QueueName = "test_work_queue";
        public static void Send()
        {
            //獲取一個連接
            using (IConnection connection = ConnectionHelper.GetConnection())
            {
                //從連接中獲取一個信道
                using (IModel channel = connection.CreateModel())
                {
                    //聲明隊列
                    channel.QueueDeclare(QueueName, false, false, false, null);

                    for (int i = 0; i < 50; i++)
                    {
                        //創建消息
                        string msg = "hello world " + i;
                        //發送消息
                        channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
                        Console.WriteLine($"{DateTime.Now} : send {msg}");
                    }
                }
            }
        }
    }

 

消費者1

    public class Consumer1
    {
        private const string QueueName = "test_work_queue";
        public static void Receive()
        {
            //獲取一個連接
            IConnection connection = ConnectionHelper.GetConnection();

            //從連接中獲取一個信道
            IModel channel = connection.CreateModel();

            //聲明隊列
            channel.QueueDeclare(QueueName, false, false, false, null);

            //添加消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //注冊消費者收消息事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer1 receive : " + str);
                Thread.Sleep(500);//休息0.5秒
            };

            //開啟消費者監聽
            channel.BasicConsume(QueueName, true, "", false, false, null, consumer);
        }
    }

 

消費者2

只有一點點區別:

                Console.WriteLine("consumer2 receive : " + str);
                Thread.Sleep(1000);//休息1秒

 

我們這里故意讓兩個消費者處理消息的耗時不一樣,一個0.5秒,一個1秒.

我們來看看結果:

 

可以非常清楚的看到,盡管兩個消費者處理消息的"耗時"不一樣,但是處理的"數量"是一樣的.

這里有幾個細節要說明一下:

1.在生產者和兩個消費者中都聲明了同一個隊列.其實,如果這個隊列之前已經存在了,那么生產者和消費者都可以不用再聲明了;

2.一定要先啟動兩個消費者,再啟動生產者.原因是,我們上面的代碼中,消費者的 BasicConsume 方法的第2個參數傳入的是 true,

這個參數就是 autoAck :是否自動確認(上面文章有講過).

所以如果先開啟生產者,那么會瞬間發送完50條消息,這時候啟動消費者1,那么會立刻"消費"掉這50條消息.有朋友肯定要問,不是"睡"了0.5秒么?

這里"睡"0.5秒,是對消息的業務邏輯處理耗時,而不是"消費"消息,消息已經在消費者啟動的那一刻從隊列中"拿"過來了;

同時,由於采用的是"自動確認",所以隊列看到50條都被"確認"了,就會將這些消息從隊列中移除.

這時候再啟動消費者2,則不會收到任何消息.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM