Stream是什么及Binder介紹
什么是Spring Cloud Stream?
官方定義Spring Cloud Stream是一個構建消息驅動微服務的框架。
應用程序通過inputs或者 outputs 來與Spring Cloud Stream中binder對象交互。
通過我們配置來binding(綁定),而Spring Cloud Stream 的binder對象負責與消息中間件交互。所以,我們只需要搞清楚如何與Spring Cloud Stream交互就可以方便使用消息驅動的方式。
通過使用Spring Integration來連接消息代理中間件以實現消息事件驅動。
Spring Cloud Stream為一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用了發布-訂閱、消費組、分區的三個核心概念。
目前僅支持RabbitMQ、 Kafka。
總結:其實總體來說就是類似於JDBC的規范,通過這個Stream驅動組件去訪問消息中間件,從而達到與中間件的分離
Stream的設計思想
標准MQ
- 生產者/消費者之間靠消息媒介傳遞信息內容
- 消息必須走特定的通道 - 消息通道 Message Channel
- 消息通道里的消息如何被消費呢,誰負責收發處理 - 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息處理器所訂閱。
為什么用Cloud Stream?
比方說我們用到了RabbitMQ和Kafka,由於這兩個消息中間件的架構上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分區。
這些中間件的差異性導致我們實際項目開發給我們造成了一定的困擾,我們如果用了兩個消息隊列的其中一種,后面的業務需求,我想往另外一種消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這時候Spring Cloud Stream給我們提供了—種解耦合的方式。
Stream憑什么可以統一底層差異?
在沒有綁定器這個概念的情況下,我們的SpringBoot應用要直接與消息中間件進行信息交互的時候,由於各消息中間件構建的初衷不同,它們的實現細節上會有較大的差異性通過定義綁定器作為中間層,完美地實現了應用程序與消息中間件細節之間的隔離。通過向應用程序暴露統一的Channel通道,使得應用程序不需要再考慮各種不同的消息中間件實現。
通過定義綁定器Binder作為中間層,實現了應用程序與消息中間件細節之間的隔離。
Binder:
- INPUT對應於消費者
- OUTPUT對應於生產者
Stream中的消息通信方式遵循了發布-訂閱模式
Topic主題進行廣播
- 在RabbitMQ就是Exchange
- 在Kakfa中就是Topic
Stream編碼常用注解簡介
Spring Cloud Stream標准流程套路
- Binder - 很方便的連接中間件,屏蔽差異。
- Channel - 通道,是隊列Queue的一種抽象,在消息通訊系統中就是實現存儲和轉發的媒介,通過Channel對隊列進行配置。
- Source和Sink - 簡單的可理解為參照對象是Spring Cloud Stream自身,從Stream發布消息就是輸出,接受消息就是輸入。
編碼API和常用注解
組成 |
說明 |
Middleware |
中間件,目前只支持RabbitMQ和Kafka |
Binder |
Binder是應用與消息中間件之間的封裝,目前實行了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現 |
@Input |
注解標識輸入通道,通過該輸乎通道接收到的消息進入應用程序 |
@Output |
注解標識輸出通道,發布的消息將通過該通道離開應用程序 |
@StreamListener |
監聽隊列,用於消費者的隊列的消息接收 |
@EnableBinding |
指信道channel和exchange綁定在一起 |
案例說明
准備RabbitMQ環境
工程中新建三個子模塊
- cloud-stream-rabbitmq-provider8801,作為生產者進行發消息模塊
- cloud-stream-rabbitmq-consumer8802,作為消息接收模塊
- cloud-stream-rabbitmq-consumer8803,作為消息接收模塊
Stream消息驅動之生產者
新建8801工程
新建Module:cloud-stream-rabbitmq-provider8801
修改POM.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>com.dance.springcloud</artifactId> <groupId>com.dance</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-provider8801</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!--基礎配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
新建yml配置
(application.yml)
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務信息; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 消息組件類型 environment: # 設置rabbitmq的相關的環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 output: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設置消息類型,本次為json,文本則設置“text/plain” binder: defaultRabbit # 設置要綁定的消息服務的具體設置 eureka: client: # 客戶端進行Eureka注冊的配置 service-url: defaultZone: http://eureka7001.com:7001/eureka, http://eureka7002.com:7002/eureka instance: lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒) lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒) instance-id: send-8801.com # 在信息列表時顯示主機名稱 prefer-ip-address: true # 訪問的路徑變為IP地址
新建主啟動類
package com.dance.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); } }
新建業務類
新建發送消息接口
package com.dance.springcloud.service; public interface IMessageProvider { public String send(); }
新建實現類
package com.dance.springcloud.service.impl; import com.dance.springcloud.service.IMessageProvider; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import javax.annotation.Resource; import java.util.UUID; /** * 定義消息的推送管道 */ @EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider { /** * 消息發送管道 */ @Resource private MessageChannel output; @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("*****serial: " + serial); return null; } }
新建Controller
package com.dance.springcloud.controller; import com.dance.springcloud.service.IMessageProvider; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class SendMessageController { @Resource private IMessageProvider messageProvider; @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProvider.send(); } }
測試
- 啟動 Eureka集群
- 啟動 rabbitmq
- 啟動 8801
- 訪問 - http://localhost:8801/sendMessage
- 后台將打印serial: UUID字符串
Stream消息驅動之消費者
新建8802工程
新建Module:cloud-stream-rabbitmq-consumer8802
修改POM.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>com.dance.springcloud</artifactId> <groupId>com.dance</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--基礎配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
新建yml配置
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務信息; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 消息組件類型 environment: # 設置rabbitmq的相關的環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 input: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain” binder: defaultRabbit # 設置要綁定的消息服務的具體設置 eureka: client: # 客戶端進行Eureka注冊的配置 service-url: defaultZone: http://eureka7001.com:7001/eureka, http://eureka7002.com:7002/eureka instance: lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒) lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒) instance-id: receive-8802.com # 在信息列表時顯示主機名稱 prefer-ip-address: true # 訪問的路徑變為IP地址
新建主啟動類
package com.dance.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamMQMain8802 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class, args); } }
新建業務類Controller
package com.dance.springcloud.controller; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消費者1號,----->接受到的消息: " + message.getPayload() + "\t port: " + serverPort); } }
測試
- 啟動Eureka集群
- 啟動StreamMQMain8801
- 啟動StreamMQMain8802
- 8801發送8802接收消息
Stream之消息重復消費
參考8802,創建8803
依照8802,克隆出來一份運行8803名稱為:cloud-stream-rabbitmq-consumer8803。
測試
啟動
- RabbitMQ
- 服務注冊 - Eureka集群
- 消息生產 - 8801
- 消息消費 - 8802
- 消息消費 - 8802
運行后有兩個問題
- 有重復消費問題
- 提供者
- 消費者
- 消息持久化問題
消費
- http://localhost:8801/sendMessage
- 目前是8802/8803同時都收到了,存在重復消費問題
- 如何解決:分組和持久化屬性group(重要)
生產實際案例
比如在如下場景中,訂單系統我們做集群部署,都會從RabbitMQ中獲取訂單信息,那如果一個訂單同時被兩個服務獲取到,那么就會造成數據錯誤,我們得避免這種情況。這時我們就可以使用Stream中的消息分組來解決。
注意在Stream中處於同一個group中的多個消費者是競爭關系,就能夠保證消息只會被其中一個應用消費一次。不同組是可以全面消費的(重復消費)。
Stream之group解決消息重復消費
原理
微服務應用放置於同一個group中,就能夠保證消息只會被其中一個應用消費一次。
不同的組是可以重復消費的,同一個組內會發生競爭關系,只有其中一個可以消費。
8802/8803都變成不同組,group兩個不同
group: A_Group、B_Group
8802修改YML
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務信息; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 消息組件類型 environment: # 設置rabbitmq的相關的環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 input: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain” binder: defaultRabbit # 設置要綁定的消息服務的具體設置 group: A_Group # <--------------------分組設置 eureka: client: # 客戶端進行Eureka注冊的配置 service-url: defaultZone: http://eureka7001.com:7001/eureka, http://eureka7002.com:7002/eureka instance: lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒) lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒) instance-id: receive-8802.com # 在信息列表時顯示主機名稱 prefer-ip-address: true # 訪問的路徑變為IP地址
8803修改YML
server: port: 8803 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務信息; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 消息組件類型 environment: # 設置rabbitmq的相關的環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 input: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain” binder: defaultRabbit # 設置要綁定的消息服務的具體設置 group: B_Group # <--------------------分組設置 eureka: client: # 客戶端進行Eureka注冊的配置 service-url: defaultZone: http://eureka7001.com:7001/eureka, http://eureka7002.com:7002/eureka instance: lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒) lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒) instance-id: receive-8803.com # 在信息列表時顯示主機名稱 prefer-ip-address: true # 訪問的路徑變為IP地址
測試
結論:還是重復消費,應為設置的是不同組,所以不存在競爭關系
修改8803為A_Group
8802/8803都變成相同組,group兩個相同
group: A_Group
8802修改YMLgroup: A_Group
8803修改YMLgroup: A_Group
測試
發送8次消息
查看結果
消費者1
消費者2
結論:同一個組的多個微服務實例,每次只會有一個拿到
8802/8803實現了輪詢分組,每次只有一個消費者,8801模塊的發的消息只能被8802或8803其中一個接收到,這樣避免了重復消費。
Stream之消息持久化
添加分組后自動支持持久化
測試
- 啟動Eureka集群
- 啟動8801 發送4條消息
- 刪除8802的分組配置后啟動
- 可以發現 在啟動過程中 完全沒有 消費之前發送的四條消息,所以會導致消息丟失
- 8803沒有刪除分組,直接啟動
- 可以看到 消息被消費了,沒有丟失 ID不一樣是應為我之前的8803是啟動者的,后來停止了 8801重新發送完成消息后,啟動的8803
作者:彼岸舞
時間:2021\09\05
內容關於:Spring Cloud H版
本文屬於作者原創,未經允許,禁止轉發