一、簡介
Spring Cloud Stream 是一個用來為微服務應用構建消息驅動能力的框架。
Spring Cloud Stream 為一些供應商的消息中間件產品(目前集成了 RabbitMQ 和 Kafka)提供了個性化的自動化配置實現,並且引入了發布/訂閱、消費組以及消息分區這三個核心概念。簡單地說,Spring Cloud Stream 本質上就是整合了 Spring Boot 和 Spring Integration, 實現了一套輕量級的消息驅動的微服務框架。
通過使用 Spring Cloud Stream,可以忽略消息中間件的差異,有效簡化開發人員對消息中間件的使用復雜度,讓系統開發人員可以有更多的精力關注於核心業務邏輯的處理。
二、快速入門
1. pom.yml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
2. application.yml
配置消息中間件的連接信息:
spring:
application:
name: cloud-stream
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
3. 消息監聽/消費
@EnableBinding({Source.class, Sink.class})
public class SinkReceiver {
private Logger log = LoggerFactory.getLogger(SinkReceiver.class);
@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public Object processInput(String message) {
log.info("Input Stream Receiver:{}", message);
return message;
}
@StreamListener(Source.OUTPUT)
public void processOutPut(String message) {
log.info("Output Stream Receiver:{}", message);
}
}
- @EnableBinding:實現對消息通道(Channel) 的綁定,其中 Sink 是 Spring Cloud Stream 默認的輸入通道,Source 是 Spring Cloud Stream 中默認的輸出通道。
- @StreamListener:將被修飾的方法注冊為消息中間件上數據流的事件監聽器,注解中的屬性值對應了監聽的消息通道名。如果不設置屬性值,將默認使用方法名作為消息通道名。
- @SendTo:很多時候在處理完輸入消息之后, 需要反饋一個消息給對方, 這時候可以通過 @SendTo 注解來指定返回內容的輸出通道。
4.消息生產
消息生產有兩種方式,一種是利用注入消息通道來發送消息,如下:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StreamApplication.class)
public class SinkOutputTest {
@Autowired
private Sink sink;
@Autowired
private Source source;
@Test
public void sink() {
sink.input().send(MessageBuilder.withPayload("From SinkSender").build());
}
@Test
public void source() {
source.output().send(MessageBuilder.withPayload("From SourceSender").build());
}
}
另外一種是使用 Spring Integration 的原生支持 — @InboundChannelAdapter
@EnableBinding(value = {Source.class})
@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "2000"))
public MessageSource<String> timerMessageSource() {
return () -> new GenericMessage<>("2019/08/06");
}
}
三、綁定器
Spring Cloud Stream 構建的應用程序與消息中間件之間是通過綁定器 Binder 相關聯的,綁定器對於應用程序而言起到了隔離作用, 它使得不同消息中間件的實現細節對應用程序來說是透明的。所以對於每一個 Spring Cloud Stream 的應用程序來說, 它不需要知曉消息中間件的通信細節,它只需知道 Binder 對應程序提供的抽象概念來使用消息中間件來實現業務邏輯即可,而這個抽象概念就是在快速入門中我們提到的消息通道:Channel。如下圖所示,在應用程序和 Binder 之間定義了兩條輸入通道和三條輸出通道來傳遞消息,而綁定器則是作為這些通道和消息中間件之間的橋梁進行通信。
通過定義綁定器作為中間層,完美地實現了應用程序與消息中間件細節之間的隔離。通過向應用程序暴露統一的 Channel 通道,使得應用程序不需要再考慮各種不同的消息中間件的實現。當需要升級消息中間件,或是更換其他消息中間件產品時,我們要做的就是更換它們對應的 Binder 綁定器而不需要修改任何 SpringBoot 的應用邏輯。
四、消費組
Spring Cloud Stream中的消息通信方式遵循了發布-訂閱模式,當一條消息被投遞到消息中間件之后,它會通過共享的 Topic 主題進行廣播,消息消費者在訂閱的主題中收到它並觸發自身的業務邏輯處理。(這里提到的 Topic 指的是 Stream 的抽象概念,可以是 RabbitMQ 中的 Exchange,也可以是 Kafka 中的 Topic)。
發布-訂閱模式會帶來一個問題。因為在微服務架構中,我們的每一個微服務應用為了實現高可用和負載均衡, 實際上都會部署多個實例。按照消息廣播的性質,多個實例都會接收到消息,從而導致重復消費。為了解決這個問題, 在Spring Cloud Stream中提供了消費組的概念。
如果在同一個主題上的應用需要啟動多個實例的時候,我們可以通過 spring.cloud.stream.bindings.
spring:
application:
name: cloud-stream
cloud:
stream:
bindings:
input:
# 設置消費組,保證只有一個實例消費到消息
# 如果不設置消費組,Stream 將會為每個實例生成一個消費組
group: ${spring.application.name}
五、消息分區
通過引入消費組的概念,我們已經能夠在多實例的清況下,保障每個消息只被組內的一個實例消費。但是消費組無法控制消息具體被哪個實例消費。也就是說,對於同一條消息,它多次到達之后可能是由不同的實例進行消費的。但是對於一些業務場景,需要對一些具有相同特征的消息設置每次都被同一個消費實例處理。
消息分區的引入就是為了解決這樣的問題:當生產者將消息數據發送給多個消費者實例時,保證擁有共同特征的消息數據始終是由同一個消費者實例接收和處理。
消費者分區:
spring:
application:
name: cloud-stream
cloud:
stream:
instance-count: 1
instance-index: 0
bindings:
input:
consumer:
partitioned: true
- spring.cloud.stream.bindings.input.consumer.partitioned = true 開啟消費者分區功能。
- spring.cloud.stream.instance-count = 1 當前消費者的總實例個數,即應用程序部署的實例數量。
- spring.cloud.stream.instance-index = 0 當前實例的索引號,從 0 開始,最大為 -1 。用於消息生產的時候鎖定該實例。(消息生產的時候 "hashCode(key) % partitionCount" 的計算值等於該設置的值,即轉發到該實例上)
生產者分區:
spring:
application:
name: cloud-stream
cloud:
stream:
bindings:
output:
producer:
partitionCount: 1
partitionKeyExtractorName: keyStrategy
- spring.cloud.stream.bindings.output.producer.partitionCount = 1 ,消息生產需要廣播的消費者數量。即消息分區的數量。
- spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName = keyStrategy ,Spring Bean — 用來消息的特征值計算。(分區選擇計算規則為 "hashCode(key) % partitionCount" , 這里的 key 根據 partitionKeyExpression 或 partitionKeyExtractorName 的配置計算得到)
@Component
public class KeyStrategy implements PartitionKeyExtractorStrategy {
@Override
public Object extractKey(Message<?> message) {
return message.getPayload();
}
}
- 演示源代碼:https://github.com/JMCuixy/spring-cloud-demo
- 內容參考:《Spring Cloud 微服務實戰》