簡介
什么叫消息隊列?
消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。
消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。
消息隊列的應用場景
可以看出消息隊列是一種應用間的異步協作機制,那什么時候需要使用 MQ 呢?
以常見的訂單系統為例,用戶點擊【下單】按鈕之后的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一起同步執行,隨着業務的發展訂單量增長,需要提升系統服務的性能,這時可以將一些不需要立即生效的操作拆分出來異步執行,比如發放紅包、發短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之后發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。
以上是用於業務解耦的情況,其它常見場景包括最終一致性、廣播、錯峰流控等等。
RabbitMQ 特點
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。
AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標准,為面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。
RabbitMQ 最初起源於金融系統,用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
RabbitMQ 基本概念
RabbitMQ 內部結構
1、Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
2、Publisher
消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
3、Exchange
交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
4、Binding
綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
5、Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
6、Connection
網絡連接,比如一個TCP連接。
7、Channel
信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
8、Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
9、Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。
10、Broker
表示消息隊列服務器實體。
Exchange 類型
Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。
headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了
下面的代碼中也不會編寫 headers類型的代碼
Direct Exchange (直連型交換機)
直連型交換機,根據消息攜帶的路由鍵將消息投遞給對應隊列。
大致流程,有一個隊列綁定到一個直連交換機上,同時賦予一個路由鍵 routing key 。
然后當一個消息攜帶着路由值為X,這個消息通過生產者發送給交換機時,交換機就會根據這個路由值X去尋找綁定值也是X的隊列。
Fanout Exchange(扇型交換機)
扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列。
Topic Exchange(主題交換機)
主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和綁定鍵之間是有規則的。
簡單地介紹下規則:
* (星號) 用來表示一個單詞 (必須出現的)
# (井號) 用來表示任意數量(零個或多個)單詞
通配的綁定鍵是跟隊列進行綁定的,舉個小例子
隊列Q1 綁定鍵為 *.TT.* 隊列Q2綁定鍵為 TT.#
如果一條消息攜帶的路由鍵為 A.TT.B,那么隊列Q1將會收到;
如果一條消息攜帶的路由鍵為TT.AA.BB,那么隊列Q2將會收到;
RabbitMQ安裝
一、這是RabbitMQ的官網地址(https://www.rabbitmq.com/install-windows.html) 可以按照官網的文檔進行安裝
ps:看了下官方文檔rabbitMQ是用Erlang 語言開發的,所以需要先安裝Erlang 語言。
二、docker-compose安裝
建議通過docker進行安裝,畢竟用docker命令安裝很方便,下面就是我用docker-compose安裝rabbitmq的yml文件配置:
version: '3'
services:
my_rabbitMQ:
image: "rabbitmq:3.8.3-management"
container_name: my_rabbitMQ
restart: always
privileged: true
ports:
- "15672:15672"
- "5672:5672"
environment:
- "RABBITMQ_DEFAULT_USER=rabbitMQ"
- "RABBITMQ_DEFAULT_PASS=rabbitMQ"
docker-compose 安裝rabbitMQ命令
docker-compose -f rabbitMq-docker-compose.yml up -d
1、docker-compose安裝成功后
2、瀏覽器訪問 http://127.0.0.1:15672/ ,並輸入賬號/密碼 rabbitMQ/rabbitMQ 能正常訪問rabbitMQ管理界面,就代表安裝完成。
spring boot集成RabbitMQ(編碼)
項目有兩個rabbit-provider(生產者)和rabbit-consumer(消費者)
集成rabbitMQ 主要需要依賴spring-boot-starter-amqp;
java-testdata-generator 是我在gitee上看到 隨機測試數據生成器,包括身份證號碼,銀行卡號,姓名,漢字、手機號,電子郵箱地址和生成insert sql參數列表字符串等的工具包。主要是用來不讓測試數據看起來那么單調;
maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.4</version>
</dependency>
<!--測試接口 添加swagger start-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.6.1</version>
</dependency>
<!--測試接口 添加swagger end-->
<!--Java實現的各種隨機測試數據生成器,包括身份證號碼,銀行卡號,姓名,漢字、手機號 start-->
<dependency>
<groupId>com.github.binarywang</groupId>
<artifactId>java-testdata-generator</artifactId>
<version>1.1.2</version>
</dependency>
<!--Java實現的各種隨機測試數據生成器,包括身份證號碼,銀行卡號,姓名,漢字、手機號 end-->
application.yml配置
server:
port: 8999
spring:
#項目名稱
application:
name: rabbitmq-provider
#配置rabbitMq 服務器
rabbitmq:
host: 127.0.0.1
port: 5672
username: rabbitMQ
password: rabbitMQ
# #虛擬host 可以不設置,使用server默認host
# virtual-host: xkq
#確認消息已發送到交換機(Exchange)
# publisher-confirm-type: SIMPLE
publisher-confirm-type: CORRELATED
#確認消息已發送到隊列(Queue)
publisher-returns: true
Direct Exchange (直連型交換機)
項目rabbit-provider(生產者)
DirectRabbitConfig配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
public static final String TestDirectQueue = "TestDirectQueue";
public static final String TestDirectExchange = "TestDirectExchange";
public static final String TestDirectRouting = "TestDirectRouting";
//隊列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
// exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable
// autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
// return new Queue("TestDirectQueue",true,true,false);
//一般設置一下隊列的持久化就好,其余兩個就是默認false
return new Queue(TestDirectQueue,true);
}
//Direct交換機 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange(TestDirectExchange,true,false);
}
//綁定 將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue())
.to(TestDirectExchange()).with(TestDirectRouting);
}
}
SendDirectMessageController業務發送消息
import cn.binarywang.tools.generator.ChineseAddressGenerator;
import cn.binarywang.tools.generator.ChineseNameGenerator;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.example.rabbitmqprovider.direct.config.DirectRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
public class SendDirectMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發送等等方法
@ResponseBody
@GetMapping("/sendDirectMessage")
public Object sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = StrUtil.format("hello 我叫:{} 我住在:{}", ChineseNameGenerator.getInstance().generate(),
ChineseAddressGenerator.getInstance()
.generate());
Map<String,Object> map = new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime", DateUtil.now());
//將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchange
rabbitTemplate.convertAndSend(DirectRabbitConfig.TestDirectExchange, DirectRabbitConfig.TestDirectRouting, map);
return map;
}
}
項目rabbit-consumer(消費者)
DirectReceiver接收消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Component
public class DirectReceiver {
public static final String TestDirectQueue = "TestDirectQueue";
public static final String TestDirectExchange = "TestDirectExchange";
public static final String TestDirectRouting = "TestDirectRouting";
@RabbitListener(queues = DirectReceiver.TestDirectQueue)
@RabbitHandler
public void receiver(@Payload HashMap message) {
log.info("receiver 消費者1號 收到消息 --- message:{}" ,message);
}
@RabbitListener(queues = TestDirectQueue)
@RabbitHandler
public void receiver2(Map testMessage) {
log.info("receiver2 消費者2號 收到消息 getClass:{} --- {}" ,testMessage.getClass(), testMessage);
}
}
測試Direct Exchange (直連型交換機)
正常調用接口【http://127.0.0.1:8999/sendDirectMessage 】 ;並且rabbitMQ管理界面看到TestDirectQueue隊列有消息新增,代表消息發送成功;
看到控制台日志輸出如下,代表消費者 成功接收到推送的消息;多個消費者的情況下 默認是采用輪詢的方式進行消費。
Fanout Exchange(扇型交換機)
項目rabbit-provider(生產者)
FanoutRabbitConfig配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
public final static String fanoutExchange = "fanoutExchange";
public static final String fanout_A = "fanout.A";
public static final String fanout_B = "fanout.B";
public static final String fanout_C = "fanout.C";
/**
* 創建三個隊列 :fanout.A fanout.B fanout.C
* 將三個隊列都綁定在交換機 fanoutExchange 上
* 因為是扇型交換機, 路由鍵無需配置,配置也不起作用
*/
@Bean
public Queue queueA() {
return new Queue(fanout_A);
}
@Bean
public Queue queueB() {
return new Queue(fanout_B);
}
@Bean
public Queue queueC() {
return new Queue(fanout_C);
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(fanoutExchange);
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
SendFanoutMessageController業務發送消息
import cn.hutool.core.date.DateUtil;
import com.example.rabbitmqprovider.direct.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
* 扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。
* 這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列。
*/
@RestController
public class SendFanoutMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發送等等方法
@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
String messageData = "FanoutMessage routingKey is null";
Map<String, Object> map = new HashMap<>();
map.put("createTime", DateUtil.now());
map.put("messageData", messageData);
rabbitTemplate.convertAndSend(FanoutRabbitConfig.fanoutExchange,null, map);
return "ok";
}
@GetMapping("/sendFanoutMessage1")
public String sendFanoutMessage1() {
String messageData = "FanoutMessage routingKey is 'xxx'";
Map<String, Object> map = new HashMap<>();
map.put("createTime", DateUtil.now());
map.put("messageData", messageData);
//扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個接口綁定一下路由鍵
rabbitTemplate.convertAndSend(FanoutRabbitConfig.fanoutExchange,"xxx", map);
return "ok";
}
}
項目rabbit-consumer(消費者)
FanoutReceiver接收消息
import java.util.Map;
@Slf4j
@Component
public class FanoutReceiver {
public static final String fanout_A = "fanout.A";
public static final String fanout_B = "fanout.B";
public static final String fanout_C = "fanout.C";
@RabbitListener(queues = fanout_A)
@RabbitHandler
public void fanout_A(Map testMessage) {
log.info("fanout_A {}" , testMessage);
}
@RabbitListener(queues = fanout_B)
@RabbitHandler
public void fanout_B(Map testMessage) {
log.info("fanout_B {}" , testMessage);
}
@RabbitListener(queues = fanout_C)
@RabbitHandler
public void fanout_C(Map testMessage) {
log.info("fanout_C {}" , testMessage);
}
}
測試 Fanout Exchange(扇型交換機)
先調用http://127.0.0.1:8999/sendFanoutMessage ,在調用接口 http://127.0.0.1:8999/sendFanoutMessage1
日志輸出如下,可以看到 扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。
這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列。
Topic Exchange(主題交換機)
項目rabbit-provider(生產者)
TopicRabbitConfig配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
//綁定鍵
public final static String man = "topic.man";
public final static String woman = "topic.woman";
public final static String xxx = "xxx";
public final static String topicExchange = "topicExchange";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
public Queue thirdQueue() {
return new Queue(TopicRabbitConfig.xxx);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchange);
}
//將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man
//這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
//將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.#
// 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
@Bean
Binding bindingExchangeMessage3() {
return BindingBuilder.bind(thirdQueue()).to(exchange()).with("#");
}
}
SendTopicMessageController業務發送消息
import com.example.rabbitmqprovider.direct.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
public class SendTopicMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發送等等方法
@ResponseBody
@GetMapping("/sendTopicMessage1")
public Object sendTopicMessage1() {
String messageData = "message: M A N ";
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageData", messageData);
rabbitTemplate.convertAndSend(TopicRabbitConfig.topicExchange, "topic.man", manMap);
return "ok";
}
@ResponseBody
@GetMapping("/sendTopicMessage2")
public Object sendTopicMessage2() {
String messageData = "message: woman is all ";
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageData", messageData);
rabbitTemplate.convertAndSend(TopicRabbitConfig.topicExchange, "topic.woman", womanMap);
return "ok";
}
@ResponseBody
@GetMapping("/sendTopicMessage3")
public Object sendTopicMessage3() {
String messageData = "message: xxx ";
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageData", messageData);
//routingKey 設置'abc';xxx隊列 routingKey配置了# 看能否收到消息
rabbitTemplate.convertAndSend(TopicRabbitConfig.topicExchange, "abc", womanMap);
return "ok";
}
}
項目rabbit-consumer(消費者)
TopicReceiver接收消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Slf4j
@Component
public class TopicReceiver {
public static final String topic_man = "topic.man";
public static final String topic_woman = "topic.woman";
public final static String xxx = "xxx";
@RabbitListener(queues = topic_man)
@RabbitHandler
public void topic_man(Map testMessage) {
log.info("我是隊列{} 收到消息:{}" ,topic_man, testMessage);
}
@RabbitListener(queues = topic_woman)
@RabbitHandler
public void topic_woman(Map testMessage) {
log.info("我是隊列{} 收到消息:{}" ,topic_woman, testMessage);
}
@RabbitListener(queues = xxx)
@RabbitHandler
public void xxx(Map testMessage) {
log.info("我是隊列{} 收到消息:{}" ,xxx, testMessage);
}
}
測試Topic Exchange(主題交換機)
- 調用sendTopicMessage1接口
routingKey設置的是"topic.man",所以三個隊列都收到消息;
- 調用sendTopicMessage2接口
routingKey設置的是"topic.woman",“topic.man”隊列設置的routingKey是“topic.man”所以沒收到消息;
- 調用sendTopicMessage3接口
routingKey設置的是"abc",隊列“xxx”配置的routingKey是“#”,所以只有隊列“xxx”收到了消息
消息可靠性
rabbitmq 的消息確認分為兩部分:發送消息確認 和 消息接收確認。
項目rabbit-provider(生產者)消息發送確認
發送消息確認:用來確認生產者 producer 將消息發送到 broker ,broker 上的交換機 exchange 再投遞給隊列 queue的過程中,消息是否成功投遞。
消息從 producer 到 rabbitmq broker有一個 confirmCallback 確認模式。
消息從 exchange 到 queue 投遞失敗有一個 returnCallback 退回模式。
我們可以利用這兩個Callback來確保消的100%送達。
1、 ConfirmCallback確認模式
- ConfirmCallback機制只確認消息是否到達exchange(交換器),不保證消息可以路由到正確的queue;
- 配置參數需要設置:publisher-confirm-type: CORRELATED;springboot版本較低的話參數設置改成:publisher-confirms: true
2、 ReturnCallback 退回模式
-
ReturnsCallback 消息機制用於處理一個不可路由的消息。在某些情況下,如果我們在發送消息的時候,當前的 exchange 不存在或者指定路由 key 路由不到,這個時候我們需要監聽這種不可達的消息
-
配置參數需要設置:publisher-returns: true
-
配置參數在application.yml文件(publisher-confirm-type、publisher-returns)
消息發送確認配置如下:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// Mandatory為true時,消息通過交換器無法匹配到隊列會返回給生產者,為false時匹配不到會直接被丟棄
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* ConfirmCallback機制只確認消息是否到達exchange(交換器),不保證消息可以路由到正確的queue;
* 需要設置:publisher-confirm-type: CORRELATED;
* springboot版本較低 參數設置改成:publisher-confirms: true
*
* 以實現方法confirm中ack屬性為標准,true到達
* config : 需要開啟rabbitmq得ack publisher-confirm-type
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("ConfirmCallback 確認結果 (true代表發送成功) : {} 消息唯一標識 : {} 失敗原因 :{}",ack,correlationData,cause);
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
/**
* ReturnsCallback 消息機制用於處理一個不可路由的消息。在某些情況下,如果我們在發送消息的時候,當前的 exchange 不存在或者指定路由 key 路由不到,這個時候我們需要監聽這種不可達的消息
* 就需要這種return機制
*
* config : 需要開啟rabbitmq發送失敗回退; publisher-returns 或rabbitTemplate.setMandatory(true); 設置為true
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
// 實現接口ReturnCallback,重寫 returnedMessage() 方法,
// 方法有五個參數
// message(消息體)、
// replyCode(響應code)、
// replyText(響應內容)、
// exchange(交換機)、
// routingKey(隊列)。
log.info("ReturnsCallback returned : {}",returned);
}
});
return rabbitTemplate;
}
}
消息發送確認測試接口:
import cn.binarywang.tools.generator.ChineseAddressGenerator;
import cn.binarywang.tools.generator.ChineseNameGenerator;
import cn.hutool.core.util.StrUtil;
import com.example.rabbitmqprovider.direct.config.DirectRabbitConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
public class SendCallbackMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發送等等方法
@ResponseBody
@GetMapping("/sendMessageToExchangeFail")
public Object sendMessageToExchangeFail() {
String messageData = StrUtil.format("hello 我叫:{} 我住在:{}", ChineseNameGenerator.getInstance().generate(),
ChineseAddressGenerator.getInstance()
.generate());
Map<String,Object> map = new HashMap<>();
map.put("messageData",messageData);
//發送一個消息到 不存在到exchange
rabbitTemplate.convertAndSend(DirectRabbitConfig.TestDirectExchange.concat("test"), DirectRabbitConfig.TestDirectRouting, map,new CorrelationData(UUID.randomUUID().toString()));
return map;
}
@ResponseBody
@GetMapping("/sendMessageToQueueFail")
public Object sendMessageToQueueFail() {
String messageData = StrUtil.format("hello 我叫:{} 我住在:{}", ChineseNameGenerator.getInstance().generate(),
ChineseAddressGenerator.getInstance()
.generate());
Map<String,Object> map = new HashMap<>();
map.put("messageData",messageData);
//發送一個消息到 不存的隊列里;
rabbitTemplate.convertAndSend(DirectRabbitConfig.TestDirectExchange, "xxx", map,new CorrelationData(UUID.randomUUID().toString()));
return map;
}
}
消息發送確認測試
- 調用sendMessageToExchangeFail接口
2022-03-06 20:31:00.488 ERROR 32087 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'TestDirectExchangetest' in vhost '/', class-id=60, method-id=40)
2022-03-06 20:31:00.489 INFO 32087 --- [nectionFactory2] c.e.r.direct.config.RabbitConfig : ConfirmCallback 確認結果 (true代表發送成功) : false 消息唯一標識 : CorrelationData [id=5aaf1d44-85cc-42ae-8ae5-cbca5074cefd] 失敗原因 :channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'TestDirectExchangetest' in vhost '/', class-id=60, method-id=40)
- 調用sendMessageToQueueFail接口
2022-03-06 20:31:30.186 INFO 32087 --- [nectionFactory2] c.e.r.direct.config.RabbitConfig : ReturnsCallback returned : ReturnedMessage [message=(Body:'[serialized object]' MessageProperties [headers={spring_returned_message_correlation=8699b6b8-0b12-420f-b866-73f54ba0a002}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=TestDirectExchange, routingKey=xxx]
2022-03-06 20:31:30.186 INFO 32087 --- [nectionFactory1] c.e.r.direct.config.RabbitConfig : ConfirmCallback 確認結果 (true代表發送成功) : true 消息唯一標識 : CorrelationData [id=8699b6b8-0b12-420f-b866-73f54ba0a002] 失敗原因 :null
項目rabbit-consumer(消費者)消息接收確認
消息通過 ACK 確認是否被正確接收,每個 Message 都要被確認(acknowledged),可以手動去 ACK 或自動 ACK
消息確認模式有:
- AcknowledgeMode.NONE:自動確認
- AcknowledgeMode.AUTO:根據情況確認(默認值)
- AcknowledgeMode.MANUAL:手動確認
手動確認消息
1、basicAck
表示成功確認,使用此回執方法后,消息會被rabbitmq broker 刪除。
void basicAck(long deliveryTag, boolean multiple)
-
deliveryTag:表示消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag都會增加。deliveryTag(唯一標識 ID):當一個消費者向 RabbitMQ 注冊后,會建立起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標識 ID,是一個單調遞增的正整數,delivery tag 的范圍僅限於 Channel
-
multiple:是否批量確認,值為 true 則會一次性 ack所有小於當前消息 deliveryTag 的消息。
-
舉個栗子: 假設我先發送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認,當我發第四條消息此時deliveryTag為8,multiple設置為 true,會將5、6、7、8的消息全部進行確認。
2、basicNack
表示失敗確認,一般在消費消息業務異常時用到此方法,可以將消息重新投遞入隊列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
- deliveryTag:表示消息投遞序號。
- multiple:是否批量確認。
- requeue:值為 true 消息將重新入隊列。
3、basicReject
拒絕消息,與basicNack區別在於不能進行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue)
- deliveryTag:表示消息投遞序號。
- requeue:值為 true 消息將重新入隊列。
開啟手動確認消息
- application.yml配置文件開啟
acknowledge-mode設置manual
spring:
#項目名稱
application:
name: rabbitmq-custom
#配置rabbitMq 服務器
rabbitmq:
host: 127.0.0.1
port: 5672
username: rabbitMQ
password: rabbitMQ
listener:
simple:
acknowledge-mode: manual
- 注解開啟手動確認
@RabbitListener注解中設置參數ackMode= "MANUAL"開啟
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
/**
* 先注釋 DirectReceiver的@Component
* 用TestAckDirectReceiver方法來驗證 手動ack
*/
@Slf4j
@Component
public class TestAckDirectReceiver {
public static final String TestDirectQueue = "TestDirectQueue";
public static final String TestDirectExchange = "TestDirectExchange";
public static final String TestDirectRouting = "TestDirectRouting";
@RabbitListener(queues = TestAckDirectReceiver.TestDirectQueue,
ackMode= "MANUAL")
@RabbitHandler
public void receiver(@Payload HashMap dataMsg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("消費者1號 deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);
try {
int i = 1/0;
} catch (Exception e) {
log.error("MyAckReceiver error : {} deliveryTag:{}",e.getMessage(),deliveryTag);
channel.basicReject(deliveryTag, true);
}
}
@RabbitListener(queues = TestAckDirectReceiver.TestDirectQueue,
ackMode= "MANUAL")
@RabbitHandler
public void receiver1(@Payload HashMap dataMsg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("消費者2號 deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);
channel.basicAck(deliveryTag,true);
}
}
手動消息確認消費測試
可以看到消費者1號調用basicReject方法拒絕了消息;消費者2號調用channel.basicAck方法手動確認了消費
2022-03-06 21:23:16.816 INFO 51530 --- [ntContainer#4-1] c.e.c.direct.TestAckDirectReceiver : 消費者1號 deliveryTag:5 dataMsg:{createTime=2022-03-06 21:23:16, messageId=33ae8439-4635-4499-ae7d-c167f100c26a, messageData=hello 我叫:毛兒 我住在:湖南省湘西土家族苗族自治州露勁路7725號純辟頑小區8單元1122室}
2022-03-06 21:23:16.816 ERROR 51530 --- [ntContainer#4-1] c.e.c.direct.TestAckDirectReceiver : MyAckReceiver error : / by zero deliveryTag:5
2022-03-06 21:23:16.818 INFO 51530 --- [ntContainer#3-1] c.e.c.direct.TestAckDirectReceiver : 消費者2號 deliveryTag:6 dataMsg:{createTime=2022-03-06 21:23:16, messageId=33ae8439-4635-4499-ae7d-c167f100c26a, messageData=hello 我叫:毛兒 我住在:湖南省湘西土家族苗族自治州露勁路7725號純辟頑小區8單元1122室}
代碼我已經上傳到gitee 代碼傳送門
參考資料:
https://www.jianshu.com/p/79ca08116d57
https://blog.csdn.net/qq_35387940/article/details/100514134
https://blog.csdn.net/weixin_32820639/article/details/111240447
https://www.cnblogs.com/gyjx2016/p/13705307.html
https://zhuanlan.zhihu.com/p/152325703