在實際的企業開發中,消息中間件是至關重要的組件之一。消息中間件主要解決應用解耦,異步消
息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。不同的中間件其實現方式,內
部結構是不一樣的。如常見的RabbitMQ和Kafka,由於這兩個消息中間件的架構上的不同,像
RabbitMQ有exchange,kafka有Topic,partitions分區,這些中間件的差異性導致我們實際項目開發
給我們造成了一定的困擾,我們如果用了兩個消息隊列的其中一種,后面的業務需求,我想往另外一種
消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的
系統耦合了,這時候 springcloud Stream 給我們提供了一種解耦合的方式。
概述
Spring Cloud Stream由一個中間件中立的核組成。應用通過Spring Cloud Stream插入的input(相當於
消費者consumer,它是從隊列中接收消息的)和output(相當於生產者producer,它是從隊列中發送消
息的。)通道與外界交流。通道通過指定中間件的Binder實現與外部代理連接。業務開發者不再關注具
體消息中間件,只需關注Binder對應用程序提供的抽象概念來使用消息中間件實現業務即可。 
說明:最底層是消息服務,中間層是綁定層,綁定層和底層的消息服務進行綁定,頂層是消息生產者和
消息消費者,頂層可以向綁定層生產消息和和獲取消息消費
綁定器
Binder 綁定器是Spring Cloud Stream中一個非常重要的概念。在沒有綁定器這個概念的情況下,我們
的Spring Boot應用要直接與消息中間件進行信息交互的時候,由於各消息中間件構建的初衷不同,它
們的實現細節上會有較大的差異性,這使得我們實現的消息交互邏輯就會非常笨重,因為對具體的中間
件實現細節有太重的依賴,當中間件有較大的變動升級、或是更換中間件的時候,我們就需要付出非常
大的代價來實施。
通過定義綁定器作為中間層,實現了應用程序與消息中間件(Middleware)細節之間的隔離。通過向應用
程序暴露統一的Channel通過,使得應用程序不需要再考慮各種不同的消息中間件的實現。當需要升級
消息中間件,或者是更換其他消息中間件產品時,我們需要做的就是更換對應的Binder綁定器而不需要
修改任何應用邏輯 。甚至可以任意的改變中間件的類型而不需要修改一行代碼。
Spring Cloud Stream支持各種binder實現,下表包含GitHub項目的鏈接。
RabbitMQ
Apache Kafka
Amazon Kinesis
Google PubSub (partner maintained)
Solace PubSub+ (partner maintained)
Azure Event Hubs (partner maintained)
通過配置把應用和spring cloud stream 的 binder 綁定在一起,之后我們只需要修改 binder 的配置來
達到動態修改topic、exchange、type等一系列信息而不需要修改一行代碼。
發布/訂閱模型
在Spring Cloud Stream中的消息通信方式遵循了發布-訂閱模式,當一條消息被投遞到消息中間件之
后,它會通過共享的 Topic 主題進行廣播,消息消費者在訂閱的主題中收到它並觸發自身的業務邏輯處
理。這里所提到的 Topic 主題是Spring Cloud Stream中的一個抽象概念,用來代表發布共享消息給消
費者的地方。在不同的消息中間件中, Topic 可能對應着不同的概念,比如:在RabbitMQ中的它對應
了Exchange、而在Kakfa中則對應了Kafka中的Topic。

案例中通過rabbitMQ作為消息中間件,完成SpringCloud Stream的案例。需要自行安裝
消息生產者
(1)創建工程引入依賴
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> </dependencies>
(2)定義bingding
發送消息時需要定義一個接口,不同的是接口方法的返回對象是 MessageChannel:
/** * 自定義的消息通道 */ public interface MyProcessor { /** * 消息生產者的配置 */ String MYOUTPUT = "myoutput"; @Output("myoutput") MessageChannel myoutput(); /** * 消息消費者的配置 */ String MYINPUT = "myinput"; @Input("myinput") SubscribableChannel myinput(); }
/** * 負責向中間件發送數據 */ @Component @EnableBinding(MyProcessor.class) public class MessageSender { @Autowired @Qualifier(value="myoutput") private MessageChannel myoutput; //發送消息 public void send(Object obj) { myoutput.send(MessageBuilder.withPayload(obj).build()); } }
yml清單
server: port: 7001 #服務端口 spring: application: name: stream_producer #指定服務名 rabbitmq: addresses: 192.168.180.137 username: guest password: guest cloud: stream: bindings: output: destination: topcheer-default #指定消息發送的目的地,在rabbitmq中,發送到一個topcheer-default的exchange中 myoutput: destination: topcheer-custom-output # producer: # partition-key-expression: payload #分區關鍵字 對象中的id,對象 # partition-count: 2 #分區大小 binders: #配置綁定器 defaultRabbit: type: rabbit
消息消費者
(1)創建工程引入依賴
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
2)定義bingding
同發送消息一致,在Spring Cloud Stream中接受消息,需要定義一個接口。
/** * 自定義的消息通道 */ public interface MyProcessor { /** * 消息生產者的配置 */ String MYOUTPUT = "myoutput"; @Output("myoutput") MessageChannel myoutput(); /** * 消息消費者的配置 */ String MYINPUT = "myinput"; @Input("myinput") SubscribableChannel myinput(); }
@Component @EnableBinding(MyProcessor.class) public class MessageListener { //監聽binding中的消息 @StreamListener(MyProcessor.MYINPUT) public void input(String message) { System.out.println("獲取到消息: "+message); } }
yml清單
server: port: 7002 #服務端口 spring: application: name: rabbitmq-consumer #指定服務名 rabbitmq: addresses: 192.168.180.137 username: guest password: guest cloud: stream: # instanceCount: 2 #消費者總數 # instanceIndex: 0 #當前消費者的索引 bindings: input: #內置的獲取消息的通道 , 從topcheer-default中獲取消息 destination: topcheer-default myinput: destination: topcheer-custom-output # group: group1 # consumer: # partitioned: true #開啟分區支持 binders: defaultRabbit: type: rabbit


測試類:
@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes = ProducerApplication.class) public class Test { @Autowired(required = false) private MessageSender messageSender; @org.junit.Test public void toSend(){ messageSender.send("測試"); } }

