导入RabbitMQ相关依赖,当然要确保RabbitMQ环境搭建成功了。
pom.xml:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
* 1 自动配置 * RabbitAutoConfiguration ---rabiitmq的自动配置类 * 自动配置了连接工厂ConnectionFactory * RabbitProperties封装了RabbitMQ的配置,默认就有一写配置 * RabbitTemplate:给RabbitMQ发送和接受消息。 * AmqpAdmin:RabbitMQ系统管理功能组件。
我们可以在测试中对RabbitMQ进行测试:
package com.xt.springbootamqp; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringBootRabbitAmqpApplicationTests { //自动注入RabbitTemplate,由他来发送和接受消息 @Autowired private RabbitTemplate rabbitTemplate; /** *@Description: 单播模式,点对点 *@date: 2019/6/17 * * RabbitTemplate中发送消息send()的重载方法: * @Override * public void send(Message message) throws AmqpException { * send(this.exchange, this.routingKey, message); * } * * @Override * public void send(String routingKey, Message message) throws AmqpException { * send(this.exchange, routingKey, message); * } * * @Override * public void send(final String exchange, final String routingKey, final Message message) throws AmqpException { * send(exchange, routingKey, message, null); * } * */ @Test public void contextLoads() {
// 使用send方法 rabbitTemplate.send(exchange,rountKey,message);
String str = "hello";
Message message = new Message(str.getBytes(),null);
rabbitTemplate.send("","",message);
} }
但是我们一般发送消息常用的不是send方法,而是convertAndSend(),当然该方法也有很多重载方法。如图:
发送消息的过程:当生产者发送消息时,首先会根据指定的交换机exchange找到所属的多个队列,又因为与队列绑定的路由键rountKey不同,消息会根据路由键发送至相对应的队列。而消费者直接面对的是队列,只需要根据队列名字获取队列相应信息就能得到消息。
由于测试,我事先已经在RabbitMQ客户端上新增了多个交换机个队列。#表示匹配多个单词 .号表示匹配单个单词
Exchanges:wqq.direct(单播) ---> rountingkey: key.direct ---> queue.direct
---> rountingKey: key.direct2 ---> queue.direct2
wqq.fanout(广播) ---> rountingkey: 广播模式与路由键无关 ---> queue.fanout
---> rountingkey: ---> queue.fanout2
wqq.topic(发布订阅) ---> rountingkey: #.topic ---> ceshi.topic 只要是路由键是xxx.topic都能匹配
---> rountingkey: queue.# ---> queue.#
单播模式:
单播模式,也就是点对点。一个消息只能发送到一个交换机下的一个队列上去。
例如:将消息发送至exchange为wqq.direct所绑定的rountkey为key.direct的消息队列上,消息体是msg
/** *@Description: 测试单播模式 *@date: 2019/6/17 */ @Test public void test1(){ //常用的发送消息的方法 // rabbitTemplate.convertAndSend(String exchange,String rountKey,object message); String msg = "测试单播directRabbitMQ"; rabbitTemplate.convertAndSend("wqq.direct","key.direct",msg); }
消费者获取消息:
@Test public void getMsg(){ Object o = rabbitTemplate.receiveAndConvert("queue.direct"); System.out.println(o); }
广播模式:
交换机广播模式下与路由键无关,因为会直接向所属的队列全部发送该消息。
/** *@Description: 广播模式 *@date: 2019/6/17 * 在广播模式下,即使我设置了路由键,但是该交换机下的所欲队列都会受到该消息,证明路由键无效 */ @Test public void fanoutTest(){ rabbitTemplate.convertAndSend("wqq.fanout","queue.fanout","测试广播模式"); }
消费者接受消息:
@Test public void getFanoutMsg(){ Object o = rabbitTemplate.receiveAndConvert("queue.fanout"); System.out.println(o); }
发布订阅模式:
发布订阅模式它会根据消息的路由键去匹配路由键绑定的队列,只要能匹配上,就能将消息发送至队列上。
/** *@Description: 发布订阅模式 *@date: 2019/6/17 */ @Test public void topicMsg(){ // 匹配#.topic的路由键或者adc.#的路由键 匹配..topic的路由键或者adc..的路由键 rabbitTemplate.convertAndSend("wqq.topic","abc.topic","发布订阅topic模式"); } @Test public void topicMsg1(){ // 匹配#.topic的路由键或者queue.#的路由键 匹配..topic的路由键或者queue..的路由键 rabbitTemplate.convertAndSend("wqq.topic","queue.topic","发布订阅topic模式"); }
虽然生产者发送消息和消费者接受消息测试通过了,但是还有一个·问题是:我们一般发送消息都不太可能是字符串,一般是对象或者集合。但是经过测试后,如果发送对象,那么消息是序列换之后的结果,不利于查看。
这是由于RabbitMQ使用的是默认的jdk序列化器,所以我们可以自定义序列化器,将对象类型转换为JSON类型。
编写配置类:
@Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter(){ MessageConverter messageConverter = new Jackson2JsonMessageConverter(); return messageConverter; } }
监听消息队列:
一般在实际开发中一般使用监听消息队列当有消息发送至RabbitMQ中时,通过监听器监听到队列中存在消息,然后将消息进行消费。
例子:如果有两个系统,订单系统、库存系统,当订单系统产生一个订单,发送一个消息至RabbitMQ,库存系统监听到RabbitMQ队列中产生了新的消息,然后对该消息进行消费,也就是减库存操作。
步骤:
开启基于RabbitMQ的注解:
package com.xt.springbootamqp; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** *@Description: *@date: 2019/6/17 * 1 自动配置 * RabbitAutoConfiguration ---rabiitmq的自动配置类 * 自动配置了连接工厂ConnectionFactory * RabbitProperties封装了RabbitMQ的配置,默认就有一写配置 * RabbitTemplate:给RabbitMQ发送和接受消息。 * AmqpAdmin:RabbitMQ系统管理功能组件。 * * */ @SpringBootApplication @EnableRabbit public class SpringBootRabbitAmqpApplication { public static void main(String[] args) { SpringApplication.run(SpringBootRabbitAmqpApplication.class, args); } }
模拟订单系统:
package com.xt.springbootamqp.controller; import com.xt.springbootamqp.service.OrderServcieImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Date: 2019/6/17 16:16 * @Description: 模拟用户产生订单 */ @RestController public class UserController { @Autowired private OrderServcieImpl orderServcie; @RequestMapping(value = "/addOrder") public void addOrder(){ orderServcie.addOrder(); } }
package com.xt.springbootamqp.service; import com.xt.springbootamqp.pojo.Order; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @Date: 2019/6/17 16:08 * @Description: 订单系统 */ @Service public class OrderServcieImpl { /** *@Description: 模拟订单系统产生订单 *@date: 2019/6/17 */ @Autowired private RabbitTemplate rabbitTemplate; public void addOrder(){ //产生一个订单 Order order = new Order(); order.setOrderId(88); order.setOrderName("小马的订单"); //将订单发送至RabbitMQ中; rabbitTemplate.convertAndSend("wqq.topic","queue.topic",order); } }
模拟库存系统:
package com.xt.springbootamqp.service; import com.xt.springbootamqp.pojo.Order; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.util.function.ObjDoubleConsumer; /** * @Date: 2019/6/17 16:18 * @Description: 模拟库存系统,监听消息队列,当订单系统查询消息时,库存系统自动减库存 */ @Service public class StockServiceImpl { @RabbitListener(queues = {"queue.topic"}) //队列可以写多个,也就是可以监听多个队列 public void reduceStock(Order order){ //监听到消息,自动减库存 //获取订单信息 System.out.println(order.toString()); }
@RequestMapping(value = "/addOrder")
public void addOrderMsg(Message message){
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
}
}
虽然监听消息成功了,但是我们有没有发现,我们都是以RabbitMQ中已经有了交换器和队列在测试,那么如果RabbitMQ中没有交换器和队列。而是需要我们自己手动添加组件,那该怎么办呢?
AmpqAdmin 管理组件:
/** *@Description: 测试AmqpAdmin管理组件 *@date: 2019/6/17 */ @Autowired private AmqpAdmin amqpAdmin; /** *@Description: 创建exchange *@date: 2019/6/17 */ @Test public void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("")); amqpAdmin.declareExchange(new FanoutExchange("")); amqpAdmin.declareExchange(new TopicExchange("")); }
自定义创建Exchange、队列、绑定关系:
@Autowired private AmqpAdmin amqpAdmin; /** *@Description: 创建exchange、队列、绑定规则 *@date: 2019/6/17 */ @Test public void test3(){ //创建exchange amqpAdmin.declareExchange(new DirectExchange("amqp.direct_exchange")); /* amqpAdmin.declareExchange(new FanoutExchange("")); amqpAdmin.declareExchange(new TopicExchange("")); */ //创建队列 amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));//是否持久化 //创建绑定关系 amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqp.direct_exchange","amapadmin_key",null)); }