【RabbitMQ】如何進行消息可靠投遞【上篇】


說明

前幾天,突然發生線上報警,釘釘連發了好幾條消息,一看是RabbitMQ相關的消息,心頭一緊,難道翻車了?

u=1091165172,1855706818&fm=26&gp=0.jpg

[橙色報警] 應用[xxx]在[08-15 16:36:04]發生[錯誤日志異常],alertId=[xxx]。由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]觸發。
應用xxx 可能原因如下
服務名為:
 異常為:org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620
 產生原因如下:
1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
||Consumer received fatal=false exception on startup:
...
應用xxx 可能原因如下
服務名為:
 異常為:org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer:run:1160
 產生原因如下:
1.Stopping container from aborted consumer||Stopping container from aborted consumer:

定睛一看,看樣子像是消費者莫名其妙斷開了連接,正逢公司搬家之際,難道是機房又雙叒叕。。。。斷電了?於是趕緊聯系了運維,咨詢RabbitMQ是否發生了調整。幾分鍾后,得到了運維的回復,由於一些不可描述的原因,RabbitMQ進行了重啟,emmmm,雖然重啟只持續了10分鍾,但是導致該集群下所有消費者都掛了,需要將項目重啟后才能正常進行消費。

項目重啟后,一切似乎又正常運轉起來,但好景不長,沒過多久,工單就找上了門來,經過排查,發現是生產者在RabbitMQ重啟期間消息投遞失敗,導致消息丟失,需要手動處理和恢復。

於是,我開始思考,如何才能進行RabbitMQ的消息可靠投遞呢?特別是在這樣比較極端的情況,RabbitMQ集群不可用的時候,無法投遞的消息該如何處理呢?

可靠投遞

先來說明一個概念,什么是可靠投遞呢?在RabbitMQ中,一個消息從生產者發送到RabbitMQ服務器,需要經歷這么幾個步驟:

  1. 生產者准備好需要投遞的消息。
  2. 生產者與RabbitMQ服務器建立連接。
  3. 生產者發送消息。
  4. RabbitMQ服務器接收到消息,並將其路由到指定隊列。
  5. RabbitMQ服務器發起回調,告知生產者消息發送成功。

所謂可靠投遞,就是確保消息能夠百分百從生產者發送到服務器。

{6582FAF9-A46E-4239-810B-E1D6883ED070}.png.jpg

為了避免爭議,補充說明一下,如果沒有設置Mandatory參數,是不需要先路由消息才發起回調的,服務器收到消息后就會進行回調確認。

2、3、5步都是通過TCP連接進行交互,有網絡調用的地方就會有事故,網絡波動隨時都有可能發生,不管是內部機房停電,還是外部光纜被切,網絡事故無法預測,雖然這些都是小概率事件,但對於訂單等敏感數據處理來說,這些情況下導致消息丟失都是不可接受的。

20170716034945131.jpg

RabbitMQ中的消息可靠投遞

默認情況下,發送消息的操作是不會返回任何信息給生產者的,也就是說,默認情況下生產者是不知道消息有沒有正確地到達服務器。

那么如何解決這個問題呢?

對此,RabbitMQ中有一些相關的解決方案:

  1. 使用事務機制來讓生產者感知消息被成功投遞到服務器。
  2. 通過生產者確認機制實現。

在RabbitMQ中,所有確保消息可靠投遞的機制都會對性能產生一定影響,如使用不當,可能會對吞吐量造成重大影響,只有通過執行性能基准測試,才能在確定性能與可靠投遞之間的平衡。

在使用可靠投遞前,需要先思考以下問題:

  1. 消息發布時,保證消息進入隊列的重要性有多高?
  2. 如果消息無法進行路由,是否應該將該消息返回給發布者?
  3. 如果消息無法被路由,是否應該將其發送到其他地方稍后再重新進行路由?
  4. 如果RabbitMQ服務器崩潰了,是否可以接受消息丟失?
  5. RabbitMQ在處理新消息時是否應該確認它已經為發布者執行了所有請求的路由和持久化?
  6. 消息發布者是否可以批量投遞消息?
  7. 在可靠投遞上是否有可以接受的平衡性?是否可以接受一部分的不可靠性來提升性能?

只考慮平衡性不考慮性能是不行的,至於這個平衡的度具體如何把握,就要具體情況具體分析了,比如像訂單數據這樣敏感的信息,對可靠性的要求自然要比一般的業務消息對可靠性的要求高的多,因為訂單數據是跟錢直接相關的,可能會導致直接的經濟損失。

所以不僅應該知道有哪些保證消息可靠性的解決方案,還應該知道每種方案對性能的影響程度,以此來進行方案的選擇。

RabbitMQ的事務機制

RabbitMQ是支持AMQP事務機制的,在生產者確認機制之前,事務是確保消息被成功投遞的唯一方法。

在SpringBoot項目中,使用RabbitMQ事務其實很簡單,只需要聲明一個事務管理的Bean,並將RabbitTemplate的事務設置為true即可。

配置文件如下:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual

先來配置一下交換機和隊列,以及事務管理器。

@Configuration
public class RabbitMQConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";

    // 聲明業務Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 聲明業務隊列
    @Bean("businessQueue")
    public Queue businessQueue(){
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();
    }

    // 聲明業務隊列綁定關系
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }


    /**
     * 配置啟用rabbitmq事務
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}

然后創建一個消費者,來監聽消息,用以判斷消息是否成功發送。

@Slf4j
@Component
public class BusinessMsgConsumer {


    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveMsg(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到業務消息:{}", msg);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }
}

然后是消息生產者:

@Slf4j
@Component
public class BusinessMsgProducer{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setChannelTransacted(true);
    }

    @Transactional
    public void sendMsg(String msg) {
        rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME, "key", msg);
        log.info("msg:{}", msg);
        if (msg != null && msg.contains("exception"))
            throw new RuntimeException("surprise!");
        log.info("消息已發送 {}" ,msg);
    }
}

這里有兩個注意的地方:

  1. 在初始化方法里,通過使用 rabbitTemplate.setChannelTransacted(true); 來開啟事務。
  2. 在發送消息的方法上加上 @Transactional 注解,這樣在該方法中發生異常時,消息將不會發送。

在controller中加一個接口來生產消息:

@RestController
public class BusinessController {

    @Autowired
    private BusinessMsgProducer producer;

    @RequestMapping("send")
    public void sendMsg(String msg){
        producer.sendMsg(msg);
    }
}

來驗證一下:

msg:1
消息已發送 1
收到業務消息:1
msg:2
消息已發送 2
收到業務消息:2
msg:3
消息已發送 3
收到業務消息:3
msg:exception

Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: surprise!] with root cause

java.lang.RuntimeException: surprise!
	at com.mfrank.rabbitmqdemo.producer.BusinessMsgProducer.sendMsg(BusinessMsgProducer.java:30)
    ...

msg 的值為 exception 時, 在調用rabbitTemplate.convertAndSend 方法之后,程序拋出了異常,消息並沒有發送出去,而是被當前事務回滾了。

當然,你可以將事務管理器注釋掉,或者將初始化方法的開啟事務注釋掉,這樣事務就不會生效,即使在調用了發送消息方法之后,程序發生了異常,消息也會被正常發送和消費。

RabbitMQ中的事務使用起來雖然簡單,但是對性能的影響是不可忽視的,因為每次事務的提交都是阻塞式的等待服務器處理返回結果,而默認模式下,客戶端是不需要等待的,直接發送就完事了,除此之外,事務消息需要比普通消息多4次與服務器的交互,這就意味着會占用更多的處理時間,所以如果對消息處理速度有較高要求時,盡量不要采用事務機制。

RabbitMQ的生產者確認機制

RabbitMQ中的生產者確認功能是AMQP規范的增強功能,當生產者發布給所有隊列的已路由消息被消費者應用程序直接消費時,或者消息被放入隊列並根據需要進行持久化時,一個Basic.Ack請求會被發送到生產者,如果消息無法路由,代理服務器將發送一個Basic.Nack RPC請求用於表示失敗。然后由生產者決定該如何處理該消息。

也就是說,通過生產者確認機制,生產者可以在消息被服務器成功接收時得到反饋,並有機會處理未被成功接收的消息。

在Springboot中開啟RabbitMQ的生產者確認模式也很簡單,只多了一行配置:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
    publisher-confirms: true

publisher-confirms: true 即表示開啟生產者確認模式。

然后將消息生產者的代表進行部分修改:

@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
//        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setConfirmCallback(this);
    }

    public void sendCustomMsg(String exchange, String msg) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        log.info("消息id:{}, msg:{}", correlationData.getId(), msg);

        rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("消息確認成功, id:{}", id);
        } else {
            log.error("消息未成功投遞, id:{}, cause:{}", id, s);
        }
    }
}

讓生產者繼承自RabbitTemplate.ConfirmCallback 類,然后實現其confirm 方法,即可用其接收服務器回調。

需要注意的是,在發送消息時,代碼也進行了調整:

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);

這里我們為消息設置了消息ID,以便在回調時通過該ID來判斷是對哪個消息的回調,因為在回調函數中,我們是無法直接獲取到消息內容的,所以需要將消息先暫存起來,根據消息的重要程度,可以考慮使用本地緩存,或者存入Redis中,或者Mysql中,然后在回調時更新其狀態或者從緩存中移除,最后使用定時任務對一段時間內未發送的消息進行重新投遞。

以下是我盜來的圖,原諒我偷懶不想畫了[手動狗頭]:

5b65729e0001439305000294.jpg

另外,還需要注意的是,如果將消息發布到不存在的交換機上,那么發布用的信道將會被RabbitMQ關閉。

此外,生產者確認機制跟事務是不能一起工作的,是事務的輕量級替代方案。因為事務和發布者確認模式都是需要先跟服務器協商,對信道啟用的一種模式,不能對同一個信道同時使用兩種模式。

在生產者確認模式中,消息的確認可以是異步和批量的,所以相比使用事務,性能會更好。

使用事務機制和生產者確認機制都能確保消息被正確的發送至RabbitMQ,這里的“正確發送至RabbitMQ”說的是消息成功被交換機接收,但如果找不到能接收該消息的隊列,這條消息也會丟失。至於如何處理那些無法被投遞到隊列的消息,將會在下篇進行說明。

結題

所以當公司機房“斷電”時,如何處理那些需要發送的消息呢?相信看完上文之后,你的心中已經有了答案。

一般來說,這種“斷電”不會持續較長時間,一般幾分鍾到半小時之間,很快能夠恢復,所以如果是重要消息,可以保存到數據庫中,如果是非重要消息,可以使用redis進行保存,當然,還要根據消息的數量級來進行判斷。

如果消息量比較大,可以考慮將消息發送到另一個集群的死信隊列中,事實上,所在公司就有兩個RabbitMQ集群,所以當一個集群不可用時,可以往另一個集群發消息,emmm,如果兩個機房都停電了的話,當我沒說。

111.png.jpg


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM