目錄
1 環境
- 系統環境:win10
- 編輯器:IDEA
- springcloud:H版
- 安裝rabbitmq文章參考
2 簡介
Spring Cloud Stream是一個用於構建消息驅動的微服務應用的框架,其提供的一系列抽象屏蔽了不同類型消息中間件使用上的差異,同時也大大簡化了Spring在整合消息中間件時的使用復雜度。
Spring Cloud Stream 提供了Binder(負責與消息中間件進行交互)
3 初見
1 創建項目 添加web rabbitmq stream依賴
2 rabbitmq配置
# 其他參數默認配置
spring.rabbitmq.host=你的host
3 消息接收器
// 該注解表示綁定Sink消息通道
@EnableBinding(Sink.class)
public class MsgReceiver {
private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);
// 自帶 消費者
@StreamListener(Sink.INPUT)
public void receive(Object payload){
logger.info("received: " + payload);
}
}
4 在rabbitmq中發送消息
5 查看結果
4 自定義消息通道
1 自定義接口
public interface MyChannel {
String INPUT = "test-input";
String OUTPUT = "test-output";
// 收
@Input(INPUT)
SubscribableChannel input();
// 發
@Output(OUTPUT)
MessageChannel output();
}
2 自定義接收器
// 綁定自定義消息通道
@EnableBinding(MyChannel.class)
public class MsgReceiver1 {
private static final Logger logger = LoggerFactory.getLogger(MsgReceiver1.class);
// 收
@StreamListener(MyChannel.INPUT)
public void receive(Object payload){
logger.info("received1: " + payload + ":" + new Date());
}
}
3 controller進行測試
package com.sundown.stream.controller;
import com.sundown.stream.bean.ChatMessage;
import com.sundown.stream.msg.MyChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
@RestController
public class HelloController {
@Autowired
MyChannel myChannel;
@GetMapping("/hello")
public void hello(){
String message = "welcome spring cloud stream";
myChannel.output().send(MessageBuilder.withPayload(message).build());
}
}
4 消息輸入輸出(通道對接)
spring.cloud.stream.bindings.test-input.destination=test-topic
spring.cloud.stream.bindings.test-output.destination=test-topic
5 啟動、訪問
5 消息分組
- 消息分組(肥水不留外人田 你可能不知道流向哪家田 但是確實是自己人)
1 打包 訪問(未使用消息分組)
- 啟動
java -jar stream-0.0.1-SNAPSHOT.jar
和
java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081
運行訪問http://localhost:8080/hello
- 結果(多次消費)
- 現在我不想一條消息被多次消費(假設消費者是一個集群 --> 多個人做同一件事 題外話:分布式 --> 一件事分給多個人做) 是否有什么辦法呢
消息分組幫我們解決(指定輸入 輸出 有么有負載均衡的味道)
2 消息分組配置
spring.cloud.stream.bindings.test-input.destination=test-topic
spring.cloud.stream.bindings.test-output.destination=test-topic
spring.cloud.stream.bindings.test-input.group=gg
spring.cloud.stream.bindings.test-output.group=gg
- 為了驗證是否能成功 重新打包運行 和上面一樣 訪問接口
- 清空2個控制台的信息 再次訪問接口
6 消息分區
- 為一些具有相同特征的消息設置每次都被同一個消費實例進行消費。
1 消息分區配置
- properties配置
spring.cloud.stream.bindings.test-input.destination=test-topic
spring.cloud.stream.bindings.test-output.destination=test-topic
spring.cloud.stream.bindings.test-input.group=gg
spring.cloud.stream.bindings.test-output.group=gg
# 開啟消費分區(消費者上配置)
spring.cloud.stream.bindings.test-input.consumer.partitioned=true
# 消費者實例個數(消費者上配置)
spring.cloud.stream.instance-count=2
# 當前實例下標(消費者上配置)
spring.cloud.stream.instance-index=0
2 controller配置
@RestController
public class HelloController {
@Autowired
MyChannel myChannel;
@GetMapping("/hello")
public void hello(){
String message = "welcome spring cloud stream";
// 先寫死
int whichPart = 1;
System.out.println("發送消息:" + message + ",發往分區:" + whichPart);
myChannel.output().send(MessageBuilder.withPayload(message).setHeader("whichPart", whichPart).build());
}
}
3 訪問
- 打包運行
java -jar stream-0.0.1-SNAPSHOT.jar --spring.cloud.stream.instance-index=0
和java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081 --spring.cloud.stream.instance-index=0
(別忘了先關閉啟動類 不然打包會報錯)
-訪問http://localhost:8080/hello
4 若是隨機訪問呢
@GetMapping("/hello")
public void hello(){
String message = "welcome spring cloud stream";
int whichPart = new Random().nextInt(2);
System.out.println("發送消息:" + message + ",發往分區:" + whichPart);
myChannel.output().send(MessageBuilder.withPayload(message).setHeader("whichPart", whichPart).build());
}
- 和上面一樣打包訪問
7 定時器
雖然定時任務可以用cron表達式 但是對於一些特殊的定時任務 可以使用stream+rabbitmq更合適 比如幾分鍾后執行
rabbitmq插件安裝
1 配置
- properties
spring.rabbitmq.host=xxx
spring.cloud.stream.bindings.test-input.destination=topic
spring.cloud.stream.bindings.test-output.destination=topic
spring.cloud.stream.rabbit.bindings.test-input.consumer.delayed-exchange=true
spring.cloud.stream.rabbit.bindings.test-output.producer.delayed-exchange=true
#spring.cloud.stream.bindings.test-input.destination=test-topic
#spring.cloud.stream.bindings.test-output.destination=test-topic
#
#spring.cloud.stream.bindings.test-input.group=gg
#spring.cloud.stream.bindings.test-output.group=gg
#
## 開啟消費分區(消費者上配置)
#spring.cloud.stream.bindings.test-input.consumer.partitioned=true
## 消費者實例個數(消費者上配置)
#spring.cloud.stream.instance-count=2
## 當前實例下標(消費者上配置)
#spring.cloud.stream.instance-index=0
#
## 生產者配置
#spring.cloud.stream.bindings.test-output.producer.partition-key-expression=headers['whichPart']
## 消費節點數量
#spring.cloud.stream.bindings.test-output.producer.partition-count=2
- 自定義通道
// 綁定自定義消息通道
@EnableBinding(MyChannel.class)
public class MsgReceiver1 {
private static final Logger logger = LoggerFactory.getLogger(MsgReceiver1.class);
// 收
@StreamListener(MyChannel.INPUT)
public void receive(Object payload){
// 添加日期 一會好對比
logger.info("received1: " + payload + ":" + new Date());
}
}
- controller
@RestController
public class HelloController {
private static final Logger logger = LoggerFactory.getLogger(HelloController.class);
@Autowired
MyChannel myChannel;
@GetMapping("/delay")
public void delay(){
String message = "welcome spring cloud stream";
logger.info("send msg:" + new Date());
// x-delay --> 延遲3s
myChannel.output().send(MessageBuilder.withPayload(message).setHeader("x-delay", 3000).build());
}
}
2 啟動 訪問
- 打開rabbitmq查看
- 查看idea控制台
8 小結
stream自帶的與自定義(添加destination=xxx)之間的類似和區別
解決重復消費 分組(group)
消息分組單個實例訪問(開啟消費分區 實例個數 實例下標 生產者配置 消費節點數)
定時器 rabbitmq相關的插件安裝運行 后端代碼實現(配置delayed-exchange和destination以及controller 發送時添加setHeader("x-delay", 3000) 3s延時)