在實際開發中我們大部分情況下都是將RabbitMQ和Springbooot集成使用,下面的例子皆以此環境為例
消息的生產和消費路徑很長且復雜,怎么保證消息最終被正確的消費?
上圖列出了ABCDE 5個風險點,當你的消息需要確保正確送達必須要控制好這幾個點
- A:確保消息被正確的發送到RabbitMQ的Exchange中
- B:防止Exchange中還未來得及放置到queue中的消息意外丟失(服務異常停止)
- C:確保消息能正常投遞到queue中(未匹配上bindingKey)
- D: 防止Queue中還未被消費的消息丟失(服務異常停止)
- E:確保消息被Consumer服務正常消費(代碼錯誤、業務邏輯異常、服務異常終止)
為解決這些問題,rabbitMQ提供了獨立的配置和功能來處理各個環節
(A) 發送消息異步確認
除了異步確認還有事務、同步確認和批量同步確認3種方式,但都會極大降低效率 一般不采用
配置文件增加publisher-confirm-type 配置項
spring:
rabbitmq:
port: 5672
host: zhangfr.iok.la
username: admin
password: admin1
virtual-host: /
# 打開消息確認機制,通知消息是否到達Exchange
publisher-confirm-type: correlated
系統初始化時設置一個ConfirmCallback
@Configuration
public class RabbitInitializingBean implements InitializingBean {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void afterPropertiesSet() throws Exception {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("Confirm====消息唯一標識: {}", correlationData);
log.info("Confirm====確認狀態: {}", ack);
log.info("Confirm====造成原因: {}", cause);
}
});
}
}
示例:發送兩條消息,一條發送給正確的Exchange,另一條發送給一個不存在的Exchange
public void p4_send() {
User user = new User();
user.setId(1L);
CorrelationData correlationData = new CorrelationData();
correlationData.setId(user.getId().toString());
// 這里的 空字符串是一個默認的路由名
rabbitTemplate.convertAndSend("",RabbitMQConfig.QUEUE_TEST,user,correlationData);
log.info("====發送消息");
}
public void p5_send() {
User user = new User();
user.setId(2L);
CorrelationData correlationData = new CorrelationData();
correlationData.setId(user.getId().toString());
// 123是一個不存在的路由
rabbitTemplate.convertAndSend("123",RabbitMQConfig.QUEUE_TEST,user,correlationData);
}
ConfirmCallback打印日志如下圖
我們可以根據發送情況進行補償處理:比如重試、通知、撤回等等
(C) 消息失敗退回
開啟消息失敗回退后,當消息未成功送達Queue時仍然會通知我們
先在配置文件開啟失敗回退配置
spring:
rabbitmq:
port: 5672
host: zhangfr.iok.la
username: admin
password: admin1
virtual-host: /
# 開啟發送失敗退回,確保消息到達 Queue
publisher-returns: true
設置一個ReturnCallback,路由消息到Queue時將會回調
@Configuration
public class RabbitInitializingBean implements InitializingBean {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void afterPropertiesSet() throws Exception {
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("Return====消息主體: {}", message);
log.info("Return====回復編碼: {}", replyCode);
log.info("Return====回復內容: {}", replyText);
log.info("Return====交換器: {}", exchange);
log.info("Return====路由鍵: {}", routingKey);
}
});
}
}
示例:發送給默認路由一個 不能匹配上 Queue的消息
public void p5_send() {
User user = new User();
user.setId(2L);
CorrelationData correlationData = new CorrelationData();
correlationData.setId(user.getId().toString());
// 空字符串是一個默認的路由名
// abc不能匹配上Queue的routingKey
rabbitTemplate.convertAndSend("","abc",user,correlationData);
log.info("====發送消息 -> {}",user.toString() );
}
ReturnCallback收到異常通知如下
(E) 消費端顯示確認消息
默認配置下消費者收到消息就確認消息被消費(消息到達方法入口就自動確認消費了消息),實際業務中該方法可能報錯、程序中斷等等。消息並未被正確消費,造成事實上的消息丟失。
對准確性要求高的業務我們就需要開啟顯示確認:當消費業務方法執行完成后再顯示的確認消息被消費
配置如下:
spring:
rabbitmq:
port: 5672
host: zhangfr.iok.la
username: admin
password: admin1
virtual-host: /
# 開啟手動確認:在收到消息處理完業務后再調用 channel.basicAck() 確認消。否則將再收到消息后自動確認,業務可能會處理失敗造成數據丟失
listener:
simple:
acknowledge-mode: manual
使用示例:
@RabbitListener(queues = RabbitMQConfig.QUEUE_TEST)
public void c3(Message msg, Channel channel) throws IOException {
byte[] byteMsg = msg.getBody();
//todo 執行業務....
//業務執行完成后再顯示確認
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
log.info("c3 確認成功 -> {}",byteMsg);
}
(E) 消費端顯示確認消息-進階
上文示例只列舉了basicAck這一種方式:他的意思是肯定消息被正常消費,RabbitMQ服務端收到此消息就會刪除掉這一條消息
實際上確認還包含了否定確認的場景。
- 拒絕此消息、第一個參數為消息ID ,第二個參數是 是否重試(為false時將成為死信)
@RabbitListener(queues = RabbitMQConfig.QUEUE_TEST)
public void c2(Message msg, Channel channel) throws IOException {
byte[] byteMsg = msg.getBody();
//todo 執行業務....
channel.basicReject(msg.getMessageProperties().getDeliveryTag(),true);
log.info("c2 確認失敗需要重試 -> {}",byteMsg);
}
執行結果
日志應該很好的展現了問題,拒絕此次推送后,又被推送過來了。如此循環(實際上如果有多個消費者的話,消息有機會被推送到其他消費者正常消費,從而終止循環)
- 批量拒絕消息,方法同上:多了一個multiple 參數(第二個參數)
@RabbitListener(queues = RabbitMQConfig.QUEUE_TEST)
public void c3(Message msg, Channel channel) throws IOException {
byte[] byteMsg = msg.getBody();
//todo 執行業務....
channel.basicNack(msg.getMessageProperties().getDeliveryTag(),true,true);
log.info("c2 確認失敗丟棄 -> {}",byteMsg);
}
- 拒絕並告知RabbitMQ盡量投遞給其他消費者(如果只有一個消費者那就等同於 basicReject了 233....)
@RabbitListener(queues = RabbitMQConfig.QUEUE_TEST)
public void c1(Message msg, Channel channel) throws IOException {
byte[] byteMsg = msg.getBody();
//todo 執行業務....
//Recover: 拒絕消息
channel.basicRecover(true);
log.info("c1 確認失敗重新放回 -> {}",byteMsg);
}
(B & D)這兩個放在一起講
交換機和隊列創建時設置為需持久化。 這樣MQ突然宕機或被關閉,下次啟動時會自動恢復數據。
當然,要是服務器掛了、磁盤壞了就不頂用了,這種情況下需要鏡像隊列、異地多活來解決了
/**
* 創建一個Queue
* @return
*/
@Bean
public Queue QUEUE_TEST(){
//第二個參數:是否持久化
return new Queue(QUEUE_TEST,true);
}
/**
* 創建一個Exchange
* @return
*/
@Bean
public DirectExchange DIRECT_EXCHANGE_TEST(){
//第二個參數:是否持久化
return new DirectExchange(DIRECT_EXCHANGE_TEST,true,false);
}