使用RabbitMQ實現訂單超時取消(延遲隊列)


使用RabbitMQ實現訂單超時取消,大致流程:

 

生產者生產一條設置了TTL的延遲取消訂單消息=>延遲隊列交換機(通過綁定路由鍵)=>消息投遞至延遲隊列=>消息延遲隊列時間到期=>經過死信隊列交換機(通過綁定路由鍵)=>投遞至死信隊列=>消費者監聽死信隊列消息即時消費(做取消訂單邏輯)。

 

下面來看代碼:

一、先聲明交換機、隊列以及他們的綁定關系:

@Configuration
public class RabbitMQConfig {

    // 聲明延時隊列交換機
    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";


    //延時隊列c
    public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec";


    //延時隊列c路由key
    public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey";

    //聲明死信隊列交換機
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";


    // 死信隊列c
    public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec";


    //死信交換機 的 不設時間路由key
    public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey";

    // 聲明延時Exchange
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    // 聲明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }



    // 聲明延時隊列C 不設置TTL
    // 並綁定到對應的死信交換機
    @Bean("delayQueueC")
    public Queue delayQueueC(){
        Map<String, Object> args = new HashMap<>(3);
        // x-dead-letter-exchange    這里聲明當前隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key  這里聲明當前隊列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
        return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
    }


    // 聲明死信隊列C 用於接收延時任意時長處理的消息
    @Bean("deadLetterQueueC")
    public Queue deadLetterQueueC(){
        return new Queue(DEAD_LETTER_QUEUEC_NAME);
    }



    // 聲明延時隊列C綁定關系
    @Bean
    public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
    }


    // 聲明死信隊列C綁定關系
    @Bean
    public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
    }
}

二、設置延遲隊列配置綁定關系

@Configuration
public class DelayedRabbitMQConfig {
    public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange";
    public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey";

    @Bean
    public Queue immediateQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,
                                 @Qualifier("customExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

三、生產者發送消息

@Component
public class DelayMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayMsg(String msg, Integer delayTime) {
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
            a.getMessageProperties().setDelay(delayTime);
            return a;
        });
    }
}

 

 

四、設置消費者監聽

@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當前時間:{},取消訂單,msg:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 
}
}

五、調用接口

@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {

    @Autowired
    private DelayMessageSender sender;


    /**
     * 發送延遲取消訂單消息
     * @param msg 消息體
     * @param delayTime 自定義延遲取消訂單時間(毫秒)
     */
    @RequestMapping("delayMsg")
    public void delayMsg2(String msg, Integer delayTime) {
         msg = msg +"=>"+ (int)(Math.random() * 90000.0D + 10000.0D);
        log.info("當前時間:{},生成訂單,msg:{},delayTime:{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), msg, delayTime);
        sender.sendDelayMsg(msg, delayTime);
    }
}

 

輸出結果:我設置的是5秒后取消訂單

 

rmq配置:

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM