【SpringCloud】Spring Cloud Stream 消費組和消息持久化(二十四)


  本章介紹Spring Cloud Stream 消費組和消息持久化,Spring Cloud Stream 入門參考:【SpringCloud】Spring Cloud Stream 消息驅動(二十三)

Spring Cloud Stream消費組

  多數情況,生產者發送消息給某具體微服務時只希望被消費一次,按照上面我們啟動兩個應用的例子,雖然它們同屬一個應用,但是這個消息出現了被重復消費兩次的情況。為了解決這個問題,在Spring Cloud Stream中提供了消費組的概念

案例說明

  本章案例在也是使用上一章的案例,架構如下,其中消息消費者8802與消息消費者8803,應用名稱和代碼一樣,

  

  消息消費者(8802|8803)配置:

 1 # 端口
 2 server:
 3   port: 8802
 4 
 5 spring:
 6   application:
 7     name: cloud-stream-consumer
 8   cloud:
 9     stream:
10       binders:
11         # 表示定義的名稱,用於binding的服務信息
12         defaultRabbit:
13           # 消息組件類型
14           type: rabbit
15           # 設置rabbitmq的相關配置的環境配置
16           environment:
17             spring:
18               rabbitmq:
19                 host: 127.0.0.1
20                 port: 5672
21                 username: guest
22                 password: guest
23       # 服務的整合處理
24       bindings:
25         # 這個名字是一個通道的名稱
26         input:
27           # 表示要使用的Exchange 名稱定義
28           destination: studyExchange
29           # 設置消息類型,本次為json,文本則設置"text/plain"
30           content-type: application/json
31           # 設置要綁定的消息服務的具體設置
32           binder: defaultRabbit
33           # 消費分組
34 #          group: testA
35 
36 eureka:
37   client:
38     service-url:
39       defaultZone: http://localhost:8761/eureka

  同一消息被同一服務多處消費現象

  測試流程:

  1、啟動相關項目

  2、查看Eureka注冊中心,同一服務CLOUD-STREAM-CONSUMER,有2個節點

    

  3、查看RabbitMQ的Web后台,發現有2個不同的Queue,綁定了studyExchange

    

  4、訪問地址:http://localhost:8801/sendMessage,發送消息

    發現消息消費者(8802|8803)2個節點,都收到同一條消息,並進行了處理

    說明:出現消息被同一種服務重復消費的問題

  同一消息被同一服務分組消費

  1、在消息消費者(8802|8803)2個節點的配置文件中配置組屬性

1 spring.cloud.stream.bindings.input.group = testA

  2、重新相關項目

  3、查看RabbitMQ的Web后台,發現只有一個studyExchange.testA的Queue,

    且studyExchange.testA綁定了studyExchange,里面有2個Chanal

    

  4、訪問地址:http://localhost:8801/sendMessage,發送消息

    發現一條消息只會被1個消費者,獲取並消費。

    發送多條消息,消息有消費者輪詢的方式消費

Spring Cloud Stream消息持久化

  消費者設置分組后

  1、關閉消費者服務

  2、查看RabbitMQ的Web后台,發現studyExchange.testA的Queue,並沒有被刪除

  3、訪問地址:http://localhost:8801/sendMessage,發送消息,

    發現消息繼續發送到了studyExchange.testA中

  4、啟動消費者服務,發現消費者服務能夠消費到,服務未啟動時提供者發送的消息

    說明已到到消息持久化的效果

  注:消費者未設置分組時,關閉消費者服務,對應的queue會理解被刪除,那么提供者也就無法方隊列中發送消息了

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM