RabbitMQ如何保證消息被正確消費


在實際開發中我們大部分情況下都是將RabbitMQ和Springbooot集成使用,下面的例子皆以此環境為例

消息的生產和消費路徑很長且復雜,怎么保證消息最終被正確的消費?

上圖列出了ABCDE 5個風險點,當你的消息需要確保正確送達必須要控制好這幾個點

  • A:確保消息被正確的發送到RabbitMQ的Exchange中
  • B:防止Exchange中還未來得及放置到queue中的消息意外丟失(服務異常停止)
  • C:確保消息能正常投遞到queue中(未匹配上bindingKey)
  • D: 防止Queue中還未被消費的消息丟失(服務異常停止)
  • E:確保消息被Consumer服務正常消費(代碼錯誤、業務邏輯異常、服務異常終止)

為解決這些問題,rabbitMQ提供了獨立的配置和功能來處理各個環節

(A) 發送消息異步確認

除了異步確認還有事務、同步確認和批量同步確認3種方式,但都會極大降低效率 一般不采用

配置文件增加publisher-confirm-type 配置項

spring:
  rabbitmq:
    port: 5672
    host: zhangfr.iok.la
    username: admin
    password: admin1
    virtual-host: /
    # 打開消息確認機制,通知消息是否到達Exchange
    publisher-confirm-type: correlated

系統初始化時設置一個ConfirmCallback

@Configuration
public class RabbitInitializingBean implements InitializingBean {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void afterPropertiesSet() throws Exception {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("Confirm====消息唯一標識: {}", correlationData);
                log.info("Confirm====確認狀態: {}", ack);
                log.info("Confirm====造成原因: {}", cause);
            }
        });
    }
}

示例:發送兩條消息,一條發送給正確的Exchange,另一條發送給一個不存在的Exchange

public void p4_send() {
	User user = new User();
	user.setId(1L);

	CorrelationData correlationData = new CorrelationData();
	correlationData.setId(user.getId().toString());

	// 這里的 空字符串是一個默認的路由名
	rabbitTemplate.convertAndSend("",RabbitMQConfig.QUEUE_TEST,user,correlationData);
	log.info("====發送消息");
}

public void p5_send() {
	User user = new User();
	user.setId(2L);

	CorrelationData correlationData = new CorrelationData();
	correlationData.setId(user.getId().toString());
	// 123是一個不存在的路由
	rabbitTemplate.convertAndSend("123",RabbitMQConfig.QUEUE_TEST,user,correlationData);
}

ConfirmCallback打印日志如下圖

我們可以根據發送情況進行補償處理:比如重試、通知、撤回等等

(C) 消息失敗退回

開啟消息失敗回退后,當消息未成功送達Queue時仍然會通知我們

先在配置文件開啟失敗回退配置

spring:
  rabbitmq:
    port: 5672
    host: zhangfr.iok.la
    username: admin
    password: admin1
    virtual-host: /
    # 開啟發送失敗退回,確保消息到達 Queue
    publisher-returns: true

設置一個ReturnCallback,路由消息到Queue時將會回調

@Configuration
public class RabbitInitializingBean implements InitializingBean {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void afterPropertiesSet() throws Exception {
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("Return====消息主體: {}", message);
                log.info("Return====回復編碼: {}", replyCode);
                log.info("Return====回復內容: {}", replyText);
                log.info("Return====交換器: {}", exchange);
                log.info("Return====路由鍵: {}", routingKey);
            }
        });
    }
}

示例:發送給默認路由一個 不能匹配上 Queue的消息

public void p5_send() {
	User user = new User();
	user.setId(2L);

	CorrelationData correlationData = new CorrelationData();
	correlationData.setId(user.getId().toString());
	// 空字符串是一個默認的路由名
	// abc不能匹配上Queue的routingKey
	rabbitTemplate.convertAndSend("","abc",user,correlationData);
	log.info("====發送消息 -> {}",user.toString() );
}

ReturnCallback收到異常通知如下

(E) 消費端顯示確認消息

默認配置下消費者收到消息就確認消息被消費(消息到達方法入口就自動確認消費了消息),實際業務中該方法可能報錯、程序中斷等等。消息並未被正確消費,造成事實上的消息丟失。
對准確性要求高的業務我們就需要開啟顯示確認:當消費業務方法執行完成后再顯示的確認消息被消費

配置如下:

spring:
  rabbitmq:
    port: 5672
    host: zhangfr.iok.la
    username: admin
    password: admin1
    virtual-host: /
    # 開啟手動確認:在收到消息處理完業務后再調用 channel.basicAck() 確認消。否則將再收到消息后自動確認,業務可能會處理失敗造成數據丟失
    listener:
      simple:
        acknowledge-mode: manual

使用示例:

@RabbitListener(queues = RabbitMQConfig.QUEUE_TEST)
public void c3(Message msg, Channel channel) throws IOException {
	byte[] byteMsg = msg.getBody();
	//todo 執行業務....
	
	//業務執行完成后再顯示確認
	channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
	log.info("c3 確認成功 -> {}",byteMsg);
}

(E) 消費端顯示確認消息-進階

上文示例只列舉了basicAck這一種方式:他的意思是肯定消息被正常消費,RabbitMQ服務端收到此消息就會刪除掉這一條消息
實際上確認還包含了否定確認的場景。

  • 拒絕此消息、第一個參數為消息ID ,第二個參數是 是否重試(為false時將成為死信)
@RabbitListener(queues = RabbitMQConfig.QUEUE_TEST)
public void c2(Message msg, Channel channel) throws IOException {
	byte[] byteMsg = msg.getBody();
	//todo 執行業務....

	channel.basicReject(msg.getMessageProperties().getDeliveryTag(),true);
	log.info("c2 確認失敗需要重試 -> {}",byteMsg);
}

執行結果

日志應該很好的展現了問題,拒絕此次推送后,又被推送過來了。如此循環(實際上如果有多個消費者的話,消息有機會被推送到其他消費者正常消費,從而終止循環)

  • 批量拒絕消息,方法同上:多了一個multiple 參數(第二個參數)
@RabbitListener(queues = RabbitMQConfig.QUEUE_TEST)
public void c3(Message msg, Channel channel) throws IOException {
	byte[] byteMsg = msg.getBody();
	//todo 執行業務....

	channel.basicNack(msg.getMessageProperties().getDeliveryTag(),true,true);
	log.info("c2 確認失敗丟棄 -> {}",byteMsg);
}
  • 拒絕並告知RabbitMQ盡量投遞給其他消費者(如果只有一個消費者那就等同於 basicReject了 233....)
@RabbitListener(queues = RabbitMQConfig.QUEUE_TEST)
public void c1(Message msg, Channel channel) throws IOException {
	byte[] byteMsg = msg.getBody();
	//todo 執行業務....

	//Recover: 拒絕消息
	channel.basicRecover(true);
	log.info("c1 確認失敗重新放回 -> {}",byteMsg);
}

(B & D)這兩個放在一起講

交換機和隊列創建時設置為需持久化。 這樣MQ突然宕機或被關閉,下次啟動時會自動恢復數據。
當然,要是服務器掛了、磁盤壞了就不頂用了,這種情況下需要鏡像隊列、異地多活來解決了

/**
 * 創建一個Queue
 * @return
 */
@Bean
public Queue QUEUE_TEST(){
	//第二個參數:是否持久化
	return new Queue(QUEUE_TEST,true);
}

/**
 * 創建一個Exchange
 * @return
 */
@Bean
public DirectExchange DIRECT_EXCHANGE_TEST(){
	//第二個參數:是否持久化
	return new DirectExchange(DIRECT_EXCHANGE_TEST,true,false);
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM