作者 | 遼天
來源 | 阿里巴巴雲原生公眾號
導讀:本文將 rocktmq-spring-boot 的設計實現做一個簡單的介紹,讀者可以通過本文了解將 RocketMQ Client 端集成為 spring-boot-starter 框架的開發細節,然后通過一個簡單的示例來一步一步的講解如何使用這個 spring-boot-starter 工具包來配置,發送和消費 RocketMQ 消息。
在 Spring 生態中玩轉 RocketMQ 系列文章:
- 《如何在 Spring 生態中玩轉 RocketMQ?》
- 《羅美琪和春波特的故事...》
- 《RocketMQ-Spring 畢業兩周年,為什么能成為 Spring 生態中最受歡迎的 messaging 實現?》
本文配套可交互教程已登錄阿里雲知行動手實驗室,PC 端登錄 start.aliyun.com 在瀏覽器中立即體驗。
通過本文,您將了解到:
- Spring 的消息框架介紹
- rocketmq-spring-boot 具體實現
- 使用示例
前言
上世紀 90 年代末,隨着 Java EE(Enterprise Edition) 的出現,特別是 Enterprise Java Beans 的使用需要復雜的描述符配置和死板復雜的代碼實現,增加了廣大開發者的學習曲線和開發成本,由此基於簡單的 XML 配置和普通 Java 對象(Plain Old Java Objects)的 Spring 技術應運而生,依賴注入(Dependency Injection), 控制反轉(Inversion of Control)和面向切面編程(AOP)的技術更加敏捷地解決了傳統 Java 企業及版本的不足。
隨着 Spring 的持續演進,基於注解(Annotation)的配置逐漸取代了 XML 文件配置,2014 年 4 月 1 日,Spring Boot 1.0.0 正式發布,它基於“約定大於配置”(Convention over configuration)這一理念來快速地開發、測試、運行和部署 Spring 應用,並能通過簡單地與各種啟動器(如 spring-boot-web-starter)結合,讓應用直接以命令行的方式運行,不需再部署到獨立容器中。這種簡便直接快速構建和開發應用的過程,可以使用約定的配置並且簡化部署,受到越來越多的開發者的歡迎。
Apache RocketMQ 是業界知名的分布式消息和流處理中間件,簡單地理解,它由 Broker 服務器和客戶端兩部分組成:
其中客戶端一個是消息發布者客戶端(Producer),它負責向 Broker 服務器發送消息;另外一個是消息的消費者客戶端(Consumer),多個消費者可以組成一個消費組,來訂閱和拉取消費 Broker 服務器上存儲的消息。
為了利用 Spring Boot 的快速開發和讓用戶能夠更靈活地使用 RocketMQ 消息客戶端,Apache RocketMQ 社區推出了 spring-boot-starter 實現。隨着分布式事務消息功能在 RocketMQ 4.3.0 版本的發布,近期升級了相關的 spring-boot 代碼,通過注解方式支持分布式事務的回查和事務消息的發送。
本文將對當前的設計實現做一個簡單的介紹,讀者可以通過本文了解將 RocketMQ Client 端集成為 spring-boot-starter 框架的開發細節,然后通過一個簡單的示例來一步一步的講解如何使用這個 spring-boot-starter 工具包來配置,發送和消費 RocketMQ 消息。
Spring 中的消息框架
順便在這里討論一下在 Spring 中關於消息的兩個主要的框架,即 Spring Messaging 和 Spring Cloud Stream。它們都能夠與 Spring Boot 整合並提供了一些參考的實現。和所有的實現框架一樣,消息框架的目的是實現輕量級的消息驅動的微服務,可以有效地簡化開發人員對消息中間件的使用復雜度,讓系統開發人員可以有更多的精力關注於核心業務邏輯的處理。
1. Spring Messaging
Spring Messaging 是 Spring Framework 4 中添加的模塊,是 Spring 與消息系統集成的一個擴展性的支持。它實現了從基於 JmsTemplate 的簡單的使用 JMS 接口到異步接收消息的一整套完整的基礎架構,Spring AMQP 提供了該協議所要求的類似的功能集。在與 Spring Boot 的集成后,它擁有了自動配置能力,能夠在測試和運行時與相應的消息傳遞系統進行集成。
單純對於客戶端而言,Spring Messaging 提供了一套抽象的 API 或者說是約定的標准,對消息發送端和消息接收端的模式進行規定,不同的消息中間件提供商可以在這個模式下提供自己的 Spring 實現:在消息發送端需要實現的是一個 XXXTemplate 形式的 Java Bean,結合 Spring Boot 的自動化配置選項提供多個不同的發送消息方法;在消息的消費端是一個 XXXMessageListener 接口(實現方式通常會使用一個注解來聲明一個消息驅動的 POJO),提供回調方法來監聽和消費消息,這個接口同樣可以使用 Spring Boot 的自動化選項和一些定制化的屬性。
如果有興趣深入的了解 Spring Messaging 及針對不同的消息產品的使用,推薦閱讀這個文件。參考 Spring Messaging 的既有實現,RocketMQ 的 spring-boot-starter 中遵循了相關的設計模式並結合 RocketMQ 自身的功能特點提供了相應的 API(如順序、異步和事務半消息等)。
2. Spring Cloud Stream
Spring Cloud Stream 結合了 Spring Integration 的注解和功能,它的應用模型如下:
該圖片引自 spring cloud stream
Spring Cloud Stream 框架中提供一個獨立的應用內核,它通過輸入(@Input)和輸出(@Output)通道與外部世界進行通信,消息源端(Source)通過輸入通道發送消息,消費目標端(Sink)通過監聽輸出通道來獲取消費的消息。這些通道通過專用的 Binder 實現與外部代理連接。開發人員的代碼只需要針對應用內核提供的固定的接口和注解方式進行編程,而不需要關心運行時具體的 Binder 綁定的消息中間件。在運行時,Spring Cloud Stream 能夠自動探測並使用在 classpath 下找到的Binder。
這樣開發人員可以輕松地在相同的代碼中使用不同類型的中間件:僅僅需要在構建時包含進不同的 Binder。在更加復雜的使用場景中,也可以在應用中打包多個 Binder 並讓它自己選擇 Binder,甚至在運行時為不同的通道使用不同的 Binder。
Binder 抽象使得 Spring Cloud Stream 應用可以靈活的連接到中間件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的靈活配置配置能力,這樣的配置可以通過外部配置的屬性和 Spring Boot 支持的任何形式來提供(包括應用啟動參數、環境變量和 application.yml 或者 application.properties 文件),部署人員可以在運行時動態選擇通道連接 destination(例如,Kafka 的 topic 或者 RabbitMQ 的 exchange)。
Binder SPI 的方式來讓消息中間件產品使用可擴展的 API 來編寫相應的 Binder,並集成到 Spring Cloud Steam 環境,目前 RocketMQ 還沒有提供相關的 Binder,我們計划在下一步將完善這一功能,也希望社區里有這方面經驗的同學積極嘗試,貢獻 PR 或建議。
spring-boot-starter的實現
在開始的時候我們已經知道,spring boot starter 構造的啟動器對於使用者是非常方便的,使用者只要在 pom.xml引入starter 的依賴定義,相應的編譯,運行和部署功能就全部自動引入。因此常用的開源組件都會為 Spring 的用戶提供一個 spring-boot-starter 封裝給開發者,讓開發者非常方便集成和使用,這里我們詳細的介紹一下 RocketMQ(客戶端)的 starter 實現過程。
1. spring-boot-starter 的實現步驟
對於一個 spring-boot-starter 實現需要包含如下幾個部分:
1)在 pom.xml 的定義
- 定義最終要生成的 starter 組件信息
<groupId>org.apache.rocketmq</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version>
- 定義依賴包
它分為兩個部分:Spring 自身的依賴包和 RocketMQ 的依賴包。
2)配置文件類
定義應用屬性配置文件類 RocketMQProperties,這個 Bean 定義一組默認的屬性值。用戶在使用最終的 starter 時,可以根據這個類定義的屬性來修改取值,當然不是直接修改這個類的配置,而是 spring-boot 應用中對應的配置文件:src/main/resources/application.properties。
3)定義自動加載類
定義 src/resources/META-INF/spring.factories 文件中的自動加載類, 其目的是讓 spring boot 更具文中中所指定的自動化配置類來自動初始化相關的 Bean、Component 或 Service,它的內容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration
在 RocketMQAutoConfiguration 類的具體實現中,定義開放給用戶直接使用的 Bean 對象包括:
- RocketMQProperties 加載應用屬性配置文件的處理類;
- RocketMQTemplate 發送端用戶發送消息的發送模板類;
- ListenerContainerConfiguration 容器 Bean 負責發現和注冊消費端消費實現接口類,這個類要求:由 @RocketMQMessageListener 注解標注;實現 RocketMQListener 泛化接口。
4)最后具體地進行 RpcketMQ 相關的封裝
在發送端(producer)和消費端(consumer)客戶端分別進行封裝,在當前的實現版本提供了對 Spring Messaging 接口的兼容方式。
2. 消息發送端實現
1)普通發送端
發送端的代碼封裝在 RocketMQTemplate POJO 中,下圖是發送端的相關代碼的調用關系圖:
為了與 Spring Messaging 的發送模板兼容,在 RocketMQTemplate 集成了 AbstractMessageSendingTemplate 抽象類,來支持相關的消息轉換和發送方法,這些方法最終會代理給 doSend() 方法、doSend() 以及 RocoketMQ 所特有的一些方法如異步,單向和順序等方法直接添加到 RoketMQTempalte 中,這些方法直接代理調用到 RocketMQ 的 Producer API 來進行消息的發送。
2)事務消息發送端
對於事務消息的處理,在消息發送端進行了部分的擴展,參考上面的調用關系類圖。
RocketMQTemplate 里加入了一個發送事務消息的方法 sendMessageInTransaction(),並且最終這個方法會代理到 RocketMQ 的 TransactionProducer 進行調用,在這個 Producer 上會注冊其關聯的 TransactionListener 實現類,以便在發送消息后能夠對 TransactionListener 里的方法實現進行調用。
3. 消息消費端實現
在消費端 Spring-Boot 應用啟動后,會掃描所有包含 @RocketMQMessageListener 注解的類(這些類需要集成 RocketMQListener 接口,並實現 onMessage()方法),這個 Listener 會一對一的被放置到。
DefaultRocketMQListenerContainer 容器對象中,容器對象會根據消費的方式(並發或順序),將 RocketMQListener 封裝到具體的 RocketMQ 內部的並發或者順序接口實現。在容器中創建 RocketMQ Consumer 對象,啟動並監聽定制的 Topic 消息,如果有消費消息,則回調到 Listener 的 onMessage() 方法。
使用示例
上面的一章介紹了 RocketMQ 在 spring-boot-starter 方式的實現,這里通過一個最簡單的消息發送和消費的例子來介紹如何使這個 rocketmq-spring-boot-starter。
1. RocketMQ 服務端的准備
1)啟動 NameServer 和 Broker
要驗證 RocketMQ 的 Spring-Boot 客戶端,首先要確保 RocketMQ 服務正確的下載並啟動。可以參考 RocketMQ 主站的快速開始來進行操作。確保啟動 NameServer 和 Broker 已經正確啟動。
2)創建實例中所需要的 Topics
在執行啟動命令的目錄下執行下面的命令行操作:
bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic
2. 編譯 rocketmq-spring-boot-starter
目前的 spring-boot-starter 依賴還沒有提交的 Maven 的中心庫,用戶使用前需要自行下載 git 源碼,然后執行 mvn clean install 安裝到本地倉庫。
git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-spring-boot-starter
mvn clean install
3. 編寫客戶端代碼
用戶如果使用它,需要在消息的發布和消費客戶端的 maven 配置文件 pom.xml 中添加如下的依賴:
屬性 spring-boot-starter-rocketmq-version 的取值為:1.0.0-SNAPSHOT, 這與上一步驟中執行安裝到本地倉庫的版本一致。
1)消息發送端的代碼
發送端的配置文件 application.properties:
發送端的 Java 代碼:
2)消息消費端代碼
消費端的配置文件 application.properties:
消費端的 Java 代碼:
這里只是簡單的介紹了使用 spring-boot 來編寫最基本的消息發送和接收的代碼,如果需要了解更多的調用方式,如: 異步發送,對象消息體,指定 tag 標簽以及指定事務消息,請參看 github 的說明文檔和詳細的代碼。我們后續還會對這些高級功能進行陸續的介紹。
作者簡介
遼天,阿里巴巴技術專家,Apache RocketMQ 內核控,擁有多年分布式系統研發經驗,對 Microservice、Messaging 和 Storage 等領域有深刻理解, 目前專注 RocketMQ 內核優化以及 Messaging 生態建設。
在 PC 端登錄 start.aliyun.com 知行動手實驗室,沉浸式體驗在線交互教程。