RabbitMQ的四種交換機
交換機的作用是接收消息,並轉發到綁定的隊列,四種類型:Direct, Topic, Headers and Fanout
Direct
Direct類型的Exchange交換機,在生產者發送消息時,會去嚴格匹配生產者所綁定的隊列queue名稱
Topic(最為靈活)
給隊列綁定routing_key(路由key),發送消息時,就根據發送消息傳回的參數去匹配這個routing_key,然后根據匹配情況把消息分配到對應的消息隊列中;
舉個例子:
與交換機綁定的queue是 #.abc,生產者發消息當routing_key設置為 any.abc時(這里any可以為任何值),那么這個最終生產者所發的消息都會發送給綁定到 #.abc的隊列中去
Headers
生產者發出消息時,無論routing_key設置成什么,這個消息都是根據生產者與隊列的headers參數比對結果來判斷是否發送消息。
比對的方式,通過在綁定隊列時設置頭,有兩種設置方式:
x-match:all --------------> 表示要頭全部匹配上才發送到隊列中
x-match:any --------------> 表示要頭只要有一個匹配上就發送到隊列中
Fanout
生產者發出消息時,無論routing_key設置成什么,這個消息都會被等量復制發送給所有綁定到Fanout類型Exchange的隊列queue中去
RabbitMQ死信隊列
死信定義
dead-letter,死信,以下幾種情況會發生死信並讓死信進入死信隊列:
1.消費方調用channel.basicNack或者channel.basicReject時,並且requeue參數設置為false
2.消息在隊列中存在時間超過TTL(time-to-live)
3.消息超過了隊列允許的最大長度;
死信隊列需要在配置隊列queue時,設置死信隊列信息
如何處理死信
1.配置死信隊列交換機,死信隊列queue,死信隊列其實和普通的隊列本質上一樣,只是可以專門來處理死信而已;
2.為正常隊列設置死信隊列信息,需要用map設置以下參數:
x-dead-letter-exchange:死信隊列交換機
x-dead-letter-routing-key:死信隊列routing-key,注意:如果配置了這個參數,那么死信進來之后其routing-key也會替換成這個參數,否則就保留其本身的routing-key
RabbitMQ延遲隊列
延遲隊列是什么
延遲隊列指的是,隊列需要在指定時間之后才被消費。
在特有的場景下可以使用延遲隊列,例如一些定時通知的業務,可以通過延遲隊列實現。
TTL實現延遲隊列
首先, TTL 是什么。TTL是英文 time to live 的縮寫,就是最大存活時間。在上一節有說到消息隊列在隊列中存活超過TTL設置的時間之后,會進入死信隊列。而延遲隊列則正是通過給隊列設置TTL過期時間,然后在這個時間過期之后,這個消息成為死信並進入到 死信隊列 中。這樣就實現了死信隊列。
但是,TTL實現延遲隊列有以下幾個問題:
1.在隊列上配置TTL,有不可擴展性,每有一個業務需要不同的TTL就需要一個新的隊列來配置,這樣不合理。於是,可以在消息本身設置TTL。
2.隊列有先入先出的特性,在第一條消息被處理成死信之前,第二條消息無法被處理。例如,隊列A先進來,設置了TTL 10秒,隨后立刻讓隊列B進來,設置了TTL 1秒。這個時候會發生這種情況,A隊列在被處理成死信之前(需要10秒時間),B隊列設置TTL是1秒,理論上來說B隊列在1秒之后會被處理成死信,但是,實際上RabbitMQ在處理的時候會先處理A,后面的隊列依次等待。於是呢,需要RabbitMQ的延遲隊列插件來實現,具體可以看下面的鏈接和方法。
插件實現延遲隊列
首先,在官網下載插件: rabbitmq_delayed_message_exchange ,https://www.rabbitmq.com/community-plugins.html,然后放到其插件目錄中再重啟即可。
安裝成功之后,就可以像上面第2條所說的那樣,后入隊的B隊列1秒之后就被處理成了死信,先入隊的A隊列在10秒之后被處理成死信,就不存在處理順序的問題了。
RabbitMQ消息可靠性
什么是消息可靠性
咱們的系統在實際工作中難免會遇到各種各樣的問題,比如宕機、網絡波動等等。那么在遇到這些問題的時候,RabbitMQ如果不做消息可靠性的處理的話,那么會丟失消息,一些不重要的業務還好,要是遇到涉及到錢或者核心業務的時候,消息可靠性就很重要了。
RabbitMQ的消息可靠性問題可能會出現在哪些地方呢?
RabbitMQ分為生產者和消費者,生產者出問題就不會發消息了,這個沒有問題。大部分問題就出在生產者發出消息之后的流程中。
生產者發消息 ---> 路由 -> 隊列 -> 消費者監聽隊列 -> 消費者消費
這里面每個環節都可能出問題,比如路由錯了,隊列錯了,消費者掛了,等等。於是就需要多種方法去解決這個問題來保證消息的可靠性。
處理消息可靠性的幾種方式
首先,消費者成功消費之后,有兩種方式讓生產者知道成功消費了,從而保證了消息的可靠性,這兩種方式分別是RabbitMQ事務機制和RabbitMQ回調機制。待會在下文詳說怎么配置。
其次,還有就是消息從生產者到路由還沒到消費者的時候出問題。這種情況可以通過兩種方式解決:消息發送失敗強制回調和備份路由的機制。當備份路由與強制回調同時開啟的情況,優先使用備份路由。
RabbitMQ事務機制
要實現RabbitMQ事務機制,首先需要注入 RabbitTransactionManager
@Bean public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); }
還需要給RabbitTemplate設置channelTransactional的值
@PostConstruct private void init() { rabbitTemplate.setChannelTransacted(true); }
最后,給對應的方法上加上注解 @Transactional ,這樣就實現了RabbitMQ的事務。
就我個人的理解的話,這個事務僅僅是在消息發送方(生產者)的一個事務,只要消息發送到了隊列並提交事務成功之后,這個事務就算是成功了,不管消費者有沒有消費。而事務的開銷非常大,開事務比不開事務要多很多網絡等開銷,所以效率低下。於是有了下面的RabbitMQ消費成功的回調機制。
RabbitMQ回調機制
RabbitMQ在消費者消費成功並確認消息時,可以回調RabbitMQ設置開啟的回調函數,來確認消費了。回調函數是通過一個唯一id來判斷是回調的哪一個隊列。
首先,開啟配置publisher-confirms: true
spring: rabbitmq: host: 192.168.1.1 port: 5672 username: root password: 123456 listener: type: simple simple: default-requeue-rejected: false acknowledge-mode: manual publisher-confirms: true
然后,實現RabbitTemplate.ConfirmCallback接口,實現對應的confirm()方法,並將其賦值給RabbitTemplate的成員變量
@Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //CorrelationData包含唯一id信息,生產者發消息的時候需要生成唯一id,並將id信息和消息一起發送 log.info("唯一id:{}", correlationData.getId()); } }); }
還要注意,在發消息的時候,需要生成唯一id,用於回調時根據id判斷是哪個隊列。
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, "direct", msg, correlationData);
路由失敗強制回調
首先,配置文件開啟強制回調,注意,也可以通過調用RabbitTemplate方法setMandatory(true)來設置。
spring: rabbitmq: host: 192.168.99.100 port: 5672 username: root password: Tykj@Rabbit listener: type: simple simple: default-requeue-rejected: false acknowledge-mode: manual publisher-confirms: true template: mandatory: true
然后,給RabbitTemplate設置RabbitTemplate.ReturnCallBack的實現類,就可以實現當路由尋找隊列失敗時,回調returnedMessage()方法
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息路由失敗"); } });
備份路由
在配置路由的時候,通過配置備份路由參數。備份路由也需要像其他正常路由一樣,進行注入配置。關鍵是配置 alternate-exchange 參數
@Bean("directExchange") public DirectExchange directExchange(){ Map<String, Object> args = new HashMap<>(); args.put("alternate-exchange", "backup.exchange"); return new DirectExchange(DIRECT_EXCHANGE,true, false, args); }