說明
對Spring Boot 和 Spring Integration的整合,通過Spring Cloud Stream能夠簡化消息中間件使用的復雜難度!讓業務人員更多的精力能夠花在業務層面
簡單例子
consumer
1.創建一個一個項目名為spring-cloud-stream-consumer
2.引入pom依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
3.yml配置
spring:
application:
name: streamConsumer
rabbitmq: #更多mq配置看書331頁
username: liqiang
password: liqiang
host: localhost
port: 5672
server:
port: 8081
5.創建消息監聽類
//實現對定義了多個@Input和@Output的接口實現對通道的綁定 Sink定義了@Input 我們自己處理時是自己定義接口 @EnableBinding(Sink.class) public class SkinReceiverService { private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class); //對input的消息監聽處理 @StreamListener(Sink.INPUT) public void receiver(Object message){ logger.info(message.toString()); } }
6.啟動rabbitmq
7.啟動項目
8.通過mq管理頁面發送消息
控制台打印 表示成功接收到消息
核心概念
綁定器
應用程序與消息中間件的抽象層。應用程序中間件的解耦。應用程序不需要考慮用的什么類型的消息中間件。當我們需要更換消息中間件 只需要替換綁定器
發布訂閱
spring cloud stream 完全遵循發布訂閱模式 當一條消息被發布到消息中間件后 將會以topic主題模式進行廣播,消費者對訂閱的topic主題進行相應的邏輯處理。topic是spring cloud stream的一個抽象概念,不同消息中間件topic概念可能不同 rabbitMq對應exchage
如rabbitMQ的topic
發布訂閱模式能夠有效避免點對點的耦合 當一種消息要增加一種處理方式時只需要增加一個消息訂閱者
消費組
一般我們的消費組都會集群部署 但是我們再集群部署的情況下 會形成多個訂閱者 導致消息被消費多次, 消費組則是解決一個消息只能被一個實例消費者消費
消費分區
指定統一特征的消息被指定服務實例消費 spring cloud stream消費分區提供通用的抽象實現 使不支持分區的中間件也能支持消費分區
自定義輸入和輸出
定義輸入
Sink 是spring cloud stream 的默認實現 我們可以通過查看源碼
public interface Sink { String INPUT = "input"; @Input("input") SubscribableChannel input(); }
通過@Input注冊參數為通道名字 同時需要返回SubscribableChannel
我們通過參考Sink定義一個輸入通道 比如處理訂單保存的通道
1.定義第一個通道
public interface OrderMQInputChannel { String saveOrderChannelName="saveOrder";//定義通道的名字 @Input(saveOrderChannelName)//定義為輸入通道 public SubscribableChannel saveOrder(); }
2.綁定通道並監聽
//通過綁定器 對OrderMQInputChannel通道進行綁定 @EnableBinding(OrderMQInputChannel.class) public class OrderMQReceiverService { private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class); //對OrderMQInputChannel.saveOrderChannelName的消息監聽處理 @StreamListener(OrderMQInputChannel.saveOrderChannelName) public void receiver(Object message){ logger.info(message.toString()); } }
定義輸出
1.創建一個測試消費提供者的項目
2.引入pom依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies>
2.yml配置文件配置
spring:
application:
name: streamProvider
rabbitmq: #更多mq配置看書331頁
username: liqiang
password: liqiang
host: localhost
port: 5672
server:
port: 8082
3.定義一個輸出通道
public interface OrderMQOutputChannel { String saveOrderChannelName="saveOrder"; @Output(saveOrderChannelName)//定義輸出管道的名字 MessageChannel saveOrder(); }
4.綁定通道
@EnableBinding(OrderMQOutputChannel.class) //綁定通道OrderMQOutputChannel public class OrderChannelBindConfig { }
5.添加測試contorller
@Controller public class TestContorller { @Autowired OrderMQOutputChannel orderMQOutputChannel; @RequestMapping("/saveOrder") @ResponseBody public boolean saveOrder(){ //發送一條保存訂單的命令 return orderMQOutputChannel.saveOrder().send(MessageBuilder.withPayload("fff").build()); } }
或者
@Controller public class TestContorller { //直接注入對應通道是的實例 @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName) MessageChannel messageChannel; @RequestMapping("/saveOrder") @ResponseBody public boolean saveOrder(){ //發送一條保存訂單的命令 return messageChannel.send(MessageBuilder.withPayload("fff").build()); } }
6.訪問
http://127.0.0.1:8082/saveOrder
7.consumer打印 表示消息被消費
spring intergration原生支持
spring cloud stream 是通過spring boot和spring intergreation的整合 所以也可以使用原生的用法實現相同的功能
provider
@EnableBinding(OrderMQOutputChannel.class) //綁定通道OrderMQOutputChannel public class OriginalOrderMQOutPutChannelService { //定義2秒發送一次消息 @Bean @InboundChannelAdapter(value = OrderMQOutputChannel.saveOrderChannelName, poller = @Poller(fixedDelay = "2000")) public MessageSource<Date> timerMessageSource() { return () -> new GenericMessage<>(new Date()); } }
consumer
//通過綁定器 對OrderMQInputChannel通道進行綁定 @EnableBinding(OrderMQInputChannel.class) public class OriginalOrderMQReceiverService { private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class); //對OrderMQInputChannel.saveOrderChannelName的消息監聽處理 @ServiceActivator(inputChannel = OrderMQInputChannel.saveOrderChannelName) public void receiver(Object message) { logger.info(message.toString()); } //定義消息轉換器 轉換saveOrderChannelName 通道的的消息 @Transformer(inputChannel = OrderMQInputChannel.saveOrderChannelName, outputChannel = OrderMQInputChannel.saveOrderChannelName) public Object transform(Date message) { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message); } }
啟動之后conusmer2秒則會受到一條消息 更多用法查看spring intergration文檔
消息轉換
通過上面我們可以看到原生通過@Transformer實現消息轉換 spring cloud stream 只需要定義消息通道的消息類型
spring.cloud.stream.bindings.[inputname].content-type=application/json
spring:
application:
name: streamConsumer
rabbitmq: #更多mq配置看書331頁
username: liqiang
password: liqiang
host: localhost
port: 5672
cloud:
stream:
bindings:
saveOrder:
content-type: application/json
server:
port: 8081
消費者
//通過綁定器 對OrderMQInputChannel通道進行綁定 @EnableBinding(OrderMQInputChannel.class) public class OrderMQReceiverService { private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class); //對OrderMQInputChannel.saveOrderChannelName的消息監聽處理 @StreamListener(OrderMQInputChannel.saveOrderChannelName) public void receiver(Order order){ logger.info(order.getId()+"-"+order.getOrderCode()); } }
消息提供者
@Controller public class TestContorller { //直接注入對應通道是的實例 @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName) MessageChannel messageChannel; @RequestMapping("/saveOrder") @ResponseBody public boolean saveOrder(){ com.liqiang.entity.Order order=new com.liqiang.entity.Order(); order.setId(1L); order.setOrderCode("201901020001"); //發送一條保存訂單的命令 return messageChannel.send(MessageBuilder.withPayload(order).build()); } }
消息反饋
用於將消息交給別的應用處理 處理后再回傳 或者異步請求 接收處理結果
provider
public interface OrderMQOutputChannel { String saveOrderChannelName="saveOrder"; String saveOrderCallbackChannelName="saveOrderCallback";//定義回調通道的名字 @Output(saveOrderChannelName)//定義輸出管道的名字 MessageChannel saveOrder(); @Input(saveOrderCallbackChannelName)//定義為輸入通道 public SubscribableChannel saveOrderCallback(); }
@EnableBinding(OrderMQOutputChannel.class) //綁定通道OrderMQOutputChannel public class OrderChannelBindConfig { private static Logger logger = LoggerFactory.getLogger(OrderMQOutputChannel.class); //對OrderMQInputChannel.saveOrderChannelName的消息監聽處理 @StreamListener(OrderMQOutputChannel.saveOrderCallbackChannelName) public void receiver(boolean boo){ logger.info(String.valueOf(boo)); } }
consumer
public interface OrderMQInputChannel { String saveOrderChannelName="saveOrder";//定義通道的名字 String saveOrderCallbackChannelName="saveOrderCallback";//定義回調通道的名字 @Input(saveOrderChannelName)//定義為輸入通道 public SubscribableChannel saveOrder(); }
//通過綁定器 對OrderMQInputChannel通道進行綁定 @EnableBinding(OrderMQInputChannel.class) public class OrderMQReceiverService { private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class); //對OrderMQInputChannel.saveOrderChannelName的消息監聽處理 @StreamListener(OrderMQInputChannel.saveOrderChannelName) @SendTo(OrderMQInputChannel.saveOrderCallbackChannelName)//反饋的通道名字 public boolean receiver(Order order){ logger.info(order.getId()+"-"+order.getOrderCode()); return true; } }
消息分組
多實例情況下 只需要指定spring.cloud.stream.bindings.[channelname].group=gorupname 當同一組實例對同一個主題的消息只能會有一個實例消費
1.測試 創建2個配置文件 分別為
application-peer1.yml
spring: application: name: streamConsumer rabbitmq: #更多mq配置看書331頁 username: liqiang password: liqiang host: localhost port: 5672 cloud: stream: bindings: saveOrder: group: groupA content-type: application/json server: port: 8081
application-peer2.yml
spring:
application:
name: streamConsumer
rabbitmq: #更多mq配置看書331頁
username: liqiang
password: liqiang
host: localhost
port: 5672
cloud:
stream:
bindings:
saveOrder:
group: groupA
content-type: application/json
server:
port: 8083
通過啟動2個消費者
java -jar /Users/liqiang/Desktop/java開發環境/javadom/spring-cloud-parent/spring-cloud-stream-consumer/target/spring-cloud-stream-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=peer1
java -jar /Users/liqiang/Desktop/java開發環境/javadom/spring-cloud-parent/spring-cloud-stream-consumer/target/spring-cloud-stream-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=peer2
只要一個實例消費了
消費分區
再某些場景 我需要指定某一類消息只能被哪些實例消費
消費者
application-peer1.yml
spring:
application:
name: streamConsumer
rabbitmq: #更多mq配置看書331頁
username: liqiang
password: liqiang
host: localhost
port: 5672
cloud:
stream:
instanceCount: 2 #跟分區一起使用 有多少實例
instanceIndex: 0 #分區當前實例編號 從0開始
bindings:
saveOrder:
group: streamConsumer
content-type: application/json
consumer:
partitioned: true #開啟消息分區的功能
server:
port: 8081
application-peer2.yml
spring:
application:
name: streamConsumer
rabbitmq: #更多mq配置看書331頁
username: liqiang
password: liqiang
host: localhost
port: 5672
cloud:
stream:
instanceCount: 2 #跟分區一起使用 有多少實例
instanceIndex: 1 #當前實例編號 從0開始
bindings:
saveOrder:
group: streamConsumer
content-type: application/json
consumer:
partitioned: true #開啟消息分區的功能
server:
port: 8083
生產者
spring:
application:
name: streamProvider
rabbitmq: #更多mq配置看書331頁
username: liqiang
password: liqiang
host: localhost
port: 5672
cloud:
stream:
bindings:
saveOrder:
producer:
partitionKeyExpression: '0' #表示只有實例索引為0的才能收到消息 支持SpEL表達式
partitionCount: 2
server:
port: 8082
當啟動2個消費者 和生產者 當前生產者 生產的消息只能被實例編號為0的消費
這里限制死了當前實例生產的消息被某個實例消費。如果我們需要指定 當前生產者生產的某一類服務被指定實例消費呢可以通過SpEL表達式設置
生產者yml
spring:
application:
name: streamProvider
rabbitmq: #更多mq配置看書331頁
username: liqiang
password: liqiang
host: localhost
port: 5672
cloud:
stream:
bindings:
saveOrder:
producer:
partitionKeyExpression: headers['partitionKey'] #SpEL表達式 通過讀取消息hearder的partitionKey屬性動態指定
partitionCount: 2 #消息分區數量
server:
port: 8082
消息生產通過header動態指定
package com.liqiang.springcloudstreamprovider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.annotation.Order; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class TestContorller { //直接注入對應通道是的實例 @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName) MessageChannel messageChannel; private static int index=0; @RequestMapping("/saveOrder") @ResponseBody public boolean saveOrder(){ com.liqiang.entity.Order order=new com.liqiang.entity.Order(); order.setId(1L); order.setOrderCode("201901020001"); //發送一條保存訂單的命令 return messageChannel.send(MessageBuilder.withPayload(order).setHeader("partitionKey",(index++)%2==0?0:1).build()); } }