SpringCloud微服務實戰系列教程
Spring Cloud Stream 消息驅動組件幫助我們更快速,更⽅便,更友好的去構建消息驅動微服務的。當時定時任務和消息驅動的⼀個對⽐。(消息驅動:基於消息機制做⼀些事情)MQ:消息隊列/消息中間件/消息代理,產品有很多,ActiveMQ RabbitMQ RocketMQ Kafka
一、 Stream解決的痛點問題
MQ消息中間件⼴泛應⽤在應⽤解耦合、異步消息處理、流量削峰等場景中。
不同的MQ消息中間件內部機制包括使⽤⽅式都會有所不同,⽐如RabbitMQ中有Exchange(交換機/交換器)這⼀概念,kafka有Topic、Partition分區這些概念,MQ消息中間件的差異性不利於我們上層的開發應⽤,當我們的系統希望從原有的RabbitMQ切換到Kafka時,我們會發現
切換⽐較困難,很多要操作可能重來(因為⽤程序和具體的某⼀款MQ消息中間件耦合在⼀起了)。
Spring Cloud Stream進⾏了很好的上層抽象,
可以讓我們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細節差異,就像Hibernate屏蔽掉了具體數據庫(Mysql/Oracle⼀樣)。如此⼀
來,我們學習、開發、維護MQ都會變得輕松。⽬前Spring Cloud Stream⽀持RabbitMQ和Kafka。
二、Stream重要概念
Spring Cloud Stream 是⼀個構建消息驅動微服務的框架。應⽤程序通過inputs(相當於消息消費者consumer)或者outputs(相當於消息⽣產者producer)來與Spring Cloud Stream中的binder對象交互,⽽Binder對象是⽤來屏蔽底層MQ細節的,它負責與具體的消息中間件交互。

Binder綁定器是Spring Cloud Stream 中⾮常核⼼的概念,就是通過它來屏蔽底層不同MQ消息中間件的細節差異,當需要更換為其他消息中間件時,我們需要做的就是更換對應的Binder綁定器⽽不需要修改任何應⽤邏輯(Binder綁定器的實現是框架內置的,Spring Cloud Stream⽬前⽀持Rabbit、Kafka兩種消息隊列)
三、Stream消息通信⽅式
Stream中的消息通信⽅式遵循了發布—訂閱模式。在Spring Cloud Stream中的消息通信⽅式遵循了發布-訂閱模式,當⼀條消息被投遞到消息中間件之后,它會通過共享的 Topic 主題進⾏⼴播,消息消費者在訂閱的主題中收到它並觸發⾃身的業務邏輯處理。這⾥所提到的 Topic 主題是Spring Cloud Stream中的⼀個抽象概念,⽤來代表發布共享消息給消費者的地⽅。在不同的消息中間件中, Topic 可能對應着不同的概念,⽐如:在RabbitMQ中的它對應了Exchange、在Kakfa中則對應了Kafka中的Topic。

四、基於RabbitMQ應用
第一步:構建消息生產者
1、引入依賴pom.ml
<!--eureka client 客戶端依賴引⼊--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--spring cloud stream 依賴(rabbit)--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
2、添加配置
注意:標紅的位置應設置相同的binder,藍色位置必須為output,被默認框架定義好的
server:
port: 9090
spring:
application:
name: stream-provider
cloud:
stream:
binders: # 綁定MQ服務信息(此處我們是RabbitMQ)
cityRabbitBinder: # 給Binder定義的名稱,⽤於后⾯的關聯
type: rabbit # MQ類型,如果是Kafka的話,此處配置kafka
environment: # MQ環境配置(⽤戶名、密碼等)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 關聯整合通道和binder對象
output: # output是我們定義的通道名稱,此處不能亂改
destination: cityExchange # 要使⽤的Exchange名稱(消息隊列主題名稱)
content-type: text/plain # application/json # 消息類型設置,⽐如json
binder: cityRabbitBinder # 關聯MQ服務
eureka:
client:
serviceUrl: # eureka server的路徑
defaultZone: http://localhost:8761/eureka/
instance:
prefer-ip-address: true #使⽤ip注冊
3、消息發送通過source對象
package city.alber; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/9/24 2:26 PM * Source.class⾥⾯就是對輸出通道的定義(這是Spring Cloud Stream內置的通道封裝) */ @EnableBinding(Source.class) public class ProviderService { /** * 將MessageChannel的封裝對象Source注⼊到這⾥使⽤ */ @Autowired private Source source; public void sendMessage(String content) { // 向mq中發送消息(並不是直接操作mq,應該操作的是spring cloud stream) // 使⽤通道向外發出消息(指的是Source⾥⾯的output通道) source.output().send(MessageBuilder.withPayload(content).build()); } }
4、啟動類添加@EnableDiscoveryClient 注解,啟動
第二步:構建消息消費者
下面的內容是和生產者不一致的地方
1、配置,標藍地方為不同點
server:
port: 9091
spring:
application:
name: stream-consumer
cloud:
stream:
binders: # 綁定MQ服務信息(此處我們是RabbitMQ)
cityRabbitBinder: # 給Binder定義的名稱,⽤於后⾯的關聯
type: rabbit # MQ類型,如果是Kafka的話,此處配置kafka
environment: # MQ環境配置(⽤戶名、密碼等)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 關聯整合通道和binder對象
input: # output是我們定義的通道名稱,此處不能亂改
destination: cityExchange # 要使⽤的Exchange名稱(消息隊列主題名稱)
content-type: text/plain # application/json # 消息類型設置,⽐如json,自動將對象轉為json
binder: cityRabbitBinder # 關聯MQ服務
eureka:
client:
serviceUrl: # eureka server的路徑
defaultZone: http://localhost:8761/eureka/
instance:
prefer-ip-address: true #使⽤ip注冊
2、接收消費消息類
package city.albert; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/9/24 2:43 PM */ @EnableBinding(Sink.class) public class ConsumerMsg { @StreamListener(Sink.INPUT) public void recevieMessages(Message<String> message) { System.out.println("=========接收到的消息:" + message); } }
五、定義輸出
定一使用與上面配置中藍色字體的input/ouput類似,定義完成配置應該為 inputSysLog/outputSysLog
1、定義通過接口,目的是給生產者消費者調用

package city.albert; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/9/24 3:11 PM */ public interface CustomStreamConfig { String INPUT_SYS_LOG = "inputSysLog"; String OUTPUT_SYS_LOG = "outputSysLog"; @Input(INPUT_SYS_LOG) SubscribableChannel inputSysLog(); @Output(OUTPUT_SYS_LOG) MessageChannel outputSysLog(); /** * 。。。。。可以跟上面一樣定義多個通道信息 */ }
2、生產者調用

package city.albert; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/9/24 3:14 PM * */ @EnableBinding(CustomStreamConfig.class) public class CustomStreamProvider { @Autowired CustomStreamConfig customStreamConfig; public void sendMessage(String content) { // 向mq中發送消息(並不是直接操作mq,應該操作的是spring cloud stream) // 使⽤通道向外發出消息(指的是Source⾥⾯的output通道) customStreamConfig.outputSysLog().send(MessageBuilder.withPayload(content).build()); } }
3、消費者調用

package city.albert; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/9/24 3:14 PM * */ @EnableBinding(CustomStreamConfig.class) public class CustomStreamConsumer { @StreamListener(CustomStreamConfig.INPUT_SYS_LOG) public void messages(Message<String> message) { System.out.println("=========接收到的消息:" + message); } }
六、分組
同組內一條消息,只能一個消費者獲取,添加配置即可