Spring Cloud Stream 是消息中間件組件,它集成了 kafka 和 rabbitmq 。本篇文章以 Rabbit MQ 為消息中間件系統為基礎,介紹 Spring Cloud Stream 的使用。如果你沒有用過消息中間件,可以到 RabbitMQ 的官網看一下,或者參考這個 http://rabbitmq.mr-ping.com/。理解了消息中間件的設計,才能更好的使用它。
消息中間的幾大應用場景
1、異步處理
比如用戶在電商網站下單,下單完成后會給用戶推送短信或郵件,發短信和郵件的過程就可以異步完成。因為下單付款是核心業務,發郵件和短信並不屬於核心功能,並且可能耗時較長,所以針對這種業務場景可以選擇先放到消息隊列中,有其他服務來異步處理。
2、應用解耦:
假設公司有幾個不同的系統,各系統在某些業務有聯動關系,比如 A 系統完成了某些操作,需要觸發 B 系統及 C 系統。如果 A 系統完成操作,主動調用 B 系統的接口或 C 系統的接口,可以完成功能,但是各個系統之間就產生了耦合。用消息中間件就可以完成解耦,當 A 系統完成操作將數據放進消息隊列,B 和 C 系統去訂閱消息就可以了。這樣各系統只要約定好消息的格式就好了。
3、流量削峰
比如秒殺活動,一下子進來好多請求,有的服務可能承受不住瞬時高並發而崩潰,所以針對這種瞬時高並發的場景,在中間加一層消息隊列,把請求先入隊列,然后再把隊列中的請求平滑的推送給服務,或者讓服務去隊列拉取。
4、日志處理
kafka 最開始就是專門為了處理日志產生的。
當碰到上面的幾種情況的時候,就要考慮用消息隊列了。如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同樣也是在使用 Spring Cloud ,那可以考慮下用 Spring Cloud Stream。
使用 Spring Cloud Stream && RabbitMQ
介紹下面的例子之前,假定你已經對 RabbitMQ 有一定的了解。
首先來認識一下 Spring Cloud Stream 中的幾個重要概念。
Destination Binders:目標綁定器,目標指的是 kafka 還是 RabbitMQ,綁定器就是封裝了目標中間件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。
Destination Bindings:外部消息傳遞系統和應用程序之間的橋梁,提供消息的“生產者”和“消費者”(由目標綁定器創建)
Message:一種規范化的數據結構,生產者和消費者基於這個數據結構通過外部消息系統與目標綁定器和其他應用程序通信。
可能看完了上面的三個概念仍然是一頭霧水,沒有關系,實踐過程中自然就明白了。
先來一個最簡單的例子
因為用到的是 rabbitmq,所以在本地搭好 rabbitmq 環境,然后裝好 rabbitmq-management 插件,這樣就可以訪問 web UI 界面了,默認是 15672 端口。
1、引用對應 rabbitmq 的 stream 包
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2、在 application.yml 中增加配置
spring:
profiles: stream-rabbit-customer-group1
cloud:
stream:
bindings:
input:
destination: default.messages
binder: local_rabbit
output:
destination: default.messages
binder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 32775
username: guest
password: guest
server:
port: 8201
理解配置文件很重要,基本上理解清楚了配置,也就明白 spring cloud stream 是怎么回事了。
spring.cloud.stream.binders
,上面提到了 stream 的 3 個重要概念的第一個 「Destination binders」。上面的配置文件中就配置了一個 binder,命名為 local_rabbit,指定 type 為 rabbit ,表示使用的是 rabbitmq 消息中間件,如果用的是 kafka ,則 type 設置為 kafka。environment 就是設置使用的消息中間件的配置信息,包括 host、port、用戶名、密碼等。可以設置多了個 binder,適配不同的場景。
spring.cloud.stream.bindings
,對應上面提到到 「Destination Bindings」。這里面可以配置多個 input 或者 output,分別表示消息的接收通道和發送通道,對應到 rabbitmq 上就是不同的 exchange。這個配置文件里定義了兩個input 、兩個output,名稱分別為 input、log_input、output、log_output。這個名稱不是亂起的,在我們的程序代碼中會用到,用來標示某個方法接收哪個 exchange 或者發送到哪個 exchange 。
每個通道下的 destination 屬性指 exchange 的名稱,binder 指定在 binders 里設置的 binder,上面配置中指定了 local_rabbit 。
可以看到 input、output 對應的 destination 是相同的,log_input、log_output 對應的 destination 也相同, 也就是對應相同的 exchange。一個表示消息來源,一個表示消息去向。
另外還可以設置 group 。因為服務很可能不止一個實例,如果啟動多個實例,那么沒必要每個實例都消費同一個消息,只要把功能相同的實例的 group 設置為同一個,那么就會只有一個實例來消費消息,避免重復消費的情況。如果設置了 group,那么 group 名稱就會成為 queue 的名稱,如果沒有設置 group ,那么 queue 就會根據 destination + 隨機字符串的方式命名。
3、接下來做一個最簡單的例子,來演示如何接收消息。
首先來介紹一下 stream 內置的簡單消息通道(消息通道也就是指消息的來源和去向)接口定義,一個 Source 和 一個 Sink 。
Source.java
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
消息發送通道定義,定義了一個 MessageChannel 類型的 output() 方法,用 @Output
注解標示,並指定了 binding 的名稱為 output。
Sink.java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
消息接收通道定義,定義了一個 SubscribableChannel 類型的 input() 方法,表示訂閱一個消息的方法,並用 @Input
注解標識,並且指定了 binging 的名稱為 input 。
創建一個簡單的消息接收方法:
@SpringBootApplication
@EnableBinding(value = {Processor.class})
@Slf4j
public class DefaultApplication {
public static void main(String[] args) {
SpringApplication.run(DefaultApplication.class, args);
}
}
在項目啟動類上加上注解 @EnableBinding(value = {Processor.class})
,表明啟用 stream ,並指定定義的 Channel 定義接口類。
然后,創建一個 service 服務類,用來訂閱消息,並對消息進行處理。
@Slf4j
@Component
public class DefaultMessageListener {
@StreamListener(Processor.INPUT)
public void processMyMessage(String message) {
log.info("接收到消息:" + message);
}
}
在方法 processMyMessage()
上使用 @StreamListener
注解,表示對消息進行訂閱監控,指定 binding 的名稱,其中 Processor.INPUT 就是 Sink 的 input ,也就是字符串 input
,對應的上面的配置文件,就是 spring.cloud.stream.bindings.input。
啟動 DefaultApplication ,可以在 rabbitmq 管理控制台的 exchanges 中看到增加的這幾個 bindings 。
可以看到 exchange 的名稱對應的就是 bindings 的兩個 input 和 兩個 output 的 destination 的值。
用 rabbitmq UI 控制台發送消息測試
點擊上圖的 default.input.messages 進入 exchange 詳請頁面,在 publish message 部分填寫上 Payload ,然后點擊 Publish message 按鈕。
之后回到 DefaultApplication 的輸出控制台,會看到消息已經被接收。
模擬一個日志處理
接下來模擬生產者和消費者處理消息的過程,模擬一個日志處理的過程。
- 原始日志發送到 kite.log.messages exchange
- 接收器在 kite.log.messages exchange 接收原始日志,經過處理格式化,發送到 kite.log.format.messages exchange
- 接收器在 kite.log.format.messages exchange 接收格式化后的日志
1、自定義消息通道接口,上面介紹了 stream 自帶的 Sink 和 Source,也僅僅能做個演示,真正的業務中還是需要自己定義更加靈活的接口。
@Component
public interface MyProcessor {
String MESSAGE_INPUT = "log_input";
String MESSAGE_OUTPUT = "log_output";
String LOG_FORMAT_INPUT = "log_format_input";
String LOG_FORMAT_OUTPUT = "log_format_output";
@Input(MESSAGE_INPUT)
SubscribableChannel logInput();
@Output(MESSAGE_OUTPUT)
MessageChannel logOutput();
@Input(LOG_FORMAT_INPUT)
SubscribableChannel logFormatInput();
@Output(LOG_FORMAT_OUTPUT)
MessageChannel logFormatOutput();
}
2、創建消費者應用
**配置文件如下 **:
spring:
profiles: stream-rabbit-customer-group1
cloud:
stream:
bindings:
log_input:
destination: kite.log.messages
binder: local_rabbit
group: logConsumer-group1
log_output:
destination: kite.log.messages
binder: local_rabbit
group: logConsumer-group1
log_format_input:
destination: kite.log.format.messages
binder: local_rabbit
group: logFormat-group1
log_format_input:
destination: kite.log.format.messages
binder: local_rabbit
group: logFormat-group1
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 32775
username: guest
password: guest
server:
port: 8201
此配置文件要參照 MyProcessor 接口查看,定義了 4 個 binding,但是 destination 兩兩相同,也就是兩個 exchange。
創建 spring boot 啟動類
@SpringBootApplication
@EnableBinding(value = {MyProcessor.class})
@Slf4j
public class CustomerApplication {
public static void main(String[] args) {
SpringApplication.run(CustomerApplication.class, args);
}
}
用 @EnableBinding(value = {MyProcessor.class}) 注解引入 MyProcessor
創建消息接收處理服務
@Slf4j
@Component
public class LogMessageListener {
/**
* 通過 MyProcessor.MESSAGE_INPUT 接收消息
* 然后通過 SendTo 將處理后的消息發送到 MyProcessor.LOG_FORMAT_OUTPUT
* @param message
* @return
*/
@StreamListener(MyProcessor.MESSAGE_INPUT)
@SendTo(MyProcessor.LOG_FORMAT_OUTPUT)
public String processLogMessage(String message) {
log.info("接收到原始消息:" + message);
return "「" + message +"」";
}
/**
* 接收來自 MyProcessor.LOG_FORMAT_INPUT 的消息
* 也就是加工后的消息,也就是通過上面的 SendTo 發送來的
* 因為 MyProcessor.LOG_FORMAT_OUTPUT 和 MyProcessor.LOG_FORMAT_INPUT 是指向同一 exchange
* @param message
*/
@StreamListener(MyProcessor.LOG_FORMAT_INPUT)
public void processFormatLogMessage(String message) {
log.info("接收到格式化后的消息:" + message);
}
}
3、創建一個消息生產者,用於發送原始日志消息
配置文件:
spring:
cloud:
stream:
bindings:
log_output:
destination: kite.log.messages
binder: local_rabbit
group: logConsumer-group1
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 32775
username: guest
password: guest
server:
port: 8202
僅僅指定了一個 binding log_output,用來發送消息,如果只做生產者就不要指定 log_input,如果指定了 log_input ,應用就會認為這個生產者服務也會消費消息,如果這時沒有在此服務中訂閱消息,當消息被發送到這個服務時,因為並沒有訂閱消息,也就是沒有 @StreamListener 注解的方法,就會出現如下異常:
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel
創建 spring boot 啟動類
@Slf4j
@RestController
@EnableBinding(value = {MyProcessor.class})
public class MyMessageController {
@Autowired
private MyProcessor myProcessor;
@GetMapping(value = "sendLogMessage")
public void sendLogMessage(String message){
Message<String> stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(message).build();
myProcessor.logOutput().send(stringMessage);
}
}
同樣的引入 @EnableBinding(value = {MyProcessor.class})
創建一個 Controller 用來發送消息
@Slf4j
@RestController
@EnableBinding(value = {MyProcessor.class})
public class MyMessageController {
@Autowired
private MyProcessor myProcessor;
@GetMapping(value = "sendLogMessage")
public void sendLogMessage(String message){
Message<String> stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(message).build();
myProcessor.logOutput().send(stringMessage);
}
}
之后,訪問鏈接:
http://localhost:8202/sendLogMessage?message=原始日志
可以在消費服務端看到如下輸出:
其他
消息除了可以是字符串類型,還可以是其他類型,也可以是實體類型,例如
@GetMapping(value = "sendObjectLogMessage")
public void sendObjectLogMessage() {
LogInfo logInfo = new LogInfo();
logInfo.setClientIp("192.168.1.111");
logInfo.setClientVersion("1.0");
logInfo.setUserId("198663383837434");
logInfo.setTime(Date.from(Instant.now()));
Message < LogInfo > stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(logInfo).build();
myProcessor.logOutput().send(stringMessage);
}
上面代碼發送了一個 LogInfo 實體對象,在消費者端依然可以用字符串類型接收,因為 @StreamListener 注解會默認把實體轉為 json 字符串。
另外,可以試着啟動兩個消費者端,把 group 設置成相同的,這時,發送的消息只會被一個消費者接收。
如果把 group 設置成不一樣的,那么發送的消息會被兩個消費者接收。
還可以看其他 Spring Cloud 系列:
Spring Cloud Eureka 實現高可用服務發現注冊中心
Spring Cloud Config 配置中心 看這一篇就夠了
服務注冊發現、配置中心集一體的 Spring Cloud Consul
不要吝惜你的「推薦」呦
歡迎關注,不定期更新本系列和其他文章
古時的風箏
,進入公眾號可以加入交流群