官網:https://spring.io/projects/spring-cloud-stream
1.簡介
1.什么是Springcloud Stream
Springcloud Stream 是一個構建消息驅動微服務的框架。說白了就是操作MQ的,可以屏蔽底層的MQ類型。
應用程序通過inputs 或者 outputs 與SpringcloudStream中binder對象交互。所以我們通過配置來綁定(binding),而與SpringcloudStream 的binder對象負責與消息中間件交互。所以我們只需要了解如何與Stream的binder交互就可以了,屏蔽了與底層MQ交互。
Springcloud Stream為一些MQ提供了個性化的自動化配置實現,引用了發布-訂閱、消費組、分區的三個核心概念。
2.為什么引入Stream
屏蔽底層消息中間件的差異,降低切換成本,統一消息的編程模型。類似於Hibernate的作用一樣, 我們更多的注重業務開發。Hibernate屏蔽了數據庫的差異,可以很好的實現數據庫的切換。Stream屏蔽了底層MQ的區別,可以很好的實現切換。目前主流的有ActiveMQ、RocketMQ、RabbitMQ、Kafka,Stream支持的有RabbitMQ和Kafka。
3.設計思想
1. 標准的MQ
(1)生產者/消費者通過消息媒介傳遞消息內容
(2)消息必須走特定的通道Channel
2.引入Stream
通過定義綁定器Binder作為中間層,實現了應用程序與消息中間件細節的隔離。
組成 | 說明 |
---|---|
Middleware | 中間件,目前只支持RabbitMQ和Kafka |
Binder | Binder是應用與消息中間件之間的封裝,目前實行了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現 |
@Input | 注解標識輸入通道,通過該輸入通道接收到的消息進入應用程序 |
@Output | 注解標識輸出通道,發布的消息將通過該通道離開應用程序 |
@StreamListener | 監聽隊列,用於消費者的隊列的消息接收 |
@EnableBinding | 指信道channel和exchange綁定在一起 |
3.Stream 的消息通信模式遵循了發布-訂閱模式,也就是Topic模式。在RabbitMQ中是Exchange交換機,在Kafka是Topic。
4. 術語
(1)Binder 綁定器,通過Binder可以很方便的連接中間件,屏蔽差異
(2)Channel: 通道,是Queue的一種抽象,主要實現存儲和轉發的媒介,通過Channel對隊列進行配置
(3)Source和Sink 簡單的理解為參照對象是Spring Cloud Stream自身,從Stream發布消息就是輸出,接收消息就是輸入。
5.過程可以理解為下圖
2.使用
1.RabbitMQ環境安裝
參考: docker安裝rabbitMQ
2.建立生產者項目
1.新建模塊 cloud-stream-rabbitmq-provider8801
2.修改pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud</artifactId> <groupId>cn.qz.cloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-provider8801</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!--基礎配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
3.新建application.yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務信息;
defaultRabbit: # 表示定義的名稱,用於於binding整合
type: rabbit # 消息組件類型
environment: # 設置rabbitmq的相關的環境配置
spring:
rabbitmq:
host: 192.168.99.100
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設置消息類型,文本則設置“text/plain”
binder: defaultRabbit # 設置要綁定的消息服務的具體設置
eureka:
client: # 客戶端進行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)
instance-id: send-8801.com # 在信息列表時顯示主機名稱
prefer-ip-address: true # 訪問的路徑變為IP地址
4.主啟動類:
package cn.qz.cloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @Author: qlq * @Description * @Date: 21:39 2020/11/9 */ @SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); } }
5.業務類:
(1)Service
接口:
package cn.qz.cloud.service; /** * @Author: qlq * @Description * @Date: 21:41 2020/11/9 */ public interface IMessageProvider { String send(); }
實現類:
package cn.qz.cloud.service.impl; import cn.qz.cloud.service.IMessageProvider; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import javax.annotation.Resource; import java.util.UUID; /** * @Author: qlq * @Description * @Date: 21:41 2020/11/9 */ @EnableBinding(Source.class) //定義消息的推送管道 public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; // 消息發送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); return serial; } }
(2)Controller
package cn.qz.cloud.controller; import cn.qz.cloud.service.IMessageProvider; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * @Author: qlq * @Description * @Date: 21:46 2020/11/9 */ @RestController public class MessageController { @Resource private IMessageProvider messageProvider; @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProvider.send(); } }
6.測試:
(1)啟動后可以到RabbitMQ查看有一個交換機
(2)測試發消息
$ curl http://localhost:8801/sendMessage % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 36 100 36 0 0 765 0 --:--:-- --:--:-- --:--:-- 2250f68f21d0-7fc7-4eaf-8f43-6e672617d0df
發出消息后到MQ看不到消息,因為Exchange本身沒有存儲消息的功能。此時還沒有隊列。但是可以通過MQ的Message rates波峰查看:
3.建立消費者一
1.新建模塊 cloud-stream-rabbitmq-consumer8802
2.修改pom
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--基礎配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
3.新建application.yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務信息;
defaultRabbit: # 表示定義的名稱,用於於binding整合
type: rabbit # 消息組件類型
environment: # 設置rabbitmq的相關的環境配置
spring:
rabbitmq:
host: 192.16899.100
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設置消息類型,如果是文本則設置“text/plain”
binder: defaultRabbit # 設置要綁定的消息服務的具體設置
eureka:
client: # 客戶端進行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)
instance-id: receive-8802.com # 在信息列表時顯示主機名稱
prefer-ip-address: true # 訪問的路徑變為IP地址
4.主啟動類:
package cn.qz.cloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @Author: qlq * @Description * @Date: 22:01 2020/11/9 */ @SpringBootApplication public class StreamMQMain8802 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class, args); } }
5.業務類:
package cn.qz.cloud.listener; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; /** * @Author: qlq * @Description * @Date: 22:03 2020/11/9 */ @Component @EnableBinding(Sink.class) //定義消息的接收管道 public class MQMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println(message.getPayload() + "\t port: " + serverPort); } }
4.建立消費者二
cloud-stream-rabbitmq-consumer8803 模塊,和上面項目一模一樣。只是端口不同,模擬集群部署一個項目。
5.測試
1.啟動兩個消費者 8802、8803
2.啟動一個生產者 8801
3.RabbitMQ查看
(1)交換機
(2)隊列:會生成兩個隨機隊列
查看其中一個隊列:(查看RoutingKey 為#, 也就是匹配任何隊列,類似於fanout廣播類型。也就是接受該交換機的任何消息。並且該隊列自動刪除[auto-delete=true],沒有消費者隊列會自動刪除)
4.生產者發送消息:
$ curl http://localhost:8801/sendMessage % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 36 100 36 0 0 1161 0 --:--:-- --:--:-- --:--:-- 3600095847545-72ca-41a2-9c24-6940455151c8
5.查看兩個消費者:
6.這種模式實際有個重復消費的情況。也就是某個應用多實例部署,每個實例都會收到消息。從MQ的本質Queue不可重復消費,但是多個實例監聽的是不同的queue,所以出現重復消費。解決辦法就是讓多個實例監聽同一個queue,解決辦法,兩個消費者端都加上group屬性監聽相同的隊列。
server:
port: 8803
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務信息;
defaultRabbit: # 表示定義的名稱,用於於binding整合
type: rabbit # 消息組件類型
environment: # 設置rabbitmq的相關的環境配置
spring:
rabbitmq:
host: 192.168.99.100
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設置消息類型,如果是文本則設置“text/plain”
binder: defaultRabbit # 設置要綁定的消息服務的具體設置
group: testGroup
eureka:
client: # 客戶端進行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)
instance-id: receive-8803.com # 在信息列表時顯示主機名稱
prefer-ip-address: true # 訪問的路徑變為IP地址
啟動后查看RabbitMQ:
(1)兩個消費者只有一個queue,實際上上面的group指定了一個queue
(2)可以看到有兩個消費者,並且由一個重要的特性就是該隊列不會自動刪除,也就是沒有消費者,隊列仍然會保存消息(間接的保證了消息的持久化)。這個很好理解,設置了group屬性的隊列,不會自動刪除,也就是即使Consumer斷開連接,隊列仍然存在,Exchange本身不具備存儲的能力,只負責轉發,所以在隊列存在的情況下隊列可以保存消息;當消費者上線后會自動消費隊列中的消息。
(3)測試:這種相同group的消費同一個queue的時候是輪詢的方式,每個實例一條輪着消費。