04消息隊列系列-RabbitMQ 利用MQ實現事物補償
一、介紹
本篇使用SpringBoot整合RabbitMQ,為后續業務處理開發做鋪墊。
二、整合實戰
2.1 創建一個gradle項目,引入amqp依賴
implementation 'org.springframework.boot:spring-boot-starter-amqp'
2.2 在application.properties文件里添加RabbitMQ的配置信息
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
2.3 編寫RabbitUtil工具類
package com.lucky.spring.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Map;
/**
* Created by zhangdd on 2020/10/7
*/
public class RabbitUtil {
private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class);
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 創建 Exchange
*
* @param exchangeType
* @param exchangeName
*/
public void addExchange(String exchangeType, String exchangeName) {
Exchange exchange = createExchange(exchangeType, exchangeName);
rabbitAdmin.declareExchange(exchange);
}
/**
* 刪除一個Exchange
*
* @param exchangeName
* @return
*/
public boolean deleteExchange(String exchangeName) {
return rabbitAdmin.deleteExchange(exchangeName);
}
/**
* 創建一個指定的Queue
*
* @param queueName
*/
public void addQueue(String queueName) {
Queue queue = createQueue(queueName);
rabbitAdmin.declareQueue(queue);
}
/**
* 刪除一個 queue
*
* @param queueName
* @return
*/
public boolean deleteQueue(String queueName) {
return rabbitAdmin.deleteQueue(queueName);
}
/**
* 按照篩選條件,刪除隊列
*
* @param queueName
* @param unused
* @param empty
*/
public void deleteQueue(String queueName, boolean unused, boolean empty) {
rabbitAdmin.deleteQueue(queueName, unused, empty);
}
/**
* 清空某個隊列中的消息,注意,清空的消息並沒有被消費
*
* @param queueName
*/
public void purgeQueue(String queueName) {
rabbitAdmin.purgeQueue(queueName, false);
}
/**
* 判斷指定的隊列是否存在
*
* @param queueName
* @return
*/
public boolean existQueue(String queueName) {
return rabbitAdmin.getQueueProperties(queueName) == null ? false : true;
}
/**
* 綁定一個隊列到一個匹配型交換器使用一個routingKey
*
* @param exchangeType
* @param exchangeName
* @param queueName
* @param routingKey
* @param isWhereAll
* @param headers EADERS模式類型設置,其他模式類型傳空
*/
public void addBinding(String exchangeType, String exchangeName, String queueName,
String routingKey, boolean isWhereAll, Map<String, Object> headers) {
Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
rabbitAdmin.declareBinding(binding);
}
/**
* 聲明綁定
*
* @param binding
*/
public void addBinding(Binding binding) {
rabbitAdmin.declareBinding(binding);
}
/**
* 解除交換器和隊列的綁定
*
* @param exchangeType
* @param exchangeName
* @param queueName
* @param routingKey
* @param isWhereAll
* @param headers
*/
public void removeBinding(String exchangeType, String exchangeName, String queueName,
String routingKey, boolean isWhereAll, Map<String, Object> headers) {
Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
removeBinding(binding);
}
/**
* 解除交換器與隊列的綁定
*
* @param binding
*/
public void removeBinding(Binding binding) {
rabbitAdmin.removeBinding(binding);
}
/**
* create a exchange,queue and bind queue at the same time
*
* @param exchangeType
* @param exchangeName
* @param queueName
* @param routingKey
* @param isWhereAll
* @param headers
*/
public void addExchangeBindingQueue(String exchangeType, String exchangeName, String queueName,
String routingKey, boolean isWhereAll, Map<String, Object> headers) {
//聲明交換器
addExchange(exchangeType, exchangeName);
//declare queue
addQueue(queueName);
//declare relationship of binding
addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
}
/**
* send message
*
* @param exchange
* @param routingKey
* @param object
*/
public void convertAndSend(String exchange, String routingKey, final Object object) {
rabbitTemplate.convertAndSend(exchange, routingKey, object);
}
/**
* switch message object
*
* @param messageType
* @param msg
* @return
*/
public Message getMessage(String messageType, Object msg) {
MessageProperties properties = new MessageProperties();
properties.setContentType(messageType);
Message message = new Message(msg.toString().getBytes(), properties);
return message;
}
/**
* declare exchange
*
* @param exchangeType
* @param exchangeName
* @return
*/
private Exchange createExchange(String exchangeType, String exchangeName) {
if (ExchangeTypes.DIRECT.equals(exchangeType)) {
return new DirectExchange(exchangeName);
}
if (ExchangeTypes.TOPIC.equals(exchangeType)) {
return new TopicExchange(exchangeName);
}
if (ExchangeTypes.HEADERS.equals(exchangeType)) {
return new HeadersExchange(exchangeName);
}
if (ExchangeTypes.FANOUT.equals(exchangeType)) {
return new FanoutExchange(exchangeName);
}
return null;
}
/**
* declare relation of binding
*
* @param exchangeType
* @param exchangeName
* @param queueName
* @param routingKey
* @param isWhereAll
* @param headers
* @return
*/
private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName,
String routingKey, boolean isWhereAll, Map<String, Object> headers) {
if (ExchangeTypes.DIRECT.equals(exchangeType)) {
return BindingBuilder.bind(new Queue(queueName))
.to(new DirectExchange(exchangeName))
.with(routingKey);
}
if (ExchangeTypes.TOPIC.equals(exchangeType)) {
return BindingBuilder.bind(new Queue(queueName))
.to(new TopicExchange(exchangeName))
.with(routingKey);
}
if (ExchangeTypes.HEADERS.equals(exchangeType)) {
if (isWhereAll) {
return BindingBuilder.bind(new Queue(queueName))
.to(new HeadersExchange(exchangeName))
.whereAll(headers)
.match();
} else {
return BindingBuilder.bind(new Queue(queueName))
.to(new HeadersExchange(exchangeName))
.whereAny(headers)
.match();
}
}
if (ExchangeTypes.FANOUT.equals(exchangeType)) {
return BindingBuilder.bind(new Queue(queueName))
.to(new FanoutExchange(exchangeName));
}
return null;
}
/**
* declare queue
*
* @param queueName
* @return
*/
private Queue createQueue(String queueName) {
return new Queue(queueName);
}
}
2.4 編寫RabbitMQ配置類
package com.lucky.spring.config;
import com.lucky.spring.util.RabbitUtil;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by zhangdd on 2020/10/7
*/
@Configuration
public class RabbitConfig {
/**
* init factory of connection
*
* @param host
* @param port
* @param username
* @param password
* @return
*/
@Bean
public ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.host}") String host,
@Value("${spring.rabbitmq.port}") int port,
@Value("${spring.rabbitmq.username}") String username,
@Value("${spring.rabbitmq.password}") String password) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
/**
* 重新實例化 RabbitAdmin操作類
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
/**
* 重新實例化 RabbitTemplate操作類
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//數據轉換為json存入消息隊列
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public RabbitUtil rabbitUtil() {
return new RabbitUtil();
}
}
2.5 編寫隊列監聽類(靜態)
我們已經知道這個概念隊列監聽這個概念,只需要在方法上加上@RabbitListener(queues = "")
即可收到對應隊列的消息。但此時的隊列是已經知道了的所以稱之為監聽靜態隊列。
package com.lucky.spring.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;
/**
* Created by zhangdd on 2020/10/7
*/
@Configuration
public class DirectConsumeListener {
private Logger log = LoggerFactory.getLogger(DirectConsumeListener.class);
/**
* 監聽指定隊列 mq.direct.1
*
* @param message
*/
@RabbitListener(queues = "mq.direct.1")
public void consume(Message message) {
log.info("DirectConsumeListener,收到消息: {}", message.toString());
}
}
2.6 編寫隊列監聽類(動態)
在服務運行過程中,動態的新增隊列,然后在監聽新增的隊列的行為,就是監聽動態隊列。
package com.lucky.spring.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by zhangdd on 2020/10/7
*/
@Configuration
public class DynamicConsumeListener {
private Logger log = LoggerFactory.getLogger(DynamicConsumeListener.class);
/**
* 使用 SimpleMessageListenerContainer 實現動態監聽
*
* @param connectionFactory
* @return
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setMessageListener(message -> {
log.info("ConsumerMessageListen,收到消息: {}", message.toString());
});
return container;
}
}
如果想向SimpleMessageListenerContainer
添加監聽隊列或者移除隊列,只需要通過如下方式即可操作。
package com.lucky.spring.controller;
import com.lucky.spring.entity.ConsumerInfo;
import com.lucky.spring.util.RabbitUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* Created by zhangdd on 2020/10/7
*/
@RestController
public class ConsumerController {
@Autowired
private SimpleMessageListenerContainer container;
@Autowired
private RabbitUtil rabbitUtil;
private Logger log = LoggerFactory.getLogger(ConsumerController.class);
/**
* 添加隊列到監聽器
*
* @param consumerInfo
*/
@PostMapping("/consume/addQueue")
public void addQueue(@RequestBody ConsumerInfo consumerInfo) {
boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName());
if (!existQueue) {
throw new RuntimeException("當前隊列不存在");
}
//添加mq監聽的隊列
container.addQueueNames(consumerInfo.getQueueName());
//打印監聽容器中正在監聽到隊列
log.info("container-queue:{}", container.getQueueNames());
}
/**
* 移除正在監聽的隊列
*
* @param consumerInfo
*/
@PostMapping("/consume/removeQueue")
public void removeQueue(@RequestBody ConsumerInfo consumerInfo) {
//移除mq監聽的隊列
container.removeQueueNames(consumerInfo.getQueueName());
//打印監聽容器中正在監聽到隊列
log.info("container-queue:{}", container.getQueueNames());
}
/**
* 查詢監聽容器中正在監聽到的隊列
*/
@PostMapping("/consume/queryListenerQueue")
public void queryListenerQueue() {
log.info("container-queue:{}", container.getQueueNames());
}
}
2.7 發送消息到交換器
通過如下方式發送消息到交換器。
- 先編寫一個請求參數實體類
package com.lucky.spring.entity;
import java.io.Serializable;
/**
* Created by zhangdd on 2020/10/7
*/
public class ProduceInfo implements Serializable {
private static final long serialVersionUID = -5816966739399349770L;
/**
* 交換器名稱
*/
private String exchangeName;
/**
* 路由鍵key
*/
private String routingKey;
/**
* 消息內容
*/
public String msg;
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
- 編寫API接口
package com.lucky.spring.controller;
import com.lucky.spring.entity.ConsumerInfo;
import com.lucky.spring.util.RabbitUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* Created by zhangdd on 2020/10/7
*/
@RestController
public class ConsumerController {
@Autowired
private SimpleMessageListenerContainer container;
@Autowired
private RabbitUtil rabbitUtil;
private Logger log = LoggerFactory.getLogger(ConsumerController.class);
/**
* 添加隊列到監聽器
*
* @param consumerInfo
*/
@PostMapping("/consume/addQueue")
public void addQueue(@RequestBody ConsumerInfo consumerInfo) {
boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName());
if (!existQueue) {
throw new RuntimeException("當前隊列不存在");
}
//添加mq監聽的隊列
container.addQueueNames(consumerInfo.getQueueName());
//打印監聽容器中正在監聽到隊列
log.info("container-queue:{}", container.getQueueNames());
}
/**
* 移除正在監聽的隊列
*
* @param consumerInfo
*/
@PostMapping("/consume/removeQueue")
public void removeQueue(@RequestBody ConsumerInfo consumerInfo) {
//移除mq監聽的隊列
container.removeQueueNames(consumerInfo.getQueueName());
//打印監聽容器中正在監聽到隊列
log.info("container-queue:{}", container.getQueueNames());
}
/**
* 查詢監聽容器中正在監聽到的隊列
*/
@PostMapping("/consume/queryListenerQueue")
public void queryListenerQueue() {
log.info("container-queue:{}", container.getQueueNames());
}
}
2.8 交換器、隊列維護操作
如果想通過接口對RabbitMQ中的交換器、隊列以及綁定關系進行維護,通過如下方式接口操作,即可實現。
- 先編寫一個請求參數實體類
package com.lucky.spring.entity;
import java.io.Serializable;
/**
* Created by zhangdd on 2020/10/7
*/
public class QueueConfig implements Serializable {
private static final long serialVersionUID = -6576396650731444495L;
/**
* 交換器類型
*/
private String exchangeType;
/**
* 交換器名稱
*/
private String exchangeName;
/**
* 隊列名稱
*/
private String queueName;
/**
* 路由鍵key
*/
private String routingKey;
public String getExchangeType() {
return exchangeType;
}
public void setExchangeType(String exchangeType) {
this.exchangeType = exchangeType;
}
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
}
- 編寫接口API
package com.lucky.spring.controller;
import com.lucky.spring.entity.QueueConfig;
import com.lucky.spring.util.RabbitUtil;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
/**
* Created by zhangdd on 2020/10/7
*/
@RestController("/config")
public class RabbitController {
@Autowired
private RabbitUtil rabbitUtil;
/**
* 創建交換器
*
* @param config
*/
@PostMapping("addExchange")
public void addExchange(@RequestBody QueueConfig config) {
rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName());
}
/**
* 刪除交換器
*
* @param config
*/
@PostMapping("deleteExchange")
public void deleteExchange(@RequestBody QueueConfig config) {
rabbitUtil.deleteExchange(config.getExchangeName());
}
/**
* 添加隊列
*
* @param config
*/
@PostMapping("addQueue")
public void addQueue(@RequestBody QueueConfig config) {
rabbitUtil.addQueue(config.getQueueName());
}
/**
* 刪除隊列
*
* @param config
*/
@PostMapping("deleteQueue")
public void deleteQueue(@RequestBody QueueConfig config) {
rabbitUtil.deleteQueue(config.getQueueName());
}
/**
* 清空隊列數據
*
* @param config
*/
@PostMapping("purgeQueue")
public void purgeQueue(@RequestBody QueueConfig config) {
rabbitUtil.purgeQueue(config.getQueueName());
}
/**
* 添加綁定
*
* @param config
*/
@PostMapping("addBinding")
public void addBinding(@RequestBody QueueConfig config) {
rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);
}
/**
* 解除綁定
*
* @param config
*/
@PostMapping("removeBinding")
public void removeBinding(@RequestBody QueueConfig config) {
rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);
}
/**
* 創建頭部類型的交換器
* 判斷條件是所有的鍵值對都匹配成功才發送到隊列
*
* @param config
*/
@PostMapping("andExchangeBindingQueueOfHeaderAll")
public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) {
HashMap<String, Object> header = new HashMap<>();
header.put("queue", "queue");
header.put("bindType", "whereAll");
rabbitUtil.addExchangeBindingQueue(ExchangeTypes.HEADERS, config.getExchangeName(), config.getQueueName(), null, true, header);
}
/**
* 創建頭部類型的交換器
* 判斷條件是只要有一個鍵值對匹配成功就發送到隊列
*
* @param config
*/
@PostMapping("andExchangeBindingQueueOfHeaderAny")
public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) {
HashMap<String, Object> header = new HashMap<>();
header.put("queue", "queue");
header.put("bindType", "whereAny");
rabbitUtil.addExchangeBindingQueue(ExchangeTypes.HEADERS, config.getExchangeName(), config.getQueueName(), null, false, header);
}
}
三、利用MQ實現事物補償
上面的操作只是告訴我們怎么使用 rabbitMQ!
當你仔細回想整個過程的時候,其實還是回到最初那個問題,什么時候使用 MQ ?
以常見的訂單系統為例,用戶點擊【下單】按鈕之后的業務邏輯可能包括:支付訂單、扣減庫存、生成相應單據、發紅包、發短信通知等等。
在業務發展初期這些邏輯可能放在一起同步執行,隨着業務的發展訂單量增長,需要提升系統服務的性能,這時可以將一些不需要立即生效的操作拆分出來異步執行,比如發放紅包、發短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之后發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取 MQ 的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。
這種是利用 MQ 實現業務解耦,其它的場景包括最終一致性、廣播、錯峰流控等等。
利用 MQ 實現業務解耦的過程其實也很簡單。
- 當主流程結束之后,將消息推送到發紅包、發短信交換器中即可
package com.lucky.spring.service;
import com.lucky.spring.entity.Order;
import com.lucky.spring.util.RabbitUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* Created by zhangdd on 2020/10/7
*/
@Service
public class OrderService {
@Autowired
private RabbitUtil rabbitUtil;
@Transactional
public void createOrder(Order order) {
//1、創建訂單
//2、調用庫存接口,減庫存
//3、向客戶發放紅包
rabbitUtil.convertAndSend("exchange.send.bonus", null, order);
//4、發短信通知
rabbitUtil.convertAndSend("exchange.sms.message", null, order);
}
}
- 監聽發紅包操作
/**
* 監聽發紅包
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "exchange.send.bonus")
public void consume(Message message, RabbitProperties.Cache.Channel channel) throws IOException {
String msgJson = new String(message.getBody(),"UTF-8");
log.info("收到消息: {}", message.toString());
//調用發紅包接口
}
- 監聽發短信操作
/**
* 監聽發短信
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "exchange.sms.message")
public void consume(Message message, AMQImpl.Channel channel) throws IOException {
String msgJson = new String(message.getBody(),"UTF-8");
log.info("收到消息: {}", message.toString());
//調用發短信接口
}
既然 MQ 這么好用,那是不是完全可以將以前的業務也按照整個模型進行拆分呢?
答案顯然不是!
當引入 MQ 之后業務的確是解耦了,但是當 MQ 一旦掛了,所有的服務基本都掛了,是不是很可怕!所以這時候就需要RabbitMQ的集群搭建和部署,保證消息幾乎100%的投遞和消費。
到此結束