RabbitMQ入門_05_多線程消費同一隊列


A. 多線程消費同一隊列

參考資料:https://www.rabbitmq.com/tutorials/tutorial-two-java.html

消費一條消息往往比產生一條消息慢很多,為了防止消息積壓,一般需要開啟多個工作線程同時消費消息。在 RabbitMQ 中,我們可以創建多個 Consumer 消費同一隊列。示意圖如下:

workqueue

gordon.study.rabbitmq.workqueue.Sender.java

public class Sender {
 
    private static final String QUEUE_NAME = "tasks";
 
    private String name;
 
    public Sender(String name) {
        this.name = name;
    }
 
    public void work() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        for (int i = 0; i < 10;) {
            String message = "NO. " + ++i;
            TimeUnit.MILLISECONDS.sleep(100);
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.printf("(%1$s)[===>%2$s    ] %3$s\n", name, ":" + QUEUE_NAME, message);
        }
 
        channel.close();
        connection.close();
    }
}

gordon.study.rabbitmq.workqueue.Receiver.java

public class Receiver {
 
    private static final String QUEUE_NAME = "tasks";
 
    private String name;
 
    private int sleepTime;
 
    public Receiver(String name, int sleepTime) {
        this.name = name;
        this.sleepTime = sleepTime;
    }
 
    public void work() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.printf(" [    %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
                try {
                    TimeUnit.MILLISECONDS.sleep(sleepTime);
                } catch (InterruptedException e) {
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

gordon.study.rabbitmq.workqueue.Test01.java

public class Test01 {
 
    public static void main(String[] args) throws Exception {
        Receiver recv1 = new Receiver("A", 200);
        recv1.work();
        Receiver recv2 = new Receiver("B", 200);
        recv2.work();
        Sender sender = new Sender("S");
        sender.work();
    }
}

運行 Test01,發現 A、B 兩個消費者輪流獲取 S 發送的消息。
RabbitMQ 默認將消息順序發送給下一個消費者,這樣,每個消費者會得到相同數量的消息。即,輪詢(round-robin)分發消息。

輪詢很好,可是如果兩個消費者消費能力不一樣呢?
gordon.study.rabbitmq.workqueue.Test02SlowConsumer.java

public class Test02SlowConsumer {
 
    public static void main(String[] args) throws Exception {
        Receiver recv1 = new Receiver("A", 200);
        recv1.work();
        Receiver recv2 = new Receiver("B", 800);
        recv2.work();
        Sender sender = new Sender("S");
        sender.work();
    }
}

將消費者B 的消費時間提高到800毫秒,問題就出現了:B 依然分到了一半消息,需要運行很久才能處理完。

B. 公平分發(fair dispatch)

怎樣才能做到按照每個消費者的能力分配消息呢?聯合使用 Qos 和 Acknowledge 就可以做到。

gordon.study.rabbitmq.workqueue.QosAcknowledgeReceiver.java

public class QosAcknowledgeReceiver {
 
    private static final String QUEUE_NAME = "tasks";
 
    private String name;
 
    private int sleepTime;
 
    public QosAcknowledgeReceiver(String name, int sleepTime) {
        this.name = name;
        this.sleepTime = sleepTime;
    }
 
    public void work() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        channel.basicQos(1);
 
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.printf(" [    %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
                try {
                    TimeUnit.MILLISECONDS.sleep(sleepTime);
                } catch (InterruptedException e) {
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

代碼第22行的 basicQos 方法設置了當前信道最大預獲取(prefetch)消息數量為1。消息從隊列異步推送給消費者,消費者的 ack 也是異步發送給隊列,從隊列的視角去看,總是會有一批消息已推送但尚未獲得 ack 確認,Qos 的 prefetchCount 參數就是用來限制這批未確認消息數量的。設為1時,隊列只有在收到消費者發回的上一條消息 ack 確認后,才會向該消費者發送下一條消息。prefetchCount 的默認值為0,即沒有限制,隊列會將所有消息盡快發給消費者。

查看 basicQos 重載方法,發現幾個有趣的特性(參考https://www.rabbitmq.com/consumer-prefetch.html):

  • basicQos 中 prefetchSize 參數通過消息的總字節數來限制隊列推送消息的速度
  • prefetchSize 與 prefetchCount 可以同時設置,達到任何一個限制,則隊列暫停推送消息
  • global 參數表示前兩個參數的作用域,true 表示限制是針對信道的,false 表示限制是針對消費者的(我還沒試過一個信道支持多個消費者的例子,樣例代碼見下方)
  • 可以對同一個信道同時設置 global 為 true 和 false 的 Qos,表示隊列要考慮每個消費者的限制,同時還要考慮整個信道的限制
  • 看起來API注釋是錯的,因為 global 默認是 false,所以第22行代碼應該是把當前信道上每個消費者(當然,上面的例子中只有一個)的 prefetchCount 設為 1
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true);  // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

第37行代碼將 autoAck 設為 false,向 Broker 發送 ack 響應的任務就交給開發人員了。

第34行代碼在任務真正完成后,調用 basicAck 方法主動通知隊列消息已成功消費。當隊列收到 ack 確認后,會把下一條消息推送過來,並將該消息從隊列中刪除。

Qos 方案示意圖如下:

Qos workqueue

gordon.study.rabbitmq.workqueue.Test03FairDispatch.java

public class Test03FairDispatch {
 
    public static void main(String[] args) throws Exception {
        QosAcknowledgeReceiver recv1 = new QosAcknowledgeReceiver("A", 200);
        recv1.work();
        QosAcknowledgeReceiver recv2 = new QosAcknowledgeReceiver("B", 800);
        recv2.work();
        Sender sender = new Sender("S");
        sender.work();
    }
}

運行Test03,可以看到 RabbitMQ 按照消費者的實際能力分配消息。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM