本章介紹Spring Cloud Stream 消費組和消息持久化,Spring Cloud Stream 入門參考:【SpringCloud】Spring Cloud Stream 消息驅動(二十三)
Spring Cloud Stream消費組
多數情況,生產者發送消息給某具體微服務時只希望被消費一次,按照上面我們啟動兩個應用的例子,雖然它們同屬一個應用,但是這個消息出現了被重復消費兩次的情況。為了解決這個問題,在Spring Cloud Stream中提供了消費組的概念
案例說明
本章案例在也是使用上一章的案例,架構如下,其中消息消費者8802與消息消費者8803,應用名稱和代碼一樣,
消息消費者(8802|8803)配置:
1 # 端口 2 server: 3 port: 8802 4 5 spring: 6 application: 7 name: cloud-stream-consumer 8 cloud: 9 stream: 10 binders: 11 # 表示定義的名稱,用於binding的服務信息 12 defaultRabbit: 13 # 消息組件類型 14 type: rabbit 15 # 設置rabbitmq的相關配置的環境配置 16 environment: 17 spring: 18 rabbitmq: 19 host: 127.0.0.1 20 port: 5672 21 username: guest 22 password: guest 23 # 服務的整合處理 24 bindings: 25 # 這個名字是一個通道的名稱 26 input: 27 # 表示要使用的Exchange 名稱定義 28 destination: studyExchange 29 # 設置消息類型,本次為json,文本則設置"text/plain" 30 content-type: application/json 31 # 設置要綁定的消息服務的具體設置 32 binder: defaultRabbit 33 # 消費分組 34 # group: testA 35 36 eureka: 37 client: 38 service-url: 39 defaultZone: http://localhost:8761/eureka
同一消息被同一服務多處消費現象
測試流程:
1、啟動相關項目
2、查看Eureka注冊中心,同一服務CLOUD-STREAM-CONSUMER,有2個節點
3、查看RabbitMQ的Web后台,發現有2個不同的Queue,綁定了studyExchange
4、訪問地址:http://localhost:8801/sendMessage,發送消息
發現消息消費者(8802|8803)2個節點,都收到同一條消息,並進行了處理
說明:出現消息被同一種服務重復消費的問題
同一消息被同一服務分組消費
1、在消息消費者(8802|8803)2個節點的配置文件中配置組屬性
1 spring.cloud.stream.bindings.input.group = testA
2、重新相關項目
3、查看RabbitMQ的Web后台,發現只有一個studyExchange.testA的Queue,
且studyExchange.testA綁定了studyExchange,里面有2個Chanal
4、訪問地址:http://localhost:8801/sendMessage,發送消息
發現一條消息只會被1個消費者,獲取並消費。
發送多條消息,消息有消費者輪詢的方式消費
Spring Cloud Stream消息持久化
消費者設置分組后
1、關閉消費者服務
2、查看RabbitMQ的Web后台,發現studyExchange.testA的Queue,並沒有被刪除
3、訪問地址:http://localhost:8801/sendMessage,發送消息,
發現消息繼續發送到了studyExchange.testA中
4、啟動消費者服務,發現消費者服務能夠消費到,服務未啟動時提供者發送的消息
說明已到到消息持久化的效果
注:消費者未設置分組時,關閉消費者服務,對應的queue會理解被刪除,那么提供者也就無法方隊列中發送消息了