SpringBoot集成RabbitMQ
跟着純潔的微笑大佬一起學習的,有興趣的可以搜一下
RabbitMQ介紹
RabbitMQ 是一個開源的 AMQP 實現,服務器端用 Erlang 語言編寫,支持多種客戶端,如 Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP 的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
相關概念
消息隊列服務會有三個概念: 發消息者、隊列、消息接收者. RabbitMQ在基本概念上多做了一層抽象,在發消息者和隊列之間加入了交換器(Exchange)。這樣發消息者和隊列就沒有直接聯系,轉而變成發消息者把消息給交換器,交換器根據調度策略再把消息再給隊列。
- 左側 P 代表生產者,也就是往 RabbitMQ 發消息的程序
- 中間即是 RabbitMQ,其中包括了交換機和隊列。
- 右側 C 代表消費者,也就是往 RabbitMQ 拿消息的程序。
比較重要的概念有四個: 虛擬主機、交換機、隊列和綁定.
- 虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定,為什么需要多個虛擬主機呢?很簡單,RabbitMQ 當中,用戶只能在虛擬主機的粒度進行權限控制。因此,如果需要禁止 A 組訪問 B 組的交換機/隊列/綁定,必須為 A 和 B 分別創建一個虛擬主機,每一個 RabbitMQ 服務器都有一個默認的虛擬主機“/”。
- 交換機:Exchange 用於轉發消息,但是它不會做存儲,如果沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的消息。
- 綁定:也就是交換機需要和隊列相綁定,這其中如上圖所示,是多對多的關系。
交換機(Exchange)
交換機的功能主要是接收消息並且轉發到綁定的隊列,交換機不存儲消息,在啟用 ack 模式后,交換機找不到隊列,會返回錯誤。交換機有四種類型:Direct、topic、Headers and Fanout。
- Direct:其類型的行為是“先匹配、再投送”,即在綁定時設定一個
routing_key
,消息的routing_key
匹配時,才會被交換器投送到綁定的隊列中去。 - Topic:按規則轉發消息(最靈活)。
- Headers:設置 header attribute 參數類型的交換機。
- Fanout:轉發消息到所有綁定隊列。
Direct Exchange
Direct Exchange 是 RabbitMQ 默認的交換機模式,也是最簡單的模式,根據 key 全文匹配去尋找隊列。
第一個 X - Q1 就有一個 binding key,名字為 orange;X - Q2 就有 2 個 binding key,名字為 black 和 green。當消息中的路由鍵和這個 binding key 對應上的時候,那么就知道了該消息去到哪一個隊列中。
注意:為什么 X 到 Q2 要有 black、green,2 個 binding key 呢,一個不就行了嗎?這個主要是因為可能又有 Q3,而 Q3 只接收 black 的信息,而 Q2 不僅接收 black 的信息,還接收 green 的信息。
Topic Exchange
Topic Exchange 轉發消息主要是根據通配符。在這種交換機下,隊列和交換機的綁定會定義一種路由模式,那么,通配符就要在這種路由模式和路由鍵之間匹配后交換機才能轉發消息。
在這種交換機模式下:
- 路由鍵必須是一串字符,用句號(.)隔開,比如 agreements.us,或者 agreements.eu.stockholm 等;
- 路由模式必須包含一個星號
(*),主要用於匹配路由鍵指定位置的一個單詞,比如,一個路由模式是這樣子,agreements..b.*
,那么就只能匹配路由鍵是這樣子的,第一個單詞是 agreements,第四個單詞是 b;井號(#
)就表示相當於一個或者多個單詞,例如一個匹配模式是agreements.eu.berlin.#
,那么,以 agreements.eu.berlin 開頭的路由鍵都是可以的。
具體代碼發送的時候還是一樣,第一個參數表示交換機,第二個參數表示 routing key,第三個參數即消息。如下:
rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is RabbitMQ!");
Topic 和 Direct 類似, 只是匹配上支持了“模式”,在“點分”的 routing_key 形式中, 可以使用兩個通配符:
*
表示一個詞#
表示零個或多個詞
Headers Exchange
Headers 也是根據規則匹配,相較於 Direct 和 Topic 固定地使用 routing_key,headers 則是一個自定義匹配規則的類型。
在隊列與交換器綁定時,會設定一組鍵值對規則,消息中也包括一組鍵值對(headers 屬性),當這些鍵值對有一對或全部匹配時,消息被投送到對應隊列。
Fanout Exchange
Fanout Exchange 消息廣播的模式,不管路由鍵或者是路由模式,會把消息發給綁定給它的全部隊列,如果配置了 routing_key 會被忽略。
集成實踐
- 引入依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置文件:
spring:
application:
name: Spring-boot-rabbitmq
rabbitmq:
host: 39.105.167.131
port: 5762
username: guest
password: guest
- 定義隊列
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("hello");
}
}
- 發送者
@Component
public class MessageSender {
@Autowired
private AmqpTemplate rabbitTemplate;
private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void send(){
String context = "hello " + format.format(new Date());
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("Hello", context);
}
}
- 接受者
@Component
@RabbitListener(queues = "hello")
public class MessageReceive {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
- 測試
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {
@Autowired
private MessageSender messageSender;
@Test
public void testMessageSend() throws InterruptedException {
messageSender.send();
Thread.sleep(1000);
}
}
注意,發送者和接收者的 queue name 必須一致,不然不能接收。
執行測試方法,就可以發送消息了.
多方測試
一個發送者和 N 個接收者或者 N 個發送者和 N 個接收者會出現什么情況呢?
一對多發送:
新創建一個發送者和兩個接受者:
發送者:
@Component
public class OneToManySender {
@Autowired
private AmqpTemplate rabbitmq;
public void send(int i){
String context = "OneToMany Spring Boot queue + " + " *********** " + i;
System.out.println("Sender : " + context);
rabbitmq.convertAndSend("oneToMany", context);
}
}
接受者
@Component
@RabbitListener(queues = "oneToMany")
public class OneToManyReceive1 {
@RabbitHandler
public void process(String context){
System.out.println("oneToMany 1 : " + context);
}
}
@Component
@RabbitListener(queues = "oneToMany")
public class OneToManyReceive2 {
@RabbitHandler
public void process(String context){
System.out.println("oneToMany 2 : " + context);
}
}
測試:
@Autowired
private OneToManySender oneToManySender;
@Test
public void oneToMany() throws InterruptedException {
for (int i = 0; i < 100; i++) {
oneToManySender.send(i);
}
Thread.sleep(10000l);
}
```> 注意,在進行測試的時候,必須先創建隊列,也就是在RabbitConfig添加隊列

> 得出的結論是: 一個發送者,N 個接收者,經過測試接收端均勻接收到消息,也說明接收端自動進行了均衡負載,也就是流量分發。
##### 多對多發送
復用前面的代碼,再創建一個發送者.
@Component
public class ManyToMany {
@Autowired
private AmqpTemplate rabbitmqTemplate;
public void send(int i){
String context = "ManyToMany Spring Boot queue + " + " *********** " + i;
System.out.println("Sender3 : " + context);
this.rabbitmqTemplate.convertAndSend("manyToMany", context);
}
}
其他代碼不懂,測試如下:
@Autowired
private ManyToMany manyToMany;
@Test
public void manyToMany() throws InterruptedException {
for (int i = 0; i < 100; i++) {
oneToManySender.send(i);
manyToMany.send(i);
}
Thread.sleep(10000l);
}
> 結論:發送端交替發送消息,接收端仍然會均勻接收到消息
##### 高級使用 - 對象的支持
SpringBoot已經完美的支持對象的發送和接受,不需要額外的配置.
發送者
@Component
public class ObjectSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(User user) {
System.out.println("Sender object: " + user.toString());
this.rabbitTemplate.convertAndSend("object", user);
}
}
接受者
@Component
@RabbitListener(queues = "object")
public class ObjectReceiver {
@RabbitHandler
public void process(User user) {
System.out.println("Receiver object : " + user);
}
}
測試
@Autowired
private ObjectSender objectSend;
@Test
public void sendObject() throws InterruptedException {
User user = new User();
user.setName("王智");
user.setPass("123456");
objectSend.send(user);
Thread.sleep(1000l);
}
> 注意在配置文件中添加隊列的注冊(RabbitConfig類中)
@Bean
public Queue ObjectQueue() {
return new Queue("object");
}
##### Topic Exchange
Topic 是 RabbitMQ 中最靈活的一種方式,可以根據 routing_key 自由的綁定不同的隊列。首先對 Topic 規則配置,這里使用兩個隊列來測試:
@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
//定義隊列
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
//交換機
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
//將隊列和交換機綁定
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
queueMessages 同時匹配兩個隊列,queueMessage 只匹配“topic.message”隊列。
發送者:
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
}
}
接受者1:
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver1 : " + message);
}
}
接受者2
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver2 : " + message);
}
}
測試:
@Autowired
private TopicSender topicSender;
@Test
public void topic1() throws Exception {
topicSender.send1();
Thread.sleep(1000l);
}
@Test
public void topic2() throws Exception {
topicSender.send2();
Thread.sleep(1000l);
}
測試topic1的時候會有兩個接收者,測試topic2的時候只有一個接收者
##### Fanout Exchange
Fanout 就是我們熟悉的廣播模式或者訂閱模式,給 Fanout 交換機發送消息,綁定了這個交換機的所有隊列都收到這個消息。
Fanout的配置類:
@Configuration
public class FanoutRabbitConfig {
//定義隊列
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
//定義交換機
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
//分部進行綁定
@Bean
Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
發送者:
@Configuration
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
}
接收者A:
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver A: " + message);
}
}
接收B:
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver B: " + message);
}
}
接收C
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver C: " + message);
}
}
測試:
@Autowired
private FanoutSender fanoutSender;
@Test
public void fanoutSender() throws Exception {
fanoutSender.send();
Thread.sleep(1000l);
}
> RabbitMQ 一個非常高效的消息隊列組件,使用 RabbitMQ 可以方便的解耦項目之間的依賴,同時利用 RabbitMQ 的特性可以做很多的解決方案。Spring Boot 為 RabbitMQ 提供了支持組件 spring-boot-starter-amqp,加載的時候會自動進行配置,並且預置了 RabbitTemplate,可以讓我們在項目中方便的調用。在測試使用的過程中發現,RabbitMQ 非常的靈活,可以使用各種策略將不同的發送者和接收者綁定在一起,這些特性在實際項目使用中非常的高效便利。
源碼: https://github.com/MissWangLove/SpringBoot