Spring Cloud Alibaba - Spring Cloud Stream 整合 RocketMQ


Spring Cloud Stream

簡介

在微服務的開發過程中,可能會經常用到消息中間件,通過消息中間件在服務與服務之間傳遞消息,不管你使用的是哪款消息中間件,比如RabbitMQ、Kafka和RocketMQ,那么消息中間件和服務之間都有一點耦合性,這個耦合性就是指如果我原來使用的RabbitMQ,現在要替換為RocketMQ,那么我們的微服務都需要修改,變動會比較大,因為這兩款消息中間件有一些區別,如果我們使用Spring Cloud Stream來整合我們的消息中間件,那么這樣就可以降低微服務和消息中間件的耦合性,做到輕松在不同消息中間件間切換,當然Spring Cloud Stream官方只支持rabbitmq 和 kafka,spring cloud alibaba新寫了一個starter可以支持RocketMQ;

按照官方的定義,Spring Cloud Stream 是一個構建消息驅動微服務的框架;

Spring Cloud Stream解決了開發人員無感知的使用消息中間件的問題,因為Spring Cloud Stream對消息中間件的進一步封裝,可以做到代碼層面對消息中間件的無感知,甚至於動態的切換中間件(rabbitmq切換為rocketmq或者kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程;


核心概念

img

Spring Cloud Stream 內部有幾個概念:Binder 、Binding、input、output;

1、Binder: 跟外部消息中間件集成的組件,用來創建Binding,各消息中間件都有自己的 Binder 實現;

比如 Kafka 的實現 KafkaMessageChannelBinder,RabbitMQ 的實現 RabbitMessageChannelBinder 以及 RocketMQ 的實現 RocketMQMessageChannelBinder;

2、Binding: 包括 Input Binding 和 Output Binding;

Binding 在消息中間件與應用程序提供的 Provider 和 Consumer 之間提供了一個橋梁,實現了開發者只需使用應用程序的 Provider 或 Consumer 生產或消費數據即可,屏蔽了開發者與底層消息中間件的接觸;

3、input

應用程序通過input(相當於消費者consumer)與Spring Cloud Stream中Binder交互,而Binder負責與消息中間件交互,因此,我們只需關注如何與Binder交互即可,而無需關注與具體消息中間件的交互。

4、Output

output(相當於生產者producer)與Spring Cloud Stream中Binder交互;

組成 說明
Binder Binder是應用與消息中間件之間的封裝,目前實現了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現;
@Input 該注解標識輸入通道,通過該輸入通道接收消息進入應用程序
@Output 該注解標識輸出通道,發布的消息將通過該通道離開應用程序
@StreamListener 監聽隊列,用於消費者的隊列的消息接收
@EnableBinding 將信道channel和exchange、topic綁定在一起

Spring Cloud Stream 應用

消息生產者

1、創建SpringBoot應用31-rocket-spring-cloud-stream;

2、添加依賴:

<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

配置文件

img

# 應用名稱
spring.application.name=stream

########## RocketMQ 通用配置
# 客戶端接入點,必填   -rocketmq 連接地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

# 日志級別
logging.level.com.alibaba.cloud.stream.binder.rocketmq=INFO

########## Consumer Config  消費者
# input 的配置:
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group

########## Produce Config  生產者
# output 的配置如下: bingdings具體生產,消費的橋梁
spring.cloud.stream.bindings.output.destination=test-topic    //目的地保持一致
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.group=test-group

兼容性問題:

注意版本需要使用springboot2.2.5

<spring-boot.version>2.2.5.RELEASE</spring-boot.version>
<spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>

消息發送:

@EnableBinding(Source.class)
@Service
public class SenderService {

  @Autowired
  private Source source;

  public void send(String msg) throws Exception {
    boolean flag = source.output().send(MessageBuilder.*withPayload*(msg).build());
    System.*out*.println("消息發送:" + flag);
  }
}

消息接收:

@EnableBinding(Sink.class)
public class ReceiveService {

  @StreamListener("input")
  public void receiveInput1(String receiveMsg) {
    System.*out*.println("input 接收到的消息: " + receiveMsg);
  }
}

可以通過調用SenderService中的方法進行發送信息,也可以通過在啟動類中的Main方法中進行調用SenderService的方法進行發送信息:

@SuppressWarnings("all")
@EnableBinding(value = {Source.class, Sink.class}) //使得Source生效
@SpringBootApplication
public class StreamApplication implements CommandLineRunner {
    @Autowired
    private SenderService senderService;
    @Autowired
    private ReceiveService receiveService;
    public static void main(String[] args) {
        SpringApplication.run(StreamApplication.class, args);
    }
    //在main中調用發送信息方法
    @Override
    public void run(String... args) throws Exception {
        senderService.send("hello rocketmq");
    }
}

img


Spring Cloud Stream自定義信道

在前面的案例中,我們已經實現了一個基礎的 Spring Cloud Stream 消息傳遞處理操作,但在操作之中使用的是系統提供的 Source (output)、Sink(input),接下來我們來看一下自定義信道名稱;

public interface MySource {
    String OUTPUT1 = "output1";
    @Output(MySource.OUTPUT1)
    MessageChannel output1();
    String OUTPUT2 = "output2";
    @Output(MySource.OUTPUT2)
    MessageChannel output2();
}
public interface MySink {
    String INPUT1 = "input1";
    @Input(MySink.INPUT1)
    SubscribableChannel input1();
    String INPUT2 = "input1";
    @Input(MySink.INPUT2)
    SubscribableChannel input2();
}
server.port=8090
# 應用名稱
spring.application.name=stream

########## RocketMQ 通用配置
# 客戶端接入點,必填   -rocketmq 連接地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

# 日志級別
logging.level.com.alibaba.cloud.stream.binder.rocketmq=INFO

########## Consumer Config  消費者
# input 的配置:
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group

########## Produce Config  生產者
# output 的配置如下: bingdings具體生產,消費的橋梁
spring.cloud.stream.bindings.output.destination=test-topic    //目的地保持一致
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.group=test-group

########## 自定義
# input 的配置:
spring.cloud.stream.bindings.input1.destination=test-topic1
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1

# output 的配置:
spring.cloud.stream.bindings.output1.destination=test-topic1
spring.cloud.stream.bindings.output1.content-type=text/plain
spring.cloud.stream.bindings.output1.group=test-group1

SpringCloudStream RocketMQ事務消息

Apache RocketMQ在4.3.0版中已經支持分布式事務消息,這里RocketMQ采用了2PC的思想來實現了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,如下圖所示:

img

上圖說明了事務消息的大致方案,其中分為兩個流程:正常事務消息的發送及提交、事務消息的補償流程;

1.事務消息發送及提交:

(1) 發送消息(half消息);

(2) 服務端響應消息寫入結果;

(3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行);

(4) 根據本地事務狀態執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)

2.補償流程:

(1) 對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起一次“回查”;

(2) Producer收到回查消息,檢查回查消息對應的本地事務的狀態;

(3) 根據本地事務狀態,重新Commit或者Rollback;

其中,補償階段用於解決消息Commit或者Rollback發生超時或者失敗的情況;

事務消息一共有三種狀態:提交狀態、回滾狀態、中間狀態;

TransactionStatus.CommitTransaction: 提交事務,代表消費者可以消費此消息;

TransactionStatus.RollbackTransaction: 回滾事務,代表消息將被刪除,不能被消費;

TransactionStatus.Unknown: 中間狀態,代表需要檢查消息隊列來確定狀態;

MQ內部邏輯:

package com.springcloud.stream.stream.Transaction;


import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

//MQ接收,並根據結果運行內部邏輯
@SuppressWarnings("all")
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    /**
     * 執行本地事務:也就是執行本地業務邏輯
     *
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Object num = msg.getHeaders().get("test");

        if ("1".equals(num)) {
            System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown");
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        else if ("2".equals(num)) {
            System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback");
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " commit");
        return RocketMQLocalTransactionState.COMMIT;
    }

    /**
     * 回調檢查
     *
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        System.out.println("check: " + new String((byte[]) msg.getPayload()));
        return RocketMQLocalTransactionState.COMMIT;
    }
}

消息發送:

@Component
public class Sender {
    @Autowired
    private MySource mySource;
    public <T> void sendTransactionalMsg(T msg ,int num) throws Exception{
        MessageBuilder builder = MessageBuilder.withPayload(msg)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .setHeader("test",String.valueOf(num));
                //.setHeader(RocketMQHeaders.TAGS,"binder");
        Message message = builder.build();

        mySource.outputTX().send(message);
    }
}

自定義信道-重寫Source

public interface MySource {
     String OUTPUTTX = "outputTX";
    @Output(MySource.OUTPUTTX)
    MessageChannel outputTX();
}

自定義信道-重寫Sink

public interface MySink {
    String INPUTTX = "inputTX";
    @Input(MySink.INPUTTX)
    SubscribableChannel inputTX();
}

消費者接收消息:

@EnableBinding({MySink.class})
public class ReceiveService {
    //spring cloud stream 里面發消息通過sink發送
    @Autowired
    private MySink mySink;
    //消費者端接收到的消息
    @StreamListener("inputTX")
    public void receiveTransactionMessage(String receiveMsg) {
        System.out.println("Transaction_input 接收到的消息: " + receiveMsg);
    }
}

Spring Cloud Stream RocketMQ 配置選項

RocketMQ Binder Properties

*spring.cloud.stream.rocketmq.binder.name-server*

RocketMQ NameServer 地址(老版本使用 namesrv-addr 配置項);

Default: 127.0.0.1:9876.

*spring.cloud.stream.rocketmq.binder.access-key*

阿里雲賬號 AccessKey。

Default: null.

*spring.cloud.stream.rocketmq.binder.secret-key*

阿里雲賬號 SecretKey。

Default: null.

*spring.cloud.stream.rocketmq.binder.enable-msg-trace*

是否為 Producer 和 Consumer 開啟消息軌跡功能

Default: true.

*spring.cloud.stream.rocketmq.binder.customized-trace-topic*

消息軌跡開啟后存儲的 topic 名稱。

Default: RMQ_SYS_TRACE_TOPIC.

RocketMQ Consumer Properties

下面的這些配置是以 spring.cloud.stream.rocketmq.bindings. .consumer. 為前綴的 RocketMQ Consumer 相關的配置。

*enable*

是否啟用 Consumer;

默認值: true.

*tags*

Consumer 基於 TAGS 訂閱,多個 tag 以 || 分割;

默認值: empty.

*sql*

Consumer 基於 SQL 訂閱;

默認值: empty.

*broadcasting*

Consumer 是否是廣播消費模式。如果想讓所有的訂閱者都能接收到消息,可以使用廣播模式;

默認值: false.

*orderly*

Consumer 是否同步消費消息模式;

默認值: false.

*delayLevelWhenNextConsume*

異步消費消息模式下消費失敗重試策略:

-1,不重復,直接放入死信隊列

0,broker 控制重試策略

>0,client 控制重試策略

默認值: 0.

*suspendCurrentQueueTimeMillis*

同步消費消息模式下消費失敗后再次消費的時間間隔;

默認值: 1000.

RocketMQ Provider Properties

下面的這些配置是以 spring.cloud.stream.rocketmq.bindings. .producer. 為前綴的 RocketMQ Producer 相關的配置;

*enable*

是否啟用 Producer;

默認值: true.

*group*

Producer group name;

默認值: empty.

*maxMessageSize*

消息發送的最大字節數;

默認值: 8249344.

*transactional*

是否發送事務消息;

默認值: false.

*sync*

是否使用同步得方式發送消息;

默認值: false.

*vipChannelEnabled*

是否在 Vip Channel 上發送消息;

默認值: true.

*sendMessageTimeout*

發送消息的超時時間(毫秒);

默認值: 3000.

*compressMessageBodyThreshold*

消息體壓縮閥值(當消息體超過 4k 的時候會被壓縮);

默認值: 4096.

*retryTimesWhenSendFailed*

在同步發送消息的模式下,消息發送失敗的重試次數;

默認值: 2.

*retryTimesWhenSendAsyncFailed*

在異步發送消息的模式下,消息發送失敗的重試次數;

默認值: 2.

*retryNextServer*

消息發送失敗的情況下是否重試其它的 broker;

默認值: false.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM