1. 概述
在本文中,我們將向您介紹Spring Cloud Stream,這是一個用於構建消息驅動的微服務應用程序的框架,這些應用程序由一個常見的消息傳遞代理(如RabbitMQ、Apache Kafka等)連接。
Spring Cloud Stream構建在現有Spring框架(如Spring Messaging和Spring Integration)之上。盡管這些框架經過了實戰測試,工作得非常好,但是實現與使用的message broker緊密耦合。此外,有時對某些用例進行擴展是困難的。
Spring Cloud Stream背后的想法是一個非常典型的Spring Boot概念——抽象地講,讓Spring根據配置和依賴關系管理在運行時找出實現自動注入。這意味着您可以通過更改依賴項和配置文件來更改message broker。可以在這里找到目前已經支持的各種消息代理。
本文將使用RabbitMQ作為message broker。在此之前,讓我們了解一下broker(代理)的一些基本概念,以及為什么要在面向微服務的體系架構中需要它。
在微服務體系架構中,我們有許多相互通信以完成請求的小型應用程序—它們的主要優點之一是改進了的可伸縮性。一個請求從多個下游微服務傳遞到完成是很常見的。例如,假設我們有一個Service-A內部調用Service-B和Service-C來完成一個請求:
[外鏈圖片轉存失敗(img-jzvHHRXw-1562549429195)(https://user-gold-cdn.xitu.io/2019/7/7/16bccd47c4051b28?w=511&h=347&f=png&s=11713)]
是的,還會有其他組件,比如Spring Cloud Eureka、Spring Cloud Zuul等等,但我們還是專注關心這類架構的特有問題。
假設由於某種原因Service-B需要更多的時間來響應。也許它正在執行I/O操作或長時間的DB事務,或者進一步調用其它導致Service-B變得更慢的服務,這些都使其無法更具效率。
現在,我們可以啟動更多的Service-B實例來解決這個問題,這樣很好,但是Service-A實際上是響應很快的,它需要等待Service-B的響應來進一步處理。這將導致Service-A無法接收更多的請求,這意味着我們還必須啟動Service-A的多個實例。
另一種方法解決類似情況的是使用事件驅動的微服務體系架構。這基本上意味着Service-A不直接通過HTTP調用Service-B或Service-C,而是將請求或事件發布給message broker(消息代理)。Service-B和Service-C將成為message broker(消息代理)上此事件的訂閱者。
與依賴HTTP調用的傳統微服務體系架構相比,這有許多優點:
- 提高可伸縮性和可靠性——現在我們知道哪些服務是整個應用程序中的真正瓶頸。
- 鼓勵松散耦合——
Service-A不需要了解Service-B和Service-C。它只需要連接到message broker並發布事件。事件如何進一步編排取決於代理設置。通過這種方式,Service-A可以獨立地運行,這是微服務的核心概念之一。 - 與遺留系統交互——通常我們不能將所有東西都移動到一個新的技術堆棧中。我們仍然必須使用遺留系統,雖然速度很慢,但是很可靠。
高級消息隊列協議(AMQP)是RabbitMQ用於消息傳遞的協議。雖然RabbitMQ支持其他一些協議,但是AMQP由於兼容性和它提供的大量特性而更受歡迎。

因此發布者將消息發布到RabbitMQ中稱為Exchange(交換器)。Exchange(交換器)接收消息並將其路由到一個或多個Queues(隊列)。路由算法依賴於Exchange(交換器)類型和routing(路由)key/header(與消息一起傳遞)。將Exchange(交換器)連接到Queues(隊列)的這些規則稱為bindings(綁定)。
綁定可以有4種類型:
- Direct: 它根據
routing key(路由鍵)將Exchange(交換器)類型直接路由到特定的Queues(隊列)。 - Fanout:它將消息路由到綁定
Exchange(交換器)中的所有Queues(隊列)。 - Topic:它根據完全匹配或部分據
routing key(路由鍵)匹配將消息路由到(0、1或更多)的Queues(隊列)。 - Headers:它類似於
Topic(主題)交換類型,但是它是基routing header(路由頭)而不是routing key(路由鍵)來路由的。

通過Exchange(交換器)和Queues(隊列)發布和消費消息的整個過程是通過一個Channel(通道)完成的。
有關路由的詳細信息,請訪問此鏈接。
我們可以從這里下載並安裝基於我們的操作系統的二進制文件。
然而,在本文中,我們將使用cloudamqp.com提供的免費雲安裝。只需注冊服務並登錄即可。
在主儀表板上單擊創建新實例:

然后給你的實例起個名字,然后進入下一步:

然后選擇一個可用區:

最后,查看實例信息,點擊右下角的創建實例:

就是這樣。現在在雲中運行了一個RabbitMQ實例。有關實例的更多信息,請轉到您的儀表板並單擊新創建的實例:

我們可以看到我們可以訪問RabbitMQ實例的主機,比如從我們的項目連接所需的用戶名和密碼:

我們將在Spring應用程序中使用AMQP URL連接到這個實例,所以請在某個地方記下它。
您還可以通過單擊左上角的RabbitMQ manager來查看管理器控制台。這將采用它來管理的您的RabbitMQ實例。

現在我們的設置已經准備好了,讓我們創建我們的服務:
- cloud-stream-producer-rabbitmq: 作為一個發布者,將消息推送到
RabbitMQ - cloud-stream-consumer-rabbitmq: 消費者消費消息
使用Spring Initializr創建一個腳手架項目。這將是我們的producer項目,我們將使用REST端點發布消息。
選擇您喜歡的Spring Boot版本,添加Web和Cloud Stream依賴項,生成Maven項目:
注意:
請注意cloud-stream依賴項。這也需要像RabbitMQ、Kafka等綁定器依賴項才能工作。
由於我們將使用RabbitMQ,添加以下Maven依賴項:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>
或者,我們也可以將兩者結合起來使用spring-cloud-starter-stream-rabbit:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
使用同樣的方法,創建消費者項目,但僅使用spring-cloud-starter-stream-rabbit依賴項。
如前所述,將消息從發布者傳遞到隊列的整個過程是通過通道完成的。因此,讓我們創建一個HelloBinding接口,其中包含我們的消息機制greetingChannel:
interface HelloBinding { @Output("greetingChannel") MessageChannel greeting(); }
因為這將發布消息,所以我們使用@Output注解。方法名可以是我們想要的任意名稱,當然,我們可以在一個接口中有多個Channel(通道)。
現在,讓我們創建一個REST,它將消息推送到這個Channel(通道)
@RestController public class ProducerController { private MessageChannel greet; public ProducerController(HelloBinding binding) { greet = binding.greeting(); } @GetMapping("/greet/{name}") public void publish(@PathVariable String name) { String greeting = "Hello, " + name + "!"; Message<String> msg = MessageBuilder.withPayload(greeting) .build(); this.greet.send(msg); } }
上面,我們創建了一個ProducerController類,它有一個MessageChannel類型的屬性 greet。這是通過我們前面聲明的方法在構造函數中初始化的。
注意: 我們可以用簡潔的方式做同樣的事情,但是我們使用不同的名稱來讓您更清楚地了解事物是如何連接的。
然后,我們有一個簡單的REST接口,它接收PathVariable的name,並使用MessageBuilder創建一個String類型的消息。最后,我們使用MessageChannel上的.send()方法來發布消息。
現在,我們將在的主類中添加@EnableBinding注解,傳入HelloBinding告訴Spring加載。
@EnableBinding(HelloBinding.class) @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }

最后,我們必須告訴Spring如何連接到RabbitMQ(通過前面的AMQP URL),並將greetingChannel連接到一可用的消費者。
這兩個都是在application.properties中定義的:
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination = greetings
server.port=8080
現在,我們需要監聽之前創建的通道greetingChannel。讓我們為它創建一個綁定:
public interface HelloBinding { String GREETING = "greetingChannel"; @Input(GREETING) SubscribableChannel greeting(); }
與生產者綁定的兩個非常明顯區別。因為我們正在消費消息,所以我們使用SubscribableChannel和@Input注解連接到greetingChannel,消息數據將被推送這里。
現在,讓我們創建處理數據的方法:
@EnableBinding(HelloBinding.class) public class HelloListener { @StreamListener(target = HelloBinding.GREETING) public void processHelloChannelGreeting(String msg) { System.out.println(msg); } }
在這里,我們創建了一個HelloListener類,在processHelloChannelGreeting方法上添加@StreamListener注解。這個方法需要一個字符串作為參數,我們剛剛在控制台打印了這個參數。我們還在類添加@EnableBinding啟用了HelloBinding。
同樣,我們在這里使用@EnableBinding,而不是主類,以便告訴我們如何使用。
看看我們的主類,我們沒有任何修改:
@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
在application.properties配置文件中,我們需要定義與生產者一樣的屬性,除了修改端口之外
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination=greetings
server.port=9090
讓我們同時啟動生產者和消費者服務。首先,讓我們通過點擊端點http://localhost:8080/greet/john來生產消息。
在消費者日志中看到消息內容:
我們使用以下命令啟動另一個消費者服務實例(在另一個端口(9091)上):
$ mvn spring-boot:run -Dserver.port=9091
現在,當我們點擊生產者REST端點生產消息時,我們看到兩個消費者都收到了消息:

這可能是我們在一些用例中想要的。但是,如果我們只想讓一個消費者消費一條消息呢?為此,我們需要在application.properties中創建一個消費者組。消費者的配置文件:
spring.cloud.stream.bindings.greetingChannel.group = greetings-group
現在,再次在不同的端口上運行消費者的2個實例,並通過生產者生產消息再次查看:

這一切也可以在RabbitMQ管理器控制台看到:


在本文中,我們解釋了消息傳遞的主要概念、它在微服務中的角色以及如何使用Spring Cloud Stream實現它。我們使用RabbitMQ作為消息代理,但是我們也可以使用其他流行的代理,比如Kafka,只需更改配置和依賴項。
與往常一樣,本文使用的示例代碼可以在GitHub獲得完整的源代碼。

