Spring Cloud Stream
簡介
在微服務的開發過程中,可能會經常用到消息中間件,通過消息中間件在服務與服務之間傳遞消息,不管你使用的是哪款消息中間件,比如RabbitMQ、Kafka和RocketMQ,那么消息中間件和服務之間都有一點耦合性,這個耦合性就是指如果我原來使用的RabbitMQ,現在要替換為RocketMQ,那么我們的微服務都需要修改,變動會比較大,因為這兩款消息中間件有一些區別,如果我們使用Spring Cloud Stream來整合我們的消息中間件,那么這樣就可以降低微服務和消息中間件的耦合性,做到輕松在不同消息中間件間切換,當然Spring Cloud Stream官方只支持rabbitmq 和 kafka,spring cloud alibaba新寫了一個starter可以支持RocketMQ;
按照官方的定義,Spring Cloud Stream 是一個構建消息驅動微服務的框架;
Spring Cloud Stream解決了開發人員無感知的使用消息中間件的問題,因為Spring Cloud Stream對消息中間件的進一步封裝,可以做到代碼層面對消息中間件的無感知,甚至於動態的切換中間件(rabbitmq切換為rocketmq或者kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程;
核心概念
Spring Cloud Stream 內部有幾個概念:Binder 、Binding、input、output;
1、Binder: 跟外部消息中間件集成的組件,用來創建Binding,各消息中間件都有自己的 Binder 實現;
比如 Kafka 的實現 KafkaMessageChannelBinder,RabbitMQ 的實現 RabbitMessageChannelBinder 以及 RocketMQ 的實現 RocketMQMessageChannelBinder;
2、Binding: 包括 Input Binding 和 Output Binding;
Binding 在消息中間件與應用程序提供的 Provider 和 Consumer 之間提供了一個橋梁,實現了開發者只需使用應用程序的 Provider 或 Consumer 生產或消費數據即可,屏蔽了開發者與底層消息中間件的接觸;
3、input
應用程序通過input(相當於消費者consumer)與Spring Cloud Stream中Binder交互,而Binder負責與消息中間件交互,因此,我們只需關注如何與Binder交互即可,而無需關注與具體消息中間件的交互。
4、Output
output(相當於生產者producer)與Spring Cloud Stream中Binder交互;
組成 | 說明 |
---|---|
Binder | Binder是應用與消息中間件之間的封裝,目前實現了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現; |
@Input | 該注解標識輸入通道,通過該輸入通道接收消息進入應用程序 |
@Output | 該注解標識輸出通道,發布的消息將通過該通道離開應用程序 |
@StreamListener | 監聽隊列,用於消費者的隊列的消息接收 |
@EnableBinding | 將信道channel和exchange、topic綁定在一起 |
Spring Cloud Stream 應用
消息生產者
1、創建SpringBoot應用31-rocket-spring-cloud-stream;
2、添加依賴:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
配置文件
# 應用名稱
spring.application.name=stream
########## RocketMQ 通用配置
# 客戶端接入點,必填 -rocketmq 連接地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
# 日志級別
logging.level.com.alibaba.cloud.stream.binder.rocketmq=INFO
########## Consumer Config 消費者
# input 的配置:
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group
########## Produce Config 生產者
# output 的配置如下: bingdings具體生產,消費的橋梁
spring.cloud.stream.bindings.output.destination=test-topic //目的地保持一致
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.group=test-group
兼容性問題:
注意版本需要使用springboot2.2.5
<spring-boot.version>2.2.5.RELEASE</spring-boot.version>
<spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>
消息發送:
@EnableBinding(Source.class)
@Service
public class SenderService {
@Autowired
private Source source;
public void send(String msg) throws Exception {
boolean flag = source.output().send(MessageBuilder.*withPayload*(msg).build());
System.*out*.println("消息發送:" + flag);
}
}
消息接收:
@EnableBinding(Sink.class)
public class ReceiveService {
@StreamListener("input")
public void receiveInput1(String receiveMsg) {
System.*out*.println("input 接收到的消息: " + receiveMsg);
}
}
可以通過調用SenderService中的方法進行發送信息,也可以通過在啟動類中的Main方法中進行調用SenderService的方法進行發送信息:
@SuppressWarnings("all")
@EnableBinding(value = {Source.class, Sink.class}) //使得Source生效
@SpringBootApplication
public class StreamApplication implements CommandLineRunner {
@Autowired
private SenderService senderService;
@Autowired
private ReceiveService receiveService;
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
//在main中調用發送信息方法
@Override
public void run(String... args) throws Exception {
senderService.send("hello rocketmq");
}
}
Spring Cloud Stream自定義信道
在前面的案例中,我們已經實現了一個基礎的 Spring Cloud Stream 消息傳遞處理操作,但在操作之中使用的是系統提供的 Source (output)、Sink(input),接下來我們來看一下自定義信道名稱;
public interface MySource {
String OUTPUT1 = "output1";
@Output(MySource.OUTPUT1)
MessageChannel output1();
String OUTPUT2 = "output2";
@Output(MySource.OUTPUT2)
MessageChannel output2();
}
public interface MySink {
String INPUT1 = "input1";
@Input(MySink.INPUT1)
SubscribableChannel input1();
String INPUT2 = "input1";
@Input(MySink.INPUT2)
SubscribableChannel input2();
}
server.port=8090
# 應用名稱
spring.application.name=stream
########## RocketMQ 通用配置
# 客戶端接入點,必填 -rocketmq 連接地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
# 日志級別
logging.level.com.alibaba.cloud.stream.binder.rocketmq=INFO
########## Consumer Config 消費者
# input 的配置:
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group
########## Produce Config 生產者
# output 的配置如下: bingdings具體生產,消費的橋梁
spring.cloud.stream.bindings.output.destination=test-topic //目的地保持一致
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.group=test-group
########## 自定義
# input 的配置:
spring.cloud.stream.bindings.input1.destination=test-topic1
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
# output 的配置:
spring.cloud.stream.bindings.output1.destination=test-topic1
spring.cloud.stream.bindings.output1.content-type=text/plain
spring.cloud.stream.bindings.output1.group=test-group1
SpringCloudStream RocketMQ事務消息
Apache RocketMQ在4.3.0版中已經支持分布式事務消息,這里RocketMQ采用了2PC的思想來實現了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,如下圖所示:
上圖說明了事務消息的大致方案,其中分為兩個流程:正常事務消息的發送及提交、事務消息的補償流程;
1.事務消息發送及提交:
(1) 發送消息(half消息);
(2) 服務端響應消息寫入結果;
(3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行);
(4) 根據本地事務狀態執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)
2.補償流程:
(1) 對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起一次“回查”;
(2) Producer收到回查消息,檢查回查消息對應的本地事務的狀態;
(3) 根據本地事務狀態,重新Commit或者Rollback;
其中,補償階段用於解決消息Commit或者Rollback發生超時或者失敗的情況;
事務消息一共有三種狀態:提交狀態、回滾狀態、中間狀態;
TransactionStatus.CommitTransaction: 提交事務,代表消費者可以消費此消息;
TransactionStatus.RollbackTransaction: 回滾事務,代表消息將被刪除,不能被消費;
TransactionStatus.Unknown: 中間狀態,代表需要檢查消息隊列來確定狀態;
MQ內部邏輯:
package com.springcloud.stream.stream.Transaction;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
//MQ接收,並根據結果運行內部邏輯
@SuppressWarnings("all")
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
/**
* 執行本地事務:也就是執行本地業務邏輯
*
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object num = msg.getHeaders().get("test");
if ("1".equals(num)) {
System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown");
return RocketMQLocalTransactionState.UNKNOWN;
}
else if ("2".equals(num)) {
System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback");
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " commit");
return RocketMQLocalTransactionState.COMMIT;
}
/**
* 回調檢查
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("check: " + new String((byte[]) msg.getPayload()));
return RocketMQLocalTransactionState.COMMIT;
}
}
消息發送:
@Component
public class Sender {
@Autowired
private MySource mySource;
public <T> void sendTransactionalMsg(T msg ,int num) throws Exception{
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader("test",String.valueOf(num));
//.setHeader(RocketMQHeaders.TAGS,"binder");
Message message = builder.build();
mySource.outputTX().send(message);
}
}
自定義信道-重寫Source
public interface MySource {
String OUTPUTTX = "outputTX";
@Output(MySource.OUTPUTTX)
MessageChannel outputTX();
}
自定義信道-重寫Sink
public interface MySink {
String INPUTTX = "inputTX";
@Input(MySink.INPUTTX)
SubscribableChannel inputTX();
}
消費者接收消息:
@EnableBinding({MySink.class})
public class ReceiveService {
//spring cloud stream 里面發消息通過sink發送
@Autowired
private MySink mySink;
//消費者端接收到的消息
@StreamListener("inputTX")
public void receiveTransactionMessage(String receiveMsg) {
System.out.println("Transaction_input 接收到的消息: " + receiveMsg);
}
}
Spring Cloud Stream RocketMQ 配置選項
RocketMQ Binder Properties
*spring.cloud.stream.rocketmq.binder.name-server*
RocketMQ NameServer 地址(老版本使用 namesrv-addr 配置項);
Default: 127.0.0.1:9876.
*spring.cloud.stream.rocketmq.binder.access-key*
阿里雲賬號 AccessKey。
Default: null.
*spring.cloud.stream.rocketmq.binder.secret-key*
阿里雲賬號 SecretKey。
Default: null.
*spring.cloud.stream.rocketmq.binder.enable-msg-trace*
是否為 Producer 和 Consumer 開啟消息軌跡功能
Default: true.
*spring.cloud.stream.rocketmq.binder.customized-trace-topic*
消息軌跡開啟后存儲的 topic 名稱。
Default: RMQ_SYS_TRACE_TOPIC.
RocketMQ Consumer Properties
下面的這些配置是以 spring.cloud.stream.rocketmq.bindings.
*enable*
是否啟用 Consumer;
默認值: true.
*tags*
Consumer 基於 TAGS 訂閱,多個 tag 以 || 分割;
默認值: empty.
*sql*
Consumer 基於 SQL 訂閱;
默認值: empty.
*broadcasting*
Consumer 是否是廣播消費模式。如果想讓所有的訂閱者都能接收到消息,可以使用廣播模式;
默認值: false.
*orderly*
Consumer 是否同步消費消息模式;
默認值: false.
*delayLevelWhenNextConsume*
異步消費消息模式下消費失敗重試策略:
-1,不重復,直接放入死信隊列
0,broker 控制重試策略
>0,client 控制重試策略
默認值: 0.
*suspendCurrentQueueTimeMillis*
同步消費消息模式下消費失敗后再次消費的時間間隔;
默認值: 1000.
RocketMQ Provider Properties
下面的這些配置是以 spring.cloud.stream.rocketmq.bindings.
*enable*
是否啟用 Producer;
默認值: true.
*group*
Producer group name;
默認值: empty.
*maxMessageSize*
消息發送的最大字節數;
默認值: 8249344.
*transactional*
是否發送事務消息;
默認值: false.
*sync*
是否使用同步得方式發送消息;
默認值: false.
*vipChannelEnabled*
是否在 Vip Channel 上發送消息;
默認值: true.
*sendMessageTimeout*
發送消息的超時時間(毫秒);
默認值: 3000.
*compressMessageBodyThreshold*
消息體壓縮閥值(當消息體超過 4k 的時候會被壓縮);
默認值: 4096.
*retryTimesWhenSendFailed*
在同步發送消息的模式下,消息發送失敗的重試次數;
默認值: 2.
*retryTimesWhenSendAsyncFailed*
在異步發送消息的模式下,消息發送失敗的重試次數;
默認值: 2.
*retryNextServer*
消息發送失敗的情況下是否重試其它的 broker;
默認值: false.