1 RabbitMQ簡介
1.1 消息隊列中間件簡介
消息隊列中間件是分布式系統中重要的組件,主要解決應用耦合,異步消息,流量 削鋒等問題實現高性能,高可用,可伸縮和最終一致性[架構] 使用較多的消息隊列有
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
消息隊列在實際應用中常用的使用場景:異步處理,應用解耦,流量削鋒和消息通訊
- 異步處理
比如注冊、需要處理業務並且還需要發短信、發郵件驗證
-
應用解耦
比如發短信何發郵件都是由專門的項目或者微服務來操作,比封裝成工具類耦合性要低
-
流量削鋒
比如現實生活中的漏斗、項目高峰期提高並發
1.2 什么是RabbitMQ
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。
AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放
標准,為面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不
受產品、開發語言等條件的限制。
RabbitMQ 最初起源於金融系統,用於在分布式系統中存儲轉發消息,在易用性、擴展
性、高可用性等方面表現不俗。具體特點包括:
1.可靠性(Reliability)
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認。
2.靈活的路由(Flexible Routing)
在消息進入隊列之前,通過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ
已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,可以將多個
Exchange 綁定在一起,也通過插件機制實現自己的 Exchange 。
3.消息集群(Clustering)
多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker 。
4.高可用(Highly Available Queues)
隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。
5.多種協議(Multi-protocol)
RabbitMQ 支持多種消息隊列協議,比如 STOMP、MQTT 等等。
6.多語言客戶端(Many Clients)
RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等。
7.管理界面(Management UI)
RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方
面。
8.跟蹤機制(Tracing)
如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什么。
9.插件機制(Plugin System)
RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。
1.3 架構圖與主要概念
1.3.1 架構圖

1.3.2 主要概念
RabbitMQ Server: 也叫broker server,它是一種傳輸服務。 他的角色就是維護一條
從Producer到Consumer的路線,保證數據能夠按照指定的方式進行傳輸。
Producer: 消息生產者,如圖A、B、C,數據的發送方。消息生產者連接RabbitMQ服
務器然后將消息投遞到Exchange。
Consumer:消息消費者,如圖1、2、3,數據的接收方。消息消費者訂閱隊列,
RabbitMQ將Queue中的消息發送到消息消費者。
Exchange:生產者將消息發送到Exchange(交換器),由Exchange將消息路由到一個
或多個Queue中(或者丟棄)。Exchange並不存儲消息。RabbitMQ中的Exchange有
direct、fanout、topic、headers四種類型,每種類型對應不同的路由規則。
Queue:(隊列)是RabbitMQ的內部對象,用於存儲消息。消息消費者就是通過訂閱
隊列來獲取消息的,RabbitMQ中的消息都只能存儲在Queue中,生產者生產消息並最終
投遞到Queue中,消費者可以從Queue中獲取消息並消費。多個消費者可以訂閱同一個
Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者
都收到所有的消息並處理。
RoutingKey:生產者在將消息發送給Exchange的時候,一般會指定一個routing key,
來指定這個消息的路由規則,而這個routing key需要與Exchange Type及binding key聯
合使用才能最終生效。在Exchange Type與binding key固定的情況下(在正常使用時一
般這些內容都是固定配置好的),我們的生產者就可以在發送消息給Exchange時,通過
指定routing key來決定消息流向哪里。RabbitMQ為routing key設定的長度限制為255
bytes。
Connection: (連接):Producer和Consumer都是通過TCP連接到RabbitMQ Server
的。以后我們可以看到,程序的起始處就是建立這個TCP連接。
Channels: (信道):它建立在上述的TCP連接中。數據流動都是在Channel中進行
的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。
VirtualHost:權限控制的基本單位,一個VirtualHost里面有若干Exchange和
MessageQueue,以及指定被哪些user使用
2 RabbitMQ安裝
2.1 windows安裝
略
2.2 docker安裝
略
安裝成功后訪問:ip:15672 看到下面頁面代表安裝成功
rabbitmq默認賬號:guest
默認密碼:guest

3 RabbitMQ模式
3.1 直接模式(direct)
3.1.1 概念
將消息發給唯一一個節點時使用這種模式,這是最簡單的一種形式。

任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue。
1.一般情況可以使用rabbitMQ自帶的Exchange:”"(該Exchange的名字為空字符串,下
文稱其為default Exchange)。
2.這種模式下不需要將Exchange進行任何綁定(binding)操作
3.消息傳遞時需要一個“RouteKey”,可以簡單的理解為要發送到的隊列名字。
4.如果vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。
3.1.2 代碼實現
需求:將消息發給q1隊列(默認就是直接模式)
首先在RabbitMQ中創建一個q1隊列

然后在rabbitmq中就可以看到剛剛創建的隊列

項目搭建
(1)創建工程rabbitMq,引入依賴:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
(2)編寫配置文件application.yml
spring:
rabbitmq:
host: localhost
(3)編寫啟動類:
@SpringBootApplication
public class RabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class,args);
}
}
(4)編寫消息生產者測試類
@SpringBootTest(classes = RabbitMQApplication.class)
@RunWith(SpringRunner.class)
public class MqTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg(){
//第一個參數為隊列名稱,第二個參數為要發送的消息對象,這里傳的是一個字符串
rabbitTemplate.convertAndSend("q1","hello rabbit");
}
}
執行上面代碼就可以向消息隊列發送一個消息,可以看到rabbitmq中的消息變成1了。

(5)編寫消息消費者代碼
@Component
@RabbitListener(queues = {"q1"})//從哪個隊列取消息
public class Q1Listener {
@RabbitHandler//取到消息后的處理方法
public void receiverMsg(String msg){
System.out.println(msg);
}
}
執行上面代碼控制台會打印消息,並且隊列中的消息被消費了。

3.2 分列模式(fanout)
3.2.1 概念
將消息一次發給多個隊列時,需要使用這種模式。如下圖:

任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的所有
Queue上。
1.可以理解為路由表的模式
2.這種模式不需要RouteKey
3.這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個
Queue,一個Queue可以同多個Exchange進行綁定。
4.如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。
3.2.2 代碼實現
需求:將消息發給q1和q2隊列
(1)在RabbitMQ中創建q2,q3隊列
(2)在RabbitMQ中創建路由

(3)在RabbitMQ中配置路由規則

(4)編寫生產者測試代碼
@Test
public void sendMsgFanout(){
//第一個參數為路由名稱,第三個參數為消息對象
rabbitTemplate.convertAndSend("q1&q2","","hello rabbit fanout");
}
(5)編寫消費者listener(q2,q3)
q2
@Component
@RabbitListener(queues = {"q2"})
public class Q2Listener {
@RabbitHandler
public void receiverMsg(String msg){
System.out.println("從q2中取消息:"+msg);
}
}
q3
@Component
@RabbitListener(queues = {"q3"})
public class Q3Listener {
@RabbitHandler
public void receiverMsg(String msg){
System.out.println("從q3中取消息:"+msg);
}
}
3.3 主題模式(Topic)
3.3.1 概念
分列模式的升級版,可以給每個隊列指定key
任何發送到Topic Exchange的消息都會被轉發到所有關心RouteKey中指定話題的Queue
上

如上圖所示
此類交換器使得來自不同的源頭的消息可以到達一個對列,其實說的更明白一點就是模
糊匹配的意思,例如:上圖中紅色對列的routekey為usa.#,#代表匹配任意字符,但是
要想消息能到達此對列,usa.必須匹配后面的#好可以隨意。圖中usa.news
usa.weather,都能找到紅色隊列,符號# 匹配一個或多個詞,符號* 匹配不多不少一個
詞。因此usa.# 能夠匹配到usa.news.XXX ,但是usa.* 只會匹配到usa.XXX 。
注:
交換器說到底是一個名稱與隊列綁定的列表。當消息發布到交換器時,實際上是由你所
連接的信道,將消息路由鍵同交換器上綁定的列表進行比較,最后路由消息。
任何發送到Topic Exchange的消息都會被轉發到所有關心RouteKey中指定話題的
Queue上
1.這種模式較為復雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一
個“標題”(RouteKey),Exchange會將消息轉發到所有關注主題能與RouteKey模糊匹配的
隊列。
2.這種模式需要RouteKey,也許要提前綁定Exchange與Queue。
3.在進行綁定時,要提供一個該隊列關心的主題,如“#.log.#”表示該隊列關心所有涉及
log的消息(一個RouteKey為”MQ.log.error”的消息會被轉發到該隊列)。
4.“#”表示0個或若干個關鍵字,“”表示一個關鍵字。如“log.”能與“log.warn”匹配,無法
與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。
5.同樣,如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此消息
3.3.2 代碼實現
(1)創建交換器

(2)配置路由key

說明:
-
交換器會將消息轉發到所有關注主題能與RouteKey
模糊匹配的隊列,如果交換器沒有發現能夠與RouteKey匹配的隊列,則會拋棄此消息。 -
#表示多個字符*表示一個字符
(4)編寫生產者代碼
@Test
public void sendMsgTopic(){
/**
* rabbitTemplate.convertAndSend(args1,args2,args3);
* 第一個參數為交換器名稱
* 第二個參數為路由匹配規則
* 第三個參數為消息對象
*/
//只發送給q1
//rabbitTemplate.convertAndSend("topic_ex","haha.abc"," rabbit fanout");
//只發送給q2
//rabbitTemplate.convertAndSend("topic_ex","abc.hehe"," rabbit fanout");
//發送給q1、q2、q3
//rabbitTemplate.convertAndSend("topic_ex","haha.hehe"," rabbit fanout");
//只發送給q1、q2
rabbitTemplate.convertAndSend("topic_ex","haha.heihei.hehe","rabbit fanout");
}
