Spring cloud stream【消息分區】


  在上篇文章中我們給大家介紹了Stream的消息分組,可以實現消息的重復消費的問題,但在某些場景下分組還不能滿足我們的需求,比如,同時有多條同一個用戶的數據,發送過來,我們需要根據用戶統計,但是消息被分散到了不同的集群節點上了,這時我們就可以考慮消息分區了。
  當生產者將消息數據發送給多個消費者實例時,保證同一消息數據始終是由同一個消費者實例接收和處理。

Stream 消息分區

創建項目

  將我們上篇文章中的分組的三個項目,拷貝一份修改名稱及服務名稱

在這里插入圖片描述

沒有分區的情況下演示

發送多條消息查看效果

@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
	
	@Autowired
	private ISendeService sendService;

	@Test
	public void testStream(){
		Product p = new Product(999, "stream test ...999");
		// 將需要發送的消息封裝為Message對象
		Message message = MessageBuilder
								.withPayload(p)
								.build();
		for (int i = 0; i < 10; i++) {
			// 發送多條消息到隊列中
			sendService.send().send(message );
		}
		
	}
}

10條消息被隨機的分散到了兩個消費者中:

在這里插入圖片描述

在這里插入圖片描述
我們可以看到A中6條消息,B中4條消息,而且這是隨機的,下次執行的結果肯定不一樣。

分區

1.發送者中配置

spring.application.name=stream-partition-sender
server.port=9060
#設置服務注冊中心地址,指向另一個注冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 鏈接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 對應 MQ 是 exchange  outputProduct自定義的信息
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct

#通過該參數指定了分區鍵的表達式規則
spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload
#指定了消息分區的數量。 
spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2

2.消費者中配置

服務A

spring.application.name=stream-partition-receiverA
server.port=9070
#設置服務注冊中心地址,指向另一個注冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 鏈接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 對應 MQ 是 exchange  和消息發送者的 交換器是同一個
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具體分組 對應 MQ 是 隊列名稱 並且持久化隊列  inputProduct 自定義
spring.cloud.stream.bindings.inputProduct.group=groupProduct999

#開啟消費者分區功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了當前消費者的總實例數量
spring.cloud.stream.instanceCount=2 
#設置當前實例的索引號,從 0 開始
spring.cloud.stream.instanceIndex=0

服務B

spring.application.name=stream-partition-receiverB
server.port=9071
#設置服務注冊中心地址,指向另一個注冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 鏈接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 對應 MQ 是 exchange  和消息發送者的 交換器是同一個
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具體分組 對應 MQ 是 隊列名稱 並且持久化隊列  inputProduct 自定義
spring.cloud.stream.bindings.inputProduct.group=groupProduct999

#開啟消費者分區功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了當前消費者的總實例數量
spring.cloud.stream.instanceCount=2 
#設置當前實例的索引號,從 1 開始
spring.cloud.stream.instanceIndex=1

啟動服務測試

在這里插入圖片描述

10個消息都被消費者A給消費了,說明到達了我們需要的效果。
案例源碼:https://github.com/q279583842q/springcloud-e-book


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM