思維導圖
一、什么是消息隊列
消息指的是兩個應用間傳遞的數據。數據的類型有很多種形式,可能只包含文本字符串,也可能包含嵌入對象。
“消息隊列(Message Queue)”是在消息的傳輸過程中保存消息的容器。在消息隊列中,通常有生產者和消費者兩個角色。生產者只負責發送數據到消息隊列,誰從消息隊列中取出數據處理,他不管。消費者只負責從消息隊列中取出數據處理,他不管這是誰發送的數據。
二、為什么使用消息隊列
主要有三個作用:
- 解耦。如圖所示。假設有系統B、C、D都需要系統A的數據,於是系統A調用三個方法發送數據到B、C、D。這時,系統D不需要了,那就需要在系統A把相關的代碼刪掉。假設這時有個新的系統E需要數據,這時系統A又要增加調用系統E的代碼。為了降低這種強耦合,就可以使用MQ,系統A只需要把數據發送到MQ,其他系統如果需要數據,則從MQ中獲取即可。
- 異步。如圖所示。一個客戶端請求發送進來,系統A會調用系統B、C、D三個系統,同步請求的話,響應時間就是系統A、B、C、D的總和,也就是800ms。如果使用MQ,系統A發送數據到MQ,然后就可以返回響應給客戶端,不需要再等待系統B、C、D的響應,可以大大地提高性能。對於一些非必要的業務,比如發送短信,發送郵件等等,就可以采用MQ。
- 削峰。如圖所示。這其實是MQ一個很重要的應用。假設系統A在某一段時間請求數暴增,有5000個請求發送過來,系統A這時就會發送5000條SQL進入MySQL進行執行,MySQL對於如此龐大的請求當然處理不過來,MySQL就會崩潰,導致系統癱瘓。如果使用MQ,系統A不再是直接發送SQL到數據庫,而是把數據發送到MQ,MQ短時間積壓數據是可以接受的,然后由消費者每次拉取2000條進行處理,防止在請求峰值時期大量的請求直接發送到MySQL導致系統崩潰。
三、RabbitMQ的特點
RabbitMQ是一款使用Erlang語言開發的,實現AMQP(高級消息隊列協議)的開源消息中間件。首先要知道一些RabbitMQ的特點,官網可查:
- 可靠性。支持持久化,傳輸確認,發布確認等保證了MQ的可靠性。
- 靈活的分發消息策略。這應該是RabbitMQ的一大特點。在消息進入MQ前由Exchange(交換機)進行路由消息。分發消息策略有:簡單模式、工作隊列模式、發布訂閱模式、路由模式、通配符模式。
- 支持集群。多台RabbitMQ服務器可以組成一個集群,形成一個邏輯Broker。
- 多種協議。RabbitMQ支持多種消息隊列協議,比如 STOMP、MQTT 等等。
- 支持多種語言客戶端。RabbitMQ幾乎支持所有常用編程語言,包括 Java、.NET、Ruby 等等。
- 可視化管理界面。RabbitMQ提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker。
- 插件機制。RabbitMQ提供了許多插件,可以通過插件進行擴展,也可以編寫自己的插件。
四、RabbitMQ初の體驗
4.1 安裝RabbitMQ (Win10系統)
由於只是學習需要,所以安裝在win10系統,就懶得開虛擬機。如果用Linux系統安裝的話,我建議用Docker拉一個RabbitMQ的鏡像下來,這樣會方便一點。
4.1.1 安裝erLang語言,配置環境變量
首先到erlang官網下載win10版安裝包。
下載完之后,就得到這個東西:
接着雙擊安裝,一直點next(下一步)就行了,安裝完之后,配置環境變量。
使用cmd命令,輸入 erl -version 驗證:
4.1.2 安裝RabbitMQ服務端
在RabbitMQ的gitHub項目中,下載window版本的服務端安裝包。
下載后,就得到這個東西:
接着到雙擊安裝,一直點下一步安裝即可,安裝完成后,找到安裝目錄:
在此目錄下打開cmd命令,輸入rabbitmq-plugins enable rabbitmq_management命令安裝管理頁面的插件:
然后雙擊rabbitmq-server.bat啟動腳本,然后打開服務管理可以看到RabbitMQ正在運行:
這時,打開瀏覽器輸入http://localhost:15672,賬號密碼默認是:guest/guest
到這一步,安裝就大功告成了!
4.2 永遠的Hello Word
服務端搭建好了之后肯定要用客戶端去操作,接下來就用Java做一個簡單的HelloWord演示。
因為我用的是SpringBoot,所以在生產者這邊加入對應的starter依賴即可:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
一般需要創建一個公共項目common,共享一些配置,比如隊列主題,交換機名稱,路由匹配鍵名稱等等。
首先在application.yml文件加上RabbitMQ的配置信息:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
然后再生產者這邊,加上common包的maven依賴,然后創建一個Direct交換機以及隊列的配置類:
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue rabbitmqDemoDirectQueue() {
/** * 1、name: 隊列名稱 * 2、durable: 是否持久化 * 3、exclusive: 是否獨享、排外的。如果設置為true,定義為排他隊列。則只有創建者可以使用此隊列。也就是private私有的。 * 4、autoDelete: 是否自動刪除。也就是臨時隊列。當最后一個消費者斷開連接后,會自動刪除。 * */
return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
}
@Bean
public DirectExchange rabbitmqDemoDirectExchange() {
//Direct交換機
return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
}
@Bean
public Binding bindDirect() {
//鏈式寫法,綁定交換機和隊列,並設置匹配鍵
return BindingBuilder
//綁定隊列
.bind(rabbitmqDemoDirectQueue())
//到交換機
.to(rabbitmqDemoDirectExchange())
//並設置匹配鍵
.with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
}
}
然后再創建一個發送消息的Service類:
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
//日期格式化
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public String sendMsg(String msg) throws Exception {
try {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
String sendTime = sdf.format(new Date());
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
}
然后根據業務放在需要用的地方,比如定時任務,或者接口。我這里就簡單一點使用Controller層進行發送:
@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
@Resource
private RabbitMQService rabbitMQService;
/** * 發送消息 * @author java技術愛好者 */
@PostMapping("/sendMsg")
public String sendMsg(@RequestParam(name = "msg") String msg) throws Exception {
return rabbitMQService.sendMsg(msg);
}
}
生產者寫完之后,就寫消費者端的代碼,消費者很簡單。maven依賴,yml文件配置和生產者一樣。只需要創建一個類,@RabbitListener注解寫上監聽隊列的名稱,如圖所示:
這里有個小坑,一開始RabbitMQ服務器里還沒有創建隊列:
這時如果啟動消費者,會報錯:
要先啟動生產者,發送一條消息:
最后再啟動消費者,進行消費:
這時候就會持續監聽隊列的消息,只要生產者發送一條消息到MQ,消費者就消費一條。我這里嘗試發送4條:
由於隊列不存在,啟動消費者報錯的這個問題。最好的方法是生產者和消費者都嘗試創建隊列,怎么寫呢,有很多方式,我這里用一個相對簡單一點的:
生產者的配置類加點東西:
//實現BeanPostProcessor類,使用Bean的生命周期函數
@Component
public class DirectRabbitConfig implements BeanPostProcessor {
//這是創建交換機和隊列用的rabbitAdmin對象
@Resource
private RabbitAdmin rabbitAdmin;
//初始化rabbitAdmin對象
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有設置為 true,spring 才會加載 RabbitAdmin 這個類
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
//實例化bean后,也就是Bean的后置處理器
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//創建交換機
rabbitAdmin.declareExchange(rabbitmqDemoDirectExchange());
//創建隊列
rabbitAdmin.declareQueue(rabbitmqDemoDirectQueue());
return null;
}
}
這樣啟動生產者就會自動創建交換機和隊列,不用等到發送消息才創建。
消費者需要加一點代碼:
@Component
//使用queuesToDeclare屬性,如果不存在則會創建隊列
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public class RabbitDemoConsumer {
//...省略
}
這樣,無論生產者還是消費者先啟動都不會出現問題了~
五、RabbitMQ中的組成部分
從上面的HelloWord例子中,我們大概也能體驗到一些,就是RabbitMQ的組成,它是有這幾部分:
- Broker:消息隊列服務進程。此進程包括兩個部分:Exchange和Queue。
- Exchange:消息隊列交換機。按一定的規則將消息路由轉發到某個隊列。
- Queue:消息隊列,存儲消息的隊列。
- Producer:消息生產者。生產方客戶端將消息同交換機路由發送到隊列中。
- Consumer:消息消費者。消費隊列中存儲的消息。
這些組成部分是如何協同工作的呢,大概的流程如下,請看下圖:
- 消息生產者連接到RabbitMQ Broker,創建connection,開啟channel。
- 生產者聲明交換機類型、名稱、是否持久化等。
- 生產者發送消息,並指定消息是否持久化等屬性和routing key。
- exchange收到消息之后,根據routing key路由到跟當前交換機綁定的相匹配的隊列里面。
- 消費者監聽接收到消息之后開始業務處理。
六、Exchange的四種類型以及用法
從上面的工作流程可以看出,實際上有個關鍵的組件Exchange,因為消息發送到RabbitMQ后首先要經過Exchange路由才能找到對應的Queue。
實際上Exchange類型有四種,根據不同的類型工作的方式也有所不同。在HelloWord例子中,我們就使用了比較簡單的Direct Exchange,翻譯就是直連交換機。其余三種分別是:Fanout exchange、Topic exchange、Headers exchange。
6.1 Direct Exchange
見文知意,直連交換機意思是此交換機需要綁定一個隊列,要求該消息與一個特定的路由鍵完全匹配。簡單點說就是一對一的,點對點的發送。
完整的代碼就是上面的HelloWord的例子,不再重復代碼。
6.2 Fanout exchange
這種類型的交換機需要將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每台子網內的主機都獲得了一份復制的消息。簡單點說就是發布訂閱。
代碼怎么寫呢,演示一下:
首先要先配置交換機和隊列的名稱:
public class RabbitMQConfig {
/** * RabbitMQ的FANOUT_EXCHANG交換機類型的隊列 A 的名稱 */
public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_A = "fanout.A";
/** * RabbitMQ的FANOUT_EXCHANG交換機類型的隊列 B 的名稱 */
public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_B = "fanout.B";
/** * RabbitMQ的FANOUT_EXCHANG交換機類型的名稱 */
public static final String FANOUT_EXCHANGE_DEMO_NAME = "fanout.exchange.demo.name";
}
再配置FanoutExchange類型的交換機和A、B兩個隊列,並且綁定。這種類型不需要配置routing key:
@Component
public class DirectRabbitConfig implements BeanPostProcessor {
@Resource
private RabbitAdmin rabbitAdmin;
@Bean
public Queue fanoutExchangeQueueA() {
//隊列A
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true, false, false);
}
@Bean
public Queue fanoutExchangeQueueB() {
//隊列B
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true, false, false);
}
@Bean
public FanoutExchange rabbitmqDemoFanoutExchange() {
//創建FanoutExchange類型交換機
return new FanoutExchange(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, true, false);
}
@Bean
public Binding bindFanoutA() {
//隊列A綁定到FanoutExchange交換機
return BindingBuilder.bind(fanoutExchangeQueueA()).to(rabbitmqDemoFanoutExchange());
}
@Bean
public Binding bindFanoutB() {
//隊列B綁定到FanoutExchange交換機
return BindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange());
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//啟動項目即創建交換機和隊列
rabbitAdmin.declareExchange(rabbitmqDemoFanoutExchange());
rabbitAdmin.declareQueue(fanoutExchangeQueueB());
rabbitAdmin.declareQueue(fanoutExchangeQueueA());
return null;
}
}
創建service發布消息的方法:
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Resource
private RabbitTemplate rabbitTemplate;
//發布消息
@Override
public String sendMsgByFanoutExchange(String msg) throws Exception {
Map<String, Object> message = getMessage(msg);
try {
rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "", message);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
//組裝消息體
private Map<String, Object> getMessage(String msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
String sendTime = sdf.format(new Date());
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
return map;
}
}
Controller接口:
@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
/** * 發布消息 * * @author java技術愛好者 */
@PostMapping("/publish")
public String publish(@RequestParam(name = "msg") String msg) throws Exception {
return rabbitMQService.sendMsgByFanoutExchange(msg);
}
}
接着在消費者項目這邊,創建兩個隊列的監聽類,監聽隊列進行消費:
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A))
public class FanoutExchangeConsumerA {
@RabbitHandler
public void process(Map<String, Object> map) {
System.out.println("隊列A收到消息:" + map.toString());
}
}
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B))
public class FanoutExchangeConsumerB {
@RabbitHandler
public void process(Map<String, Object> map) {
System.out.println("隊列B收到消息:" + map.toString());
}
}
然后啟動生產者和消費者兩個項目,可以看到管理界面創建了一個FanoutExchange交換機和兩個隊列,並且綁定了:
使用POSTMAN進行發送消息,測試:
然后可以看到控制台,兩個隊列同時都收到了相同的消息,形成了發布訂閱的效果:
6.3 Topic Exchange
直接翻譯的話叫做主題交換機,如果從用法上面翻譯可能叫通配符交換機會更加貼切。這種交換機是使用通配符去匹配,路由到對應的隊列。通配符有兩種:"*" 、 "#"。需要注意的是通配符前面必須要加上"."符號。
*
符號:有且只匹配一個詞。比如 a.*
可以匹配到"a.b"、"a.c",但是匹配不了"a.b.c"。
#
符號:匹配一個或多個詞。比如"rabbit.#"既可以匹配到"rabbit.a.b"、"rabbit.a",也可以匹配到"rabbit.a.b.c"。
廢話不多說,代碼演示一下:
依然是配置TopicExchange名稱和三個隊列的名稱:
/** * RabbitMQ的TOPIC_EXCHANGE交換機名稱 */
public static final String TOPIC_EXCHANGE_DEMO_NAME = "topic.exchange.demo.name";
/** * RabbitMQ的TOPIC_EXCHANGE交換機的隊列A的名稱 */
public static final String TOPIC_EXCHANGE_QUEUE_A = "topic.queue.a";
/** * RabbitMQ的TOPIC_EXCHANGE交換機的隊列B的名稱 */
public static final String TOPIC_EXCHANGE_QUEUE_B = "topic.queue.b";
/** * RabbitMQ的TOPIC_EXCHANGE交換機的隊列C的名稱 */
public static final String TOPIC_EXCHANGE_QUEUE_C = "topic.queue.c";
然后還是老配方,配置交換機和隊列,然后綁定,創建:
@Component
public class DirectRabbitConfig implements BeanPostProcessor {
//省略...
@Bean
public TopicExchange rabbitmqDemoTopicExchange() {
//配置TopicExchange交換機
return new TopicExchange(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, true, false);
}
@Bean
public Queue topicExchangeQueueA() {
//創建隊列1
return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A, true, false, false);
}
@Bean
public Queue topicExchangeQueueB() {
//創建隊列2
return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B, true, false, false);
}
@Bean
public Queue topicExchangeQueueC() {
//創建隊列3
return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C, true, false, false);
}
@Bean
public Binding bindTopicA() {
//隊列A綁定到FanoutExchange交換機
return BindingBuilder.bind(topicExchangeQueueB())
.to(rabbitmqDemoTopicExchange())
.with("a.*");
}
@Bean
public Binding bindTopicB() {
//隊列A綁定到FanoutExchange交換機
return BindingBuilder.bind(topicExchangeQueueC())
.to(rabbitmqDemoTopicExchange())
.with("a.*");
}
@Bean
public Binding bindTopicC() {
//隊列A綁定到FanoutExchange交換機
return BindingBuilder.bind(topicExchangeQueueA())
.to(rabbitmqDemoTopicExchange())
.with("rabbit.#");
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
rabbitAdmin.declareExchange(rabbitmqDemoTopicExchange());
rabbitAdmin.declareQueue(topicExchangeQueueA());
rabbitAdmin.declareQueue(topicExchangeQueueB());
rabbitAdmin.declareQueue(topicExchangeQueueC());
return null;
}
}
然后寫一個發送消息的service方法:
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@Override
public String sendMsgByTopicExchange(String msg, String routingKey) throws Exception {
Map<String, Object> message = getMessage(msg);
try {
//發送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, routingKey, message);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
}
寫一個Controller接口:
@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
@Resource
private RabbitMQService rabbitMQService;
/** * 通配符交換機發送消息 * * @author java技術愛好者 */
@PostMapping("/topicSend")
public String topicSend(@RequestParam(name = "msg") String msg, @RequestParam(name = "routingKey") String routingKey) throws Exception {
return rabbitMQService.sendMsgByTopicExchange(msg, routingKey);
}
}
生產者這邊寫完,就寫消費端,消費端比較簡單,寫三個監聽類:
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A))
public class TopicExchangeConsumerA {
@RabbitHandler
public void process(Map<String, Object> map) {
System.out.println("隊列[" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A + "]收到消息:" + map.toString());
}
}
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B))
public class TopicExchangeConsumerB {
@RabbitHandler
public void process(Map<String, Object> map) {
System.out.println("隊列[" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B+ "]收到消息:" + map.toString());
}
}
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C))
public class TopicExchangeConsumerC {
@RabbitHandler
public void process(Map<String, Object> map) {
System.out.println("隊列[" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C + "]收到消息:" + map.toString());
}
}
大功告成,然后啟動項目開始調試。啟動成功后可以看到隊列和路由鍵綁定的關系:
通過POSTMAN進行測試,測試一下 rabbit.# 的路由鍵是否能夠匹配成功:
測試成功,隊列A消費到消息:
接着測試 a.* 路由鍵,發送 routingKey = a.b :
比較常用的就是以上三種:直連(DirectExchange),發布訂閱(FanoutExchange),通配符(TopicExchange)。熟練運用這三種交換機類型,基本上可以解決大部分的業務場景。
實際上稍微思考一下,可以發現通配符(TopicExchange)這種模式其實是可以達到直連(DirectExchange)和發布訂閱(FanoutExchange)這兩種的效果的。
FanoutExchange不需要綁定routingKey,所以性能相對TopicExchange會好一點。
6.4 Headers Exchange
這種交換機用的相對沒這么多。它跟上面三種有點區別,它的路由不是用routingKey進行路由匹配,而是在匹配請求頭中所帶的鍵值進行路由。如圖所示:
創建隊列需要設置綁定的頭部信息,有兩種模式:全部匹配和部分匹配。如上圖所示,交換機會根據生產者發送過來的頭部信息攜帶的鍵值去匹配隊列綁定的鍵值,路由到對應的隊列。代碼怎么實現呢,往下看演示代碼:
首先還是需要定義交換機名稱,隊列名稱:
/** * HEADERS_EXCHANGE交換機名稱 */
public static final String HEADERS_EXCHANGE_DEMO_NAME = "headers.exchange.demo.name";
/** * RabbitMQ的HEADERS_EXCHANGE交換機的隊列A的名稱 */
public static final String HEADERS_EXCHANGE_QUEUE_A = "headers.queue.a";
/** * RabbitMQ的HEADERS_EXCHANGE交換機的隊列B的名稱 */
public static final String HEADERS_EXCHANGE_QUEUE_B = "headers.queue.b";
然后設置交換機,隊列,進行綁定:
@Component
public class DirectRabbitConfig implements BeanPostProcessor {
@Bean
public Queue headersQueueA() {
return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A, true, false, false);
}
@Bean
public Queue headersQueueB() {
return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B, true, false, false);
}
@Bean
public HeadersExchange rabbitmqDemoHeadersExchange() {
return new HeadersExchange(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, true, false);
}
@Bean
public Binding bindHeadersA() {
Map<String, Object> map = new HashMap<>();
map.put("key_one", "java");
map.put("key_two", "rabbit");
//全匹配
return BindingBuilder.bind(headersQueueA())
.to(rabbitmqDemoHeadersExchange())
.whereAll(map).match();
}
@Bean
public Binding bindHeadersB() {
Map<String, Object> map = new HashMap<>();
map.put("headers_A", "coke");
map.put("headers_B", "sky");
//部分匹配
return BindingBuilder.bind(headersQueueB())
.to(rabbitmqDemoHeadersExchange())
.whereAny(map).match();
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
rabbitAdmin.declareExchange(rabbitmqDemoHeadersExchange());
rabbitAdmin.declareQueue(headersQueueA());
rabbitAdmin.declareQueue(headersQueueB());
return null;
}
}
再寫一個Service方法發送消息:
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public String sendMsgByHeadersExchange(String msg, Map<String, Object> map) throws Exception {
try {
MessageProperties messageProperties = new MessageProperties();
//消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
//添加消息
messageProperties.getHeaders().putAll(map);
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, null, message);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
}
再寫一個Controller接口:
@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
@Resource
private RabbitMQService rabbitMQService;
@PostMapping("/headersSend")
@SuppressWarnings("unchecked")
public String headersSend(@RequestParam(name = "msg") String msg, @RequestParam(name = "json") String json) throws Exception {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> map = mapper.readValue(json, Map.class);
return rabbitMQService.sendMsgByHeadersExchange(msg, map);
}
}
生產者這邊寫完了,再寫兩個隊列的監聽類進行消費:
@Component
public class HeadersExchangeConsumerA {
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A))
public void process(Message message) throws Exception {
MessageProperties messageProperties = message.getMessageProperties();
String contentType = messageProperties.getContentType();
System.out.println("隊列[" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A + "]收到消息:" + new String(message.getBody(), contentType));
}
}
@Component
public class HeadersExchangeConsumerB {
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B))
public void process(Message message) throws Exception {
MessageProperties messageProperties = message.getMessageProperties();
String contentType = messageProperties.getContentType();
System.out.println("隊列[" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B + "]收到消息:" + new String(message.getBody(), contentType));
}
}
大功告成~啟動項目,打開管理界面,我們可以看到交換機綁定隊列的信息:
跟上面示意圖一樣~證明沒有問題,一切盡在掌握之中。使用POSTMAN發送,測試全匹配的隊列A:
再測試部分匹配的隊列B:
總結
這篇文章就先寫到這里了。回顧一下學了哪些:
- 什么是消息隊列?為什么使用消息隊列?
- RabbitMQ的特點、組成部分、工作流程
- 安裝RabbitMQ,以及完成一個HelloWord小案例
- RabbitMQ交換機的四種類型的特點,以及使用方法
實際上RabbitMQ還有事務機制和負載均衡這些還沒講,因為篇幅實在有點長了,差不多5千字了。所以放在下期講吧,盡請期待一下。
上面所有例子的代碼都上傳github了:
如果你覺得這篇文章對你有用,點個贊吧~
你的點贊是我創作的最大動力~
想第一時間看到我更新的文章,可以微信搜索公眾號「java技術愛好者
」,拒絕做一條咸魚,我是一個努力讓大家記住的程序員。我們下期再見!!!
能力有限,如果有什么錯誤或者不當之處,請大家批評指正,一起學習交流!