RabbitMQ入門_10_隊列長度限制


參考資料:https://www.rabbitmq.com/maxlength.html

RabbitMQ 有兩種方式限制隊列長度,第一種是對隊列中消息總數進行限制:

gordon.study.rabbitmq.features.TestQueueLengthLimit.java

public class TestQueueLengthLimit {
 
    private static final String QUEUE_NAME = "queueLengthLimit";
 
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel senderChannel = connection.createChannel();
        Channel consumerChannel = connection.createChannel();
 
        // 設置隊列最大消息數量為5
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-max-length", 5);
        senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
        // 發布6個消息
        for (int i = 0; i < 6;) {
            String message = "NO. " + ++i;
            senderChannel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        }
 
        // 獲取的消息為 NO. 2,說明隊列頭部第一條消息被拋棄
        Thread.sleep(100);
        GetResponse resp = consumerChannel.basicGet(QUEUE_NAME, false);
        String message = new String(resp.getBody(), "UTF-8");
        System.out.printf("consume: %s\n", message);
        System.out.printf("queue size: %d\n", resp.getMessageCount());
 
        // 現在隊列中有4個 Ready消息,1個 Unacked消息。此時再發布兩條消息,應該只有 NO. 3 被拋棄。
        senderChannel.basicPublish("", QUEUE_NAME, null, "NO. 7".getBytes("UTF-8"));
        senderChannel.basicPublish("", QUEUE_NAME, null, "NO. 8".getBytes("UTF-8"));
        Thread.sleep(100);
        GetResponse resp2 = consumerChannel.basicGet(QUEUE_NAME, false);
        message = new String(resp2.getBody(), "UTF-8");
        System.out.printf("consume: %s\n\n", message);
 
        // 現在隊列中有4個 Ready消息,2個 Unacked消息。
        // 此時Nack,消息2、4取消退回隊列頭導致隊列消息數量超過設定值,誰能留下?
        consumerChannel.basicNack(resp2.getEnvelope().getDeliveryTag(), true, true);
        Thread.sleep(100);
        while (true) {
            resp = consumerChannel.basicGet(QUEUE_NAME, true);
            if (resp == null) {
                break;
            } else {
                message = new String(resp.getBody(), "UTF-8");
                System.out.printf("consume: %s\n", message);
            }
        }
    }
}

顧名思義,x-max-length 參數限制了一個隊列的消息總數,當消息總數達到限定值時,隊列頭的消息會被拋棄

此外,處於 Unacked 狀態的消息不納入消息總數計算。但是,當 Unacked 消息被 reject 並重新入隊時,就會受 x-max-length 參數限制,可能回不了隊列。


第二種是對隊列中消息體總字節數進行限制:

        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-max-length-bytes ", 1000);
        senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);

只計算消息體的字節數,不算消息頭、消息屬性等字節數。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM