什么是Spring Cloud Stream
一個用於構建消息驅動的微服務的框架
應用程序通過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 交互,通過我們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與中間件交互。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅動的方式
Spring Cloud Stream編程模型
- Destination Binder(目標綁定器)
- 與消息中間件通信的組件
- Destination Bindings(目標綁定)
- Binding是連接應用程序與消息中間件的橋梁,用於消息的消費和生產,有Binder創建
- Message(消息)
微服務集成了Stream,Stream的Destination Binder創建了兩個Binding,左邊的Binding連接Rabbit MQ,右邊的MQ連接Kafka。
左邊的Binding從Rabbit MQ處消費消息,然后經過Application處代碼的處理,把處理結果傳輸給Kafka。【從Rabbit MQ處消費消息,然后經過處理,生產到Kafka】
使用Spring Cloud Stream 實現消息收發
編寫生產者
添加依賴
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
注意groupId為com.alibaba.cloud,而不是org.springframework.cloud
添加注解
在啟動類上添加@EnableBinding注解,其中Source用來發送消息
@SpringBootApplication
@EnableBinding(Source.class)
public class Study01Application {
public static void main(String[] args) {
SpringApplication.run(Study01Application.class, args);
}
}
添加配置
rocketmq.binder.name-server RocketMQ控制台地址
output 表示生產者,用於綁定一個topic投遞消息
bindings.output.destination 指定topic
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
# 生產者
output:
# 指定topic
destination: topic-stream
代碼實現
注入Source接口,用來實現消息發送
MessageBuilder構建消息體
@Autowired
private Source source;
@GetMapping("test-stream")
public String testStream() {
this.source.output()
.send(
MessageBuilder
.withPayload("消息體")
.build()
);
return "testStream";
}
控制台查看
啟動項目,請求之后,可以在RocketMQ控制台-消息頁面看見topic為topic-stream的消息
編寫消費者
添加依賴
同生產者
添加注解
@SpringBootApplication
@EnableBinding(Sink.class)
public class Study02Application {
public static void main(String[] args) {
SpringApplication.run(Study02Application.class, args);
}
}
添加配置
input表示消費者,用於綁定一個topic消費消息
destination指定topic,要與生產者的topic相對應.在使用RocketMQ時,group必填;使用其他MQ時,可以留空
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
# 消費者
input:
# 指定topic,要與生產者的topic匹配
destination: topic-stream
# 根據業務指定
# 一定要設置,否則會啟動報錯
# 如果使用的是其他的MQ,可以留空
group: group-stream
代碼實現
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class testStreamConsumer {
@StreamListener(Sink.INPUT)
public void receive(String messageBody) {
log.info("------>" + messageBody);
}
}
測試結果
Spring Cloud Stream自定義接口
在以上的例子中,我們發現只可以設置一個topic,這顯然滿足不了實際的生產需求,所以這個時候就需要用到stream的自定義接口來實現多個“input”和“output”綁定不同的topic了。
生產者發送消息時使用的是Source接口里的output方法,而消費者接收消息時使用的是Sink接口里的input方法,並且都需要配置到啟動類的@EnableBinding注解里。所以實際上我們需要自定義接口的源碼與這兩個接口的源碼幾乎一致,只是名稱有所不同而已,使用上也只是將Source和Sink改為自定義的接口即可。
自定義發送消息
自定義消息發送接口
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* 自定義發送消息接口
*/
public interface customizeSource {
String CUSTOMIZE_OUTPUT = "customize-output";
@Output(CUSTOMIZE_OUTPUT)
MessageChannel output();
}
修改注解
在啟動類的@EnableBinding注解上添加剛剛自定義的消息發送接口
@EnableBinding({Source.class, customizeSource.class})
修改配置
注意customize-output的值一定要與自定義消息發送接口中@Output注解的值相同
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
# 生產者
output:
# 指定topic
destination: topic-stream
customize-output:
destination: topic-stream-customize
代碼實現
@Autowired
private CustomizeSource customizeSource;
@GetMapping("test-stream-customize")
public String testCustomizeStream() {
customizeSource.output()
.send(
MessageBuilder
.withPayload("消息體")
.build()
);
return "testStream";
}
驗證
自定義接收消息
自定義消息接收接口
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* 自定義消費消息接口
*/
public interface CustomizeSink {
String CUSTOMIZE_INPUT = "customize-input";
@Input(CUSTOMIZE_INPUT)
SubscribableChannel input();
}
修改注解
在啟動類的@EnableBinding注解上添加剛剛自定義的消息接收接口
@EnableBinding({Sink.class, CustomizeSink.class})
修改配置
注意customize-input的值一定要與自定義消息發送接口中@Input注解的值相同
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
# 消費者
input:
# 指定topic,要與生產者的topic匹配
destination: topic-stream
# 根據業務指定
# 一定要設置,否則會啟動報錯
# 如果使用的是其他的MQ,可以留空
group: group-stream
customize-input:
destination: topic-stream-customize
group: group-stream-customize
代碼實現
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class CustomizeStreamConsumer {
@StreamListener(CustomizeSink.CUSTOMIZE_INPUT)
public void receive(String messageBody) {
log.info("------>自定義>" + messageBody);
}
}
驗證
PS:總結來說,@EnableBinding注解的Source接口實現了發送消息,Sink接口實現了接收消息.而@EnableBinding還有一個Processor接口,Processor接口繼承了Source接口和Sink接口,使用這個接口可以實現收發消息
package org.springframework.cloud.stream.messaging;
public interface Processor extends Source, Sink {
}