說明該篇文章內容包括有rabbitMq相關的一些簡單理論介紹,provider消息推送實例,consumer消息消費實例,Direct、Topic、Fanout的使用。
在安裝完rabbitMq后,輸入http://ip:15672/ ,是可以看到一個簡單后台管理界面的。

在這個界面里面我們可以做些什么?
可以手動創建虛擬host,創建用戶,分配權限,創建交換機,創建隊列等等,還有查看隊列消息,消費效率,推送效率等等。
以上這些管理界面的操作在這篇暫時不做擴展描述,我想着重介紹后面實例里會使用到的。
首先先介紹一個簡單的一個消息推送到接收的流程,提供一個簡單的圖:

黃色的圈圈就是我們的消息推送服務,將消息推送到 中間方框里面也就是 rabbitMq的服務器,然后經過服務器里面的交換機、隊列等各種關系(后面會詳細講)將數據處理入列后,最終右邊的藍色圈圈消費者獲取對應監聽的消息。
常用的交換機有以下三種,因為消費者是從隊列獲取信息的,隊列是綁定交換機的(一般),所以對應的消息推送/接收模式也會有以下幾種:
Direct Exchange
直連型交換機,根據消息攜帶的路由鍵將消息投遞給對應隊列。
大致流程,有一個隊列綁定到一個直連交換機上,同時賦予一個路由鍵 routing key 。
然后當一個消息攜帶着路由值為X,這個消息通過生產者發送給交換機時,交換機就會根據這個路由值X去尋找綁定值也是X的隊列。
Fanout Exchange
扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列。
Topic Exchange
主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和綁定鍵之間是有規則的。
簡單地介紹下規則:
* (星號) 用來表示一個單詞 (必須出現的)
# (井號) 用來表示任意數量(零個或多個)單詞
通配的綁定鍵是跟隊列進行綁定的,舉個小例子
隊列Q1 綁定鍵為 *.TT.* 隊列Q2綁定鍵為 TT.#
如果一條消息攜帶的路由鍵為 A.TT.B,那么隊列Q1將會收到;
如果一條消息攜帶的路由鍵為TT.AA.BB,那么隊列Q2將會收到;
主題交換機是非常強大的,為啥這么膨脹?
當一個隊列的綁定鍵為 "#"(井號) 的時候,這個隊列將會無視消息的路由鍵,接收所有的消息。
當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。
所以主題交換機也就實現了扇形交換機的功能,和直連交換機的功能。
另外還有 Header Exchange 頭交換機 ,Default Exchange 默認交換機,Dead Letter Exchange 死信交換機,這幾個該篇暫不做講述。
本次實例教程需要創建2個springboot項目,一個生產者,一個消費者。
首先創建 生產者,
pom.xml里用到的jar依賴:
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml:
#本機端口
server:
port: 8082
spring:
#給項目來個名字
application:
name: rabbitmq-provider
#配置rabbitMq 服務器
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
然后先使用下direct exchange(直連型交換機),創建DirectRabbitConfig.java
package com.wx.test.rtmdemo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author : laz
* @CreateTime : 2021/11/14
* @Description :
**/
@Configuration
public class DirectRabbitConfig {
//隊列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
//隊列的三個參數講解
// durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
// exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable
// autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
// return new Queue("TestDirectQueue",true,true,false);
//一般設置一下隊列的持久化就好,其余兩個就是默認false
return new Queue("TestDirectQueue",true);
}
//Direct交換機 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange",true,false);
}
//綁定 將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
}
然后寫個簡單的接口進行消息推送:
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
將項目運行起來,調用這個接口:

此時,在rabbitMq管理頁面可以看到,有一條推送的消息。

然后有一個消息等待被消費。

此時,說明我們的生產者的消息已經生產成功,只需要等待消費者消費即可!
接下來創建消費者服務:
首先是pom.xml文件的依賴:
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后是application.yml:
server: port: 8083 spring: #給項目來個名字 application: name: rabbitmq-consumer #配置rabbitMq 服務器 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
然后是創建消息接收監聽類,DirectReceiver.java:
package com.wx.test.consumer.config;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "TestDirectQueue")//監聽的隊列名稱 TestDirectQueue
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消費者收到消息 : " + testMessage.toString());
}
}
然后啟動項目,可以看到生產者的消息,在這邊被成功消費了。

接下來是Topic Exchange交換機,配置文件以及依賴包不變,新建一個TopicRabbitConfig.java:
package com.wx.test.rtmdemo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : laz
* @CreateTime : 2021/11/14
* @Description :
**/
@Configuration
public class TopicRabbitConfig {
//綁定鍵
public final static String man = "topic.man";
public final static String woman = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
//將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man
//這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
//將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.#
// 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
然后在創建兩個接口,用於測試:
@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: M A N ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
return "ok";
}
@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.woman123", womanMap);
return "ok";
}
然后在創建一個消費者TopicManReceiver.java::
package com.wx.test.consumer.config;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author : laz
* @CreateTime : 2021/11/14
* @Description :
**/
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("TopicManReceiver消費者收到消息 : " + testMessage.toString());
}
}
然后再創建一個消費者TopicTotalReceiver.java::
package com.wx.test.consumer.config;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author : laz
* @CreateTime : 2021/11/14
* @Description :
**/
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("TopicTotalReceiver消費者收到消息 : " + testMessage.toString());
}
}
啟動項目,調用sendTopicMessage1這個接口:

可以看到,兩個消費者都消費了這條消息。
注意:
TopicManReceiver監聽隊列1,綁定鍵為:topic.man
TopicTotalReceiver監聽隊列2,綁定鍵為:topic.#
而當前推送的消息,攜帶的路由鍵為:topic.man
再調用sendTopicMessage2這個接口:

這時,只有TopicTotalReceiver成功消費了。
然后是Fanout Exchang交換機:
首先,創建生產者:
package com.wx.test.rtmdemo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : laz
* @CreateTime : 2021/11/14
* @Description :
**/
@Configuration
public class FanoutRabbitConfig {
/**
* 創建三個隊列 :fanout.A fanout.B fanout.C
* 將三個隊列都綁定在交換機 fanoutExchange 上
* 因為是扇型交換機, 路由鍵無需配置,配置也不起作用
*/
@Bean
public Queue queueA() {
return new Queue("fanout.A");
}
@Bean
public Queue queueB() {
return new Queue("fanout.B");
}
@Bean
public Queue queueC() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
同樣寫一個接口,用於生產一條消息:
@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: testFanoutMessage ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("fanoutExchange", null, map);
return "ok";
}
接下來在消費者里面創建一個類,用於接收消息:
FanoutReceiverA.java:
package com.wx.test.consumer.config;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author : laz
* @CreateTime : 2021/11/14
* @Description :
**/
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverA消費者收到消息 : " +testMessage.toString());
}
}
FanoutReceiverB.java:
package com.wx.test.consumer.config;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author : laz
* @CreateTime : 2021/11/14
* @Description :
**/
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverB消費者收到消息 : " +testMessage.toString());
}
}
FanoutReceiverC.java:
package com.wx.test.consumer.config;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author : laz
* @CreateTime : 2021/11/14
* @Description :
**/
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverC消費者收到消息 : " +testMessage.toString());
}
}
最后啟動項目,調用我們生產者的接口,查看消息消費情況:

可以看到,這三個類都消費到消息。
好了,這篇Springboot整合rabbitMq教程就暫且到此。本文若有錯誤,還請各路大佬指正指正!
