最近公司做項目,涉及到下訂單的功能,項目不大,用的人也不多,其實可以不用引入rabbit mq的,但本着閑着也是閑着的態度,即使項目規模不大咱也專業點。其實之前做過類似需求的功能,當時的實現方式是每個一分鍾查詢一次數據庫,判斷當前記錄的下單時間是否超時了,然后更改訂單狀態,是不是不太professional😄。閑扯就到這里了,下面開始正文。
關於rabbit mq的安裝在這里就不說了,如果有需要請給我留言,可以單獨輔導😄。
1、引入rabbit mq依賴:

1 <dependency> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-amqp</artifactId> 4 </dependency>

1 spring: 2 #整合rabbitmq 3 rabbitmq: 4 host: 127.0.0.1 5 port: 5672 6 username: guest 7 password: guest
2、rabbit mq配置類編寫

1 @Configuration 2 public class RabbitMqConfig { 3 4 // 支付超時延時交換機 5 public static final String Delay_Exchange_Name = "delay.exchange"; 6 7 // 超時訂單關閉隊列 8 public static final String Timeout_Trade_Queue_Name = "close_trade"; 9 10 11 @Bean 12 public Queue delayPayQueue() { 13 return new Queue(RabbitMqConfig.Timeout_Trade_Queue_Name, true); 14 } 15 16 17 // 定義廣播模式的延時交換機 無需綁定路由 18 @Bean 19 FanoutExchange delayExchange(){ 20 Map<String, Object> args = new HashMap<String, Object>(); 21 args.put("x-delayed-type", "direct"); 22 FanoutExchange topicExchange = new FanoutExchange(RabbitMqConfig.Delay_Exchange_Name, true, false, args); 23 topicExchange.setDelayed(true); 24 return topicExchange; 25 } 26 27 // 綁定延時隊列與交換機 28 @Bean 29 public Binding delayPayBind() { 30 return BindingBuilder.bind(delayPayQueue()).to(delayExchange()); 31 } 32 33 // 定義消息轉換器 34 @Bean 35 Jackson2JsonMessageConverter jsonMessageConverter() { 36 return new Jackson2JsonMessageConverter(); 37 } 38 39 // 定義消息模板用於發布消息,並且設置其消息轉換器 40 @Bean 41 RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { 42 final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 43 rabbitTemplate.setMessageConverter(jsonMessageConverter()); 44 return rabbitTemplate; 45 } 46 @Bean 47 RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) { 48 return new RabbitAdmin(connectionFactory); 49 } 50 51 }
這塊注意一下,必須要安裝rabbitmq_delayed_message_exchange插件,下載地址:點我。不然啟動的時候會報錯。
3、實體類代碼:

1 import java.io.Serializable; 2 import java.math.BigDecimal; 3 4 public class OrderMasterDo implements Serializable { 5 6 private String orderId; 7 private BigDecimal amt; 8 private Integer payStatus; 9 10 public String getOrderId() { 11 return orderId; 12 } 13 14 public void setOrderId(String orderId) { 15 this.orderId = orderId; 16 } 17 18 public BigDecimal getAmt() { 19 return amt; 20 } 21 22 public void setAmt(BigDecimal amt) { 23 this.amt = amt; 24 } 25 26 public Integer getPayStatus() { 27 return payStatus; 28 } 29 30 public void setPayStatus(Integer payStatus) { 31 this.payStatus = payStatus; 32 } 33 }
4、消息生產者代碼:

1 @RestController 2 @RequestMapping("/basic") 3 public class BasicInfoController { 4 5 private final Logger logger = LoggerFactory.getLogger(BasicInfoController.class); 6 7 @Autowired 8 private RabbitTemplate rabbitTemplate; 9 10 @GetMapping("/test") 11 public String createOrderTest() { 12 13 // 創建訂單 14 OrderMasterDo orderMaster = new OrderMasterDo(); 15 //未支付 16 orderMaster.setPayStatus(0); 17 orderMaster.setOrderId("001"); 18 orderMaster.setAmt(new BigDecimal(1000)); 19 20 // 發送訂單到消息隊列 21 rabbitTemplate.convertAndSend(RabbitMqConfig.Delay_Exchange_Name, "", orderMaster, message ->{ 22 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); 23 message.getMessageProperties().setDelay(10*1000); // 毫秒為單位,指定此消息的延時時長 24 return message; 25 }); 26 27 System.out.println("創建訂單成功"); 28 29 return "創建訂單成功"; 30 } 31 }
5、消費者代碼:

1 @Component 2 public class OrderReceiver { 3 4 @RabbitListener(queues = RabbitMqConfig.Timeout_Trade_Queue_Name) 5 public void process(OrderMasterDo orderMasterDo, Message message, Channel channel) throws IOException{ 6 try { 7 System.out.println("開始執行訂單[{}]的支付超時訂單關閉......"); 8 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 9 System.out.println("超時訂單處理完畢"); 10 } catch (Exception e) { 11 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); 12 } 13 } 14 }
6、測試效果:
可以看到,生產者生產的消息在10s之后被消費。
總結:
以上就實現了基本的功能需求,但還不完美,因為沒有考慮到消息丟失、重復等問題,關於這塊下一篇會說到。
參考資料: