rabbitmq-plugins enable rabbitmq_delayed_message_exchange #安裝插件 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
生產端
rabbitTemplate.convertAndSend("direct.exchange", "notify", msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(20000); System.out.println(sdf.format(new Date()) + " Delay sent."); return message; } });
消費端
@Component("delayedReceiver") @EnableRabbit @Configuration public class DelayedReceiver { private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @RabbitListener(queues = "notify.queue") public void handleMessageNotify(Object object, Channel channel) throws Exception { Message msg = (Message) object; System.out.println(sdf.format(new Date()) + " notify :" + new String(msg.getBody())); channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); } @Bean public Exchange direct() { return ExchangeBuilder.directExchange("direct.exchange").delayed().withArgument("x-delayed-type", "direct") .build(); } @Bean public Queue notifyQueue() { return new Queue("notify.queue"); } @Bean public Binding bindingNotify(DirectExchange direct, Queue notifyQueue) { return BindingBuilder.bind(notifyQueue).to(direct).with("notify"); } }