RabbitMQ 常用知識點總結


基礎

為什么使用 MQ?

1、削峰:在某個模塊接收到超過最大承受的並發量時,可以通過 MQ 排隊來使這些削減同一時刻處理的消息量。減小並發量。

2、解耦:在發送 MQ 處理業務時,可以使業務代碼與當前的代碼解耦,便於維護和拓展。

3、異步:異步使得在調用 MQ 后可以去處理其他操作,在 MQ 執行完后會自動反饋結果。

 

MQ缺點

1、復雜性提高,引入了其他問題。如消息丟失、重復消費、消息順序執行等。這些解決方案下面會說到。

2、宕機后不可用。可以創建集群來解決。

 

幾種 MQ 實現總結

ActiveMQ:老牌的 MQ,可靠性高,但支持的並發量低,目前維護也比較少。適用於並發量低的項目。

Kafka:支持超高並發場景,但是消息可靠性較差(消費失敗后不支持重試)。適用於產生大量數據的數據收集服務。

RocketMQ:支持超高並發場景,可靠性高。但支持的客戶端語言不多。適用於高並發的互聯網項目。

RabbitMQ:支持一般的高並發場景(萬級),可靠性高。支持客戶端語言較多。但是其實現是通過 Erlang 語言,不方便學習。適用於中小型項目。

 

完整架構圖

Publisher:生產者,生產消息的組件。

Exchange:交換機,對生產者傳來的消息進行解析並傳給隊列,交換機類型分為 fanout、direct、Topic、headers,其中headers交換機是根據消息對象的 headers 屬性值進行匹配的,性能較差,一般不使用。

Queue:隊列,因為其是 FIFO 結構,所以消息會按先進先出的順序被發送給消費者消費。

Binding:交換機與隊列的綁定關系,生產者在發送消息時會攜帶一個 RoutingKey ,在消息到達交換機后,交換機會根據 RoutingKey 匹配對應 BindingKey 的隊列,然后把消息發送給該隊列。

Virtual Host:又稱為 vhost,相當於一個文件夾,包含一個或多個 Exchange 與 Queue 的組合。

Broker:表示消息隊列服務器實體。

Consumer:消費者,專門消費消息的組件。

Connection:隊列與消費者之間的組件,在由隊列向消費者發送消息時,需要先建立連接,創建連接對象。

Channel:通道。消息由隊列發送直消費者時,用於傳輸的通道對象。

四大核心概念指的是生產者、交換機、隊列、消費者。

 

RabbitMQ 六種工作模式

1、Simple 簡單模式

不配置交換機,生產者直接發送給隊列(實際使用了默認的交換機),消費者監聽隊列,隊列與消費者是1對1的關系。

 

2、work 工作模式

和簡單模式差不多,同樣是不配置交換機,不同的是工作模式多個消費者監聽一個隊列。

公平分發:在工作模式中,默認情況下多個消費者會依次接收並消費隊列中的消息。

不公平分發:在工作模式中,可以在消費者端獲取消息時將 channel 的參數 basicQos 設為1(默認0),那么就會在消息分發時優先選擇空閑的消費者分發。如果不存在空閑隊列,那么還是按公平分發。

 

預取值:可以看作是規定的消費者等待消費隊列內部期望的隊列長度。比如消費 C1 是 2,C2 是 3,那么開始的消息會先分配給 C1,直到 C1 中等待消息的消息隊列長度為2時,下一個消息才會分配給 C2,然后C2也積累了3個消息后,繼續C1、C2輪流分配。預期值默認為0,所以默認情況就是消費者輪流被分配消息。

配置方式也是設置消費者端的 channel 對象的 basicQos 參數。

 

3、publish/subscribe 發布訂閱模式

交換機是 fanout 類型。交換機會將接收的消息發送給所有與其綁定的隊列。

 

4、routing 路由模式

交換機是 direct 類型。交換機會根據接收消息的 RoutingKey 尋找匹配的 BindingKey,然后發送給對應的隊列。BindingKey 是和 RoutingKey 完全匹配的,一對一關系。

 

5、topic 主題模式

交換機是 topic 類型。交換機會根據接收消息的 RoutingKey 尋找匹配的 BindingKey,與 routing 模式不同的是,topic 模式消息攜帶的 BindingKey 可以是一個通配符。交換機會匹配與通配符匹配的 BindingKey 對應的隊列。* 表示任意一個單次,# 表示0個或多個單次。如果 RoutingKey 不包括通配符,那么就相當於路由模式,如果 RoutingKey 是 #,那么就相當於發布訂閱模式。

 

6、RPC模式

RPC,也就是遠程調用, RabbitMQ 的 RPC 模式可以實現 RPC 的異步調用。客戶端既是發送者也是消費者,在請求發送給隊列 rpc_queue 后,服務器會監聽這個隊列,獲取后處理,處理完成將返回數據消息發給隊列 reply_to,而客戶端也會監聽這個隊列,最終實現得到結果數據。

 

高級

死信隊列

死信隊列,是指消息在變成死信消息后會被發給與其綁定好的死信交換機,然后重新被死信交換機發送至新的隊列,最后被消費者消費。而消息在變成死信消息的過程消耗的時間就成為了延期時間,所以常常用於實現延時隊列。

死信來源

1、消息TTL過期。

2、隊列內等待消費的消息達到最大長度(默認隊列無長度限制)

3、消息在消費者被拒絕(Nack 或 reject),且不重新加入隊列

 

延時隊列實現

1、為消息設置過期時間

這種方式就是在生產者發送消息時指定消息的過期時間,等到消息在死信隊列中過期后會被發送給死信交換機。

配置隊列:

發送方:

2、直接為隊列設置消息的過期時間,進入隊列的消息則會到達設置的過期時間后自動變成死信消息。

兩種方式的比較:

1、為消息設置過期時間會有一個缺陷,因為隊列是先進先出結構,所以如果為消息設置過期時間,那么先進的消息一定會先被執行,后面的一定會先等到前面的消息執行完成后才被執行,如果前面的消息過期時間長於后面的,那么后面的消息即使到達過期時間后也不會被執行,必須等到前面的消息發送完才能執行。所以只適用於發送的延時消息按過期時間遞增順序的場景。

2、直接為隊列設置過期時間,因為是進入隊列的消息都會被分配相同的過期時間,所以不會產生上面的問題,所以也存在弊端。如果需要配置多個過期時間,那么每次都需要重新聲明一個死信交換機、死信隊列以及綁定關系。這樣會造成配置臃腫。所以只適用於配置過期時間種類數較少的場景。

3、可以看出這兩種方式都存在不足之處,有沒有一種完美的方案呢?在 1 中,可以將消息按過期時間發送放在交換機里執行。因為交換機並不存在順序執行,所以就避免了 1 的問題。

實現:

配置案例代碼:

@Configuration
public class DelayedQueueConfig {
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //自定義交換機 我們在這里定義的是一個延遲交換機
    @Bean
    public CustomExchange delayedExchange() { 
        Map<String, Object> args = new HashMap<>();
        //自定義交換機的類型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,args);
    }
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange
                                               delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    } 
}

生產者案例代碼:

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    public RabbitTemplate rabbitTemplate;
    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
                correlationData ->{
                    correlationData.getMessageProperties().setDelay(delayTime);
                    return correlationData;
                });
        log.info(" 當 前 時 間 : {}, 發 送 一 條 延 遲 {} 毫秒的信息給隊列 delayed.queue:{}", new Date(),delayTime, message);
    }

消費者案例代碼:

public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("當前時間:{},收到延時隊列的消息:{}", new Date().toString(), msg);
    }

 

消息可靠性

使用 RabbitMQ 來進行部分業務的執行,尤其是一些重要的業務,如果消息在 MQ 中丟失,就會對整個系統造成比較嚴重的影響。保證消息可靠性主要分為保證各組件的持久化以及避免消息的丟失。

1、組件持久化

保證交換機、隊列、消息的持久化。針對於交換機、隊列,在聲明時就可以將交換機、隊列聲明為持久化的。

而消息的持久化,需要在已開啟交換機、隊列的持久化后,在發送消息時將消息的 BasicProperties 參數的 deliveryMode 設為 2,就可以實現持久化

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());

而在 SpringBoot 封裝好的 RabbitTemplate 的 convertAndSend 中,默認就已經將 deliveryMode 設為了2。

 

2、生產者到交換機(消息是否到達交換機都會觸發,回調方法參數會返回是否成功的 boolean值)

配置:

1)配置文件打開配置 spring.rabbitmq.publisher-confirm-type=correlated(老版本是spring.rabbitmq.publisher-confirms=true)。

⚫ NONE

禁用發布確認模式,是默認值

⚫ CORRELATED

發布消息成功到交換器后會觸發回調方法

⚫ SIMPLE

經測試有兩種效果,其一效果和 CORRELATED 值一樣會觸發回調方法,其二在發布消息成功后使用 rabbitTemplate 調用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節點返回發送結果,根據返回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie 方法如果返回 false 則會關閉 channel,則接下來無法發送消息到 broker

2)配置回調接口並加入到容器

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    /**
     * 交換機不管是否收到消息的一個回調方法
     * CorrelationData
     * 消息相關數據
     * ack
     * 交換機是否收到消息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id=correlationData!=null?correlationData.getId():"";
        if(ack){
            log.info("交換機已經收到 id 為:{}的消息",id);
        }else{
            log.info("交換機還未收到 id 為:{}消息,由於原因:{}",id,cause);
        } 
    } 
}

3)配置生產者(在生產者發送時定義的 CorrelationData 對象可以在回調接口中獲取到,如果沒有定義回調接口接收的就是空對象)。以及將回調接口注冊到 rabbitTemplate 對象中

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MyCallBack myCallBack;
    //依賴注入 rabbitTemplate 之后再設置它的回調對象
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(myCallBack);
    }
    
    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        //指定消息 id 為 1
        CorrelationData correlationData1=new CorrelationData("1");
        String routingKey="key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);
        CorrelationData correlationData2=new CorrelationData("2");
        routingKey="key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);
        log.info("發送消息內容:{}",message);
    }
}

 

3、交換機到隊列(消息未找到匹配隊列觸發)

如果消息傳到交換機后,沒有找到對應的隊列,那么這個消息默認會丟失,而如果配置了 Mandatory 參數可以在消息在交換機丟失時觸發回調方法。

配置

1)打開配置

#開啟回調函數
spring.rabbitmq.publisher-returns=true
#是否在交換機沒有匹配合適的隊列后返回給異步函數,false表示丟棄
spring.rabbitmq.template.mandatory=true

2)實現 ReturnCallback 接口,定義回調接口並加入容器,這里就在上一個中增加的組件類上實現

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    /**
     * 交換機不管是否收到消息的一個回調方法
     * CorrelationData
     * 消息相關數據
     * ack
     * 交換機是否收到消息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id=correlationData!=null?correlationData.getId():"";
        if(ack){
            log.info("交換機已經收到 id 為:{}的消息",id);
        }else{
            log.info("交換機還未收到 id 為:{}消息,由於原因:{}",id,cause);
        } 
    }
    //當消息無法路由的時候的回調方法
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String
            exchange, String routingKey) {
        log.error(" 消 息 {}, 被 交 換 機 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}",new String(message.getBody()),exchange,replyText,routingKey);
    } 
}

3)將這個組件配置到 rabbitTemplate 對象中。

 

4、隊列到消費者(手動確認)

默認情況下,消息發送到消費者后會立刻返回給隊列一個確認標識,顯示簽收。而如果消費者在確認標識返回成功后,執行業務到一半時發生異常,那么這條消息就沒有執行完,所以需要關閉自動確認,等到業務執行完畢后才進行手動的確認。在SpringBoot 對 RabbitMQ 封裝的依賴中,提供了隊列的補償機制,如果隊列在一段時間沒有收到消費者的確認消息,那么就會重新發送消息。

手動確認又分為三種方式,單個確認、批量確認和異步確認。

單個確認:

批量確認:方式和單個確認一樣,因為 waitForConfirms 方法作用的就是當前消息以及之前的所有未確認消息。

異步確認:通過添加回調接口來在執行完畢失敗后自動返回結果。

  

而在 SpringBoot 的繼承中,單個確認與批量確認都是使用 channel的 basicAck 方法。

使用配置:

1)在配置文件中將手動確認打開

2)在業務最后添加手動確認的代碼。

multiple表示簽收是否批量,也就是是否包括前面未簽收的消息。deleveryTag 是一個自增的消息唯一標識

此外,如果發生異常,可以取消這次確認,並選擇是否重新加入隊列。拒絕確認有兩種方式。一種Nack,一種是 Reject。區別是 Nack 會將當前消息之前的所有未確認的消息也取消確認,而 Reject 只針對於當前消息。(未確認/取消確認的消息會被標記為 unacked 狀態,即使宕機也不會丟失,發出的消息如果沒有接收到返回信息每隔一段時間會重新發送一次)。

 

5、開啟RabbitMQ 的事務(生產者端到MQ,不推薦)

channel.txSelect()聲明啟動事務模式;

channel.txComment()提交事務;

channel.txRollback()回滾事務。

try {
  channel.txSelect();
  channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
  channel.txCommit();
} catch (Exception e) {
  e.printStackTrace();
  channel.txRollback();
}

使用事務可以有效生產者端發送消息的可靠性,但是其不適用於多線程執行的場景,多個線程執行效率會很低。所以一般不推薦。

 

消息補償機制

1、SpringBoot 封裝的補償機制

在 SpringBoot 為 RabbitMQ 封裝的依賴中,提供一種補償機制,如果消息在消費者端拋出異常,那么該消息就會進行重發。默認情況下會隔5秒一直進行重發,直到消費者正常執行完畢。

我們可以通過自定義配置參數來修改默認的補償機制

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true       # 自動觸發補償機制
          max-attempts: 5     # 補償機制嘗試次數
          max-interval: 10000   # 重試最大間隔時間
          initial-interval: 2000  # 重試初始間隔時間
          multiplier: 2 # 間隔時間乘子,間隔時間*乘子=下一次的間隔時間,最大不能超過設置的最大間隔時間

 

2、自定義補償機制。對於封裝的補償機制存在一些不足,因為其是無差別補償,也就是只要消費者沒有響應就會重發,但是對於一些異常導致沒有響應即使發幾次都會導致沒有響應(如數據計算異常,數據類型轉換異常),這樣的補償機制就會消耗 CPU 資源。所以對於這些異常可以捕獲然后直接處理。對於其他異常(如調用第三方接口失敗)則可以進行補償重試。

 

對於MQ 整個模塊的補償機制,可以參考下面的架構圖

各步驟:

1、發生業務操作,業務數據寫入數據庫

2、生產者將消息發送給MQ的隊列Q1

3、發送了一條與step2中一摸一樣的延遲消息到對了Q3

4、消費者監聽Q1,獲取到了step2中發送的業務消息

5、消費者在收到生產者的業務消息后,發送了一條確認消息(記錄收到的消息信息)到Q2

6、回調檢查服務監聽了Q2,獲取到了消費者發送的確認消息

7、回調檢查服務將這條確認消息寫入數據庫等待之后的比對

8、Q3中的延遲消息延遲時間已到,被回調檢查服務接收到,之后就拿着這條延遲消息在數據庫中比對,如果比對成功,證明消費者接收到了生產者的業務消息並處理成功(如果不處理成功誰會傻了吧唧發送確認消息呢);如果比對失敗,證明消費者沒有接收到生產者的業務消息,或者說消費者接收到了業務消息之后始終沒有處理成功相關的業務並發送確認消息。這時回調檢查服務就會調用生產者的相關業務接口,讓生產者再次發送這條失敗的消息

9、有一種最極端的情況,step2和step3的消息都發送失敗了或者說在消息傳遞過程中發生意外丟失了!定時檢查服務會一直輪詢保存確認消息的數據庫中的消息數據,並於生產者的業務數據庫中的業務數據進行比對,如果兩者比對數量一致,則代表業務執行沒有問題;如果比對不一致,確認消息數據庫的數據量小於生產者業務數據量的話,就證明消費者沒有接收到生產者發送的消息。這時定時檢查服務會通知生產者再次發送消息到MQ的隊列Q1

 

消息冪等性

由於消息補償機制的存在,可以更加有效保證消息可以被消費,但是帶來的問題是可能某個消息執行的比較久,導致同一條消息再次被發送給了消費者,而前一條消息順利執行完,這樣一條消息就會被多次執行,所以消費者端的方法需要涉及成冪等性,也就是對於一條消息,無論被消費者消費幾次,效果都是一樣的。實現方案主要有兩種。

1、唯一ID+指紋碼。

唯一ID指的是使用 UUID、或者操作數據的主鍵,而指紋碼是與業務相關的ID,比如雪花算法就是根據當前時間戳生成的,生成的ID就屬於指紋碼。

在生產者發送時創建 Messgae 對象,將業務數據以及唯一ID+指紋碼保存到Meaasge對象中進行發送

Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();
amqpTemplate.convertAndSend(queueName, message);

然后再消費者端接收,獲取ID,以其為主鍵添加到mysql中,如果拋出異常說明已經執行過,沒有拋出異常繼續執行。

缺點是高並發場景下會受到性能瓶頸限制。可以通過分庫分表解決。

2、redis 操作。

在消費者方法開始使用 redis 的 setnx 方法來處理判斷數據可以一步到位,是實現冪等性的最佳方案。

 

消息順序執行

如果多個消費者監聽同一個隊列,那么默認下消息會依次順序分配給消費者。

上面提到預取值概念,通過配置消費者端的 channel 的 basicQos 參數來修改,但是這會收到消費者執行快慢、生產者發送消息到隊列的順序等因素影響,所以並不可靠。

所以實現消息順序執行的方式就是增加隊列,拆分消費者,使每個消費者只監聽一個隊列。

 

消息積壓

如果發現隊列中積壓了很多消息沒有處理,那么該如何解決。

1、對於積壓的消息,首先需要先檢查對應的消費者端,解決其執行慢導致阻塞的問題后,增加臨時隊列和消費者來處理積壓的消息,等到恢復后再將 MQ 改成原來架構。

2、對於設置了 TTL 的消息,在其因消息積壓過期丟失后,在 MQ 空閑時將過期丟失的消息進行重發。

 

備用交換機

在上面說到過,在消息發給交換機后,如果交換機沒有找到匹配的隊列,那么這個消息默認會丟失,可以配置消息在交換機上沒有匹配到隊列后的回調消息,以及將此條消息重新發回生產者。但是也可以配置一個備用交換機,在沒有匹配到隊列后發給備用交換機。

配置案例:

在同時配置備用交換機、returnCallBack 回調接口下,如果消息沒有匹配到對應的消息,那么會優先采用備用交換機。

 

 

 

參考博客:

RabbitMQ應用問題——消息補償機制以及冪等性的保證簡單介紹

RabbitMQ重試機制


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM