上篇文章我們簡單的介紹了stream的使用,發現使用還是蠻方便的,但是在上個案例中,如果有多個消息接收者,那么消息生產者發送的消息會被多個消費者都接收到,這種情況在某些實際場景下是有很大問題的,比如在如下場景中,訂單系統我們做集群部署,都會從RabbitMQ中獲取訂單信息,那如果一個訂單同時被兩個服務獲取到,那么就會造成數據錯誤,我們得避免這種情況。這時我們就可以使用Stream中的消息分組來解決了!
Stream消息分組
消息分組的作用我們已經介紹了。注意在Stream中處於同一個group中的多個消費者是競爭關系。就能夠保證消息只會被其中一個應用消費一次。不同的組是可以消費的,同一個組內會發生競爭關系,只有其中一個可以消費。通過案例我們來演示看看,這里我們會創建3個服務,分別如下
服務 | 介紹 |
---|---|
stream-group-sender | 消息發送者服務 |
stream-group-receiverA | 消息接收者服務 |
stream-group-receiverB | 消息接收者服務 |
1.創建stream-group-sender 服務
1.1 創建項目
1.2 pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.13.RELEASE</version>
</parent>
<groupId>com.bobo</groupId>
<artifactId>stream-group-sender</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
1.3 配置文件
配置中的“outputProduct”可以自定義,但是我們等會在消息接口中要使用到。
spring.application.name=stream-sender
server.port=9060
#設置服務注冊中心地址,指向另一個注冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 鏈接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
# 對應 MQ 是 exchange outputProduct自定義的信息
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
1.4 發送接口
/**
* 發送消息的接口
* @author dengp
*
*/
public interface ISendeService {
String OUTPUT="outputProduct";
/**
* 指定輸出的交換器名稱
* @return
*/
@Output(OUTPUT)
SubscribableChannel send();
}
1.5 啟動類
@SpringBootApplication
@EnableEurekaClient
// 綁定我們剛剛創建的發送消息的接口類型
@EnableBinding(value={ISendeService.class})
public class StreamSenderStart {
public static void main(String[] args) {
SpringApplication.run(StreamSenderStart.class, args);
}
}
1.6 創建pojo
在本案例中我們發送的消息是自定義的對象
package com.bobo.stream.pojo;
import java.io.Serializable;
public class Product implements Serializable{
private Integer id;
private String name;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Product(Integer id, String name) {
super();
this.id = id;
this.name = name;
}
public Product() {
super();
}
@Override
public String toString() {
return "Product [id=" + id + ", name=" + name + "]";
}
}
2.創建stream-group-receiverA服務
2.1 創建項目
2.2 pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.13.RELEASE</version>
</parent>
<groupId>com.bobo</groupId>
<artifactId>stream-group-receiverA</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.3 配置文件
配置文件中配置分組“groupProduct”
spring.application.name=stream-group-receiverA
server.port=9070
#設置服務注冊中心地址,指向另一個注冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 鏈接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
# 對應 MQ 是 exchange 和消息發送者的 交換器是同一個
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具體分組 對應 MQ 是 隊列名稱 並且持久化隊列 inputProduct 自定義
spring.cloud.stream.bindings.inputProduct.group=groupProduct
2.4 接收消息的接口
/**
* 接收消息的接口
* @author dengp
*
*/
public interface IReceiverService {
String INPUT = "inputProduct";
/**
* 指定接收的交換器名稱
* @return
*/
@Input(INPUT)
SubscribableChannel receiver();
}
2.5 消息的具體處理類
/**
* 具體接收消息的處理類
* @author dengp
*
*/
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {
@StreamListener(IReceiverService.INPUT)
public void onReceiver(Product p){
System.out.println("消費者A:"+p);
}
}
注意同樣需要添加Product類
package com.bobo.stream.pojo;
import java.io.Serializable;
public class Product implements Serializable{
private Integer id;
private String name;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Product(Integer id, String name) {
super();
this.id = id;
this.name = name;
}
public Product() {
super();
}
@Override
public String toString() {
return "Product [id=" + id + ", name=" + name + "]";
}
}
2.6 啟動類
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(value={IReceiverService.class})
public class StreamReceiverStart {
public static void main(String[] args) {
SpringApplication.run(StreamReceiverStart.class, args);
}
}
3.創建stream-group-receiverB服務
此服務和stream-group-receiverA一樣,復制一份只需修改application.properties中的服務名稱,端口。我們先將group設置不一樣,我們測試來看看
spring.application.name=stream-group-receiverB
server.port=9071
#設置服務注冊中心地址,指向另一個注冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 鏈接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
# 對應 MQ 是 exchange 和消息發送者的 交換器是同一個
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具體分組 對應 MQ 是 隊列名稱 並且持久化隊列 inputProduct 自定義
spring.cloud.stream.bindings.inputProduct.group=groupProduct1
4.測試代碼
@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
@Autowired
private ISendeService sendService;
@Test
public void testStream(){
Product p = new Product(666, "stream test ...");
// 將需要發送的消息封裝為Message對象
Message message = MessageBuilder
.withPayload(p)
.build();
sendService.send().send(message );
}
}
在stream-group-receiverA和stream-group-receiverB服務的group不一致的情況下
改為同組的情況下
啟動服務,發送數據
通過結果可以看到只有其中一個受到消息。避免了消息重復消費。
案例代碼github:https://github.com/q279583842q/springcloud-e-book