使用RabbitMQ實現訂單超時取消,大致流程:
生產者生產一條設置了TTL的延遲取消訂單消息=>延遲隊列交換機(通過綁定路由鍵)=>消息投遞至延遲隊列=>消息延遲隊列時間到期=>經過死信隊列交換機(通過綁定路由鍵)=>投遞至死信隊列=>消費者監聽死信隊列消息即時消費(做取消訂單邏輯)。
下面來看代碼:
一、先聲明交換機、隊列以及他們的綁定關系:
@Configuration public class RabbitMQConfig { // 聲明延時隊列交換機 public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; //延時隊列c public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec"; //延時隊列c路由key public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey"; //聲明死信隊列交換機 public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; // 死信隊列c public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec"; //死信交換機 的 不設時間路由key public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey"; // 聲明延時Exchange @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); } // 聲明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 聲明延時隊列C 不設置TTL // 並綁定到對應的死信交換機 @Bean("delayQueueC") public Queue delayQueueC(){ Map<String, Object> args = new HashMap<>(3); // x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 這里聲明當前隊列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY); return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build(); } // 聲明死信隊列C 用於接收延時任意時長處理的消息 @Bean("deadLetterQueueC") public Queue deadLetterQueueC(){ return new Queue(DEAD_LETTER_QUEUEC_NAME); } // 聲明延時隊列C綁定關系 @Bean public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY); } // 聲明死信隊列C綁定關系 @Bean public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY); } }
二、設置延遲隊列配置綁定關系
@Configuration public class DelayedRabbitMQConfig { public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue"; public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange"; public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey"; @Bean public Queue immediateQueue() { return new Queue(DELAYED_QUEUE_NAME); } @Bean public CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue, @Qualifier("customExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
三、生產者發送消息
@Component public class DelayMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendDelayMsg(String msg, Integer delayTime) { rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{ a.getMessageProperties().setDelay(delayTime); return a; }); } }
四、設置消費者監聽
@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當前時間:{},取消訂單,msg:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
五、調用接口
@Slf4j @RequestMapping("rabbitmq") @RestController public class RabbitMQMsgController { @Autowired private DelayMessageSender sender; /** * 發送延遲取消訂單消息 * @param msg 消息體 * @param delayTime 自定義延遲取消訂單時間(毫秒) */ @RequestMapping("delayMsg") public void delayMsg2(String msg, Integer delayTime) { msg = msg +"=>"+ (int)(Math.random() * 90000.0D + 10000.0D); log.info("當前時間:{},生成訂單,msg:{},delayTime:{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), msg, delayTime); sender.sendDelayMsg(msg, delayTime); } }
輸出結果:我設置的是5秒后取消訂單
rmq配置: