原文:https://my.oschina.net/u/3266761/blog/1926588
rabbitMq是受欢迎的消息中间件之一,相比其他的消息中间件,具有高并发的特性(天生具备高并发高可用的erlang语言编写),除此之外,还可以持久化,保证消息不易丢失,高可用,实现集群部署,提供灵活的路由和可靠性,可视化管理等等的优点。
相比于其他的消息队列,rabbitmq最大的特色就是加入了exchange(交换器)这个东西,AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。
RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。
fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。
direct:把消息投递到那些binding key与routing key完全匹配的队列中。
topic:将消息路由到binding key与routing key模式匹配的队列中。
言归正传,延时队列如何通过rabbitmq来实现呢?
分析:首先rabbitmq自己是不具备延时的功能的,除了使用官方提供的插件之外,我们还可以通过ttl(设置超时时间的方式)+ DLX(一个死信队列)的方式来实现 + Router(转发队列)
其中,ttl可以设置在消息上,也可以设置在队列上,设置在消息上可以提供更大的灵活性,但是如果同时设置超时时间的话,就取最小的超时时间为准。
此外,死信队列是一个普通的队列,它没有消费者,用来存储有超时时间信息的消息,并且可以设置当消息超时(ttl),转发到另一个指定队列(此处设置转发到router, 当发送消息之后(发送时,带上要延时的队列名称),等待消息超时,将消息转发到指定的Router队列。
最后,转发队列,用来接收死信队列超时消息,在接收到之后,消费者将消息解析,获取queueName,body,再向所获取的queueName队列发送一条消息,内容为body.
下面是代码:
生产者:
package cn.chinotan.service.delayQueueRabbitMQ; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @program: test * @description: 生产者 * @author: xingcheng * @create: 2018-08-12 12:33 **/ @Service public class Producr { private static final Logger LOGGER = LoggerFactory.getLogger(Producr.class); @Autowired private AmqpTemplate amqpTemplate; public void send(String msg, long time, String delayQueueName) { //rabbit默认为毫秒级 long times = time * 1000; MessagePostProcessor processor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(String.valueOf(times)); return message; } }; // 拼装msg msg = StringUtils.join(msg, ":", delayQueueName); amqpTemplate.convertAndSend(MqConstant.MY_EXCHANGE, MqConstant.DEAD_LETTER_QUEUE, msg, processor); } }
消费队列1:
package cn.chinotan.service.delayQueueRabbitMQ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.Date; /** * @program: test * @description: queueOne消费者 * @author: xingcheng * @create: 2018-08-12 12:35 **/ @Service public class MyQueueOneConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueOneConsumer.class); @RabbitListener(queues=MqConstant.MY_QUEUE_ONE) @RabbitHandler public void process(String content) { LOGGER.info("延迟时间到,queueOne开始执行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } }
消费队列2:
package cn.chinotan.service.delayQueueRabbitMQ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.Date; /** * @program: test * @description: queueTwo队列 * @author: xingcheng * @create: 2018-08-12 12:35 **/ @Service public class MyQueueTwoConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueTwoConsumer.class); @Autowired private Producr producr; @RabbitListener(queues=MqConstant.MY_QUEUE_TWO) @RabbitHandler public void process(String content) { LOGGER.info("延迟时间到,queueTwo开始执行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } }
转发队列:
package cn.chinotan.service.delayQueueRabbitMQ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.Date; /** * @program: test * @description: 转发队列 * @author: xingcheng * @create: 2018-08-12 12:35 **/ @Service public class TradeProcess { private static final Logger LOGGER = LoggerFactory.getLogger(TradeProcess.class); @Autowired private AmqpTemplate amqpTemplate; @RabbitListener(queues=MqConstant.MY_TRANS_QUEUE) @RabbitHandler public void process(String content) { String msg = content.split(":")[0]; String delayQueueName = content.split(":")[1]; amqpTemplate.convertAndSend(MqConstant.MY_EXCHANGE, delayQueueName, msg); LOGGER.info("进行转发 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } }
队列配置:
package cn.chinotan.service.delayQueueRabbitMQ; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @program: test * @description: 延时队列rabbitMQ配置 * @author: xingcheng * @create: 2018-08-12 12:27 **/ @Configuration public class RabbitConfig { @Bean public DirectExchange myExchange() { return new DirectExchange(MqConstant.MY_EXCHANGE, true, false); } @Bean public Queue myQueueOne() { return new Queue(MqConstant.MY_QUEUE_ONE, true, false, false); } @Bean public Queue myQueueTwo() { return new Queue(MqConstant.MY_QUEUE_TWO, true, false, false); } @Bean public Queue myTransQueue() { return new Queue(MqConstant.MY_TRANS_QUEUE, true, false, false); } @Bean public Queue deadLetterQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange", MqConstant.MY_EXCHANGE); map.put("x-dead-letter-routing-key", MqConstant.MY_TRANS_QUEUE); Queue queue = new Queue(MqConstant.DEAD_LETTER_QUEUE, true, false, false, map); System.out.println("arguments :" + queue.getArguments()); return queue; } @Bean public Binding queueOneBinding() { return BindingBuilder.bind(myQueueOne()).to(myExchange()).with(MqConstant.MY_QUEUE_ONE); } @Bean public Binding queueTwoBinding() { return BindingBuilder.bind(myQueueTwo()).to(myExchange()).with(MqConstant.MY_QUEUE_TWO); } @Bean public Binding queueDeadBinding() { return BindingBuilder.bind(deadLetterQueue()).to(myExchange()).with(MqConstant.DEAD_LETTER_QUEUE); } @Bean public Binding queueTransBinding() { return BindingBuilder.bind(myTransQueue()).to(myExchange()).with(MqConstant.MY_TRANS_QUEUE); } }
队列常量配置:
package cn.chinotan.service.delayQueueRabbitMQ; /** * @program: test * @description: rabbitMq常量 * @author: xingcheng * @create: 2018-08-12 12:30 **/ public class MqConstant { public static final String MY_EXCHANGE = "my_exchange"; public static final String MY_QUEUE_ONE = "my_queue_one"; public static final String MY_QUEUE_TWO = "my_queue_two"; public static final String DEAD_LETTER_QUEUE = "dead_letter_queue"; public static final String MY_TRANS_QUEUE = "my_trans_queue"; }
测试延时controller:
package cn.chinotan.controller; import cn.chinotan.service.delayQueueRabbitMQ.MqConstant; import cn.chinotan.service.delayQueueRabbitMQ.Producr; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.text.SimpleDateFormat; import java.util.Date; /** * @program: test * @description: 延时队列启动类 * @author: xingcheng * @create: 2018-08-12 15:41 **/ @RestController @RequestMapping("/delayQueue") public class DelayQueueController { private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueController.class); @Autowired private Producr producr; @GetMapping("/send/{time}") public String send(@PathVariable("time") int time){ LOGGER.info("{}秒后, 发送延迟消息,当前时间{}", time, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); producr.send("我是延时消息...", time, MqConstant.MY_QUEUE_TWO); return "ok"; } }
测试结果: