RabbitMQ交換機、死信隊列、延遲隊列、消息可靠性


RabbitMQ的四種交換機

交換機的作用是接收消息,並轉發到綁定的隊列,四種類型:Direct, Topic, Headers and Fanout

Direct

Direct類型的Exchange交換機,在生產者發送消息時,會去嚴格匹配生產者所綁定的隊列queue名稱

Topic(最為靈活)

給隊列綁定routing_key(路由key),發送消息時,就根據發送消息傳回的參數去匹配這個routing_key,然后根據匹配情況把消息分配到對應的消息隊列中;

舉個例子:

與交換機綁定的queue是 #.abc,生產者發消息當routing_key設置為 any.abc時(這里any可以為任何值),那么這個最終生產者所發的消息都會發送給綁定到 #.abc的隊列中去

Headers

生產者發出消息時,無論routing_key設置成什么,這個消息都是根據生產者與隊列的headers參數比對結果來判斷是否發送消息。

比對的方式,通過在綁定隊列時設置頭,有兩種設置方式:

x-match:all  --------------> 表示要頭全部匹配上才發送到隊列中

x-match:any  --------------> 表示要頭只要有一個匹配上就發送到隊列中

Fanout

生產者發出消息時,無論routing_key設置成什么,這個消息都會被等量復制發送給所有綁定到Fanout類型Exchange的隊列queue中去

 


 

RabbitMQ死信隊列

死信定義

dead-letter,死信,以下幾種情況會發生死信並讓死信進入死信隊列:

1.消費方調用channel.basicNack或者channel.basicReject時,並且requeue參數設置為false

2.消息在隊列中存在時間超過TTL(time-to-live)

3.消息超過了隊列允許的最大長度;

死信隊列需要在配置隊列queue時,設置死信隊列信息

如何處理死信

1.配置死信隊列交換機,死信隊列queue,死信隊列其實和普通的隊列本質上一樣,只是可以專門來處理死信而已;

2.為正常隊列設置死信隊列信息,需要用map設置以下參數:

x-dead-letter-exchange:死信隊列交換機

x-dead-letter-routing-key:死信隊列routing-key,注意:如果配置了這個參數,那么死信進來之后其routing-key也會替換成這個參數,否則就保留其本身的routing-key

 


 

RabbitMQ延遲隊列

延遲隊列是什么

  延遲隊列指的是,隊列需要在指定時間之后才被消費。

  在特有的場景下可以使用延遲隊列,例如一些定時通知的業務,可以通過延遲隊列實現。

TTL實現延遲隊列

  首先,  TTL 是什么。TTL是英文 time to live 的縮寫,就是最大存活時間。在上一節有說到消息隊列在隊列中存活超過TTL設置的時間之后,會進入死信隊列。而延遲隊列則正是通過給隊列設置TTL過期時間,然后在這個時間過期之后,這個消息成為死信並進入到 死信隊列 中。這樣就實現了死信隊列。

  但是,TTL實現延遲隊列有以下幾個問題:

  1.在隊列上配置TTL,有不可擴展性,每有一個業務需要不同的TTL就需要一個新的隊列來配置,這樣不合理。於是,可以在消息本身設置TTL。

  2.隊列有先入先出的特性,在第一條消息被處理成死信之前,第二條消息無法被處理。例如,隊列A先進來,設置了TTL 10秒,隨后立刻讓隊列B進來,設置了TTL 1秒。這個時候會發生這種情況,A隊列在被處理成死信之前(需要10秒時間),B隊列設置TTL是1秒,理論上來說B隊列在1秒之后會被處理成死信,但是,實際上RabbitMQ在處理的時候會先處理A,后面的隊列依次等待。於是呢,需要RabbitMQ的延遲隊列插件來實現,具體可以看下面的鏈接和方法。

插件實現延遲隊列

  首先,在官網下載插件: rabbitmq_delayed_message_exchange ,https://www.rabbitmq.com/community-plugins.html,然后放到其插件目錄中再重啟即可。

  安裝成功之后,就可以像上面第2條所說的那樣,后入隊的B隊列1秒之后就被處理成了死信,先入隊的A隊列在10秒之后被處理成死信,就不存在處理順序的問題了。

 


 

RabbitMQ消息可靠性

什么是消息可靠性

  咱們的系統在實際工作中難免會遇到各種各樣的問題,比如宕機、網絡波動等等。那么在遇到這些問題的時候,RabbitMQ如果不做消息可靠性的處理的話,那么會丟失消息,一些不重要的業務還好,要是遇到涉及到錢或者核心業務的時候,消息可靠性就很重要了。

  RabbitMQ的消息可靠性問題可能會出現在哪些地方呢?

  RabbitMQ分為生產者和消費者,生產者出問題就不會發消息了,這個沒有問題。大部分問題就出在生產者發出消息之后的流程中。

   生產者發消息 ---> 路由 -> 隊列 -> 消費者監聽隊列 -> 消費者消費 

  這里面每個環節都可能出問題,比如路由錯了,隊列錯了,消費者掛了,等等。於是就需要多種方法去解決這個問題來保證消息的可靠性。

處理消息可靠性的幾種方式

  首先,消費者成功消費之后,有兩種方式讓生產者知道成功消費了,從而保證了消息的可靠性,這兩種方式分別是RabbitMQ事務機制RabbitMQ回調機制待會在下文詳說怎么配置。

  其次,還有就是消息從生產者路由還沒到消費者的時候出問題。這種情況可以通過兩種方式解決:消息發送失敗強制回調備份路由的機制。當備份路由與強制回調同時開啟的情況,優先使用備份路由。

RabbitMQ事務機制

  要實現RabbitMQ事務機制,首先需要注入 RabbitTransactionManager 

@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}

  還需要給RabbitTemplate設置channelTransactional的值

@PostConstruct
private void init() {
    rabbitTemplate.setChannelTransacted(true);
}

  最后,給對應的方法上加上注解 @Transactional ,這樣就實現了RabbitMQ的事務。

  就我個人的理解的話,這個事務僅僅是在消息發送方(生產者)的一個事務,只要消息發送到了隊列並提交事務成功之后,這個事務就算是成功了,不管消費者有沒有消費。而事務的開銷非常大,開事務比不開事務要多很多網絡等開銷,所以效率低下。於是有了下面的RabbitMQ消費成功的回調機制。

RabbitMQ回調機制

  RabbitMQ在消費者消費成功並確認消息時,可以回調RabbitMQ設置開啟的回調函數,來確認消費了。回調函數是通過一個唯一id來判斷是回調的哪一個隊列。

  首先,開啟配置publisher-confirms: true  

spring:
  rabbitmq:
    host: 192.168.1.1
    port: 5672
    username: root
    password: 123456
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
    publisher-confirms: true

  然后,實現RabbitTemplate.ConfirmCallback接口,實現對應的confirm()方法,並將其賦值給RabbitTemplate的成員變量

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
private void init() {
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            //CorrelationData包含唯一id信息,生產者發消息的時候需要生成唯一id,並將id信息和消息一起發送
            log.info("唯一id:{}", correlationData.getId());
        }
    });
}

  還要注意,在發消息的時候,需要生成唯一id,用於回調時根據id判斷是哪個隊列。

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, "direct", msg, correlationData);

路由失敗強制回調

  首先,配置文件開啟強制回調,注意,也可以通過調用RabbitTemplate方法setMandatory(true)來設置。

spring:
  rabbitmq:
    host: 192.168.99.100
    port: 5672
    username: root
    password: Tykj@Rabbit
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
    publisher-confirms: true
    template:
      mandatory: true

  然后,給RabbitTemplate設置RabbitTemplate.ReturnCallBack的實現類,就可以實現當路由尋找隊列失敗時,回調returnedMessage()方法

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息路由失敗");
    }
});

備份路由

  在配置路由的時候,通過配置備份路由參數。備份路由也需要像其他正常路由一樣,進行注入配置。關鍵是配置 alternate-exchange 參數

@Bean("directExchange")
public DirectExchange directExchange(){
    Map<String, Object> args = new HashMap<>();
    args.put("alternate-exchange", "backup.exchange");
    return new DirectExchange(DIRECT_EXCHANGE,true, false, args);
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM