AMQP
定義
AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)是一個進程間傳遞異步消息的網絡協議。
模型圖
工作過程
發布者(Publisher)發布消息(Message),經由交換機(Exchange)。
交換機根據路由規則將收到的消息分發給與該交換機綁定的隊列(Queue)。
最后 AMQP 代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。
應用場景
對於一個大型的軟件系統來說,它會有很多的組件或者說模塊或者說子系統或者(subsystem or Component or submodule)。那么這些模塊的如何通信?這和傳統的IPC有很大的區別。傳統的IPC很多都是在單一系統上的,模塊耦合性很大,不適合擴展(Scalability);如果使用socket那么不同的模塊的確可以部署到不同的機器上,但是還是有很多問題需要解決。比如:
1)信息的發送者和接收者如何維持這個連接,如果一方的連接中斷,這期間的數據如何方式丟失?
2)如何降低發送者和接收者的耦合度? 首先是異步化的,可以大大提高系統的抗峰值能力。然后就是解耦
3)如何讓Priority高的接收者先接到數據?
4)如何做到load balance?有效均衡接收者的負載?
5)如何有效的將數據發送到相關的接收者?也就是說將接收者subscribe 不同的數據,如何做有效的filter。
6)如何做到可擴展,甚至將這個通信模塊發到cluster上?
7)如何保證接收者接收到了完整,正確的數據?
AMDQ協議解決了以上的問題,而RabbitMQ實現了AMQP。
參考文檔
https://blog.csdn.net/weixin_37641832/article/details/83270778
https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html
https://blog.csdn.net/anzhsoft/article/details/19563091
http://rabbitmq.mr-ping.com/
RabbitMQ
定義
RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現
https://www.rabbitmq.com/
生產者(Product)
發布消息到消息隊列服務中。
交換機(Exchange)
生產者的消息並不能直接到Queue中,而是經過交換機分配的。一個消息可以分配給一個或者多個queue,分配通過bind進行的。
在綁定(Binding)Exchange和Queue的同時,一般會指定一個Binding Key,生產者將消息發送給Exchange的時候,一般會產生一個Routing Key,當Routing Key和Binding Key對應上的時候,消息就會發送到對應的Queue中去。那么Exchange有四種類型,不同的類型有着不同的策略。也就是表明不同的類型將決定綁定的Queue不同,換言之就是說生產者發送了一個消息,Routing Key的規則是A,那么生產者會將Routing Key=A的消息推送到Exchange中,這時候Exchange中會有自己的規則,對應的規則去篩選生產者發來的消息,如果能夠對應上Exchange的內部規則就將消息推送到對應的Queue中去。那么接下來就來詳細講解下Exchange里面類型。
Exchange Type
我來用表格來描述下類型以及類型之間的區別。
- fanout
fanout類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中。
上圖所示,生產者(P)生產消息1將消息1推送到Exchange,由於Exchange Type=fanout這時候會遵循fanout的規則將消息推送到所有與它綁定Queue,也就是圖上的兩個Queue最后兩個消費者消費。
- direct
direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全匹配的Queue中
當生產者(P)發送消息時Rotuing key=booking時,這時候將消息傳送給Exchange,Exchange獲取到生產者發送過來消息后,會根據自身的規則進行與匹配相應的Queue,這時發現Queue1和Queue2都符合,就會將消息傳送給這兩個隊列,如果我們以Rotuing key=create和Rotuing key=confirm發送消息時,這時消息只會被推送到Queue2隊列中,其他Routing Key的消息將會被丟棄。
- topic
前面提到的direct規則是嚴格意義上的匹配,換言之Routing Key必須與Binding Key相匹配的時候才將消息傳送給Queue,那么topic這個規則就是模糊匹配,可以通過通配符滿足一部分規則就可以傳送。它的約定是:
- routing key為一個句點號“. ”分隔的字符串(我們將被句點號“. ”分隔開的每一段獨立的字符串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key與routing key一樣也是句點號“. ”分隔的字符串
- binding key中可以存在兩種特殊字符“*”與“#”,用於做模糊匹配,其中“*”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)
當生產者發送消息Routing Key=F.C.E的時候,這時候只滿足Queue1,所以會被路由到Queue中,如果Routing Key=A.C.E這時候會被同是路由到Queue1和Queue2中,如果Routing Key=A.F.B時,這里只會發送一條消息到Queue2中。
- headers
headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange綁定時指定的鍵值對;如果完全匹配則消息會路由到該Queue,否則不會路由到該Queue。
該類型的Exchange沒有用到過(不過也應該很有用武之地),所以不做介紹。
這里在對其進行簡要的表格整理:
類型名稱 | 類型描述 |
fanout | 把所有發送到該Exchange的消息路由到所有與它綁定的Queue中 |
direct | Routing Key==Binding Key |
topic | 我這里自己總結的簡稱模糊匹配 |
headers | Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。 |
隊列Queue
Queue(隊列)RabbitMQ的作用是存儲消息,隊列的特性是先進先出。上圖可以清晰地看到Client A和Client B是生產者,生產者生產消息最終被送到RabbitMQ的內部對象Queue中去,而消費者則是從Queue隊列中取出數據。可以簡化成表示為:
生產者Send Message “A”被傳送到Queue中,消費者發現消息隊列Queue中有訂閱的消息,就會將這條消息A讀取出來進行一些列的業務操作。這里只是一個消費正對應一個隊列Queue,也可以多個消費者訂閱同一個隊列Queue,當然這里就會將Queue里面的消息平分給其他的消費者,但是會存在一個一個問題就是如果每個消息的處理時間不同,就會導致某些消費者一直在忙碌中,而有的消費者處理完了消息后一直處於空閑狀態,因為前面已經提及到了Queue會平分這些消息給相應的消費者。這里我們就可以使用prefetchCount來限制每次發送給消費者消息的個數。詳情見下圖所示:
這里的prefetchCount=1是指每次從Queue中發送一條消息來。等消費者處理完這條消息后Queue會再發送一條消息給消費者。
Virtual Hosts
在RabbitMQ中可以虛擬消息服務器VirtualHost,每個VirtualHost相當月一個相對獨立的RabbitMQ服務器,每個VirtualHost之間是相互隔離的。exchange、queue、message不能互通。
一個broker里可以開設多個vhost。 borker簡單來說就是消息隊列服務器實體
基礎對象
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。Connection是RabbitMQ的socket鏈接,它封裝了socket協議相關部分邏輯。ConnectionFactory為Connection的制造工廠。
Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。
Connection就是建立一個TCP連接,生產者和消費者的都是通過TCP的連接到RabbitMQ Server中的,這個后續會再程序中體現出來。
Channel虛擬連接,建立在上面TCP連接的基礎上,數據流動都是通過Channel來進行的。為什么不是直接建立在TCP的基礎上進行數據流動呢?如果建立在TCP的基礎上進行數據流動,建立和關閉TCP連接有代價。頻繁的建立關閉TCP連接對於系統的性能有很大的影響,而且TCP的連接數也有限制,這也限制了系統處理高並發的能力。但是,在TCP連接中建立Channel是沒有上述代價的。