Spring Could Stream 基本用法


Spring Cloud Stream是一個建立在Spring Boot和Spring Integration之上的框架,有助於創建事件驅動或消息驅動的微服務。
通過它可以更方便的訪問消息服務,如消費Rabbitmq的消息示例如下:
添加Spring Cloud Stream與RabbitMQ消息中間件的依賴。
< dependency >
<groupId> org.springframework.cloud </groupId> <artifactId> spring-cloud-starter-stream-rabbit </artifactId> </dependency>
配置通道關聯的destination,對應rabbitmq的exchange名稱。
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: mqTestDefault 
        output:
          destination: mqTestDefault
          contentType: text/plain
destination:指定了消息獲取的目的地 exchange,這里的exchange就是 mqTestDefault。這里配置應用輸入、輸出的destination相同,實際應用是input或output中的一方。
@SpringBootApplication@EnableBinding(Processor.class) public class MyLoggerServiceApplication {public static void main(String[] args) { SpringApplication.run(MyLoggerServiceApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public LogMessage enrichLogMessage(LogMessage log) { return new LogMessage(String.format("[1]: %s", log.getMessage())); } }
啟動后,默認是會創建一個臨時隊列,臨時隊列綁定的exchange為 “mqTestDefault”,routing key為 “#”。
實際使用中,我們需要一個持久化的隊列,並且指定一個分組,用於保證應用服務的縮放。

只需要在消費者端的 binding 添加配置項 spring.cloud.stream.bindings.[channelName].group = XXX 。對應的隊列就是持久化,並且名稱為:mqTestOrder.XXX

如果我們需要進一步根據 routing key 來進行區分消息投遞的目的地,或者消息接受,需要進一步配,Spring Cloud Stream 也提供了相關配置:

spring:
  cloud:
    stream:
      bindings:
        inputProductAdd:
          destination: mqTestProduct
          group: addProductHandler      # 擁有 group 默認會持久化隊列
        outputProductAdd:
          destination: mqTestProduct
      rabbit:
        bindings:
          inputProductAdd:
            consumer:
              bindingRoutingKey: addProduct.*       # 用來綁定消費者的 routing key
          outputProductAdd:
            producer:
              routing-key-expression: '''addProduct.*'''  # 需要用這個來指定 RoutingKey

常用配置

給消費者設置消費組和主題

  1. 設置消費組: spring.cloud.stream.bindings.<通道名>.group=<消費組名>
  2. 設置主題: spring.cloud.stream.bindings.<通道名>.destination=<主題名>

給生產者指定通道的主題:spring.cloud.stream.bindings.<通道名>.destination=<主題名>

消費者開啟分區,指定實例數量與實例索引

  1. 開啟消費分區: spring.cloud.stream.bindings.<通道名>.consumer.partitioned=true
  2. 消費實例數量: spring.cloud.stream.instanceCount=1 (具體指定)
  3. 實例索引: spring.cloud.stream.instanceIndex=1 #設置當前實例的索引值

生產者指定分區鍵

  1. 分區鍵: spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分區鍵>
  2. 分區數量: spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分區數量>
一般最簡單的應用只要配置 spring.cloud.stream.bindings.開頭的項即可,如果涉及到


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM