一、使用rabbitmq時的系統架構圖

通過路由鍵將交換機和隊列進行綁定,從而實現消息的發送和接收。
二、rabbitmq基本概念
rabbitmq是AMQP協議的一個開源實現,所以其內部實際上也是AMQP中的基本概念,如下圖所示:

1、Message(消息)
消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(傳輸模式,指出該消息可能需要持久化存儲)等。
2、Publisher
消息生產者,也是一個向交換器發布消息的客戶端應用程序,就是投遞消息的程序。
3、Exchange
交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。消息交換機,它指定消息按什么規則,路由到哪個隊列。
4、Routing Key
路由關鍵字,exchange根據這個關鍵字進行消息投遞。
5、Binding(綁定)
用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
它的作用就是把exchange和queue按照路由規則綁定起來。
綁定其實就是關聯了exchange和queue,或者這么說:queue對exchange的內容感興趣,exchange要把它的Message deliver到queue。
6、Queue(消息隊列)
消息的載體,每個消息都會被投到一個或多個隊列,等待消費者連接到這個隊列將其取走。它是消息的容器,也是消息的終點。
7、Connection
網絡連接,例如一個TCP連接。
8、Channel(信道,通道)
消息通道,在客戶端的每個連接里,可建立多個channel。
多路復用連接中的一條獨立雙向數據流通道。信道是建立在真實的TCP連接內的虛擬連接,AMQP命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說建立和銷毀TCP都是非常昂貴的開銷,所以引入了信道的概念以達到復用一條TCP連接的目的。
9、Consumer
消息消費者,表示一個從消息隊列中取得消息的客戶端應用程序,就是接受消息的程序。
10、Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。一個broker里可以有多個vhost,用作不同用戶的權限分離。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個vhost本質上就是一個mini版的rabbitmq服務器,擁有自己的隊列、交換器、綁定和權限機制。
vhost是AMQP概念的基礎,必須在連接時指定,rabbitmq默認的vhost是 / 。
11、Broker
表示消息隊列服務器實體。它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸。
三、AMQP中的消息路由
生產者把消息發布到Exchange上,消息最終到達隊列並被消費者接收,而Binding決定交換器的消息應該發送到那個隊列。如下圖所示:

四、Exchange類型
Exchange分發消息時根據類型的不同分發策略有區別,目前共有四種類型:direct、fanout、topic、headers。headers匹配AMQP消息的header而不是路由鍵,此外headers交換器和direct交換器完全一致,但性能差很多,目前幾乎用不到。且看direct、fanout、topic這三種類型。
1、direct類型
消息中的路由鍵routing key如果和Binding中的binding key一致,交換器就將消息發到對應的隊列中去。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換器要求路由鍵為“dog”,則只轉發routing key標記為“dog”的消息,不會轉發“dog.puppy”等等。它是完全匹配、單傳播的模式。
Driect exchange的路由算法非常簡單:通過bindingkey的完全匹配,可以用下圖來說明:


Exchange和兩個隊列綁定在一起,Q1的bindingkey是orange,Q2的binding key是black和green。
當Producer publish key是orange時,exchange會把它放到Q1上,如果是black或green就會到Q2上,其余的Message被丟棄。
2、fanout類型
每個發到fanout類型交換器的消息都會分到所有綁定的隊列上去。fanout交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。類似於子網廣播,每台子網內的主機都獲得了一份復制的消息。fanout類型轉發消息是最快的。 如下圖所示:

3、topic類型
topic交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:#和*,#匹配0個或多個單詞,*只能匹配一個單詞。
對於Message的routing_key是有限制的,不能是任意的。格式是以點號“.”分割的字符表。比如:”stock.usd.nyse”,“nyse.vmw”, “quick.orange.rabbit”。你可以放任意的key在routing_key中,當然最長不能超過255 bytes。對於routing_key,有兩個特殊字符#和*,#匹配0個或多個單詞,*只能匹配一個單詞。如下圖所示:


Producer發送消息時需要設置routing_key,routing_key包含三個單詞和兩個點號,第一個key描述了celerity(靈巧),第二個是color(色彩),第三個是物種。
在這里我們創建了兩個綁定: Q1 的binding key 是”.orange.“; Q2 是 “..rabbit” 和 “lazy.#”:Q1感興趣所有orange顏色的動物;Q2感興趣所有rabbits和所有的lazy的。
例如:rounting_key 為 “quick.orange.rabbit”將會發送到Q1和Q2中。rounting_key 為”lazy.orange.rabbit.hujj.ddd”會被投遞到Q2中,#匹配0個或多個單詞。
五、ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。
1、Connection
Connection是Rabbitmq的socket連接,它封裝了socket協議相關部分邏輯。
2、ConnectionFactory
ConnectionFactory是connection的制造工廠。
3、Channel
Channel是我們與rabbitmq打交道的最重要的一個接口,大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。
六、任務分發機制
1、Round-robin dispathching 循環分發
RabbbitMQ的分發機制非常適合擴展,而且它是專門為並發程序設計的,如果現在load加重,那么只需要創建更多的Consumer來進行任務處理。
2、Message acknowledgment 消息確認
為了保證數據不被丟失,RabbitMQ支持消息確認機制,為了保證數據能被正確處理而不僅僅是被Consumer收到,這就需要在處理完數據之后發送一個確認ack。
在處理完數據之后發送ack,就是告訴RabbitMQ數據已經被接收並且處理完成,RabbitMQ可以將消息從隊列中移除了。如果Consumer退出了但是沒有發送ack,那么RabbitMQ就會把這個Message發送到下一個Consumer,這樣就保證在Consumer異常退出情況下數據也不會丟失。
RabbitMQ沒有用到超時機制,它僅僅通過Consumer的連接中斷來確認該Message並沒有被正確處理,一個消費者處理消息的時間再長也不會導致該消息被發送給其他消費者,即RabbitMQ給了Consumer足夠長的時間來做數據處理。如果忘記ack,那么當Consumer退出時,Mesage會被重新分發,從而導致隊列中的累積的消息越來越多,然后RabbitMQ會占用越來越多的內存。
3、Message durability 消息持久化
如果我們希望即使在rabbitmq服務重啟的情況下,也不會丟失消息,我們可以將Queue與Message都設置成可持久化的(durable),這樣就可以保證絕大部分情況下我們的rabbitmq消息不會丟失。但依然解決不了小概率丟失事件的發生(例如rabbitmq服務器已經接收到了生產者的消息,但還沒來得及持久化該消息時rabbitmq服務器就斷電了)。如果也要將這種小概率事件管理起來就需要使用到事務了。要持久化隊列需要在聲明時指定durable=True;這里要注意,隊列的名字一定要是Broker中不存在的,不然不能改變此隊列的任何屬性。隊列和交換機有一個創建時候指定的標志durable,durable的唯一含義就是讓具有這個標志的隊列和交換機會在重啟之后重新建立。
消息持久化包括3部分
(1)exchange持久化,在聲明時指定durable => true
channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久的
(2)queue持久化,在聲明時指定durable => true
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//聲明消息隊列,且為可持久的
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)。
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的;如果exchange和queue兩者之間有一個持久化,一個非持久化,則不允許建立綁定。
注意:一旦創建了隊列和交換機,就不能修改其標志了。例如創建了一個non-durable的隊列,然后想把它改變成durable的,唯一的辦法就是刪除這個隊列然后重新創建。
4、Fair dispath 公平分發
你可能也注意到了,分發機制不是那么優雅,默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。n是取余后的,它不管Consumer是否還有unacked Message,只是按照這個默認的機制進行分發。那么如果有個Consumer工作比較重,那么就會導致有的Consumer基本沒事可做,有的Consumer卻毫無休息的機會,那么Rabbit是如何處理這種問題呢?

通過basic.qos方法設置prefetch_count=1,如下設置
channel.basic_qos(prefetch_count=1)
這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message,換句話說,在接收到該Consumer的ack前,它不會將新的Message分發給它。但是這種方法可能會導致queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者創建更多的virtualHost來細化你的設計。
5、分發到多個Consumer
Direct Exchange:直接匹配,通過Exchange名稱+RountingKey來發送與接收消息。
Fanout Exchange:廣播訂閱,向所有的消費者發布消息,但是只有消費者將隊列綁定到該路由器才能收到消息,忽略Routing Key。
Topic Exchange:主題匹配訂閱,這里的主題指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用英文句點來分隔多個詞,只有消息將隊列綁定到該路由器且指定RoutingKey符合匹配規則時才能收到消息。
Headers Exchange:消息頭訂閱,消息發布前為消息定義一個或多個鍵值對的消息頭,然后消費者接收消息,同時需要定義類似的鍵值對請求頭(如
x-mactch=all或者x_match=any),只有請求頭與消息頭匹配,才能接收消息,忽略RoutingKey。
默認的exchange:如果用空字符串去聲明一個exchange,那么系統就會使用”amq.direct”這個exchange。我們創建一個queue時,默認的都會有一個和新建queue同名的routingKey綁定到這個默認的exchange上去。如下:
channel.BasicPublish("", "TaskQueue", properties, bytes);
因為在第一個參數選擇了默認的exchange,而我們聲明的隊列叫TaskQueue,所以默認的,它要新建一個也叫TaskQueue的routingKey,並綁定在默認的exchange上,導致了我們可以在第二個參數routingKey中寫TaskQueue,這樣它就會找到定義的同名的queue並把消息放進去。
如果有兩個接收程序都是用了同一個的queue和相同的routingKey去綁定direct exchange的話,分發的行為是負載均衡的,也就是說第一個是程序1收到,第二個是程序2收到,以此類推。
如果有兩個接收程序用了各自的queue,但使用相同的routingKey去綁定direct exchange的話,分發的行為是復制的,即每個程序都會收到這個消息的副本。行為相當於fanout類型的exchange。
多個queue綁定同一個key也是可以的,對於下圖的例子,Q1和Q2都綁定了black,對於routing key是black的Message,會被deliver到Q1和Q2,其余的Message都會被丟棄。

七、RPC遠程過程調用
MQ本身是基於異步的消息處理,前面的示例中所有的生產者(P)將消息發送到RabbitMQ后不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。 但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端將我的消息處理完成后再進行下一步處理。這相當於RPC(Remote Procedure Call,遠程過程調用)。在RabbitMQ中也支持RPC。
RabbitMQ中實現RPC的機制如下圖所示:

客戶端發送請求(消息)時,在消息的屬性(MessageProperties ,在AMQP 協議中定義了14種properties ,這些屬性會隨着消息一起發送)中設置兩個值replyTo (一個Queue 名稱,用於告訴服務器處理完成后將通知我的消息發送到這個Queue 中)和correlationId (此次請求的標識號,服務器處理完成后需要將此屬性返還,客戶端將根據這個id了解哪條請求被成功執行或執行失敗)。
服務器端收到消息並處理,服務器端處理完消息后,將生成一條應答消息到replyTo 指定的Queue中 ,同時帶上correlationId 屬性,客戶端之前已訂閱replyTo 指定的Queue ,從中收到服務器的應答消息后,根據其中的correlationId 屬性分析哪條請求被執行了,然后根據執行結果進行后續業務處理。
參考博文:
(1) https://segmentfault.com/a/1190000019122774?utm_source=tag-newest
(2) https://blog.csdn.net/whoamiyang/article/details/54954780
(3) https://blog.csdn.net/yuanlong122716/article/details/104488654/
