在實際開發過程中,服務與服務之間通信經常會使用到消息中間件,而以往使用了中間件比如RabbitMQ,那么該中間件和系統的耦合性就會非常高,如果我們要替換為Kafka那么變動會比較大,這時我們可以使用SpringCloudStream來整合我們的消息中間件,來降低系統和中間件的耦合性。
一、消息中間的幾大應用場景
1、異步處理
比如用戶在電商網站下單,下單完成后會給用戶推送短信或郵件,發短信和郵件的過程就可以異步完成。因為下單付款是核心業務,發郵件和短信並不屬於核心功能,並且可能耗時較長,所以針對這種業務場景可以選擇先放到消息隊列中,有其他服務來異步處理。
2、應用解耦:
假設公司有幾個不同的系統,各系統在某些業務有聯動關系,比如 A 系統完成了某些操作,需要觸發 B 系統及 C 系統。如果 A 系統完成操作,主動調用 B 系統的接口或 C 系統的接口,可以完成功能,但是各個系統之間就產生了耦合。用消息中間件就可以完成解耦,當 A 系統完成操作將數據放進消息隊列,B 和 C 系統去訂閱消息就可以了。這樣各系統只要約定好消息的格式就好了。
3、流量削峰
比如秒殺活動,一下子進來好多請求,有的服務可能承受不住瞬時高並發而崩潰,所以針對這種瞬時高並發的場景,在中間加一層消息隊列,把請求先入隊列,然后再把隊列中的請求平滑的推送給服務,或者讓服務去隊列拉取。
4、日志處理
kafka 最開始就是專門為了處理日志產生的。
當碰到上面的幾種情況的時候,就要考慮用消息隊列了。如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同樣也是在使用 Spring Cloud ,那可以考慮下用 Spring Cloud Stream。
二、什么是SpringCloudStream
官方定義 Spring Cloud Stream 是一個構建消息驅動微服務的框架。
應用程序通過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 交互,通過我們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與消息中間件交互。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅動的方式。
通過使用Spring Integration來連接消息代理中間件以實現消息事件驅動。Spring Cloud Stream 為一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用了發布-訂閱、消費組、分區的三個核心概念。目前僅支持RabbitMQ、Kafka。
三、Stream 解決了什么問題?
Stream解決了開發人員無感知的使用消息中間件的問題,因為Stream對消息中間件的進一步封裝,可以做到代碼層面對中間件的無感知,甚至於動態的切換中間件(rabbitmq切換為kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程
官網結構圖
組成 | 說明 |
---|---|
Middleware | 中間件,目前只支持RabbitMQ和Kafka |
Binder | Binder是應用與消息中間件之間的封裝,目前實行了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現 |
@Input | 注解標識輸入通道,通過該輸入通道接收到的消息進入應用程序 |
@Output | 注解標識輸出通道,發布的消息將通過該通道離開應用程序 |
@StreamListener | 監聽隊列,用於消費者的隊列的消息接收 |
@EnableBinding | 指信道channel和exchange綁定在一起 |
以下實戰代碼是基於RabbitMQ的,不清楚如何安裝RabbitMQ請查看我的另一篇文章最簡單的RabbitMQ消息隊列搭建(windows環境下安裝),項目的三個模塊如下:
(一)創建消息生產者【service-sender-stream-8089】
pom.xml文件
pom.xml
application.yml
IMessageSender.java
MessageSenderImpl.java
TestController.java
附上swagger部分代碼
SwaggerApp.java
項目的啟動類如下,沒有什么特殊的處理:
啟動項目后,輸入地址http://localhost:8089/swagger-ui.html打開swagger頁面,然后點擊try it out發送消息
在后台我們可以看到發送消息成功了
(二)消息消費者【service-consumer-stream-8090】
pom.xml文件如下
application.yml
定義一個消息接收接口
ReceviceMsg.java
ReceviceMsgImpl.java
啟動項目,然后再次用上面的消息發送控制器TestController.java發送消息到消息隊列,然后可以看到消息消費端也收到了隊列的消息如下:
(三)消息消費者【service-consumer-stream-8091】
這是復制上面的消費者創建的另一個消息消費者,基本配置跟上面service-consumer-stream-8090基本一模一樣,只是application.yml中的部分配置略有不同(主要是端口不同),如下:
application.yml
這里我們就用默認的通道完成了消息的發送和接收,下一篇我們將說一下自定義通道實現消息的發送和接收,下次再見!
===============================================================================