1. 正常的消息流程
上面這張圖,是一個正常的消息從生產到消息流程。在上一篇文章RabbitMQ學習總結(3)-集成SpringBoot中,代碼里使用消息確認,消息回退機制,現在詳細說一下。
2.1 消息發送確認
消息確認機制,需要實現RabbitTemplate類的一個內部接口ConfirmCallback,這個接口的作用是生產者把消息發送到交換機的結果回調。(Producer -----> Exchange)
參數包括:
CorrelationData correlationData:可以封裝業務ID信息,需要在發送消息時傳入此參數,這里才能接收到,否則是null
boolean ack:消息發送的結果狀態,發送成功是true,失敗是false
String cause:發送失敗的描述信息,如果發送成功是null。
/** * A callback for publisher confirmations. */ @FunctionalInterface public interface ConfirmCallback { /** * Confirmation callback. * @param correlationData correlation data for the callback. * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause); }
2.2 失敗回調
失敗回調,需要實現RabbitTemplate類的一個內部接口ReturnCallback,這個接口的作用是消息從交換機到隊列的結果回調。(Exchange -----> Queue )。消息從交換到隊列失敗,失敗原因可能是路由鍵不存在,通道未綁定等等,一般都跟配置有關系。
參數包括:
Message message:發送的消息內容 + 發送消息的配置信息
int replyCode:狀態碼,發送成功是200
String replyText:發送消息失敗的描述信息
String exchange:消息使用的交換機
String routingKey:消息使用的路由鍵
/** * A callback for returned messages. */ @FunctionalInterface public interface ReturnCallback { /** * Returned message callback. * @param message the returned message. * @param replyCode the reply code. * @param replyText the reply text. * @param exchange the exchange. * @param routingKey the routing key. */ void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey); }
2. 異常消息流程
2.1 消息退回
在消費者接收到消息之后,如果這個消息有問題,或者不是消費者想要的消息,可以把消息退回到原隊列。退回原隊列要確保有其他的服務可以再次消費這條消息。
參數說明:
long deliveryTag:消息唯一標識,這是RabbitMQ自動生成的,不需要人為管理,只需要從message.getMessageProperties().getDeliveryTag() 就可以獲得。
boolean multiple:是否批量退回,不開啟就使用false,開啟批量退回需要增加自己的業務判斷邏輯(比如:攢夠幾條再批量回退,或者設置等待間隔等等)
boolean requeue:是否退回到消息隊列,退回就使用true,就是交給其他消費者處理。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
2.2 死信交換機
死信交換機,顧名思義,就是不能再使用的消息,都會到這里來。其實謂死信交換機, 只是對應的隊列設置了對應的交換機是死信交換機, 對於交換機來講, 他還是一個普通的交換機 。
一般進入死信交換機有3種情況:
- 1. 消息被退回,又沒有被退回到原隊列。就是2.1中的requeue參數設置成false的情況。
- 2. 當消息過期了。隊列在創建時如果設置了消息過期時間,消息超過這個時間還沒有被消息的情況。
- 3. 消息超過隊列容量。隊列設置了最大長度、最大容量,如果超出容量存不下的情況。
死信交換機的定義,在創建隊列時,給隊列指定它的死信交換機。
@Bean public Queue queue() { Map<String,Object> map = new HashMap<>(); //設置消息的過期時間 單位毫秒 map.put("x-message-ttl",10000); //設置附帶的死信交換機 map.put("x-dead-letter-exchange","exchange.dlx"); //指定重定向的路由建 消息作廢之后可以決定需不需要更改他的路由建 如果需要 就在這里指定 map.put("x-dead-letter-routing-key","dead.order"); return new Queue("testQueue", true,false,false,map); }
注意:這里的死信交換機名稱“exchange.dlx”是要提前定義好的,並且綁定它自己的隊列,否則會報錯。
3. RabbitMQ常用配置參數
3.1 隊列參數
參數名 |
配置作用 |
x-dead-letter-exchange |
死信交換機 |
x-dead-letter-routing-key |
死信消息重定向路由鍵 |
x-expires |
隊列在指定毫秒數后被刪除 |
x-ha-policy |
創建HA隊列 |
x-ha-nodes |
HA隊列的分布節點 |
x-max-length |
隊列的最大消息數 |
x-message-ttl |
毫秒為單位的消息過期時間,隊列級別 |
x-max-priority |
最大優先值為255的隊列優先排序功能 |
3.2 消息參數
參數名 |
配置作用 |
content-type |
消息體的MIME類型,如application/json |
content-encoding |
消息的編碼類型 |
message-id |
消息的唯一性標識,由應用進行設置 |
correlation-id |
一般用做關聯消息的message-id,常用於消息的響應 |
timestamp |
消息的創建時刻,整形,精確到秒 |
其實我們剛剛發現所謂死信交換機,只是對應的隊列設置了對應的交換機是死信交換機,對於交換機來講,他還是一個普通的交換機。