SpringBoot整合RabbitMQ


首先本文是學習過程中的一個小demo,不涉及實際的發送短信、郵件的發送邏輯,同時,在文中 RabbitMQ 是基於發布訂閱模式。


所以如下會使用郵件、短信發送的例子,生產者對外發布發送消息的接口,根據調用的參數發送到相應的隊列中。


其實這里面還會存在一些問題,比如事務問題、重復簽收問題等等,由於是練手Demo,其他問題留在后面的文章補充。

文章目錄

1. 生產者1.1 maven依賴1.2 application.yml配置類1.3 交換機綁定隊列1.4 生產者投遞消息1.5 控制層調用代碼2. 消費者2.1 maven依賴2.2 application.yml配置2.3 郵件消費者2.4 短信消費者3. 運行測試

1. 生產者

1.1 maven依賴

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.RELEASE</version>
</parent>

<dependencies>
    <!-- springboot-web組件 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 添加springboot對amqp的支持 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
    </dependency>
    <!--fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.49</version>
    </dependency>
</dependencies>

1.2 application.yml配置類

spring:
  rabbitmq:
    ####連接地址
    host: 127.0.0.1
    ####端口號
    port: 5672
    ####賬號
    username: guest
    ####密碼
    password: guest
    ### 地址
    virtual-host: /

1.3 交換機綁定隊列

@Component
public class FanoutConfig {

    /**
     * 郵件隊列
     */

    private String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue";

    /**
     * 短信隊列
     */

    private String FANOUT_SMS_QUEUE = "fanout_sms_queue";

    /**
     * 交換機名稱
     */

    private String EXCHANGE_NAME = "fanoutExchange";

    /**
     * 1.定義郵件隊列
     * @return
     */

    @Bean
    public Queue fanOutEamilQueue() {
        return new Queue(FANOUT_EMAIL_QUEUE);
    }

    /**
     * 1.定義短信隊列
     * @return
     */

    @Bean
    public Queue fanOutSmsQueue() {
        return new Queue(FANOUT_SMS_QUEUE);
    }

    /**
     * 2.定義交換機
     * @return
     */

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_NAME);
    }

    /**
     * 3.隊列與交換機綁定郵件隊列
     * @param fanOutEamilQueue
     * @param fanoutExchange
     * @return
     */

    @Bean
    Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
    }

    /**
     * 4.隊列與交換機綁定短信隊列
     * @param fanOutSmsQueue
     * @param fanoutExchange
     * @return
     */

    @Bean
    Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
    }
}

FanoutConfig 中執行如下幾步任務:

  1. 定義短信隊列
  2. 定義交換機
  3. 隊列與交換機綁定郵件|短信隊列

1.4 生產者投遞消息

@Component
public class FanoutProducer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 發送消息
     * @param queueName 隊列名稱
     */

    public void send(String queueName) {
        String msg = "my_fanout_msg:" + new Date();
        System.out.println(msg + ":" + msg);
        amqpTemplate.convertAndSend(queueName, msg);
    }
}

1.5 控制層調用代碼

@RestController
public class ProducerController {

    @Autowired
    private FanoutProducer fanoutProducer;

    @RequestMapping("/sendFanout")
    public String sendFanout(String queueName) {
        fanoutProducer.send(queueName);
        return "success";
    }
}

2. 消費者

2.1 maven依賴

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.RELEASE</version>
</parent>

<dependencies>

    <!-- springboot-web組件 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 添加springboot對amqp的支持 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-mail</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
    </dependency>
    <!--fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.49</version>
    </dependency>

</dependencies>

2.2 application.yml配置

spring:
  rabbitmq:
  ####連接地址
    host: 127.0.0.1
   ####端口號   
    port: 5672
   ####賬號 
    username: guest
   ####密碼  
    password: guest
   ### 地址
    virtual-host: /

server:
  port: 8081

2.3 郵件消費者

@Component
@RabbitListener(queues = "fanout_eamil_queue")
public class FanoutEamilConsumer {

    @RabbitHandler
    public void process(String msg) throws Exception {
        System.out.println("郵件消費者獲取生產者消息msg:" + msg);
    }
}

2.4 短信消費者

@Component
@RabbitListener(queues = "fanout_sms_queue")
public class FanoutSmsConsumer {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("短信消費者獲取生產者消息msg:" + msg);
    }
}

3. 運行測試

訪問時才會創建交換機以及隊列,並非項目啟動就會創建

生產者訪問鏈接(地址+發送的隊列名稱):
http://localhost:8080/sendFanout?queueName=fanout_sms_queue

消費者啟動:

案例代碼: https://www.lanzous.com/i5zkf7c

我創建了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小伙伴可以關注一下微信公眾號哈:niceyoo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM