Spring Cloud Stream簡介
Spring cloud stream是一個構建與Spring Boot和Spring Integration之上的框架,方便開發人員快速構建基於Message-Driven的系統。
Spring Integration & Enterprise Integration Patterns簡介
Enterprise Integration Patterns 是由Gregor Hohpe和Bobby Woolf在 Enterprise Integration Patterns 一書中總結的企業應用開發實踐中使用到的各系統間數據交換的方式。
Spring Integration是Spring框架對Enterprise Integration Patterns的實現和適配。Spring Integration在基於Spring的應用程序中實現輕量級消息傳遞,並支持通過聲明適配器與外部系統集成。 與Spring對遠程處理,消息傳遞和調度的支持相比,這些適配器提供了更高級別的抽象。 Spring Integration的主要目標是提供一個簡單的模型來構建企業集成解決方案,同時保持關注點的分離,這對於生成可維護的可測試代碼至關重要。
常見的企業集成數據傳遞模式有以下幾種:
- 文件傳輸:系統A采用FTP輪詢等方式獲取系統B生成的文件等。
- 共享數據庫:系統A和系統B共用一個數據庫表,共用實體類。
- RPC調用:系統A和B暴露互相之間能調用的服務,例如SOAP、REST。
- 消息傳遞:系統A和系統B通過消息中間價交換數據。
Spring Cloud Stream 優點
- 和MQ中間件解耦:相較同樣是針對MQ中間價集成的Spring Message項目,提供了更高層的面向不同MQ中間件代理(RabbitMQ、Kafka等)的Binder抽象,為開發人員提供了統一的編程模型。例如RabbitMQ原生並不支持partition特性,如果想要從Kafaka遷移到RabbitMQ,就需要修改一堆代碼,但是如果是Spring Cloud Stream則有可能只需要修改幾個配置即可。
- 錯誤重試:集成Spring Retry提供了錯誤自動重試功能。
- Error Handler:提供application和system兩層的異常處理機制。
Spring Cloud Stream核心概念
Spring Cloud Stream官網的核心架構圖
Binder 層負責和MQ中間件的通信,應用程序 Application Core 通過 inputs 接收 Binder 包裝后的 Message,相當於是消費者Consumer;通過 outputs 投遞 Message給 Binder,然后由 Binder 轉換后投遞給MQ中間件,相當於是生產者Producer。
Channel
Channel
描述的是消息從應用程序和Binder
之間的流通的通道,也就是Application Model
中的input
和output
。
Binder
Binder
是Spring Cloud Stream中一個非常重要的概念,它是應用程序和消息中間件的中間層,完美屏蔽了不同消息中間件的實現差異,可以簡單的類比為Adapter
。
Spring Cloud Stream官方提供了spring-cloud-stream-binder-kafka
和spring-cloud-stream-binder-rabbit
兩款主流消息中間件的Binder
實現。並且還提供了專門用於測試的TestSupportBinder
,開發者可以直接使用它來對通道的接收內容進行斷言測試。
當然,Spring Cloud Stream也允許開發者通過它的SPI來實現其他MQ的Binder
。目前已有多款MQ產品提供了第三方Binder
實現,參考官方文檔Binder Implementions。如要實現自己的Binder
可以參考官方文檔Binder SPI。
Bindings
Binding
是用於描述MQ中間件到應用程序的橋梁模型,即是對於Binder
加上inputs
和outputs
各個channel
的綁定關系的描述。例如:RabbitMQ-Binder
+ channel-input1
。
Spring Cloud Stream通過spring.cloud.stream.bindings.<channelName>
來確定綁定關系。
Spring Cloud Stream已經包含了以下幾個Bindings
接口:
Source
-定義了應用程序作為生產者將消息投遞到一個名為output
的channel
中去。
public interface Source {
/**
* Name of the output channel.
*/
String OUTPUT = "output";
/**
* @return output channel
*/
@Output(Source.OUTPUT)
MessageChannel output();
}
Sink
-定義了應用程序作為消費者消費名為input
的channel
中的消息。
public interface Sink {
/**
* Input channel name.
*/
String INPUT = "input";
/**
* @return input channel.
*/
@Input(Sink.INPUT)
SubscribableChannel input();
}
Processor
-定義了應用程序同時作為生產者和消費者,生產消息到名為output
的通道,消費來自名為input
通道的消息。
public interface Processor extends Source, Sink {
}
當然,這幾個預定義的接口必然無法滿足復雜的業務邏輯,因此Spring Cloud Stream也支持開發人員自定義Bindings
接口。
Pub-sub
spring cloud stream支持的是共享topics
的publish-subscribe
模型,並沒有采用point-to-point
的queues
模型,因為pub-sub模型在微服務中更具有普適性。而且pub-sub模型也能通過只有一個消費者來變相支持p2p模型。
kafka是最典型的pub-sub主流MQ中間件,spring cloud stream在術語和特性支持上基本和kafka類似。
Consumer group
在普通的pub-sub關系中,多個consumer
在訂閱了同一個topic
時,這些consumer
之間是競爭關系,即topic
中的一條消息只會被其中一個consumer
消費。但如果這些consumer
不屬於同一個服務怎么辦,例如下單topic
的下游會有庫存服務、賬戶服務等多個服務的消費者同時存在,這些不同服務的消費者都需要獲取到下單topic
中的消息,否則就無法觸發相應的操作,難道需要給不同服務排個隊依次傳遞消息,那就變成了同步操作了。
在kafka中通過Consumer Group
消費者分組來處理上述問題。一個topic
中的每一條消息都會采取多副本的方式分發給所有訂閱的Consumer Group
,每個Consumer Group
中的Consumer
之間則競爭消費。即庫存服務和賬戶服務的消費組屬於不同的Consumer Group
,兩個服務都會得到下單topic
的消息,但是同一個服務只會有一個Consumer
實例會實際消費。
Spring Clous Stream也支持了kafka的這一特性,每個Consumer
可以通過spring.cloud.stream.bindings.<channelName>.group
屬性設置自己所屬的Consumer Group
。
默認情況下,如果我們沒有為Consumer
指定消費組的話,Spring Cloud Stream會為其分配一個獨立的匿名消費組。所以如果某topic
下的所有consumers
都未指定消費組時,當有消息發布后,所有的consumers
都會對其進行消費,因為它們各自屬於獨立的組。因此,我們建議在使用Spring Cloud Stream時最好都指定Consumer Group
,以防止對消息的重復消費,除非該行為是必要的(例如刷新所有consumer
的配置等)。
Polled Consumer
spring cloud stream 2.0之后開始支持定時拉取的消費模式,開發人員可以指定拉取頻率以及最大拉取消息數量來控制消費速率。
Partition
通過Consumer Group
我們已經能保障每條消息只會被組內的某個實例消費一次,但是我們無法控制消息會被哪一個實例消費。即多條消息到達后,它們可能是分別由不同的consumer
實例消費。
但是對於一些業務場景,就需要針對某些具有相同特征的消息每次都可以被同一個消費者實例消費,例如某些監控計數服務,需要針對相同uid的行為在內存中計數。因此,MQ中間件引入了消息分區的概念,消息根據特征寫入到不同的partition,不同的消費者實例指定消費不同分區的消息,於是保證相同特征的消息會被同一個消費者實例消費。
Spring Cloud Stream針對patition提供了一個通用的抽象,用來在消息中間件的上層實現分區處理,所以它對於消息中間件自身是否實現了消息分區並不關心,這使得Spring Cloud Stream為不具備分區功能的消息中間件也增加了分區功能擴展(例如RabbitMQ)。