1.解決的痛點
那么有沒有一種新技術,能讓我們不在關注具體的MQ細節,我們只需要用一種適配綁定的方式,自動的給我們在MQ內進行切換。這個時候就是Springcloud Stream要大顯身手的時候。
2.概述
一句話:屏蔽底層消息中間件的差異,降低切換成本,同一消息的編程模型。
官方定義:springcloud Stream 是一個構建消息驅動的微服務框架。
通過我們配置binding(綁定),而springcloud Stream 的binder對象負責與消息中間件交互。
所以,我們只需要搞清楚如何與springcloud stream交互就可以方便的使用消息驅動方式。
通過spring integration 來連接消息代理中間件,以實現消息消息事件驅動。
springcloud stream為一些供應商的消息中間件產品提供了個性化自動配置實現,引用了發布-訂閱、消費組、分區三個核心概念,目前僅支持RabbitMQ、 Kafka
https://spring.io/projects/spring-cloud-stream
4.stream憑什么可以統一差異?
通過定義綁定器作為中間層,完美的實現了應用程序與消息中間件細節之間的隔離。
通過向應用程序暴露統一的Channel通道,使得應用程序不需要再考慮各種不同的消息中間件實現。
通過定義綁定器Binder作為中間層,實現了應用程序與消息中間件細節之間的隔離。
input對應消費者;
output對應生產者;
6.Stream消息通信方式是什么?
它遵循了發布-訂閱模式,通過Topic主題進行廣播推送。
7.Stream遵循的標准流程
1.binder 很方便的連接中間件,屏蔽差異。
2.Channel 通道,是隊列QUEUE的一種抽象,在消息通訊系統中就是實現存儲和轉發的媒介,通過channel隊列進行配置。
3.source和sink 簡單的可以理解為參照對象是springcloud stream自身,從stream發布消息就是輸出,接受消息就是輸入。
8.API和常用的注解
組成 | 說明 |
---|---|
Middleware | 中間件,目前只支持RabbitMQ和Kafka |
Binder | binder是應用於消息中間件之間的封裝,目前實行了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現。 |
@Input | 注解標識輸入通道,通過該輸入通道接收到的消息進入應用程序 |
@Output | 注解標識輸出通道,發布的消息將通過該通道離開應用程序 |
@StreamListener | 監聽隊列,用於消費者的隊列的消息接收。 |
@EnableBinding |
9.代碼
第一步:創建eureka服務模塊cloud-eureka-server7001、cloud-eureka-server7002
1.POM文件:
<dependencies> <!--eureka-server--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--引入熱部署--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2.創建application.yml
server: port: 7001 # eureka配置 eureka: instance: hostname: eureka7001.com #eureka服務端的實例名稱 client: # false表示不想注冊中心注冊自己 register-with-eureka: false # false表示自己端就是注冊中心,我的職責就是維護服務實例,並不需要檢索服務 fetch-registry: false service-url: # 設置Eureka Server交互的地址查詢服務和注冊服務都需要依賴這個地址 #defaultZone: http://eureka7002.com:7002/eureka/ # 單機就是7001自己守望自己 defaultZone: http://eureka7001.com:7001/eureka/ server: enable-self-preservation: false # 禁用自我保護模式 eviction-interval-timer-in-ms: 2000
3.創建主入口函數
package com.seegot.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer; /** * @program: cloud2020 * @description: * @author: PP Zhang * @create: 2020-06-09 22:15 */ @SpringBootApplication @EnableEurekaServer // 表示自己就是注冊中心 public class EurekaMain7001 { public static void main(String[] args) { SpringApplication.run(EurekaMain7001.class,args); } }
7002和上面配置基本一致,只是端口號不一致,直接clone就行。
第二步:新建消息生產者模塊cloud-stream-rabbitmq-provider8801
1.POM
<dependencies> <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> <version>3.0.6.RELEASE</version> </dependency> <!--springcloud stream--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- 客戶端 config --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency> <!--注入eureka client 依賴--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--引入熱部署--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2.創建application.yml
1 server: 2 port: 8801 3 spring: 4 application: 5 name: cloud-stream-provider 6 cloud: 7 stream: 8 binders: # 在此處配置需要綁定的rabbitmq的服務消息 9 defaultRabbit: # 表示定義的名稱,用於binding整合 10 type: rabbit # 消息組件類型 11 environment: # 設置rabbitmq的相關環境配置 12 spring: 13 rabbitmq: 14 host: localhost 15 port: 5672 16 username: guest 17 password: guest 18 bindings: # 服務的整合處理 19 output: # 這個名字是一個通道的名稱 20 destination: studyExchange # 標識要使用的Exchange名稱定義 21 content-type: application/json #設置消息類型,本次為json,本文則設置為“text/plain” 22 binder: defaultRabbit # 設置要綁定的消息服務的具體設置 23 eureka: 24 client: 25 service-url: 26 defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka # 集群版 27 instance: 28 lease-renewal-interval-in-seconds: 2 # 設置心跳時間間隔,默認是30秒 29 lease-expiration-duration-in-seconds: 5 # 如果超過了5秒間隔,默認是90秒 30 instance-id: send-8801.com # 在信息列表時顯示主機名稱 31 prefer-ip-address: true #訪問路徑變為IP地址
3.創建主入口函數
package com.seegot.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; /** * @program: cloud2020 * @description: * @author: PP Zhang * @create: 2020-06-30 10:45 */ @SpringBootApplication @EnableEurekaClient public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class,args); } }
4.創建服務類 IMessageProvider
package com.seegot.springcloud.service; /** * @program: cloud2020 * @description: * @author: PP Zhang * @create: 2020-06-30 10:47 */ public interface IMessageProvider { public String send(); }
創建服務實現類
package com.seegot.springcloud.service.impl; import com.seegot.springcloud.service.IMessageProvider; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import javax.annotation.Resource; import java.util.UUID; /** * @program: cloud2020 * @description: * @author: PP Zhang * @create: 2020-06-30 10:48 */ @EnableBinding(Source.class) // 定義消息的推送管道。類似output public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; // 屬於消息發送管道。 // 綁定消息 @Override public String send() { String uuid = UUID.randomUUID().toString();// 發送的內容 output.send(MessageBuilder.withPayload(uuid).build()); System.out.println("@@@@@uuid: "+ uuid); return null; } }
5.創建業務類
1 package com.seegot.springcloud.controller; 2 3 import com.seegot.springcloud.service.IMessageProvider; 4 import lombok.extern.slf4j.Slf4j; 5 import org.springframework.web.bind.annotation.GetMapping; 6 import org.springframework.web.bind.annotation.RestController; 7 8 import javax.annotation.Resource; 9 10 /** 11 * @program: cloud2020 12 * @description: 13 * @author: PP Zhang 14 * @create: 2020-06-30 10:56 15 */ 16 @RestController 17 @Slf4j 18 public class SendMessageController { 19 @Resource 20 private IMessageProvider messageProvider; 21 @GetMapping(value = "/sendMessage") 22 public String sendMessage(){ 23 return messageProvider.send(); 24 } 25 }
6.測試。http://localhost:8801/sendMessage
2020-06-30 13:22:54.601 INFO 2980 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration @@@@@uuid: baef8eab-67c7-40ce-b6ad-f290ad5ffeba @@@@@uuid: c457a864-45a4-4af1-99f8-ed0cf430429c @@@@@uuid: 803cfd67-ab44-4d63-98c0-b08880142316 @@@@@uuid: 818e3d27-6853-4878-9d24-6c73f7349480 @@@@@uuid: 45a28a16-d074-43ce-afd9-74c74d79259f @@@@@uuid: bcaad91e-fa08-4f5f-98d0-2232843f052b @@@@@uuid: 89b6b301-ee96-41e8-a99a-5f2c0e579cfa @@@@@uuid: 55d8c5aa-2093-44f5-ac8c-58392ec45249 @@@@@uuid: 8d79cfe9-e0fa-4880-aad4-bc1342919ec7 @@@@@uuid: 73202336-05b0-4682-bcfc-22627a3141c3 @@@@@uuid: 40206a09-5e06-4f02-95bb-68f7c78e141e @@@@@uuid: c768c1d1-961c-4a3c-8250-cba4052b046f
第三步:新建消費者模塊cloud-stream-rabbitmq-consumer8802、cloud-stream-rabbitmq-consumer8803
1.POM
<dependencies> <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> <version>3.0.6.RELEASE</version> </dependency> <!--springcloud stream--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- 客戶端 config --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency> <!--注入eureka client 依賴--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--引入熱部署--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2.新建application.yml
在分布式環境下,一條消息只希望被消費一次,避免重復消費,此時需要注意對group的使用,要將消費者統一在同一個組group內,也就是group名稱一致。另外,group還可以實現消息的持久化。可以通過停用消息消費者,然后通過消息生產者發送多條消息,然后重新啟動消息消費者,會看到消息消費者接收到了相應的消息,這就是持久化;反之如果刪除group,那么重新停用消息消費者,然后在通過消息生產者發送消息,再次啟動消息消費者,此時就接收不到相應的消息。
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此處配置需要綁定的rabbitmq的服務消息 defaultRabbit: # 表示定義的名稱,用於binding整合 type: rabbit # 消息組件類型 environment: # 設置rabbitmq的相關環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 input: # 這個名字是一個通道的名稱 destination: studyExchange # 標識要使用的Exchange名稱定義 content-type: application/json #設置消息類型,本次為json,本文則設置為“text/plain” binder: defaultRabbit # 設置要綁定的消息服務的具體設置 group: seegotB #分組並且還可以實現消息持久化。 eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka # 集群版 instance: lease-renewal-interval-in-seconds: 2 # 設置心跳時間間隔,默認是30秒 lease-expiration-duration-in-seconds: 5 # 如果超過了5秒間隔,默認是90秒 instance-id: receive-8802.com # 在信息列表時顯示主機名稱 prefer-ip-address: true #訪問路徑變為IP地址
3.新建主入口函數
package com.seegot.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; /** * @program: cloud2020 * @description: * @author: PP Zhang * @create: 2020-06-30 11:19 */ @SpringBootApplication @EnableEurekaClient public class StreamMQMain8802 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class,args); } }
4.新建業務類
package com.seegot.springcloud.controller; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.web.bind.annotation.RestController; /** * @program: cloud2020 * @description: * @author: PP Zhang * @create: 2020-06-30 11:21 */ @RestController @EnableBinding(Sink.class)// 定義消息的接受管道。類似input public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) // 監聽 public void input(Message<String> message){ System.out.println("消費者1號,---->接收的消息:"+message.getPayload()+"\t port:"+serverPort); } }