Spring Cloud Stream是一個建立在Spring Boot和Spring Integration之上的框架,有助於創建事件驅動或消息驅動的微服務。
通過它可以更方便的訪問消息服務,如消費Rabbitmq的消息示例如下:
添加Spring Cloud Stream與RabbitMQ消息中間件的依賴。
<
dependency
>
<groupId>
org.springframework.cloud
</groupId>
<artifactId>
spring-cloud-starter-stream-rabbit
</artifactId>
</dependency>
配置通道關聯的destination,對應rabbitmq的exchange名稱。
spring:
cloud:
stream:
bindings:
input:
destination: mqTestDefault output:
destination: mqTestDefault
contentType: text/plain
destination:指定了消息獲取的目的地 exchange,這里的exchange就是 mqTestDefault。這里配置應用輸入、輸出的destination相同,實際應用是input或output中的一方。
@SpringBootApplication@EnableBinding(Processor.class) public class MyLoggerServiceApplication {public static void main(String[] args) { SpringApplication.run(MyLoggerServiceApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public LogMessage enrichLogMessage(LogMessage log) { return new LogMessage(String.format("[1]: %s", log.getMessage())); } }
啟動后,默認是會創建一個臨時隊列,臨時隊列綁定的exchange為 “mqTestDefault”,routing key為 “#”。
實際使用中,我們需要一個持久化的隊列,並且指定一個分組,用於保證應用服務的縮放。
只需要在消費者端的 binding 添加配置項 spring.cloud.stream.bindings.[channelName].group = XXX 。對應的隊列就是持久化,並且名稱為:mqTestOrder.XXX。
如果我們需要進一步根據 routing key 來進行區分消息投遞的目的地,或者消息接受,需要進一步配,Spring Cloud Stream 也提供了相關配置:
spring:
cloud:
stream:
bindings:
inputProductAdd:
destination: mqTestProduct
group: addProductHandler # 擁有 group 默認會持久化隊列
outputProductAdd:
destination: mqTestProduct
rabbit:
bindings:
inputProductAdd:
consumer:
bindingRoutingKey: addProduct.* # 用來綁定消費者的 routing key
outputProductAdd:
producer:
routing-key-expression: '''addProduct.*''' # 需要用這個來指定 RoutingKey
常用配置
給消費者設置消費組和主題
- 設置消費組:
spring.cloud.stream.bindings.<通道名>.group=<消費組名>
- 設置主題:
spring.cloud.stream.bindings.<通道名>.destination=<主題名>
給生產者指定通道的主題:spring.cloud.stream.bindings.<通道名>.destination=<主題名>
消費者開啟分區,指定實例數量與實例索引
- 開啟消費分區:
spring.cloud.stream.bindings.<通道名>.consumer.partitioned=true
- 消費實例數量:
spring.cloud.stream.instanceCount=1
(具體指定) - 實例索引:
spring.cloud.stream.instanceIndex=1
#設置當前實例的索引值
生產者指定分區鍵
- 分區鍵:
spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分區鍵>
- 分區數量:
spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分區數量>
一般最簡單的應用只要配置
spring.cloud.stream.bindings.開頭的項即可,如果涉及到