一、概述
“Spring Cloud Stream is a framework for building message-driven microservice applications.”這是來自官方文檔對spring cloud sream的介紹,大致可以理解為Spring Cloud Stream 是一個構建消息驅動微服務的框架,該項目用於代理消息隊列的集成過程。避免業務與具體的mq產品有深刻的綁定關系,易於后期的服務切換。
二、課題
如何通過spring-cloud-starter-stream-rabbit實現一條消息,可以被多個群組同時消費?
三、過程
1、創建消息生產者項目
- pom引用
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies>
- spring-cloud-stream的生產隊列配置
spring: rabbitmq: addresses: 192.168.0.114:5672 username: simm password: test cloud: stream: default-binder: rabbit bindings: # 生產-消費者 指向同一個topic bizunit-install-producter: destination: yysc-bizunit-install # group: bizunit-queue consumer: concurrency: 5
- yysc-bizunit-install消息的生產,即rabbit的topic-exchange
1 /** 2 * 生產者通道 3 * @author miscr 4 */ 5 public interface InstallCallbackOutputChannel { 6 /** 7 * 定義通道的名稱 8 */ 9 String OUTPUT = "bizunit-install-producter"; 10 11 /** 12 * 定義為輸入通道 13 * @return 14 */ 15 @Output(OUTPUT) 16 MessageChannel output(); 17 } 18 19 20 /** 21 * 生產消息 22 * @author miscr 23 */ 24 @EnableBinding(InstallCallbackOutputChannel.class) 25 public class InstallCallbackSender { 26 @Bean 27 @InboundChannelAdapter(value = InstallCallbackOutputChannel.OUTPUT,poller = @Poller(fixedDelay = "2000")) 28 public MessageSource<Date> timerMessagaSource(){ 29 return ()->new GenericMessage<>(new Date()); 30 } 31 }
2、創建兩個消費者群組,群組分別命名為 main 和 template,各設置5個消費線程
- application.yml中配置消費者相關信息
spring: rabbitmq: addresses: 192.168.0.114:5672 username: simm password: test cloud: stream: default-binder: rabbit bindings: install-consumer: destination: yysc-bizunit-install group: template consumer: concurrency: 5 bizunit-install-consumer: destination: yysc-bizunit-install group: main consumer: concurrency: 5
- 消費者 install-consumer 的綁定源碼示例
/** * 安裝狀態回調接收通道 * @author miscr */ public interface InstallCallbackInputChannel { /** * 定義通道的名稱 */ String INPUT = "install-consumer"; /** * 定義為輸入通道 * @return */ @Input(INPUT) SubscribableChannel input(); } /** * 消費者服務 * * @author miscr */ @EnableBinding(InstallCallbackInputChannel.class) public class InstallCallbackReceiver { /** * 消息監聽 * * @param message */ @StreamListener(InstallCallbackInputChannel.INPUT) private void receiver(Object message) { System.out.println("template" + message.toString()); } }
- 消費者 bizunit-install-consumer 的綁定源碼示例
/** * 安裝狀態回調接收通道 * @author miscr */ public interface BizUnitInstallCallbackInputChannel { /** * 定義通道的名稱 */ String INPUT = "bizunit-install-consumer"; /** * 定義為輸入通道 * @return */ @Input(INPUT) SubscribableChannel input(); } /** * 消費者服務 * * @author miscr */ @EnableBinding(BizUnitInstallCallbackInputChannel.class) public class BizUnitInstallCallbackReceiver { /** * 消息監聽 * * @param message */ @StreamListener(BizUnitInstallCallbackInputChannel.INPUT) private void receiver(Object message) { System.out.println("bizunit-install" + message.toString()); } }
四、啟動后監看mq的隊列生產與消費情況
- 生產者項目與消費者項目啟動后,查看mq的控制台,結果如下
- 查看java控制台的消費日志,兩個群組確實都在消費同一條消息