Springboot集成RabbitMQ实现消息的发送和消费


一、Exchange 类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。

direct:一对一。消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。

topic:一对多。通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,“*”匹配不多不少一个单词。

fanout:广播。每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去,连routing key都不需要。

headers: 匹配 AMQP 消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。

过程:

项目结构:

 二、搭建父工程rabbitMQ-parent

依赖包pom.xml

    <packaging>pom</packaging>

    <parent>
        <artifactId>spring-boot-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.1.16.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.1.1</version>
        </dependency>
    </dependencies>

三、搭建消息发送者send

1.结构:

2.依赖pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>    

3.配置文件application.yml

server:
  port: 7070
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

4.启动类SendApp

@SpringBootApplication
public class SendApp {
    public static void main(String[] args) {
        SpringApplication.run(SendApp.class,args);
    }
}

四、搭建消息消费者receive

1.结构

2.依赖pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>    

3.配置文件application.yml

server:
  port: 7070 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest

4.启动类ReceiveApp

@SpringBootApplication
public class ReceiveApp {
    public static void main(String[] args) {
        SpringApplication.run(ReceiveApp.class,args);
    }
}

五、发送消息

1.send中创建配置类AmqpSendConfig 

@Configuration
public class AmqpSendConfig {
    @Bean //模拟<bean>标签
    public DirectExchange getDirectExchange(){
        return new DirectExchange("myDirectExchange");
    }

    @Bean
    public FanoutExchange getFanoutExchange(){
        return new FanoutExchange("myFanoutExchange");
    }

    @Bean
    public TopicExchange getTopicExchange(){
        return new TopicExchange("myTopicExchange");
    }
}

2.SendService 

@Service
public class SendService {

    //自动注入amqpTemplate模板对象
    @Resource
    private AmqpTemplate amqpTemplate;

    //direct方式
    public  String driectSend(){
        /**
        *参数一:交换机名称,需要事先在config创建
        *参数二:routing Key  根据这个Key发送消息到指定地方
        *参数三:消息的具体内容
        **/
        amqpTemplate.convertAndSend("myDirectExchange","directKey","你吃饭了吗?");
        return "direct发送成功了";
    }

    //fanout方式
    public String fanoutSend(){
        int []arr={1,2,3,4};
        //fanout为广播,不需要rountkey
        this.amqpTemplate.convertAndSend("myFanoutExchange","",arr);

        return "fanout发送成功了";
    }

    //topic方式
    public String topicSend() {
        this.amqpTemplate.convertAndSend("myTopicExchange","abc.def","1111115555555");

        return "topic发送成功了";
    }


}

3.SendController

@RestController
public class SendController {

    @Autowired
    private SendService sendService;

    @RequestMapping("/direct")
    public String directExchange(){
        return this.sendService.driectSend();
    }

    @RequestMapping("/fanout")
    public String fanoutExchange(){
        return this.sendService.fanoutSend();
    }

    @RequestMapping("/topic")
    public String topicExchange() {
        return this.sendService.topicSend();
    }
}

 六、接收消息

1.receive中添加配置类

direct方式配置类DirectReceiveConfig

@Configuration
public class DirectReceiveConfig {
    //1.创建一个名字为myDirectQueue的队列
    @Bean
    public Queue getQueue() {
        return new Queue("myDirectQueue");
    }

    //2.创建名字为myDirectExchange的交换机
    @Bean
    public Exchange getExchange() {
        return new DirectExchange("myDirectExchange");
    }

    //3.将队列绑定到交换机
    @Bean("binding")
    public Binding binding(Queue getQueue,Exchange getExchange){
        Binding myBinding = BindingBuilder.bind(getQueue)
                .to(getExchange)
                .with("directKey")
                .noargs();
        return myBinding;
    }
}

fanout方式配置类FanoutReceiveConfig 

@Configuration
public class FanoutReceiveConfig {

    @Bean
    public Queue fanoutQueue(){
        return new Queue("myFanoutQueue");
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("myFanoutExchange");
    }

    @Bean
    public Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }
}

topic方式配置类TopicReceiveConfig 

@Configuration
public class TopicReceiveConfig {

    //队列一
    @Bean
    public Queue topicQueue(){
        return new Queue("myTopicQueue");
    }

    //队列二
    @Bean
    public Queue topicQueue2(){
        return new Queue("myTopicQueue2");
    }

    //交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("myTopicExchange");
    }

    //队列一绑定到交换机
    @Bean
    public Binding binding1(Queue topicQueue,TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue)
                .to(topicExchange)
                .with("abc.#");//routingKey以abc开头的都可以接受到消息
    }

    //队列一绑定到交换机
    @Bean
    public Binding binding2(Queue topicQueue2,TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue2)
                .to(topicExchange)
                .with("#.def");//routingKey以def结尾的都可以接受到消息
    }

}

2.ReceiveService 

@Service
public class ReceiveService {

    //direct方式
    //@RabbitListener注解用于标记当前方法为消息监听方法,可以监听某个队列,当队列中有新消息则自动完成接收,
    @RabbitListener(queues = "myDirectQueue")
    public void receive(String message){
        System.out.println("接收:"+message);

    }

    //fanout方式
    @RabbitListener(queues = "myFanoutQueue")
    public void fanoutReceive(String o){

        System.out.println("fanout接收:"+ o);
    }

    //topic方式
    @RabbitListener(queues = "myTopicQueue")
    public void topicReceive1(String oi) {

        System.out.println("topic1接收:"+ oi);
    }

    @RabbitListener(queues = "myTopicQueue2")
    public void topicReceive2(String oi) {

        System.out.println("topic2接收:"+ oi);
    }
}

七、访问测试

1.启动RabbitMQ服务器

2.启动发送方

3.启动接收方

4.浏览器访问查看是否发送成功

 5.控制器查看是否接收

 

项目github:https://github.com/smiles365/rabbitMQ-parent


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM