一、前言
RabbitMQ是一個開源的消息隊列,輕量級且易於部署,並支持多種消息協議。RabbitMQ可以部署在分布式和聯合配置中,以滿足高規模、高可用性的需求。本文整合RabbitMQ實現延遲消息的過程,以發送延遲消息取消超時訂單為例.
二、RabbitMQ的安裝和使用
1、安裝Erlang,下載地址:http://erlang.org/download/otp_win_64_21.3.exe
2、安裝RabbitMQ,下載地址:https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.14/rabbitmq-server-3.7.14.exe
3、啟動RabbitMQ,在cmd命令提示框中進入RabbitMQ安裝目錄下的sbin目錄,執行命令:rabbitmq-plugins enable rabbitmq_management
4、登錄rabbitmq管理界面,訪問地址:http://localhost:15672
5、輸入賬號密碼:guest guest,登錄成功后創建賬號admin
點擊Admin
6、創建一個新的虛擬host:/shop
點擊右邊的Virtual Hosts
7、給新創建的admin賬號設置host
點擊新創建的賬號admin,進入配置頁面
至此,RabbitMQ的安裝和配置完成。
三、RabbitMQ的消息模型
標志 |
中文名 | 英文名 | 描述 |
p | 生產者 | Producer |
消息的發送者,可以將消息發送到交換機 |
C | 消費者 | Consumer |
消息的接收者,從隊列中獲取消息進行消費 |
X | 交換機 | Exchange | 接收生產者發送到的消息,並根據路由鍵 發送給指定隊列 |
Q | 隊列 |
Queue | 存儲從交換機發來的消息 |
type | 交換機類型 | type |
direct表示直接根據路由鍵(orange/black) 發送消息 |
四、項目整合RabbitMQ
1、業務場景說明,RabbitMQ本次主要用於解決用戶下單后,訂單超時如何取消訂單的問題
· 用戶進行下單操作(鎖定商品庫存,使用優惠券、積分等)
· 生成訂單,獲取訂單的id
· 獲取到設置的訂單超時時間(假設設置60分鍾不支付即取消訂單)
· 按訂單超時時間發送一個延遲消息給RabbitMQ,讓它在訂單超時后觸發取消訂單
· 如果用戶沒有支付,進行取消訂單操作(釋放鎖定商品庫存、返優惠券、返回積分一系列操作)
2、在pom.xml中添加依賴
<!--rabbitmq相關依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--lombok相關依賴-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
3、修改application.yml文件,在spring節點下添加RabbitMQ相關配置
rabbitmq: host: localhost # rabbitmq的連接地址 port: 5672 # rabbitmq的連接端口號 virtual-host: /shop # rabbitmq的虛擬host username: admin # rabbitmq的用戶名 password: 123456 # rabbitmq的密碼 publisher-confirms: true # 如果對異步消息需要回調設置為true
4、在com.zzb.test.admin.dto下添加消息隊列的枚舉配置類QueueEnum
package com.zzb.test.admin.dto; import lombok.Getter; /** * 消息隊列枚舉配置 * 用於延遲消息隊列及處理取消訂單消息隊列的常量定義,包括交換機名稱、隊列名稱、路由鍵名稱等 * Created by zzb on 2019/12/17 11:26 */ @Getter public enum QueueEnum { /** * 消息通知隊列 */ QUEUE_ORDER_CANCEL("shop.order.direct", "shop.order.cancel", "shop.order.cancel"), /** * 消息通知ttl隊列 */ QUEUE_TTL_ORDER_CANCEL("shop.order.direct.ttl", "shop.order.cancel.ttl", "shop.order.cancel.ttl"); /** * 交換名稱 */ private String exchange; /** * 隊列名稱 */ private String name; /** * 路由鍵 */ private String routeKey; QueueEnum(String exchange, String name, String routeKey){ this.exchange = exchange; this.name = name; this.routeKey = routeKey; } }
5、在com.zzb.test.admin.config包下添加rabbitmq的配置類RabbitMqConfig
package com.zzb.test.admin.config; import com.zzb.test.admin.dto.QueueEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 消息隊列配置 * 用戶配置交換機、隊列及隊列與交換機的綁定關系 * Created by zzb on 2019/12/17 11:40 */ @Configuration public class RabbitMqConfig { /** * 訂單消息實際消費隊列所綁定的交換機 * @return */ @Bean DirectExchange orderDirect(){ return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 訂單延遲隊列所綁定的交換機 * @return */ @Bean DirectExchange orderTtlDirect(){ return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 訂單實際消費隊列 * @return */ @Bean public Queue orderQueue(){ return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName()); } /** * 訂單延遲隊列(死信隊列) * @return */ @Bean public Queue orderTtlQueue(){ return QueueBuilder .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()) .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()) .build(); } /** * 將訂單隊列綁定到交換機 * @return */ @Bean Binding orderBinding(DirectExchange orderDirect, Queue orderQueue){ return BindingBuilder .bind(orderQueue) .to(orderDirect) .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()); } /** * 將訂單延遲隊列綁定到交換機 * @param orderTtlDirect * @param orderTtlQueue * @return */ @Bean Binding orderTtlBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue){ return BindingBuilder .bind(orderTtlQueue) .to(orderTtlDirect) .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey()); } }
6、在com.zzb.test.admin.common包下添加延遲消息的發送類CancelOrderSender
package com.zzb.test.admin.common; import com.zzb.test.admin.dto.QueueEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 取消訂單消息的發出者 * 用於向訂單延遲消息隊列(shop.order.cancel.ttl)里發送消息 * Created by zzb on 2019/12/17 14:30 */ @Component public class CancelOrderSender { private static Logger logger = LoggerFactory.getLogger(CancelOrderSender.class); @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(Long orderId, long dalayTimes){ //給延遲隊列發送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //給消息設置延遲毫秒值 message.getMessageProperties().setExpiration(String.valueOf(dalayTimes)); return message; } }); logger.info((dalayTimes/1000)+"秒后發送取消訂單的消息給訂單:{}",orderId); } }
7、在com.zzb.test.admin.common包下添加取消訂單消息的接受類CancelOrderReceiver
package com.zzb.test.admin.common; import com.zzb.test.admin.service.OmsOrderService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 取消訂單消息的接受者 * 用於從取消訂單的隊列里(shop.order.cancel)接收消息 * Created by zzb on 2019/12/17 14:40 */ @Component @RabbitListener(queues = "shop.order.cancel") public class CancelOrderReceiver { private static Logger logger = LoggerFactory.getLogger(CancelOrderReceiver.class); @Autowired private OmsOrderService omsOrderService; @RabbitHandler public void handle(Long orderId){ logger.info("接收隊列取消訂單的消息:{}", orderId); omsOrderService.cancelOrder(orderId); } }
8、在service包下添加訂單管理接口OmsOrderService
package com.zzb.test.admin.service; import com.zzb.test.admin.common.CommonResult; import com.zzb.test.admin.dto.OrderParam; import org.springframework.transaction.annotation.Transactional; /** * 訂單管理接口 * Created by zzb on 2019/12/17 14:47 */ public interface OmsOrderService { /** * 下單生成訂單 * @param orderParam * @return */ @Transactional CommonResult generateOrder(OrderParam orderParam); /** * 取消單個超時訂單 * @param orderId */ @Transactional void cancelOrder(Long orderId); }
9、在impl包下添加其實現類OmsOrderServiceImpl
package com.zzb.test.admin.service.impl; import com.zzb.test.admin.common.CancelOrderSender; import com.zzb.test.admin.common.CommonResult; import com.zzb.test.admin.dto.OrderParam; import com.zzb.test.admin.service.OmsOrderService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * 訂單管理接口實現類 * Created by zzb on 2019/12/17 14:49 */ @Service public class OmsOrderServiceImpl implements OmsOrderService { private static Logger logger = LoggerFactory.getLogger(OmsOrderServiceImpl.class); @Autowired private CancelOrderSender cancelOrderSender; @Override public CommonResult generateOrder(OrderParam orderParam) { // TODO: 2019/12/17 下單生成訂單 logger.info("下單成功,獲取到訂單id:{}", 1L); //設置延遲發送時間,測試設置為30秒 long delayTimes = 30*1000; //發送延遲消息 cancelOrderSender.sendMessage(1L, delayTimes); return CommonResult.success("下單成功"); } @Override public void cancelOrder(Long orderId) { // TODO: 2019/12/17 取消單個超時訂單 logger.info("根據orderId取消超時訂單:{}", orderId); } }
10、在dto包下添加訂單傳入參數類OrderParam
package com.zzb.test.admin.dto; import lombok.Getter; import lombok.Setter; /** * 生成訂單時傳入的參數 * Created by zzb on 2019/12/17 14:57 */ @Getter @Setter public class OrderParam { //收貨地址id private Long memeberAddressId; //優惠券id private Long couponId; //使用的積分 private Integer useIntegration; //支付的方式 private Integer payType; }
11、在controller包下添加訂單管理控制器OmsOrderController
package com.zzb.test.admin.controller; import com.zzb.test.admin.common.CommonResult; import com.zzb.test.admin.dto.OrderParam; import com.zzb.test.admin.service.OmsOrderService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; /** * 訂單管理Controller * Created by zzb on 2019/12/17 15:10 */ @Controller @Api(tags = "OmsOrderController", description = "訂單管理") public class OmsOrderController { @Autowired private OmsOrderService omsOrderService; @ApiOperation("下單生成訂單") @RequestMapping(value = "/admin/oms/generateOrder", method = RequestMethod.POST) @ResponseBody public CommonResult generateOrder(@RequestBody OrderParam orderParam){ return omsOrderService.generateOrder(orderParam); } }
五、測試
1、啟動項目,在RabbitMQ管理界面查看自動生產的交換機及隊列
交換機及隊列說明
-
mall.order.direct(取消訂單消息隊列所綁定的交換機):綁定的隊列為mall.order.cancel,一旦有消息以mall.order.cancel為路由鍵發過來,會發送到此隊列。
-
mall.order.direct.ttl(訂單延遲消息隊列所綁定的交換機):綁定的隊列為mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl為路由鍵發送過來,會轉發到此隊列,並在此隊列保存一定時間,等到超時后會自動將消息發送到mall.order.cancel(取消訂單消息消費隊列)。
2、訪問swagger-ui.html測試頁,登錄,登錄方法詳見:https://www.cnblogs.com/zzb-yp/p/11899880.html
登錄成功后訪問下單接口
下單成功后發送消息
30秒后自動觸發取消訂單方法
項目github地址:https://github.com/18372561381/shoptest