spring-cloud-stream-rabbit的一個topic對應多組消費者實例


一、概述

   “Spring Cloud Stream is a framework for building message-driven microservice applications.”這是來自官方文檔對spring cloud sream的介紹,大致可以理解為Spring Cloud Stream 是一個構建消息驅動微服務的框架,該項目用於代理消息隊列的集成過程。避免業務與具體的mq產品有深刻的綁定關系,易於后期的服務切換。

二、課題

  如何通過spring-cloud-starter-stream-rabbit實現一條消息,可以被多個群組同時消費?

三、過程

1、創建消息生產者項目

  •  pom引用
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>
  • spring-cloud-stream的生產隊列配置
spring:
  rabbitmq:
    addresses: 192.168.0.114:5672
    username: simm
    password: test
  cloud:
    stream:
      default-binder: rabbit
      bindings:
        # 生產-消費者 指向同一個topic
        bizunit-install-producter:
          destination: yysc-bizunit-install
#          group: bizunit-queue
          consumer:
            concurrency: 5
  • yysc-bizunit-install消息的生產,即rabbit的topic-exchange
 1 /**
 2  * 生產者通道
 3  * @author miscr
 4  */
 5 public interface InstallCallbackOutputChannel {
 6     /**
 7      * 定義通道的名稱
 8      */
 9     String OUTPUT = "bizunit-install-producter";
10 
11     /**
12      * 定義為輸入通道
13      * @return
14      */
15     @Output(OUTPUT)
16     MessageChannel output();
17 }
18 
19 
20 /**
21  * 生產消息
22  * @author miscr
23  */
24 @EnableBinding(InstallCallbackOutputChannel.class)
25 public class InstallCallbackSender {
26     @Bean
27     @InboundChannelAdapter(value = InstallCallbackOutputChannel.OUTPUT,poller = @Poller(fixedDelay = "2000"))
28     public MessageSource<Date> timerMessagaSource(){
29         return ()->new GenericMessage<>(new Date());
30     }
31 }

2、創建兩個消費者群組,群組分別命名為 main 和 template,各設置5個消費線程

  •  application.yml中配置消費者相關信息
spring:
  rabbitmq:
    addresses: 192.168.0.114:5672
    username: simm
    password: test
  cloud:
    stream:
      default-binder: rabbit
      bindings:
        install-consumer:
          destination: yysc-bizunit-install
          group: template
          consumer:
            concurrency: 5
        bizunit-install-consumer:
          destination: yysc-bizunit-install
          group: main
          consumer:
            concurrency: 5
  • 消費者 install-consumer 的綁定源碼示例
/**
 * 安裝狀態回調接收通道
 * @author miscr
 */
public interface InstallCallbackInputChannel {
    /**
     * 定義通道的名稱
     */
    String INPUT = "install-consumer";

    /**
     * 定義為輸入通道
     * @return
     */
    @Input(INPUT)
    SubscribableChannel input();
}

/**
 * 消費者服務
 *
 * @author miscr
 */
@EnableBinding(InstallCallbackInputChannel.class)
public class InstallCallbackReceiver {
    /**
     * 消息監聽
     *
     * @param message
     */
    @StreamListener(InstallCallbackInputChannel.INPUT)
    private void receiver(Object message) {
        System.out.println("template" + message.toString());
    }
}
  • 消費者 bizunit-install-consumer 的綁定源碼示例
/**
 * 安裝狀態回調接收通道
 * @author miscr
 */
public interface BizUnitInstallCallbackInputChannel {
    /**
     * 定義通道的名稱
     */
    String INPUT = "bizunit-install-consumer";

    /**
     * 定義為輸入通道
     * @return
     */
    @Input(INPUT)
    SubscribableChannel input();
}

/**
 * 消費者服務
 *
 * @author miscr
 */
@EnableBinding(BizUnitInstallCallbackInputChannel.class)
public class BizUnitInstallCallbackReceiver {
    /**
     * 消息監聽
     *
     * @param message
     */
    @StreamListener(BizUnitInstallCallbackInputChannel.INPUT)
    private void receiver(Object message) {
        System.out.println("bizunit-install" + message.toString());
    }
}

四、啟動后監看mq的隊列生產與消費情況

  • 生產者項目與消費者項目啟動后,查看mq的控制台,結果如下

 

 

  • 查看java控制台的消費日志,兩個群組確實都在消費同一條消息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM