Springboot集成RabbitMQ實現消息的發送和消費


一、Exchange 類型

Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。

direct:一對一。消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。

topic:一對多。通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“*”。#匹配0個或多個單詞,“*”匹配不多不少一個單詞。

fanout:廣播。每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去,連routing key都不需要。

headers: 匹配 AMQP 消息的 header 而不是路由鍵,headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了。

過程:

項目結構:

 二、搭建父工程rabbitMQ-parent

依賴包pom.xml

    <packaging>pom</packaging>

    <parent>
        <artifactId>spring-boot-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.1.16.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.1.1</version>
        </dependency>
    </dependencies>

三、搭建消息發送者send

1.結構:

2.依賴pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>    

3.配置文件application.yml

server:
  port: 7070
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

4.啟動類SendApp

@SpringBootApplication
public class SendApp {
    public static void main(String[] args) {
        SpringApplication.run(SendApp.class,args);
    }
}

四、搭建消息消費者receive

1.結構

2.依賴pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>    

3.配置文件application.yml

server:
  port: 7070 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest

4.啟動類ReceiveApp

@SpringBootApplication
public class ReceiveApp {
    public static void main(String[] args) {
        SpringApplication.run(ReceiveApp.class,args);
    }
}

五、發送消息

1.send中創建配置類AmqpSendConfig 

@Configuration
public class AmqpSendConfig {
    @Bean //模擬<bean>標簽
    public DirectExchange getDirectExchange(){
        return new DirectExchange("myDirectExchange");
    }

    @Bean
    public FanoutExchange getFanoutExchange(){
        return new FanoutExchange("myFanoutExchange");
    }

    @Bean
    public TopicExchange getTopicExchange(){
        return new TopicExchange("myTopicExchange");
    }
}

2.SendService 

@Service
public class SendService {

    //自動注入amqpTemplate模板對象
    @Resource
    private AmqpTemplate amqpTemplate;

    //direct方式
    public  String driectSend(){
        /**
        *參數一:交換機名稱,需要事先在config創建
        *參數二:routing Key  根據這個Key發送消息到指定地方
        *參數三:消息的具體內容
        **/
        amqpTemplate.convertAndSend("myDirectExchange","directKey","你吃飯了嗎?");
        return "direct發送成功了";
    }

    //fanout方式
    public String fanoutSend(){
        int []arr={1,2,3,4};
        //fanout為廣播,不需要rountkey
        this.amqpTemplate.convertAndSend("myFanoutExchange","",arr);

        return "fanout發送成功了";
    }

    //topic方式
    public String topicSend() {
        this.amqpTemplate.convertAndSend("myTopicExchange","abc.def","1111115555555");

        return "topic發送成功了";
    }


}

3.SendController

@RestController
public class SendController {

    @Autowired
    private SendService sendService;

    @RequestMapping("/direct")
    public String directExchange(){
        return this.sendService.driectSend();
    }

    @RequestMapping("/fanout")
    public String fanoutExchange(){
        return this.sendService.fanoutSend();
    }

    @RequestMapping("/topic")
    public String topicExchange() {
        return this.sendService.topicSend();
    }
}

 六、接收消息

1.receive中添加配置類

direct方式配置類DirectReceiveConfig

@Configuration
public class DirectReceiveConfig {
    //1.創建一個名字為myDirectQueue的隊列
    @Bean
    public Queue getQueue() {
        return new Queue("myDirectQueue");
    }

    //2.創建名字為myDirectExchange的交換機
    @Bean
    public Exchange getExchange() {
        return new DirectExchange("myDirectExchange");
    }

    //3.將隊列綁定到交換機
    @Bean("binding")
    public Binding binding(Queue getQueue,Exchange getExchange){
        Binding myBinding = BindingBuilder.bind(getQueue)
                .to(getExchange)
                .with("directKey")
                .noargs();
        return myBinding;
    }
}

fanout方式配置類FanoutReceiveConfig 

@Configuration
public class FanoutReceiveConfig {

    @Bean
    public Queue fanoutQueue(){
        return new Queue("myFanoutQueue");
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("myFanoutExchange");
    }

    @Bean
    public Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }
}

topic方式配置類TopicReceiveConfig 

@Configuration
public class TopicReceiveConfig {

    //隊列一
    @Bean
    public Queue topicQueue(){
        return new Queue("myTopicQueue");
    }

    //隊列二
    @Bean
    public Queue topicQueue2(){
        return new Queue("myTopicQueue2");
    }

    //交換機
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("myTopicExchange");
    }

    //隊列一綁定到交換機
    @Bean
    public Binding binding1(Queue topicQueue,TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue)
                .to(topicExchange)
                .with("abc.#");//routingKey以abc開頭的都可以接受到消息
    }

    //隊列一綁定到交換機
    @Bean
    public Binding binding2(Queue topicQueue2,TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue2)
                .to(topicExchange)
                .with("#.def");//routingKey以def結尾的都可以接受到消息
    }

}

2.ReceiveService 

@Service
public class ReceiveService {

    //direct方式
    //@RabbitListener注解用於標記當前方法為消息監聽方法,可以監聽某個隊列,當隊列中有新消息則自動完成接收,
    @RabbitListener(queues = "myDirectQueue")
    public void receive(String message){
        System.out.println("接收:"+message);

    }

    //fanout方式
    @RabbitListener(queues = "myFanoutQueue")
    public void fanoutReceive(String o){

        System.out.println("fanout接收:"+ o);
    }

    //topic方式
    @RabbitListener(queues = "myTopicQueue")
    public void topicReceive1(String oi) {

        System.out.println("topic1接收:"+ oi);
    }

    @RabbitListener(queues = "myTopicQueue2")
    public void topicReceive2(String oi) {

        System.out.println("topic2接收:"+ oi);
    }
}

七、訪問測試

1.啟動RabbitMQ服務器

2.啟動發送方

3.啟動接收方

4.瀏覽器訪問查看是否發送成功

 5.控制器查看是否接收

 

項目github:https://github.com/smiles365/rabbitMQ-parent


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM