RabbitMQ學習總結(4)-消息處理機制


 

1. 正常的消息流程

  

  上面這張圖,是一個正常的消息從生產到消息流程。在上一篇文章RabbitMQ學習總結(3)-集成SpringBoot中,代碼里使用消息確認,消息回退機制,現在詳細說一下。

2.1 消息發送確認

  消息確認機制,需要實現RabbitTemplate類的一個內部接口ConfirmCallback,這個接口的作用是生產者把消息發送到交換機的結果回調。(Producer ----->  Exchange)

  

  參數包括:

  CorrelationData correlationData:可以封裝業務ID信息,需要在發送消息時傳入此參數,這里才能接收到,否則是null

  boolean ack:消息發送的結果狀態,發送成功是true,失敗是false

  String cause:發送失敗的描述信息,如果發送成功是null。

    /**
     * A callback for publisher confirmations.
     */
    @FunctionalInterface
    public interface ConfirmCallback {

        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);

    }

 

2.2 失敗回調

  失敗回調,需要實現RabbitTemplate類的一個內部接口ReturnCallback,這個接口的作用是消息從交換機到隊列的結果回調。(Exchange  -----> Queue )。消息從交換到隊列失敗,失敗原因可能是路由鍵不存在,通道未綁定等等,一般都跟配置有關系。

  參數包括:

  Message message:發送的消息內容 + 發送消息的配置信息

  int replyCode:狀態碼,發送成功是200

  String replyText:發送消息失敗的描述信息

  String exchange:消息使用的交換機

  String routingKey:消息使用的路由鍵

    /**
     * A callback for returned messages.
     */
    @FunctionalInterface
    public interface ReturnCallback {

        /**
         * Returned message callback.
         * @param message the returned message.
         * @param replyCode the reply code.
         * @param replyText the reply text.
         * @param exchange the exchange.
         * @param routingKey the routing key.
         */
        void returnedMessage(Message message, int replyCode, String replyText,
                String exchange, String routingKey);
    }

 

2. 異常消息流程

2.1 消息退回

  在消費者接收到消息之后,如果這個消息有問題,或者不是消費者想要的消息,可以把消息退回到原隊列。退回原隊列要確保有其他的服務可以再次消費這條消息。

  參數說明:

  long deliveryTag:消息唯一標識,這是RabbitMQ自動生成的,不需要人為管理,只需要從message.getMessageProperties().getDeliveryTag() 就可以獲得。

  boolean multiple:是否批量退回,不開啟就使用false,開啟批量退回需要增加自己的業務判斷邏輯(比如:攢夠幾條再批量回退,或者設置等待間隔等等)

  boolean requeue:是否退回到消息隊列,退回就使用true,就是交給其他消費者處理。

    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

 

2.2 死信交換機

  死信交換機,顧名思義,就是不能再使用的消息,都會到這里來。其實謂死信交換機, 只是對應的隊列設置了對應的交換機是死信交換機, 對於交換機來講, 他還是一個普通的交換機 。

 

  一般進入死信交換機有3種情況:

  • 1. 消息被退回,又沒有被退回到原隊列。就是2.1中的requeue參數設置成false的情況。
  • 2. 當消息過期了。隊列在創建時如果設置了消息過期時間,消息超過這個時間還沒有被消息的情況。
  • 3. 消息超過隊列容量。隊列設置了最大長度、最大容量,如果超出容量存不下的情況。

  死信交換機的定義,在創建隊列時,給隊列指定它的死信交換機。

    @Bean
    public Queue queue() {
        Map<String,Object> map = new HashMap<>();
        //設置消息的過期時間 單位毫秒
        map.put("x-message-ttl",10000);
        //設置附帶的死信交換機
        map.put("x-dead-letter-exchange","exchange.dlx");
        //指定重定向的路由建 消息作廢之后可以決定需不需要更改他的路由建 如果需要 就在這里指定
        map.put("x-dead-letter-routing-key","dead.order");
        return new Queue("testQueue", true,false,false,map);
    }

  注意:這里的死信交換機名稱“exchange.dlx”是要提前定義好的,並且綁定它自己的隊列,否則會報錯。

 

3. RabbitMQ常用配置參數

3.1 隊列參數

 

參數名

配置作用

x-dead-letter-exchange

死信交換機

x-dead-letter-routing-key

死信消息重定向路由鍵

x-expires

隊列在指定毫秒數后被刪除

x-ha-policy

創建HA隊列

x-ha-nodes

HA隊列的分布節點

x-max-length

隊列的最大消息數

x-message-ttl

毫秒為單位的消息過期時間,隊列級別

x-max-priority

最大優先值為255的隊列優先排序功能

 

3.2 消息參數

參數名

配置作用

content-type

消息體的MIME類型,如application/json

content-encoding

消息的編碼類型

message-id

消息的唯一性標識,由應用進行設置

correlation-id

一般用做關聯消息的message-id,常用於消息的響應

timestamp

消息的創建時刻,整形,精確到秒

 

 

 

 

 

 

 

 

其實我們剛剛發現所謂死信交換機,只是對應的隊列設置了對應的交換機是死信交換機,對於交換機來講,他還是一個普通的交換機


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM