作者 | RocketMQ 官微
來源|阿里巴巴雲原生公眾號
2019 年 1 月,孵化 6 個月的 RocketMQ-Spring 作為 Apache RocketMQ 的子項目正式畢業,發布了第一個 Release 版本 2.0.1。該項目是把 RocketMQ 的客戶端使用 Spring Boot 的方式進行了封裝,可以讓用戶通過簡單的 annotation 和標准的 Spring Messaging API 編寫代碼來進行消息的發送和消費。當時 RocketMQ 社區同學請 Spring 社區的同學對 RocketMQ-Spring 代碼進行 review,引出一段羅美琪(RocketMQ)和春波特(Spring Boot)的故事。
時隔兩年,RocketMQ-Spring 正式發布 2.2.0。在這期間,RocketMQ-Spring 迭代了數個版本,以 RocketMQ-Spring 為基礎實現的 Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ 登上了 Spring 的官網,Spring 布道師 baeldung 向國外同學介紹如何使用 RocketMQ-Spring,越來越多國內外的同學開始使用 RocketMQ-Spring 收發消息,RocketMQ-Spring 倉庫的 star 數也在短短兩年時間內超越了 Spring-Kafka 和 Spring-AMQP(注:兩者均由 Spring 社區維護),成為 Apache RocketMQ 最受歡迎的生態項目之一。
RocketMQ-Spring 的受歡迎一方面得益於支持豐富業務場景的 RocketMQ 與微服務生態 Spring 的完美契合,另一方面也與 RocketMQ-Spring 本身嚴格遵循 Spring Messaging API 規范,支持豐富的消息類型分不開。
遵循 Spring Messaging API 規范
Spring Messaging 提供了一套抽象的 API,對消息發送端和消息接收端的模式進行規定,不同的消息中間件提供商可以在這個模式下提供自己的 Spring 實現:在消息發送端需要實現的是一個 XXXTemplate 形式的 Java Bean,結合 Spring Boot 的自動化配置選項提供多個不同的發送消息方法;在消息的消費端是一個 XXXMessageListener 接口(實現方式通常會使用一個注解來聲明一個消息驅動的 POJO),提供回調方法來監聽和消費消息,這個接口同樣可以使用 Spring Boot 的自動化選項和一些定制化的屬性。
1. 發送端
RocketMQ-Spring 在遵循 Spring Messaging API 規范的基礎上結合 RocketMQ 自身的功能特點提供了相應的 API。在消息的發送端,RocketMQ-Spring 通過實現 RocketMQTemplate 完成消息的發送。如下圖所示,RocketMQTemplate 繼承 AbstractMessageSendingTemplate 抽象類,來支持 Spring Messaging API 標准的消息轉換和發送方法,這些方法最終會代理給 doSend 方法,doSend 方法會最終調用 syncSend,由 DefaultMQProducer 實現。
除 Spring Messaging API 規范中的方法,RocketMQTemplate 還實現了 RocketMQ 原生客戶端的一些方法,來支持更加豐富的消息類型。值得注意的是,相比於原生客戶端需要自己去構建 RocketMQ Message(比如將對象序列化成 byte 數組放入 Message 對象),RocketMQTemplate 可以直接將對象、字符串或者 byte 數組作為參數發送出去(對象序列化操作由 RocketMQ-Spring 內置完成),在消費端約定好對應的 Schema 即可正常收發。
RocketMQTemplate Send API:
SendResult syncSend(String destination, Object payload)
SendResult syncSend(String destination, Message<?> message)
void asyncSend(String destination, Message<?> message, SendCallback sendCallback)
void asyncSend(String destination, Message<?> message, SendCallback sendCallback)
……
2. 消費端
在消費端,需要實現一個包含 @RocketMQMessageListener 注解的類(需要實現 RocketMQListener 接口,並實現 onMessage 方法,在注解中進行 topic、consumerGroup 等屬性配置),這個 Listener 會一對一的被放置到 DefaultRocketMQListenerContainer 容器對象中,容器對象會根據消費的方式(並發或順序),將 RocketMQListener 封裝到具體的 RocketMQ 內部的並發或者順序接口實現。在容器中創建 RocketMQ DefaultPushConsumer 對象,啟動並監聽定制的 Topic 消息,完成約定 Schema 對象的轉換,回調到 Listener 的 onMessage 方法。
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}")
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumer received: %s \n", message);
}
}
除此 Push 接口之外,在最新的 2.2.0 版本中,RocketMQ-Spring 實現了 RocketMQ Lite Pull Consumer。通過在配置文件中進行 consumer 的配置,利用 RocketMQTemplate 的 Recevie 方法即可主動 Pull 消息。
配置文件resource/application.properties:
rocketmq.name-server=localhost:9876
rocketmq.consumer.group=my-group1
rocketmq.consumer.topic=test
Pull Consumer代碼:
while(!isStop) {
List<String> messages = rocketMQTemplate.receive(String.class);
System.out.println(messages);
}
豐富的消息類型
RocketMQ Spring 消息類型支持方面與 RocketMQ 原生客戶端完全對齊,包括同步/異步/one-way、順序、延遲、批量、事務以及 Request-Reply 消息。在這里,主要介紹較為特殊的事務消息和 request-reply 消息。
1. 事務消息
RocketMQ 的事務消息不同於 Spring Messaging 中的事務消息,依然采用 RocketMQ 原生事務消息的方案。如下所示,發送事務消息時需要實現一個包含 @RocketMQTransactionListener 注解的類,並實現 executeLocalTransaction 和 checkLocalTransaction 方法,從而來完成執行本地事務以及檢查本地事務執行結果。
// Build a SpringMessage for sending in transaction
Message msg = MessageBuilder.withPayload(..)...;
// In sendMessageInTransaction(), the first parameter transaction name ("test")
// must be same with the @RocketMQTransactionListener's member field 'transName'
rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
// Define transaction listener with the annotation @RocketMQTransactionListener
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return bollback, commit or unknown
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return bollback, commit or unknown
return RocketMQLocalTransactionState.COMMIT;
}
}
在 2.1.0 版本中,RocketMQ-Spring 重構了事務消息的實現,如下圖所示,舊版本中每一個 group 對應一個 TransactionProducer,而在新版本中改為每一個 RocketMQTemplate 對應一個 TransationProducer,從而解決了並發使用多個事務消息的問題。當用戶需要在單進程使用多個事務消息時,可以使用 ExtRocketMQTemplate 來完成(一般情況下,推薦一個進程使用一個 RocketMQTemplate,ExtRocketMQTemplate 可以使用在同進程中需要使用多個 Producer / LitePullConsumer 的場景,可以為 ExtRocketMQTemplate 指定與標准模版 RocketMQTemplate 不同的 nameserver、group 等配置),並在對應的 RocketMQTransactionListener 注解中指定 rocketMQTemplateBeanName 為 ExtRocketMQTemplate 的 BeanName。
2. Request-Reply 消息
在 2.1.0 版本中,RocketMQ-Spring 開始支持 Request-Reply 消息。Request-Reply 消息指的是上游服務投遞消息后進入等待被通知的狀態,直到消費端返回結果並返回給發送端。在 RocketMQ-Spring 中,發送端通過 RocketMQTemplate 的 sendAndReceivce 方法進行發送,如下所示,主要有同步和異步兩種方式。異步方式中通過實現 RocketMQLocalRequestCallback 進行回調。
// 同步發送request並且等待String類型的返回值
String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
// 異步發送request並且等待User類型的返回值
rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
@Override public void onSuccess(User message) {
……
}
@Override public void onException(Throwable e) {
……
}
});
在消費端,仍然需要實現一個包含 @RocketMQMessageListener 注解的類,但需要實現的接口是 RocketMQReplyListener<T, R> 接口(普通消息為 RocketMQListener
@Service
@RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
@Override
public String onMessage(String message) {
……
return "reply string";
}
}
RocketMQ-Spring 遵循 Spring 約定大於配置(Convention over configuration)的理念,通過啟動器(Spring Boot Starter)的方式,在 pom 文件引入依賴(groupId:org.apache.rocketmq,artifactId:rocketmq-spring-boot-starter)便可以在 Spring Boot 中集成所有 RocketMQ 客戶端的所有功能,通過簡單的注解使用即可完成消息的收發。在 RocketMQ-Spring Github Wiki 中有更加詳細的用法和常見問題解答。
據統計,從 RocketMQ-Spring 發布第一個正式版本以來,RocketMQ-Spring 完成 16 個 bug 修復,37 個 imporvement,其中包括事務消息重構,消息過濾、消息序列化、多實例 RocketMQTemplate 優化等重要優化,歡迎更多的小伙伴能參與到 RocketMQ 社區的建設中來,羅美琪(RocketMQ)和春波特(Spring Boot)的故事還在繼續...釘釘搜索群號:21982288,即可進群和眾多開發者交流!