RocketMQ 介紹
RocketMQ 是一款開源的分布式消息系統,基於高可用分布式集群技術,提供低延時的、高可靠的消息發布與訂閱服務。同時,廣泛應用於多個領域,包括異步通信解耦、企業解決方案、金融支付、電信、電子商務、快遞物流、廣告營銷、社交、即時通信、移動應用、手游、視頻、物聯網、車聯網等。
具有以下特點:
- 能夠保證嚴格的消息順序
- 提供豐富的消息拉取模式
- 高效的訂閱者水平擴展能力
- 實時的消息訂閱機制
- 億級消息堆積能力
RocketMQ 基本使用
- 下載 RocketMQ
下載 RocketMQ最新的二進制文件,並解壓
解壓后的目錄結構如下:
apache-rocketmq
├── LICENSE
├── NOTICE
├── README.md
├── benchmark
├── bin
├── conf
└── lib
- 啟動 NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
- 啟動 Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
- 發送、接收消息
發送消息:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
發送成功后顯示:SendResult [sendStatus=SEND_OK, msgId= …
接收消息:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
接收成功后顯示:ConsumeMessageThread_%d Receive New Messages: [MessageExt…
- 關閉 Server
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
Spring Cloud Stream 介紹
Spring Cloud Stream 是一個用於構建基於消息的微服務應用框架。它基於 SpringBoot 來創建具有生產級別的單機 Spring 應用,並且使用 Spring Integration 與 Broker 進行連接。
Spring Cloud Stream 提供了消息中間件配置的統一抽象,推出了 publish-subscribe、consumer groups、partition 這些統一的概念。
Spring Cloud Stream 內部有兩個概念:Binder 和 Binding。
- Binder: 跟外部消息中間件集成的組件,用來創建 Binding,各消息中間件都有自己的 Binder 實現。
比如 Kafka 的實現 KafkaMessageChannelBinder,RabbitMQ 的實現 RabbitMessageChannelBinder 以及 RocketMQ 的實現 RocketMQMessageChannelBinder。
- Binding: 包括 Input Binding 和 Output Binding。
Binding 在消息中間件與應用程序提供的 Provider 和 Consumer 之間提供了一個橋梁,實現了開發者只需使用應用程序的 Provider 或 Consumer 生產或消費數據即可,屏蔽了開發者與底層消息中間件的接觸。

Figure 1. Spring Cloud Stream
使用 Spring Cloud Stream 完成一段簡單的消息發送和消息接收代碼:
MessageChannel messageChannel = new DirectChannel();
// 消息訂閱
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("receive msg: " + message.getPayload());
}
});
// 消息發送
messageChannel.send(MessageBuilder.withPayload("simple msg").build());
這段代碼所有的消息類都是 spring-messaging 模塊里提供的。屏蔽具體消息中間件的底層實現,如果想用更換消息中間件,在配置文件里配置相關消息中間件信息以及修改 binder 依賴即可。
Spring Cloud Stream 底層基於這段代碼去做了各種抽象。
如何使用 Spring Cloud Alibaba RocketMQ Binder
如果要在您的項目中引入 RocketMQ Binder,需要引入如下 maven 依賴:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
或者可以使用 Spring Cloud Stream RocketMQ Starter:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
Spring Cloud Alibaba RocketMQ Binder 實現
RocketMQ Binder 的實現依賴於 RocketMQ-Spring 框架。
RocketMQ-Spring 框架是 RocketMQ 與 Spring Boot 的整合,RocketMQ Spring 主要提供了 3 個特性:
- 使用
RocketMQTemplate用來統一發送消息,包括同步、異步發送消息和事務消息 @RocketMQTransactionListener注解用來處理事務消息的監聽和回查@RocketMQMessageListener注解用來消費消息
RocketMQ Binder 的核心類 RocketMQMessageChannelBinder 實現了 Spring Cloud Stream 規范,內部構建會 RocketMQInboundChannelAdapter 和 RocketMQMessageHandler。
RocketMQMessageHandler 會基於 Binding 配置構造 RocketMQTemplate,RocketMQTemplate 內部會把 spring-messaging 模塊內 org.springframework.messaging.Message 消息類轉換成 RocketMQ 的消息類 org.apache.rocketmq.common.message.Message,然后發送出去。
RocketMQInboundChannelAdapter 也會基於 Binding 配置構造 RocketMQListenerBindingContainer,RocketMQListenerBindingContainer 內部會啟動 RocketMQ Consumer 接收消息。
| Note | 在使用 RocketMQ Binder 的同時也可以配置 rocketmq.** 用於觸發 RocketMQ Spring 相關的 AutoConfiguration |
|---|---|
目前 Binder 支持在 Header 中設置相關的 key 來進行 RocketMQ Message 消息的特性設置。
比如 TAGS、DELAY、TRANSACTIONAL_ARG、KEYS、WAIT_STORE_MSG_OK、FLAG 表示 RocketMQ 消息對應的標簽,
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, "binder")
.setHeader(RocketMQHeaders.KEYS, "my-key")
.setHeader("DELAY", "1");
Message message = builder.build();
output().send(message);
MessageSource 支持
目前 RocketMQ 已經支持 MessageSource,可以進行消息的拉取,例子如下:
@SpringBootApplication
@EnableBinding(MQApplication.PolledProcessor.class)
public class MQApplication {
private final Logger logger =
LoggerFactory.getLogger(MQApplication.class);
public static void main(String[] args) {
SpringApplication.run(MQApplication.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source,
MessageChannel dest) {
return args -> {
while (true) {
boolean result = source.poll(m -> {
String payload = (String) m.getPayload();
logger.info("Received: " + payload);
dest.send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(m.getHeaders())
.build());
}, new ParameterizedTypeReference<String>() { });
if (result) {
logger.info("Processed a message");
}
else {
logger.info("Nothing to do");
}
Thread.sleep(5_000);
}
};
}
public static interface PolledProcessor {
@Input
PollableMessageSource source();
@Output
MessageChannel dest();
}
}
配置選項
RocketMQ Binder Properties
-
spring.cloud.stream.rocketmq.binder.name-server
RocketMQ NameServer 地址(老版本使用 namesrv-addr 配置項)。Default:
127.0.0.1:9876. -
spring.cloud.stream.rocketmq.binder.access-key
阿里雲賬號 AccessKey。Default: null.
-
spring.cloud.stream.rocketmq.binder.secret-key
阿里雲賬號 SecretKey。Default: null.
-
spring.cloud.stream.rocketmq.binder.enable-msg-trace
是否為 Producer 和 Consumer 開啟消息軌跡功能Default:
true. -
spring.cloud.stream.rocketmq.binder.customized-trace-topic
消息軌跡開啟后存儲的 topic 名稱。Default:
RMQ_SYS_TRACE_TOPIC.
RocketMQ Consumer Properties
下面的這些配置是以 spring.cloud.stream.rocketmq.bindings..consumer. 為前綴的 RocketMQ Consumer 相關的配置。
-
enable
是否啟用 Consumer。默認值:
true. -
tags
Consumer 基於 TAGS 訂閱,多個 tag 以
||分割。默認值: empty. -
sql
Consumer 基於 SQL 訂閱。默認值: empty.
-
broadcasting
Consumer 是否是廣播消費模式。如果想讓所有的訂閱者都能接收到消息,可以使用廣播模式。默認值:
false. -
orderly
Consumer 是否同步消費消息模式。默認值:
false. -
delayLevelWhenNextConsume
異步消費消息模式下消費失敗重試策略:-1,不重復,直接放入死信隊列0,broker 控制重試策略>0,client 控制重試策略默認值:
0. -
suspendCurrentQueueTimeMillis
同步消費消息模式下消費失敗后再次消費的時間間隔。默認值:
1000.
RocketMQ Provider Properties
下面的這些配置是以 spring.cloud.stream.rocketmq.bindings..producer. 為前綴的 RocketMQ Producer 相關的配置。
-
enable
是否啟用 Producer。默認值:
true. -
group
Producer group name。默認值: empty.
-
maxMessageSize
消息發送的最大字節數。默認值:
8249344. -
transactional
是否發送事務消息。默認值:
false. -
sync
是否使用同步得方式發送消息。默認值:
false. -
vipChannelEnabled
是否在 Vip Channel 上發送消息。默認值:
true. -
sendMessageTimeout
發送消息的超時時間(毫秒)。默認值:
3000. -
compressMessageBodyThreshold
消息體壓縮閥值(當消息體超過 4k 的時候會被壓縮)。默認值:
4096. -
retryTimesWhenSendFailed
在同步發送消息的模式下,消息發送失敗的重試次數。默認值:
2. -
retryTimesWhenSendAsyncFailed
在異步發送消息的模式下,消息發送失敗的重試次數。默認值:
2. -
retryNextServer
消息發送失敗的情況下是否重試其它的 broker。默認值:
false.
官方文檔
Spring Cloud Stream編寫生產者
加依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
加注解
@EnableBinding(Source.class)
加配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
# 指定topic
destination: stream-test-topic
創建消息
@GetMapping("test-stream")
public String testStream(){
this.source.output().send(
MessageBuilder.withPayload("消息!!").build()
);
return "success";
}
Spring Cloud Stream編寫消費者
加依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
加注解
@EnableBinding({Sink.class})
加配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input:
destination: stream-test-topic
# rocketmq一定要設置group(隨便寫) 其他的mq可留空
group: binding-group
消費消息
package cn.cicoding.usercenter.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;
/**
* @author zhaokejin
* @description
* @date 2019/11/15
*/
@Service
@Slf4j
public class TestStreamConsumer {
@StreamListener(Sink.INPUT)
public void receive(String messageBody){
log.info("通過stream收到消息,messageBody = {}", messageBody);
}
}
啟動測試!可以看到控制台收到的消息
2019-12-28 11:54:08.988 INFO 14568 --- [MessageThread_1] c.c.m.consumer.TestStreamConsumer : 通過stream收到消息,messageBody = RocketMQ消息!!
2019-12-28 11:54:11.814 INFO 14568 --- [MessageThread_1] c.c.m.consumer.TestStreamConsumer : 通過stream收到消息,messageBody = RocketMQ消息!!
2019-12-28 11:54:12.435 INFO 14568 --- [MessageThread_1] c.c.m.consumer.TestStreamConsumer : 通過stream收到消息,messageBody = RocketMQ消息!!
2019-12-28 11:54:12.612 INFO 14568 --- [MessageThread_1] c.c.m.consumer.TestStreamConsumer : 通過stream收到消息,messageBody = RocketMQ消息!!
