Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.
SpringCloud 用于构建高度可扩展的基于事件驱动的微服务在消息系统中。该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring习惯用法和最佳实践之上,包括对持久发布/子语义、消费者组和有状态分区的支持。
Spring Cloud Stream消息订阅发送
//消息通道 MessageChannel messageChannel = new DirectChannel(); //订阅消息 // Message subscription ((SubscribableChannel) messageChannel).subscribe(new MessageHandler() { @Override public void handleMessage(Message<? > message) throws MessagingException { System.out.println("receive msg: " + message.getPayload()); } }); // Message sending 消息发送 messageChannel.send(MessageBuilder.withPayload("simple msg").build());
代码中的所有消息类型都是由spring-messaging模块提供的,它屏蔽了消息中间件的底层实现。如果想要选用不同消息中间件,只需要在配置文件里面配置消息中间件信息和更换Binder 依赖。 以RocketMQ 消息队列为例,Binder 只需要引入 RocketMQ Binder 的maven依赖 和 在 application.yml 中配置 spring.cloud.stream.rocketmq 信息。
application.yml 配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
#具体配置的 RocketMQ 的消费者 Binding、生产者 Binding
bindings:
#接受管道信息
input:
destination: topic-product
content-type: application/json
group: distributed-group
#输入管道信息
output:
destination: topic-order
content-type: application/json
group: distributed-group
kafaka:
maven示例
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-stream-binder-rocketmq</artifactId> <version>2.1.0.RELEASE</version> </dependency>
如果想要更换为 Kafka 只需要导入 spring-cloud-stream-binder-kafka 依赖和配置 kafka 的 binder信息。
消息发送和消息生产
以下代码示例,说明创建消息生产者、消息消费者。Spring cloud Stream RocketMQ application.yml bindings配置文件如下:
server:
port: 9200
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
#具体配置的 RocketMQ 的消费者 Binding、生产者 Binding
bindings:
Topic-product:
destination: Topic-product
content-type: application/json
group: distributed-group2
Topic-order:
destination: Topic-order
content-type: application/json
group: distributed-group2
Topic-stock:
destination: Topic-stock
content-type: application/json
group: distributed-group2
生产者消息发送
设置 Binder 自定义
/** * 自定义通道 Binder * */ public interface CustomChannelBinder { /** * 接收产品 Channel 消息 * * @return */ @Input("Topic-product") MessageChannel productInputChannel(); /** * 发送消息到订单 Channel 消息 * * @return */ @Output("Topic-order") MessageChannel sendToOrderChannel(); /** * 发送消息到库存 Channel 消息 * * @return */ @Output("Topic-stock") MessageChannel sendToStockChannel(); }
启用自定义的 Binding 服务
/** * 分布式服务-产品服务 * */ @EnableBinding({CustomChannelBinder.class}) @SpringBootApplication public class ProductApplication { public static void main(String[] args) { SpringApplication.run(ProductApplication.class); } }
发送到指定的@Output("Topic-stock")
import com.github.shaylau.rocketmq.distributed.product.model.Product; import com.github.shaylau.rocketmq.distributed.product.mq.binder.CustomChannelBinder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; /** */ @Service public class ProductProducer { @Autowired private CustomChannelBinder channelBinder; /** * 发送消息到订单 * @param product */ public void sentToStock(Product product) { Message<Product> message = MessageBuilder.withPayload(product).build(); channelBinder.sendToStockChannel().send(message); } }
消费者消息消费
import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Service; @Service public class ProductConsumer { /** * 消费订单消息 * * @param object 订单消息体 */ @StreamListener("Topic-order") public void consumerForOrder(Object object) { System.out.println("收到订单消息:" + object); } }