RocketMq灰皮書(三)------MQ使用
在使用MQ之前,我們回顧一下前兩篇博文的內容.
- 我們大致了解了
RocketMQ
的四個概念,分別是:Producer
,Consumer
,Message
和Broker
- 我們在本地的Windows10系統上,部署了
RocketMQ
和其后台系統
在本篇博文中,我們會使用使用SpringBoot構建兩個微服務,一個作為生產者,一個作為消費者,通過RocketMQ
傳遞消息,了解在Java
中使用RocketMQ的方法.
一. SpringBoot整合RocketMQ收發消息
在灰皮書第一篇文章中,我畫了下面這個圖:
現在我們本地的RocketMQ
也部署起來了,接下來我們創建兩個微服務通過MQ來收發消息,實現基本的流程.
1. 微服務構建
首先我們創建兩個基於SpringBoot
的微服務,分別是:
rocketmq-consumer
消息消費者rocketmq-producer
消息生產者
兩個服務里面,rocketmq-consumer
的端口號是2001,rocketmq-producer
的端口號是2002
2. 微服務啟動測試
分別在兩個微服務寫兩個測試方法,啟動測試:
rocketmq-consumer
@RestController
public class ConsumerController {
@GetMapping("/consumer")
public String index() {
return "rocketmq-consumer";
}
}
rocketmq-producer
@RestController
public class ProducerController {
@GetMapping("/producer")
public String index() {
return "rocketmq-producer";
}
}
啟動測試,兩個接口都成功訪問.
根據我們最上面的圖,服務A發送消息到服務B,在這里,我們用rocketmq-producer
來發送消息,消息發送到rocketmq
以后,由服務Brocketmq-consumer
消費消息.
3. 生產者發送消息
使用rocketmq發送消息有很多種方式,因為我們使用的是SpringBoot
,這里直接使用官方提供的rocketmq-spring-boot-starter
包來開發
在github
上有個項目:RocketMQ-Spring
它就是RocketMq官方提供的整合了SpringBoot
的rocketmq工具包,git地址如下:https://github.com/apache/rocketmq-spring
當然,你也可以使用原生的rocketmq-client
包,在官方的示例中,使用的就是這種方式,具體可以查看官方文檔,下面我們直接使用rocketmq-spring-boot-starter
來發送消息.
我們可以看到有很多的版本可以用:
這里我們使用2.0.3
這個版本吧,具體的官方細節可以查看https://github.com/apache/rocketmq-spring/blob/release-2.0.3/README_zh_CN.md
3.1發送String消息
首先是pom坐標:
<!--add dependency in pom.xml-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
然后再rocketmq-producer
的配置文件中配置rocketmq的name-server
和group
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=producer
rocketmq-spring-boot-starter
中提供了一個RocketMQTemplate
來方便我們發送消息,我們可以直接注入這個類來使用.
RocketMQTemplate
有send
方法和convertAndSend
方法,都可以用來發送消息,區別是,前者的方法入參是rocketmq
規定的Message
類型,而后者可以發送對象,並且幫我們轉換,源碼如下:
/**
* Send a message to the given destination.
* @param destination the target destination
* @param message the message to send
*/
void send(D destination, Message<?> message) throws MessagingException;
/**
* Convert the given Object to serialized form, possibly using a
* {@link org.springframework.messaging.converter.MessageConverter},
* wrap it as a message and send it to a default destination.
* @param payload the Object to use as payload
*/
void convertAndSend(Object payload) throws MessagingException;
下面我們直接發送消息到mq
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/producer")
public String index() {
rocketMQTemplate.convertAndSend("test-topic", "消息發送成功!");
return "rocketmq-producer";
}
convertAndSend
方法有兩個參數,第一個參數是消息要發送到的topic
,也就是目的地,第二個參數就是消息本身,至於topic到底是什么,這個我們后面詳細來說,我們只需要知道,我們的消息發送到了rocketmq
的一個叫做test-topic的地方即可.
並且,由於我們在灰皮書第二章的時候,啟動mq的時候,指定了autoCreateTopicEnable=true
,也就是說,我們使用RocketMQTemplate
發送的消息,就算topic之前不存在,rocket也會幫我們創建好.
編碼完成,重啟項目,我們只要訪問http://localhost:2002/producer
就會發送消息到mq,我們可以通過rocketmq-console
查看我們發送的消息
可以看到mq自動為我們創建了topic:
在message頁簽,可以查看到我們剛才發送的消息:
詳細的消息內容:
3.2發送對象
在上面的例子中,我們直接發送字符串到MQ,一般來說,我們發送的消息體是一個java對象,在這里也是可以的,我們改造一下代碼:
@GetMapping("/producer")
public String index() {
rocketMQTemplate.convertAndSend("test-topic", new User("張三", 20));
return "rocketmq-producer";
}
@Data
class User implements Serializable {
private static final long serialVersionUID = -3486413003967431764L;
private String name;
private Integer age;
User() {}
User(String name, Integer age) {
this.name = name;
this.age = age;
}
}
這樣我們發送了一個User對象到RocketMQ
中,我們再去rocketmq-console
查看:
可以看到,消息成功發送到了mq中,需要注意的是,這里我們發送的對象要實現Serializable
接口,不然會拋異常.
那么我們發送的消息的內容是怎么序列化的呢?
RocketMQ的消息體都是以
byte[]
方式存儲的,如果內容體是java.lang.String
類型時,統一按照UTF-8
編碼轉成byte[]
;如果消息內容不是String類型的,則采用jackson-databind
序列化成JSON格式的字符串后,再統一按照UTF-8
編碼轉換成byte[]
以上釋義源於RocketMQ
官方文檔,所以說,有問題多看看官方文檔能很大程度上解決我們的疑惑!
4. 消費消息
好了,我們的消息發送成功了,接下來我們在rocketmq-consumer
應用中消費之前發送出來的消息.
在開發之前我們先想一下: 消息的生產者隨着用戶的請求,不斷的往MQ中發送消息,那么消費者在消費消息的時候,是怎么知道它要取哪一條消息呢?
我們之前的文章中提到過一個topic
,生產者在發送消息的時候,會指定一個topic,消息會發送到某個topic下,那么自然而然的,消費者在獲取消息的時候,也是需要知道它要從哪個topic
里面去獲取消息的.
而獲取消息,則是通過監聽器
來完成的.
首先在rocketmq-consumer
項目的配置文件中指定mq的nameServer
的地址:rocketmq.name-server=127.0.0.1:9876
創建一個監聽器:
@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer")
@Slf4j
public class Consumerlistener implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
log.info("收到消息 : {}", message);
}
}
@RocketMQMessageListener
注解中我們指定了2個參數:
- topic 指定監聽器要監聽的topic,監聽器運行以后,會一直監聽該topic下的消息
- consumerGroup 指定當前消費者是數據哪個消費組,這個概念我們后面會詳細說
其次,我們自定義的監聽器還要實現RocketMQListener<T>
接口,該接口的泛型類型就是我們生產者發送消息的消息類型,之前我們發送的是User
對象,因此這里也是User
對象
實現RocketMQListener
接口的onMessage
方法,方法的入參就是我們發送出來的消息,在這個方法中我們可以進行自己的業務處理.
啟動服務rocketmq-consumer
,可以看到正常消費到了消息:
以上,我們成功的在我們的微服務中使用RocketMQ
進行了消息的發送和消費.
不僅僅是簡單的消息,RocketMQ
還支持更高級的功能,比如事務消息
、消息軌跡
等,這些高級特效我們會下后面的進階文章中詳細講解.
結語:
在本篇博文中,我們使用RocketMQ
官方提供的pom包進行了消息的發送和接收,也成功的在rocketmq-console
中查看到了消息.
在這個工程中,我們接觸了很多新的概念:
- topic
- consumerGroup
以上這些概念,以及前面篇文章中遺留下來的概念,我們將在下一篇文章中詳細介紹.
個人公眾號<橙耘自留地>日前已經開通,后續博主發布的文章都會一並更新到公眾號,如有需要,自行查閱.
關於橙耘自留地,是我個人聚焦互聯網技術棧學習分享的一個平台,創立之初是因為目前業內各種技術課程資料層次不齊,褒貶不一,有時候一門課花費高價買入,其實內含草包,有時偶爾低價得之,卻又大有干貨.因此我會根據大家的意見和評價,選擇不同的技術棧去學習,一為提升我自己的技術,二為大家梳理出質量比較好的課程,以作參考.同時,相關的學習心得也會一並更新到博客和公眾號.