SpringCloud 整合RabbitMQ


 簡介:Spring Cloud Stream是一個構建消息驅動的微服務框架,應用程序通過input通道或者output通道來與Spring Cloud Stream中binder交互,通過配置來binding,而binder負責與消息中間件交互。Spring Cloud Stream是為了簡化開發人員對消息中間件的使用復雜度,讓我們有更多精力關注核心業務邏輯的處理。

1. Maven依賴

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  <version>2.0.1.RELEASE</version>
</dependency>                  

2. application.yml配置文件

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          input-stream-test:
            consumer:
              acknowledge-mode: manual
              recovery-interval: 3000 #rabbitmq重連錯誤
          input-stream:
            consumer:
              acknowledge-mode: manual
              recovery-interval: 3000
              # 聲明為延遲交換機
              delayedExchange: true
          output:
            producer:
              delayedExchange: true
      binders:                 # 定義消息中間件的服務信息
        rabbitmqc:             # 表示定義的名稱,用於bindings整合
          type: rabbit         # 消息組件類型
          environment:
            spring:
              rabbitmq:
                host: rabbitmq
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings:                # 服務整合處理
        myInput:               # 通道名稱
      binder: rabbitmqc
          # 交換機名稱
          destination: a
          # 指定該應用實例屬於哪個消費組
          group: b
          consumer:
            # 開啟消費者分區功能
            partitioned: true
        myOutput:
          binder: rabbitmqc
          destination: a
          # 消息發送的格式
          content-type: text/plain
          producer:
            # 指定分區鍵的表達式規則
            partitionKeyExpression: payload
            partitionCount: 2 # 指定當前消費者的總實例數量 instance-count: 2 # 設置當前實例的索引號 instance-index: 0          

3. 注解

  A. @EnableBinding:將一個或多個接口作為參數,接口聲明輸入和輸出通道,Stream提供了Source、Sink和Processor接口,也可以自定義接口;

  B. @Input:標識一個輸入通道,接收到的消息通過該輸入通道進入應用程序;

  C. @Output:標識一個輸出通道,已發布的消息通過該輸出通道離開應用程序,如果未提供名稱,則使用方法的名稱;

  D. @StreamListener:基於內容的路由,方法的參數可以用@Payload、@Header、@Headers進行注釋,若是有返回值,必須使用@SendTo注解為該方法返回的數據指定輸出綁定目標。

4. 發布訂閱模式

  Stream中消息通訊方式遵守了發布訂閱模式,當一條消息被投遞出去時,它通過共享的Topic主題進行廣播,消息消費者在訂閱的主題中接收到它時,就會觸發自身的邏輯進行處理。

5. 消費組(Consumer Groups)

  通常在生產環境,我們的服務會以集群方式部署,目的是保證高可用和負載均衡,那么服務就會以多個實例運行着,而這些實例都會綁定到同一個消息通道的目標主題上。默認情況下,當生產者發出一條消息到綁定通道上,這條消息會產生多個副本被每個消費者實例接收和處理。但是有時候我們希望消息只被一個消費者消費,那么就需要用到消費組,只需要在服務消費端設置group屬性即可,達到避免重復消費。同一組內,消費者會為生產者提供的消息發生競爭關系,只有其中一個實例可以消費。

spring.cloud.stream.bindings.<channelName>.group=組名

6. 延遲隊列

  延遲隊列操作的對象是延遲消息,所謂延遲消息是指當消息被發送出去后,並不想讓消費者立馬消費,而是等待特定時間后,消費者才能拿到消息進行消費,使用場景有:支付訂單、設備升級的超時處理,可以替代定時調度處理。

spring.cloud.stream.rabbit.bindings.<channelName>.producer.delayed-exchange=true # 延遲隊列
channel.output().send(MessageBuilder.withPayload(message).setHeader("x-delay", 30 * 60 * 1000).build());

7. 消息分區

   分區就是將數據分區,生產者生產的消息給多個消費者實例發送消息確保相同特征的數據被同一消費者實例處理。

 8. 消費者類型

  A. 消息驅動(異步):消息一旦可用,就會被傳遞,並且有現成可以處理它;

  B. 輪詢(同步):希望控制消息的處理速率。

9. 示例

 

 

可參考:Spring Cloud Stream(消息驅動)

      Spring Cloud中文文檔

      配置參數詳解

      Spring Cloud Stream進階配置高吞吐量之批量預期消息prefetch

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM