spring boot集成RabbitMQ實現訂單超時取消完整版


    最近公司做項目,涉及到下訂單的功能,項目不大,用的人也不多,其實可以不用引入rabbit mq的,但本着閑着也是閑着的態度,即使項目規模不大咱也專業點。其實之前做過類似需求的功能,當時的實現方式是每個一分鍾查詢一次數據庫,判斷當前記錄的下單時間是否超時了,然后更改訂單狀態,是不是不太professional😄。閑扯就到這里了,下面開始正文。

關於rabbit mq的安裝在這里就不說了,如果有需要請給我留言,可以單獨輔導😄。

1、引入rabbit mq依賴:

1         <dependency>
2             <groupId>org.springframework.boot</groupId>
3             <artifactId>spring-boot-starter-amqp</artifactId>
4         </dependency>
pom引入依賴

 

1 spring:
2   #整合rabbitmq
3   rabbitmq:
4     host: 127.0.0.1
5     port: 5672
6     username: guest
7     password: guest
yml配置文件

 

 2、rabbit mq配置類編寫

 1 @Configuration
 2 public class RabbitMqConfig {
 3 
 4     // 支付超時延時交換機
 5     public static final String Delay_Exchange_Name = "delay.exchange";
 6 
 7     // 超時訂單關閉隊列
 8     public static final String Timeout_Trade_Queue_Name = "close_trade";
 9 
10 
11     @Bean
12     public Queue delayPayQueue() {
13         return new Queue(RabbitMqConfig.Timeout_Trade_Queue_Name, true);
14     }
15 
16 
17     // 定義廣播模式的延時交換機 無需綁定路由
18     @Bean
19     FanoutExchange delayExchange(){
20         Map<String, Object> args = new HashMap<String, Object>();
21         args.put("x-delayed-type", "direct");
22         FanoutExchange topicExchange = new FanoutExchange(RabbitMqConfig.Delay_Exchange_Name, true, false, args);
23         topicExchange.setDelayed(true);
24         return topicExchange;
25     }
26 
27     // 綁定延時隊列與交換機
28     @Bean
29     public Binding delayPayBind() {
30         return BindingBuilder.bind(delayPayQueue()).to(delayExchange());
31     }
32 
33     // 定義消息轉換器
34     @Bean
35     Jackson2JsonMessageConverter jsonMessageConverter() {
36         return new Jackson2JsonMessageConverter();
37     }
38 
39     // 定義消息模板用於發布消息,並且設置其消息轉換器
40     @Bean
41     RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
42         final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
43         rabbitTemplate.setMessageConverter(jsonMessageConverter());
44         return rabbitTemplate;
45     }
46     @Bean
47     RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) {
48         return new RabbitAdmin(connectionFactory);
49     }
50 
51 }
配置類代碼

這塊注意一下,必須要安裝rabbitmq_delayed_message_exchange插件,下載地址:點我。不然啟動的時候會報錯。

 

3、實體類代碼:

 1 import java.io.Serializable;
 2 import java.math.BigDecimal;
 3 
 4 public class OrderMasterDo implements Serializable {
 5 
 6     private String orderId;
 7     private BigDecimal amt;
 8     private Integer payStatus;
 9 
10     public String getOrderId() {
11         return orderId;
12     }
13 
14     public void setOrderId(String orderId) {
15         this.orderId = orderId;
16     }
17 
18     public BigDecimal getAmt() {
19         return amt;
20     }
21 
22     public void setAmt(BigDecimal amt) {
23         this.amt = amt;
24     }
25 
26     public Integer getPayStatus() {
27         return payStatus;
28     }
29 
30     public void setPayStatus(Integer payStatus) {
31         this.payStatus = payStatus;
32     }
33 }
實體類代碼

 

4、消息生產者代碼:

 1 @RestController
 2 @RequestMapping("/basic")
 3 public class BasicInfoController {
 4 
 5     private final Logger logger = LoggerFactory.getLogger(BasicInfoController.class);
 6 
 7     @Autowired
 8     private RabbitTemplate rabbitTemplate;
 9 
10     @GetMapping("/test")
11     public String createOrderTest() {
12 
13         // 創建訂單
14         OrderMasterDo orderMaster = new OrderMasterDo();
15         //未支付
16         orderMaster.setPayStatus(0);
17         orderMaster.setOrderId("001");
18         orderMaster.setAmt(new BigDecimal(1000));
19 
20         // 發送訂單到消息隊列
21         rabbitTemplate.convertAndSend(RabbitMqConfig.Delay_Exchange_Name, "", orderMaster, message ->{
22             message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
23             message.getMessageProperties().setDelay(10*1000);   // 毫秒為單位,指定此消息的延時時長
24             return message;
25         });
26 
27         System.out.println("創建訂單成功");
28 
29         return "創建訂單成功";
30     }
31 }
生產者代碼

 

5、消費者代碼:

 1 @Component
 2 public class OrderReceiver {
 3 
 4     @RabbitListener(queues = RabbitMqConfig.Timeout_Trade_Queue_Name)
 5     public void process(OrderMasterDo orderMasterDo, Message message, Channel channel) throws IOException{
 6         try {
 7             System.out.println("開始執行訂單[{}]的支付超時訂單關閉......");
 8             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 9             System.out.println("超時訂單處理完畢");
10         } catch (Exception e) {
11             channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
12         }
13     }
14 }
消費者代碼

 

6、測試效果:

 可以看到,生產者生產的消息在10s之后被消費。

 

總結:

以上就實現了基本的功能需求,但還不完美,因為沒有考慮到消息丟失、重復等問題,關於這塊下一篇會說到。

 

參考資料:

https://segmentfault.com/a/1190000016072908


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM