最近公司做项目,涉及到下订单的功能,项目不大,用的人也不多,其实可以不用引入rabbit mq的,但本着闲着也是闲着的态度,即使项目规模不大咱也专业点。其实之前做过类似需求的功能,当时的实现方式是每个一分钟查询一次数据库,判断当前记录的下单时间是否超时了,然后更改订单状态,是不是不太professional😄。闲扯就到这里了,下面开始正文。
关于rabbit mq的安装在这里就不说了,如果有需要请给我留言,可以单独辅导😄。
1、引入rabbit mq依赖:

1 <dependency> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-amqp</artifactId> 4 </dependency>

1 spring: 2 #整合rabbitmq 3 rabbitmq: 4 host: 127.0.0.1 5 port: 5672 6 username: guest 7 password: guest
2、rabbit mq配置类编写

1 @Configuration 2 public class RabbitMqConfig { 3 4 // 支付超时延时交换机 5 public static final String Delay_Exchange_Name = "delay.exchange"; 6 7 // 超时订单关闭队列 8 public static final String Timeout_Trade_Queue_Name = "close_trade"; 9 10 11 @Bean 12 public Queue delayPayQueue() { 13 return new Queue(RabbitMqConfig.Timeout_Trade_Queue_Name, true); 14 } 15 16 17 // 定义广播模式的延时交换机 无需绑定路由 18 @Bean 19 FanoutExchange delayExchange(){ 20 Map<String, Object> args = new HashMap<String, Object>(); 21 args.put("x-delayed-type", "direct"); 22 FanoutExchange topicExchange = new FanoutExchange(RabbitMqConfig.Delay_Exchange_Name, true, false, args); 23 topicExchange.setDelayed(true); 24 return topicExchange; 25 } 26 27 // 绑定延时队列与交换机 28 @Bean 29 public Binding delayPayBind() { 30 return BindingBuilder.bind(delayPayQueue()).to(delayExchange()); 31 } 32 33 // 定义消息转换器 34 @Bean 35 Jackson2JsonMessageConverter jsonMessageConverter() { 36 return new Jackson2JsonMessageConverter(); 37 } 38 39 // 定义消息模板用于发布消息,并且设置其消息转换器 40 @Bean 41 RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { 42 final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 43 rabbitTemplate.setMessageConverter(jsonMessageConverter()); 44 return rabbitTemplate; 45 } 46 @Bean 47 RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) { 48 return new RabbitAdmin(connectionFactory); 49 } 50 51 }
这块注意一下,必须要安装rabbitmq_delayed_message_exchange插件,下载地址:点我。不然启动的时候会报错。
3、实体类代码:

1 import java.io.Serializable; 2 import java.math.BigDecimal; 3 4 public class OrderMasterDo implements Serializable { 5 6 private String orderId; 7 private BigDecimal amt; 8 private Integer payStatus; 9 10 public String getOrderId() { 11 return orderId; 12 } 13 14 public void setOrderId(String orderId) { 15 this.orderId = orderId; 16 } 17 18 public BigDecimal getAmt() { 19 return amt; 20 } 21 22 public void setAmt(BigDecimal amt) { 23 this.amt = amt; 24 } 25 26 public Integer getPayStatus() { 27 return payStatus; 28 } 29 30 public void setPayStatus(Integer payStatus) { 31 this.payStatus = payStatus; 32 } 33 }
4、消息生产者代码:

1 @RestController 2 @RequestMapping("/basic") 3 public class BasicInfoController { 4 5 private final Logger logger = LoggerFactory.getLogger(BasicInfoController.class); 6 7 @Autowired 8 private RabbitTemplate rabbitTemplate; 9 10 @GetMapping("/test") 11 public String createOrderTest() { 12 13 // 创建订单 14 OrderMasterDo orderMaster = new OrderMasterDo(); 15 //未支付 16 orderMaster.setPayStatus(0); 17 orderMaster.setOrderId("001"); 18 orderMaster.setAmt(new BigDecimal(1000)); 19 20 // 发送订单到消息队列 21 rabbitTemplate.convertAndSend(RabbitMqConfig.Delay_Exchange_Name, "", orderMaster, message ->{ 22 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); 23 message.getMessageProperties().setDelay(10*1000); // 毫秒为单位,指定此消息的延时时长 24 return message; 25 }); 26 27 System.out.println("创建订单成功"); 28 29 return "创建订单成功"; 30 } 31 }
5、消费者代码:

1 @Component 2 public class OrderReceiver { 3 4 @RabbitListener(queues = RabbitMqConfig.Timeout_Trade_Queue_Name) 5 public void process(OrderMasterDo orderMasterDo, Message message, Channel channel) throws IOException{ 6 try { 7 System.out.println("开始执行订单[{}]的支付超时订单关闭......"); 8 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 9 System.out.println("超时订单处理完毕"); 10 } catch (Exception e) { 11 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); 12 } 13 } 14 }
6、测试效果:
可以看到,生产者生产的消息在10s之后被消费。
总结:
以上就实现了基本的功能需求,但还不完美,因为没有考虑到消息丢失、重复等问题,关于这块下一篇会说到。
参考资料: