在實際的企業開發中,消息中間件是至關重要的組件之一。消息中間件主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。不同的中間件其實現方式,內部結構是不一樣的。如常見的RabbitMQ和Kafka,由於這兩個消息中間件的架構上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分區,這些中間件的差異性導致我們實際項目開發造成了一定的困擾,我們如果用了兩個消息隊列的其中一種,后面的業務需求,我想往另外一種消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這時候 springcloud Stream 給我們提供了一種解耦合的方式。
Spring Cloud Stream由一個中間件中立的核組成。應用通過Spring Cloud Stream插入的input(相當於消費者consumer,它是從隊列中接收消息的)和output(相當於生產者producer,它是從隊列中發送消息的。)通道與外界交流。通道通過指定中間件的Binder實現與外部代理連接。業務開發者不再關注具體消息中間件,只需關注Binder對應用程序提供的抽象概念來使用消息中間件實現業務即可。
入門案例:通過rabbitMQ作為消息中間件
消息生產者:
(1)創建工程引入依賴
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> </dependencies>
(2)定義bingding
發送消息時需要定義一個接口,不同的是接口方法的返回對象是 MessageChannel,下面是 SpringCloud Stream 內置的接口:
public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output(); }
接口聲明了一個 binding 命名為 “output”。這個binding 聲明了一個消息輸出流,也就是消息的生產者。
(3)配置application.yml
server:
port: 7001 #服務端口
spring:
application:
name: stream_producer #指定服務名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
output:
destination: fan-default #指定了消息發送的目的地,對應 RabbitMQ,會發送到 exchange 是 fan-default 的所有消息隊列中。
contentType: text/plain #用於指定消息的類型。
binders:
defaultRabbit:
type: rabbit
(4)消息發送工具類
// 負責向中間件發送數據 @Component @EnableBinding(Source.class) public class MessageSender { @Autowired private MessageChannel output; // 接口來源於Source.class public void send(Object obj) { //發送MQ消息 output.send(MessageBuilder.withPayload(obj).build()); } }
(5)啟動類
@SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class); } }
(6)測試類
@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class ProducerTest { @Autowired private MessageSender messageSender; @Test public void testSend() { messageSender.send("hello word"); } }
(7)運行啟動類,訪問RabbitMQ地址 http://127.0.0.1:15672/ 即可看到發送的消息
消息消費者:
(1)創建工程引入依賴
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies>
(2)定義bingding
同發送消息一致,在Spring Cloud Stream中接收消息,需要定義一個接口,如下是內置的一個接口。
public interface Sink { String INPUT = "input"; @Input("input") SubscribableChannel input(); }
接口聲明了一個 binding 命名為 “input” 。注釋 @Input 對應的方法,需要返回 SubscribableChannel
(3)配置application.yml
server:
port: 7002 #服務端口
spring:
application:
name: stream_consumer #指定服務名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input:
destination: fan-default
binders:
defaultRabbit:
type: rabbit
(4)消息監聽類
@Component @EnableBinding(Sink.class) public class MessageListener { // 監聽 binding 為 Sink.INPUT 的消息 @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("監聽收到:" + message.getPayload()); } }
(5)啟動類
@SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class); } }
(6) 運行啟動類后,再去運行消息生產者的測試類,即可在消息消費者控制台接收到消息
自定義消息通道:
public interface OrderProcessor { String INPUT_ORDER = "inputOrder"; String OUTPUT_ORDER = "outputOrder"; @Input(INPUT_ORDER) SubscribableChannel inputOrder(); @Output(OUTPUT_ORDER) MessageChannel outputOrder(); }
Spring Cloud Stream 內置了兩種接口,分別定義了 binding 為 “input” 的輸入流,和 “output” 的輸出流,而在我們實際使用中,往往是需要定義各種輸入輸出流
一個接口中,可以定義無數個輸入輸出流,可以根據實際業務情況划分。上述的接口,定義了一個訂單輸入和訂單輸出兩個 binding。
使用時,需要在 @EnableBinding 注解中,添加自定義的接口。
使用 @StreamListener 做監聽的時候,需要指定 OrderProcessor.INPUT_ORDER
消息發送工具類要指定注入的通道:
配置文件也要修改:
消息分組:
通常在生產環境,我們的每個服務都不會以單節點的方式運行在生產環境,當同一個服務啟動多個實例的時候,這些實例都會綁定到同一個消息通道的目標主題(Topic)上。默認情況下,當生產者發出一條消息到綁定通道上,這條消息會產生多個副本被每個消費者實例接收和處理,但是有些業務場景之下,我們希望生產者產生的消息只被其中一個實例消費,這個時候我們需要為這些消費者設置消費組來實現這樣的功能。
只需要在每個服務消費者端設置spring.cloud.stream.bindings.input.group 屬性即可
消息分區:
有一些場景需要滿足, 同一個特征的數據被同一個實例消費, 比如同一個id的傳感器監測數據必須被同一個實例統計計算分析, 否則可能無法獲取全部的數據。又比如部分異步任務,首次請求啟動task,二次請求取消task,此場景就必須保證兩次請求至同一實例。
消息生產者配置:
1. spring.cloud.stream.bindings.output.producer.partitionKeyExpression :通過該參數指定了分區鍵的表達式規則,我們可以根據實際的輸出消息規則來配置SpEL來生成合適的分區鍵;
2. spring.cloud.stream.bindings.output.producer.partitionCount :該參數指定了消息分區的數量。
消息消費者配置:
1. spring.cloud.stream.bindings.input.consumer.partitioned :通過該參數開啟消費者分區功能;
2. spring.cloud.stream.instanceCount :該參數指定了當前消費者的總實例數量;
3. spring.cloud.stream.instanceIndex :該參數設置當前實例的索引號,從0開始,最大值為spring.cloud.stream.instanceCount 參數 - 1。