RabbitMQ的各個參數


簡介

原文:https://blog.csdn.net/vbirdbest/article/details/78670550

本節主要討論隊列聲明的各個參數

queueDeclare(String queue, 
            boolean durable, 
            boolean exclusive, 
            Map<String, Object> arguments);

    queue: 隊列名稱

    durable: 是否持久化, 隊列的聲明默認是存放到內存中的,如果rabbitmq重啟會丟失,如果想重啟之后還存在就要使隊列持久化,保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫

    exclusive:是否排外的,有兩個作用,一:當連接關閉時connection.close()該隊列是否會自動刪除;二:該隊列是否是私有的private,如果不是排外的,可以使用兩個消費者都訪問同一個隊列,沒有任何問題,如果是排外的,會對當前隊列加鎖,其他通道channel是不能訪問的,如果強制訪問會報異常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等於true的話用於一個隊列只能有一個消費者來消費的場景

    autoDelete:是否自動刪除,當最后一個消費者斷開連接之后隊列是否自動被刪除,可以通過RabbitMQ Management,查看某個隊列的消費者數量,當consumers = 0時隊列就會自動刪除

    arguments:
    隊列中的消息什么時候會自動被刪除?

        Message TTL(x-message-ttl):設置隊列中的所有消息的生存周期(統一為整個隊列的所有消息設置生命周期), 也可以在發布消息的時候單獨為某個消息指定剩余生存時間,單位毫秒, 類似於redis中的ttl,生存時間到了,消息會被從隊里中刪除,注意是消息被刪除,而不是隊列被刪除, 特性Features=TTL, 單獨為某條消息設置過期時間AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
        channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));

        Auto Expire(x-expires): 當隊列在指定的時間沒有被訪問(consume, basicGet, queueDeclare…)就會被刪除,Features=Exp

        Max Length(x-max-length): 限定隊列的消息的最大值長度,超過指定長度將會把最早的幾條刪除掉, 類似於mongodb中的固定集合,例如保存最新的100條消息, Feature=Lim

        Max Length Bytes(x-max-length-bytes): 限定隊列最大占用的空間大小, 一般受限於內存、磁盤的大小, Features=Lim B

        Dead letter exchange(x-dead-letter-exchange): 當隊列消息長度大於最大長度、或者過期的等,將從隊列中刪除的消息推送到指定的交換機中去而不是丟棄掉,Features=DLX

        Dead letter routing key(x-dead-letter-routing-key):將刪除的消息推送到指定交換機的指定路由鍵的隊列中去, Feature=DLK

        Maximum priority(x-max-priority):優先級隊列,聲明隊列時先定義最大優先級值(定義最大值一般不要太大),在發布消息的時候指定該消息的優先級, 優先級更高(數值更大的)的消息先被消費,
        Lazy mode(x-queue-mode=lazy): Lazy Queues: 先將消息保存到磁盤上,不放在內存中,當消費者開始消費的時候才加載到內存中
        Master locator(x-queue-master-locator)

注意

關於隊列的聲明,如果使用同一套參數進行聲明了,就不能再使用其他參數來聲明,要么刪除該隊列重新刪除,可以使用命令行刪除也可以在RabbitMQ Management上刪除,要么給隊列重新起一個名字。
隊列持久化

重啟RabbitMQ服務器(可以通過rabbitmqctl stop_app關閉服務器,rabbitmqctl start_app重啟服務器),可以登錄RabbitMQ Management—> Queues中可以看到之前聲明的隊列還存在

boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, arguments);

消息持久化

設置消息持久化必須先設置隊列持久化,要不然隊列不持久化,消息持久化,隊列都不存在了,消息存在還有什么意義。消息持久化需要將交換機持久化、隊列持久化、消息持久化,才能最終達到持久化的目的

方式一:設置deliveryMode=2

channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT,true);
    channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
    String message = "Hello RabbitMQ: ";
    // 設置消息持久化
    AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
    properties.deliveryMode(2);
    // 設置消息是否持久化,1: 非持久化 2:持久化
    channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));

方式二:設置BasicProperties為MessageProperties.PERSISTENT_TEXT_PLAIN

    channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    String message = "Hello RabbitMQ: ";
    channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

Message TTL消息剩余生存時間

統一設置隊列中的所有消息的過期時間,例如設置10秒,10秒后這個隊列的消息清零

方式一:為該隊列的所有消息統一設置相同的聲明周期

    Map<String, Object> arguments = new HashMap<String, Object>(); 
    arguments.put("x-message-ttl",10000);
    // 聲明隊列時指定隊列中的消息過期時間 
    channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); 
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

 

方式二:單獨為某條消息單獨設置時間

// expiration: 設置單條消息的過期時間

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); 
    for(int i = 1; i <= 5; i++) { AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties() .builder().expiration( i * 1000 + "");
        channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8")); }

Auto Expire自動過期

x-expires用於當多長時間沒有消費者訪問該隊列的時候,該隊列會自動刪除,可以設置一個延遲時間,如僅啟動一個生產者,10秒之后該隊列會刪除,或者啟動一個生產者,再啟動一個消費者,消費者運行結束后10秒,隊列也會被刪除

   Map<String, Object> arguments = new HashMap<String, Object>();
    arguments.put("x-expires", 10000); 
    channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); 
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

Max Length最大長度

x-max-length:用於指定隊列的長度,如果不指定,可以認為是無限長,例如指定隊列的長度是4,當超過4條消息,前面的消息將被刪除,給后面的消息騰位

 Map<String, Object> arguments = new HashMap<String, Object>(); 
    arguments.put("x-max-length", 4); 
    channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
    for(int i = 1; i <= 5; i++) { channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes("UTF-8")); }

Max Length Bytes代碼片段

x-max-length-bytes: 用於指定隊列存儲消息的占用空間大小,當達到最大值是會刪除之前的數據騰出空間

 Map<String, Object> arguments = new HashMap<String, Object>();
    rguments.put("x-max-length-bytes", 1024); 
    channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); 
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

Maximum priority最大優先級

x-max-priority: 設置消息的優先級,優先級值越大,越被提前消費。

正常情況下不適用優先級
Hello RabbitMQ: 1
Hello RabbitMQ: 2
Hello RabbitMQ: 3
Hello RabbitMQ: 4
Hello RabbitMQ: 5

使用優先級順序正好相反
Hello RabbitMQ: 5
Hello RabbitMQ: 4
Hello RabbitMQ: 3
Hello RabbitMQ: 2
Hello RabbitMQ: 1

   Map<String, Object> arguments = new HashMap<String, Object>();
    arguments.put("x-max-priority", 10); 
    channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); 
    for(int i = 1; i <= 5; i++) { AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties() .builder().priority(i); 
        channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8")); }

 

Dead letter exchange(死亡交換機) 和 Dead letter routing key(死亡路由鍵)

當隊列中的消息過期,或者達到最大長度而被刪除,或者達到最大空間時而被刪除時,可以將這些被刪除的信息推送到其他交換機中,讓其他消費者訂閱這些被刪除的消息,處理這些消息

public void testBasicPublish() throws IOException, TimeoutException, InterruptedException { 
        ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT); factory.setUsername("mengday"); 
        factory.setPassword("mengday"); Connection connection = factory.newConnection();
        Channel channel = connection.createChannel(); 
        // 聲明一個接收被刪除的消息的交換機和隊列
        String EXCHANGE_DEAD_NAME = "exchange.dead";
        String QUEUE_DEAD_NAME = "queue_dead"; 
        channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT); 
        channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null); 
        channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead"); 
        String EXCHANGE_NAME = "exchange.fanout"; 
        String QUEUE_NAME = "queue_name"; 
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); 
        Map<String, Object> arguments = new HashMap<String, Object>(); 
        arguments.put("x-message-ttl", 15000); 
        arguments.put("x-max-length", 4);
        arguments.put("x-max-length-bytes", 1024); 
        arguments.put("x-expires", 30000);
        arguments.put("x-dead-letter-exchange", "exchange.dead"); 
        arguments.put("x-dead-letter-routing-key", "routingkey.dead"); 
        channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); 
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 
        String message = "Hello RabbitMQ: "; 
        for(int i = 1; i <= 5; i++) { channel.basicPublish(EXCHANGE_NAME, "", null, (message + i).getBytes("UTF-8")); 
        } channel.close(); connection.close(); 
    }

剛開始由於隊列長度是4,總共發送5條消息,所以最早進入隊列的消息1將被刪除掉,被推送到“死亡隊列”中,所以看到普通隊列的消息為4條,死亡隊列的消息為1條,是消息1

隨着時間的流逝,普通隊列中的消息都該過期了,所以消息2、3、4、5都被推送到死亡隊列,所以死亡隊列消息是5條,普通隊列的消息條數為0

 

查看死亡隊列的消息可以得知,消息一死亡的原因是maxlen達到了最大長度,消息2、3、4、5都是因為生存時間到了導致死亡的

 

 

一個比較雜的綜合示例

關於消費者就不用代碼來獲取消息了,直接在RabbitMQ Management點擊某個隊列的名字,然后Get Message(s) 即可獲取

該示例使用很多參數配置,可能實際使用不會像這樣用,因為這樣好像不太配套。

public class Producer {
    @Test
    public void testBasicPublish() throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday");
        factory.setPassword("mengday");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 聲明一個接收被刪除的消息的交換機和隊列
        String EXCHANGE_DEAD_NAME = "exchange.dead";
        String QUEUE_DEAD_NAME = "queue_dead";
        channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null);
        channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead");
        String EXCHANGE_NAME = "exchange.fanout";
        String QUEUE_NAME = "queue_name";
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        Map<String, Object> arguments = new HashMap<String, Object>();
        // 統一設置隊列中的所有消息的過期時間
        arguments.put("x-message-ttl", 30000);
        // 設置超過多少毫秒沒有消費者來訪問隊列,就刪除隊列的時間
        arguments.put("x-expires", 20000);
        // 設置隊列的最新的N條消息,如果超過N條,前面的消息將從隊列中移除掉
        arguments.put("x-max-length", 4);
        // 設置隊列的內容的最大空間,超過該閾值就刪除之前的消息
        //
        arguments.put("x-max-length-bytes", 1024);
        // 將刪除的消息推送到指定的交換機,一般x-dead-letter-exchange和x-dead-letter-routing-key需要同時設置
        arguments.put("x-dead-letter-exchange", "exchange.dead");
        // 將刪除的消息推送到指定的交換機對應的路由鍵
        arguments.put("x-dead-letter-routing-key", "routingkey.dead");
        // 設置消息的優先級,優先級大的優先被消費 arguments.put("x-max-priority", 10);
        channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        String message = "Hello RabbitMQ: ";
        for (int i = 1; i <= 5; i++) {
            // expiration: 設置單條消息的過期時間
            AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().priority(i).expiration(i * 1000 + "");
            channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8"));
        }
        channel.close();
        connection.close();
    }

 

 運行效果

這里寫圖片描述

這里寫圖片描述

這里寫圖片描述

這里寫圖片描述

這里寫圖片描述

這里寫圖片描述

疑惑: 單獨使用arguments.put(“x-max-length”, 4); arguments.put(“x-dead-letter-exchange”, “exchange.dead”);arguments.put(“x-dead-letter-routing-key”, “routingkey.dead”);發現消息1先會觸發maxlen條件,而被推送到queue_dead隊列中,由此可以得出,當達到最大長度時,先刪除的是先被添加到隊列的消息。但是如果很多條件一塊同時使用可能現象不太好解釋,如上例如,實際結果是消息5因為maxlen而被推送到死亡隊列中,消息1、2、3、4都是由於expired過期導致的,難道不是消息1由於maxlen被推送到希望隊列,而2、3、4、5是由於過期導致的嗎?還有上面代碼如果將x-max-len該為3,在死亡隊列中獲取消息的先后順序是4、5、3、2、1不是優先級高的先被消費嗎,為啥不是5、4、3、2、1 難道是條件用的太多了,都亂了??? 明白的同學請留言,謝謝!

經過實際測試,參數單獨用或者和其他參數合理搭配使用都沒問題,如果是像上例一塊都用,大雜燴,搞不懂結果。

// 檢查隊列是否存在,不存在拋異常
channel.queueDeclarePassive(“queue_name”);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM