Spring Boot 整合RabbitMQ


导入RabbitMQ相关依赖,当然要确保RabbitMQ环境搭建成功了。

pom.xml:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
 *  1 自动配置
 *      RabbitAutoConfiguration ---rabiitmq的自动配置类
 *      自动配置了连接工厂ConnectionFactory
 *      RabbitProperties封装了RabbitMQ的配置,默认就有一写配置
 * RabbitTemplate:给RabbitMQ发送和接受消息。
 *      AmqpAdmin:RabbitMQ系统管理功能组件。

 

我们可以在测试中对RabbitMQ进行测试:

package com.xt.springbootamqp;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootRabbitAmqpApplicationTests {

    //自动注入RabbitTemplate,由他来发送和接受消息
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *@Description:  单播模式,点对点
     *@date: 2019/6/17
     *   
     *   RabbitTemplate中发送消息send()的重载方法:
     *          @Override
     *        public void send(Message message) throws AmqpException {
     *                 send(this.exchange, this.routingKey, message);
     *          }
     *
     *           @Override
     *       public void send(String routingKey, Message message) throws AmqpException {
     *                 send(this.exchange, routingKey, message);
     *          }
     *
     *          @Override
     *      public void send(final String exchange, final String routingKey, final Message message) throws AmqpException {
     *                 send(exchange, routingKey, message, null);
     *       }
     * 
     */
    @Test
    public void contextLoads() {
    // 使用send方法 rabbitTemplate.send(exchange,rountKey,message);
      String str = "hello";
      Message message = new Message(str.getBytes(),null);
      rabbitTemplate.send("","",message);
} }

但是我们一般发送消息常用的不是send方法,而是convertAndSend(),当然该方法也有很多重载方法。如图:

 

发送消息的过程:当生产者发送消息时,首先会根据指定的交换机exchange找到所属的多个队列,又因为与队列绑定的路由键rountKey不同,消息会根据路由键发送至相对应的队列。而消费者直接面对的是队列,只需要根据队列名字获取队列相应信息就能得到消息。

由于测试,我事先已经在RabbitMQ客户端上新增了多个交换机个队列。#表示匹配多个单词   .号表示匹配单个单词

Exchanges:wqq.direct(单播)     ---> rountingkey:  key.direct  --->  queue.direct    

               ---> rountingKey: key.direct2 ---> queue.direct2

      wqq.fanout(广播)   ---> rountingkey: 广播模式与路由键无关   ---> queue.fanout

               ---> rountingkey:                                  ---> queue.fanout2

      wqq.topic(发布订阅)  ---> rountingkey: #.topic  ---> ceshi.topic           只要是路由键是xxx.topic都能匹配

                ---> rountingkey: queue.# ---> queue.#

 

单播模式:

单播模式,也就是点对点。一个消息只能发送到一个交换机下的一个队列上去。

例如:将消息发送至exchange为wqq.direct所绑定的rountkey为key.direct的消息队列上,消息体是msg

    /**
     *@Description: 测试单播模式
     *@date: 2019/6/17
     */
    @Test
    public void test1(){
        //常用的发送消息的方法
       // rabbitTemplate.convertAndSend(String exchange,String rountKey,object message);
        String msg = "测试单播directRabbitMQ";
 rabbitTemplate.convertAndSend("wqq.direct","key.direct",msg);

    }

消费者获取消息:

    @Test
    public void getMsg(){
        Object o = rabbitTemplate.receiveAndConvert("queue.direct");
        System.out.println(o);
    }

 

广播模式:

交换机广播模式下与路由键无关,因为会直接向所属的队列全部发送该消息。

    /**
     *@Description: 广播模式
     *@date: 2019/6/17
     * 在广播模式下,即使我设置了路由键,但是该交换机下的所欲队列都会受到该消息,证明路由键无效
     */
    @Test
    public void fanoutTest(){
        rabbitTemplate.convertAndSend("wqq.fanout","queue.fanout","测试广播模式");
    }

消费者接受消息:

    @Test
    public void getFanoutMsg(){
        Object o = rabbitTemplate.receiveAndConvert("queue.fanout");
        System.out.println(o);
    }

 

发布订阅模式:

发布订阅模式它会根据消息的路由键去匹配路由键绑定的队列,只要能匹配上,就能将消息发送至队列上。

    /**
     *@Description: 发布订阅模式
     *@date: 2019/6/17
     */
    @Test
    public void topicMsg(){
        // 匹配#.topic的路由键或者adc.#的路由键     匹配..topic的路由键或者adc..的路由键
        rabbitTemplate.convertAndSend("wqq.topic","abc.topic","发布订阅topic模式");  
    }
    @Test
    public void topicMsg1(){
        // 匹配#.topic的路由键或者queue.#的路由键    匹配..topic的路由键或者queue..的路由键
        rabbitTemplate.convertAndSend("wqq.topic","queue.topic","发布订阅topic模式");
    }

 


 

虽然生产者发送消息和消费者接受消息测试通过了,但是还有一个·问题是:我们一般发送消息都不太可能是字符串,一般是对象或者集合。但是经过测试后,如果发送对象,那么消息是序列换之后的结果,不利于查看。

这是由于RabbitMQ使用的是默认的jdk序列化器,所以我们可以自定义序列化器,将对象类型转换为JSON类型。

编写配置类:

@Configuration
public class MyRabbitConfig {

 @Bean public MessageConverter messageConverter(){ MessageConverter messageConverter = new Jackson2JsonMessageConverter(); return messageConverter; }

}

 

 

 


 

监听消息队列:

一般在实际开发中一般使用监听消息队列当有消息发送至RabbitMQ中时,通过监听器监听到队列中存在消息,然后将消息进行消费。

例子:如果有两个系统,订单系统、库存系统,当订单系统产生一个订单,发送一个消息至RabbitMQ,库存系统监听到RabbitMQ队列中产生了新的消息,然后对该消息进行消费,也就是减库存操作。

步骤:

开启基于RabbitMQ的注解:

package com.xt.springbootamqp;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 *@Description:
 *@date: 2019/6/17
 *  1 自动配置
 *      RabbitAutoConfiguration   ---rabiitmq的自动配置类
 *      自动配置了连接工厂ConnectionFactory
 *      RabbitProperties封装了RabbitMQ的配置,默认就有一写配置
 *      RabbitTemplate:给RabbitMQ发送和接受消息。
 *      AmqpAdmin:RabbitMQ系统管理功能组件。
 *
 *
 */
@SpringBootApplication
@EnableRabbit public class SpringBootRabbitAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootRabbitAmqpApplication.class, args);
    }

}

模拟订单系统:

package com.xt.springbootamqp.controller;

import com.xt.springbootamqp.service.OrderServcieImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Date: 2019/6/17 16:16
 * @Description:  模拟用户产生订单
 */
@RestController
public class UserController {

    @Autowired
    private OrderServcieImpl orderServcie;


    @RequestMapping(value = "/addOrder")
    public void addOrder(){
        orderServcie.addOrder();
    }

}
package com.xt.springbootamqp.service;

import com.xt.springbootamqp.pojo.Order;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Date: 2019/6/17 16:08
 * @Description:  订单系统
 */
@Service
public class OrderServcieImpl {

    /**
     *@Description: 模拟订单系统产生订单
     *@date: 2019/6/17
     */

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void addOrder(){
        //产生一个订单
        Order order = new Order();
        order.setOrderId(88);
        order.setOrderName("小马的订单");

        //将订单发送至RabbitMQ中;
        rabbitTemplate.convertAndSend("wqq.topic","queue.topic",order);
    }

}

模拟库存系统:

package com.xt.springbootamqp.service;

import com.xt.springbootamqp.pojo.Order;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.util.function.ObjDoubleConsumer;

/**
 * @Date: 2019/6/17 16:18
 * @Description:   模拟库存系统,监听消息队列,当订单系统查询消息时,库存系统自动减库存
 */
@Service
public class StockServiceImpl {


 @RabbitListener(queues = {"queue.topic"}) //队列可以写多个,也就是可以监听多个队列 public void reduceStock(Order order){
        //监听到消息,自动减库存
        //获取订单信息
        System.out.println(order.toString());
    }
  @RequestMapping(value = "/addOrder")
  public void addOrderMsg(Message message){
  byte[] body = message.getBody();
  MessageProperties messageProperties = message.getMessageProperties();
  }
}

 

 虽然监听消息成功了,但是我们有没有发现,我们都是以RabbitMQ中已经有了交换器和队列在测试,那么如果RabbitMQ中没有交换器和队列。而是需要我们自己手动添加组件,那该怎么办呢?


 

AmpqAdmin 管理组件:

    /**
     *@Description: 测试AmqpAdmin管理组件
     *@date: 2019/6/17
     */
    @Autowired
    private AmqpAdmin amqpAdmin;

    /**
     *@Description: 创建exchange
     *@date: 2019/6/17
     */
    @Test
    public void createExchange(){
        amqpAdmin.declareExchange(new DirectExchange(""));
        amqpAdmin.declareExchange(new FanoutExchange(""));
        amqpAdmin.declareExchange(new TopicExchange(""));
    }

 

 自定义创建Exchange、队列、绑定关系:

    @Autowired
    private AmqpAdmin amqpAdmin;

    /**
     *@Description: 创建exchange、队列、绑定规则
     *@date: 2019/6/17
     */
    @Test
    public void test3(){
        //创建exchange
        amqpAdmin.declareExchange(new DirectExchange("amqp.direct_exchange"));
       /*
        amqpAdmin.declareExchange(new FanoutExchange(""));
        amqpAdmin.declareExchange(new TopicExchange(""));
        */

       //创建队列
        amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));//是否持久化

        //创建绑定关系
        amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqp.direct_exchange","amapadmin_key",null));

    }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM