背景
消息中間件有多種,rabbitmq,rocketmq,activemq,kafka等。
不同的消息中間件具體細節不一樣。那么有沒有一種新的技術誕生,讓我們不再關注具體MQ細節,我們只需要用一種適配綁定的方式,自動給我們在各種MQ內切換。
一句話:屏蔽底層消息中間件的差異,降低切換成本,統一消息的編程模型。Spring Cloud Stream 因此誕生。
官方定義 Spring Cloud Stream是一個構建消息驅動微服務的框架。
應用程序使用inputs或者outputs來與springcloud stream中binder交互。
通過我們配置來bingding(綁定),而stream的binder對象負責與消息中間件交互。
所以,我們只需要搞清楚如何與springcloud stream交互就可以方便使用消息驅動方式。
SpringCloud Stream為一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用發布-訂閱、消費組、分區三個核心概念。
目前僅支持RabbitMQ和Kafka。
Stream處理架構:
通過定義綁定器Binder作為中間層,實現了應用程序與消息中間件之間的隔離。
stream標准流程套路:
Binder:很方便的連接中間件,屏蔽差異。
channel:通道,是隊列Queue的一種抽象,在消息通訊系統中就是實現存儲和轉發的媒介,通過channel對隊列進行配置。
source和sink:簡單理解為消息的輸入輸出。
編碼API和注解:
Stream消息生產者
pom依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
yml配置
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
bindings:
output:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.10.132
port: 5672
username: guest
password: guest
eureka:
client:
#是否將自己注冊到Eureka Server 默認為true
register-with-eureka: true
#是否從EurekaServer抓取已有的注冊信息,默認為true,單節點無所謂,集群必須設置true才能配合ribbon做負載均衡
fetch-registry: true
service-url:
#設置eureka server交互的地址查詢服務和注冊服務都需要依賴這個地址
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
instance-id: send-8801.com
prefer-ip-address: true
發送消息的接口:
public interface IMessageProvider {
public String send();
}
發送消息的實現:
@EnableBinding(Source.class)//定義消息的推送管道
public class MessageProvider implements IMessageProvider {
@Autowired
private MessageChannel output;
@Override
public String send() {
String serial = IdUtil.simpleUUID();
System.out.println(serial+"============");
output.send(MessageBuilder.withPayload(serial).build());
return serial;
}
}
controller
@RestController
public class SendController {
@Autowired
private IMessageProvider messageProvider;
@GetMapping("sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}
調用接口,觀察效果
Stream消息消費者
配置yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.10.132
port: 5672
username: guest
password: guest
eureka:
client:
#是否將自己注冊到Eureka Server 默認為true
register-with-eureka: true
#是否從EurekaServer抓取已有的注冊信息,默認為true,單節點無所謂,集群必須設置true才能配合ribbon做負載均衡
fetch-registry: true
service-url:
#設置eureka server交互的地址查詢服務和注冊服務都需要依賴這個地址
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
instance-id: receive-8802.com
prefer-ip-address: true
接收消息:
@EnableBinding(Sink.class)
@RestController
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(value = Sink.INPUT)
public void input(Message<String> message){
System.out.println("消息:"+message.getPayload()+"serverPort:"+serverPort);
}
}
stream重復消費
下默認配置中,多個消費者存在,會存在重復消費問題
原因:默認分組group是不同的,組流水號不一樣,被認為是不同組,可以消費,所以要自定義配置分組。
yml配置:
cloud:
stream:
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
group: wen.jie
通過配置后,兩個消費者被分配到一組,就不存在重復消費的問題。