RabbitMQ入門-競爭消費者模式


上一篇講了個 哈嘍World,現在來看看如果存在多個消費者的情況。

 

生產者:

package com.example.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 競爭消費者模式
 */
public class CompetingSend {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();    // 連接工廠
        factory.setHost("localhost");
        Connection connection = factory.newConnection();        // 獲取連接
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);    // 聲明隊列,只有他不存在的時候創建
        String msg = "Hello World!";
        // 發送多條消息
        for (int i = 0; i < 5; i++){
            channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes());
            System.out.println("Sending:" + msg);
        }

        channel.close();
        connection.close();

    }
}

 

消費者:

package com.example.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 一個生產者,多個消費者
 */
public class CompetingReceiveA {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();    // 連接工廠
        factory.setHost("localhost");
        Connection connection = factory.newConnection();        // 獲取連接
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);    // 聲明隊列,只有他不存在的時候創建

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String recv = new String(body, "UTF-8");
                System.out.println("Receive:" + recv);
                try {
                    doWork(recv);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("Done");
                }
            }
        };

        // true代表接收到消息后,給兔子發消息,讓這條消息失效
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

    // 模擬每條消息處理時間不一樣
    private static void doWork(String msg) throws InterruptedException {
        char c = msg.charAt(msg.length() - 1);
        for (int i = 0; i < Integer.parseInt(c+""); i++)
            Thread.sleep(1000);
    }

}

 

先啟動兩個消費者,再啟動生產者,查看控制台:

消費者A

消費者B

生產者(這里不必有疑問,這里打印的是修改之前的消息)

 

 要說明的是什么觀點呢?

默認情況下,RabbitMQ將按順序將每條消息發送給下一個使用者。一般來說,每個消費者得到的消息是一樣多。但是,並不是說每個消費者的任務重量是平均的。很有可能出現A總在處理耗時任務,B一直吃西瓜的情況。

因為兔子不知道每個消息的耗時,他就會傻傻的派遣任務。

 不過,官方也有解決辦法。

為了解決這個問題,我們可以使用basicQos方法,設置prefetchCount = 1這告訴RabbitMQ不要向消費者發送多於一條消息。換句話說,在它處理並確認了前一個消息之前,不要向工作人員發送新消息。

如果當前消費者正在忙碌(沒有確認消息),它會將其分派給空閑下一個消費者。

 

int prefetchCount = 1;
channel.basicQos(prefetchCount);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM