RabbitMQ


 

官方網址:

建議讀懂六種場景:http://www.rabbitmq.com/getstarted.html

針對mqtt的支持: https://www.rabbitmq.com/mqtt.html 

一、基本概念

1、名詞解釋

消息隊列服務(Message Queue Service,MQS)是一種分布式、高可用的在線消息隊列服務。它能為分布式應用系統提供異步解耦,具有高吞吐量,低延遲等優點。MQS服務基於RabbitMQ實現,不僅提供所有RabbitMQ的原生功能,還提供實例自動化部署、實例的維護、實例狀態的監控與告警、賬號和vHost的相關管理等功能。同時,MQS提供多種實例規格,方便用戶按需選擇。

RabbitMQ:AMQP的一種實現方式,服務器用Erlang語言編寫,支持多種客戶端,主要用於在分布式系統中存儲轉發消息。

節點:一個雲主機實例。

節點組:具有相同配置的雲主機實例的集合。

組件:可在操作系統上獨立安裝的軟件系統包,如RabbitMQ等。

user:使用IOP平台和RabbitMQ的用戶。

vhost:針對queues、exchanges、channels等資源的邏輯分組,用戶可以通過創建vHost的方式來實現上述資源的分組。

鏡像模式:RabbitMQ的集群模式包括普通模式和鏡像模式兩種,鏡像模式可以將指定隊列在其它節點上進行備份,當某一節點出現故障時,該隊列在其它節點上的備份仍可以為用戶正常提供服務。

connection:通過身份驗證完成的AMQP連接。

channel:共享一個TCP連接的通道。

exchange:從channel接收消息,並將消息路由到零或多個隊列中。

queue:用來存儲未被消費的消息。

binding:交換機路由消息到隊列中的路由規則。

producer:發送消息到RabbitMQ的生產者。

consumer:從RabbitMQ接收並消費消息的消費者。

2、運行原理

利用RabbitMQ進行消息傳遞前,需要在生產者、RabbitMQ和消費者之間建立完整的連接通道。

系統結構圖:

圖中:

兩個生產者發送的消息經過RabbitMQ中間件后,傳遞給了三個消費者;

RabbitMQ中存在交換機和消息隊列,交換機用來接收消息,並按照一定的規則路由消息到指定的隊列;消息隊列用來存儲未被消費者消費的消息。

3、Work Queues

創建工作隊列(Work queues)的主要目的是:執行相對比較耗時的任務時,可以將該任務分給多個消費者共同完成。

生產者發送的任務,被封裝成消息存儲到隊列中,如果該任務屬於密集型任務,可以利用兩個消費者去處理同一任務的不同部分,當然消費者可以不止兩個。

3.1、生產者發送任務

這個例子,

首先利用for循環生成一個資源相對比較豐富的任務,然后將其封裝成消息存儲在聲明的隊列中並發送;

同時為了防止在RabbitMQ出現宕機或重啟的狀況下,創建的消息隊列和消息丟失,將隊列和消息都設置為持久化。

3.2、消費者接收任務

這個例子相比hello world中1.2【消費者接收任務】,有以下不同:

  • 利用channel.basicQos(1)語句實現消息的公平分發:假設只有兩個消費者,並且RabbitMQ發送的所有奇數消息比較耗時,偶數消息卻較易處理,那么處理奇數消息的消費者會非常忙碌,同時處理偶數消息的消費者大部分時間內無事可做。盡管如此,RabbitMQ依然會均勻的分發任務。為了避免類似狀況的發生,利用channel.basicQos(1)語句實現在前一個消息沒有被處理並發回確認之前,消費者不會收到RabbitMQ發送的下一條消息的方法;或者將消息發送給其它消費者。
  • 用手動方式進行消息的確認:該確認方法用於相對耗時的任務中。首先利用channel.basicConsume(queue,false,consumer)語句關閉消息的自動確認機制,然后在工作完成后,利用channel.basicAck(envelope.getDeliveryTag(), false)命令手動發回確認消息。
  • 消費者處理消息的過程中使用了Thread.sleep()函數。Thread.sleep()函數可以模擬消費者始終處於一個忙碌的場景中

3.3、公平分發驗證

公平分發驗證即驗證利用channel.basicQos(1)語句進行消息分發后的結果。下圖RabbitMQ把消息同時分發給三個消費者(即打開三個控制台)的結果,效果表明實現了公平分發。

 3.4、消息確認驗證

采用手動方式確認消息的驗證結果:

有消息確認機制時,消費者對生產者發送的消息不能重復接收,無消息確認機制時,消費者對生產者發送的消息可以重復接收。

有消費確認機制時:消費者每次接收到消息時會向RabbitMQ發送確認消息,之后RabbitMQ就不會再發送同樣的消息給消費者;

無消息確認機制時,消費者接收消息之后不給RabbitMQ反饋確認信息,RabbitMQ沒有收到確認信息,就不會從隊列中刪除該消息,因此消息仍然能從隊列中獲取。

3.5、消息持久化驗證

 

消息持久化被設置后,RabbitMQ將需要發送的消息寫入到了磁盤中,而非內存中;

下次RabbitMQ啟動后,即使不運行發送者,消費者也可以從指定的隊列中取出存儲在磁盤中的消息。

4、hello world

假設一個生產者只向一個已命名的隊列中發送消息,發送的消息為Hello World,一個消費者從該隊列中接收消息並打印,交換機為默認類型。

消息傳遞的流程圖如5-2所示。在圖5-2中P為生產者,C為消費者。

4.1、生產者和消費者

4.2、代碼分析 - 創建鏈接

生產者還是消費者,在處理消息之前都需要和RabbitMQ中間件建立完整的連接。RabbitMQ連接的建立方式。

    程序中利用channel.queueDeclare命令聲明隊列,同時在聲明的隊列中設置了相關屬性值。

  • Queue: 隊列名。
  • Durable: 隊列是否持久化,即RabbitMQ宕機或重啟后,隊列和隊列中存儲的消息是否依然存在。
  • Exclusive:隊列是否排他。如果一個隊列被聲明為排他隊列,同一連接的不同通道可以同時訪問該隊列,並在連接斷開時自動刪除;這種隊列適用於只限一個客戶端發送讀取消息的實例。
  • Auto-delete: 隊列是否自動刪除,如果該隊列沒有任何訂閱的消費者,則會被自動刪除。這種隊列適用於臨時隊列。
  • Arguments: 隊列的其它屬性。

注意:

        如果只有消費者聲明隊列,則隊列被聲明之前,生產者發送的消息存在被丟棄的可能性,所以需要生產者首先聲明隊列;

        為了防止程序先運行消費者,消費者也需再次聲明隊列,目的為在RabbitMQ中創建隊列。

4.3、代碼分析 - 生產者發送消息

生產者使用channel.basicPublish命令向RabbitMQ發送消息,設置了相關屬性值。

  • Exchange: 交換機。此處用空字符串表示默認類型的交換機。
  • RoutingKey: 路由關鍵字。
  • Props: 消息的其它屬性。此處設置為null。
  • Body: 生產者發送的消息體。

4.4、代碼分析 - 消費者接收消息

消費者被創建之后,在接收消息的過程中,使用了方法名為handleDelivery的方法,並在該方法中傳入了各種相關的參數。

  • ConsumerTag: 第i個消費者的標簽。
  • Envelope: 利用AMQP基本方法封裝的一些參數,包括:表示接收到第幾條消息的DeliveryTag;若消息確認失敗,決定是否重發消息的Redeliver;選用的Exchange類型;路由關鍵字RoutingKey。
  • Properties: AMQP協議中預先確定的14條消息屬性。大部分屬性並不常用,常用的屬性會在之后的某些實例中進行詳細說明,此處的消息屬性全部為null。
  • Body: 消費者獲得的消息體。在該程序中使用UTF-8編碼將消息體轉換為英文的形式進行打印並輸出。

消息被接收並打印之后,一般需要在RabbitMQ中進行確認。

  • 如果消費者發送確認信號,RabbitMQ便知道消息被成功接收,然后將其從隊里中刪除;
  • 如果消費者沒有發送確認信號,RabbitMQ會認為這個消息沒有處理成功,然后把它傳遞給其它消費者。

消息確認的方法有兩種。

  • 一種只采用channel.basicConsume(queue,true,consumer)語句,使消息被接收后立即確認;
  • 另一種針對相對耗時的工作,先利用channel.basicConsume(queue,false,consumer)語句關閉消息的自動確認,在工作完成后,通過channel.basicACK命令手動發回確認。

4.5、代碼分析 - 關閉連接

消息被處理完后可以利用channel.close()和connection.close()語句將整個通道和連接關閉。如果連接沒有關閉,會導致RabbitMQ的資源無法釋放。

二、發布訂閱的三種消息路由機制

1、Publish/Subscribe 《---》fanout 廣播

1.1、概念

在工作隊列的實例中,RabbitMQ將同一任務的不同部分發送給不同的消費者,即每個消費者接收到的消息都是唯一的。但在該部分,我們將同一條消息發送給多個消費者,也就是說發布的消息將會被廣播給所有的接收者,這就是“發布/訂閱模式(Publish/Subscribe)”。

舉例:為了描述這種模式,需要建立一個簡單的日志系統,該日志系統包括發送日志信息和接收並打印信息兩部分。一個生產者P發送消息到RabbitMQ的交換機中,然后交換機將消息處理后,發送消息到兩個不同的隊列,最后消費者C1和C2分別去接收相應隊列的消息。顯然,兩個消費者接收到的消息是相同的。

相比於前面的例子,這個例子加入了對交換機的表示。因為生產者發送的消息並不會直接發送到隊列中,而是先發送給交換機;

交換機對接收到的消息進行處理,比如決定將消息發送給一個隊列還是多個隊列,或者選擇是否拋棄該消息;

交換機對消息處理完后,如果繼續將消息發送給隊列,對應的消費者才可以接收此消息。前兩種實例采用了默認類型的交換機,所以沒有加入對交換機的表示。

1.2、faout交換機與臨時隊列

常見的交換機類型有direct、topic、headers和fanout。

fanout類型的交換機可以把接收到的消息路由到所有與它綁定的隊列中。聲明交換機類型並與隊列綁定的方法如圖:

因為我們既想要監聽所有的消息,又只對當前的消息感興趣,所以放棄了使用已命名隊列,選則了臨時隊列。

臨時隊列可以實現:任何時候連接RabbitMQ,隊列都是新的、空的;一旦消費者和隊列不再連接,那么隊列可以自動刪除。

臨時隊列一般隨機命名,雖然它可以自行創建,但是利用RabbitMQ為我們選擇一個隨機分配的隊列卻是更好的方式。

1.3、消息的發送和接收結果

2、Routing 《---》direct

2.1、bindingkey

 

此處的綁定關鍵字(BindingKey)就是路由關鍵字(RoutingKey),但因發送消息的basicPublish命令中也含有路由關鍵字參數,所以為了避免混淆,將路由關鍵字又命名為綁定關鍵字。

在發布和訂閱模式中,盡管fanout類型的交換機接收到的消息,總能被路由到所有與之綁定的隊列中,但我們有時並不會對生產者發送的所有消息感興趣,而且希望消費者只接收所有消息中的某個子集。比如生產者發送的任務中包含error、warning和info類型的消息,某個消費者可以只輸出並打印error類型的消息,當然其它消費者也可以只輸出並打印warning類型的消息或info類型的消息,甚至所有類型的消息。

若果滿足上述需求,首先需要將交換機和隊列之間通過關鍵字進行綁定,同時利用該綁定關鍵字決定輸出並打印哪種類型的消息。圖5-16中交換機為direct型,交換機和隊列之間的綁定關鍵字分別為error和warning。

2.2、direct類型的交換機

 因為生產者發送的消息只路由到路由關鍵字和綁定關鍵字完全匹配的隊列中,所以相對fanout類型的交換機,direct類型的交換機功能更靈活。

direct類型交換機的聲明和與該交換機有關的關鍵字綁定方式如圖:

2.3、消費者獲取綁定關鍵字

 消費者在接收消息的過程中,首先需要獲得綁定關鍵字;

如果綁定關鍵字和生產者發送的路由關鍵字完全匹配,則輸出並打印消息。消費者獲取關鍵字的方法如圖:

在envelope中含有利用AMQP基本方法封裝的一些參數,其中一項即為綁定關鍵字。如果消費者希望在控制台輸出並打印該關鍵字,只需要利用getRoutingKey()命令。

2.4、消息的發送和接收結果

3、topics 《---》正則匹配路由

3.1、概念

盡管direct交換機相對fanout交換機來說已經足夠靈活,但它依然無法依據多重准則將消息路由到隊列中,而topic類型的交換機卻實現了這一功能。

Topic類型的交換機可以將接收到的消息路由到綁定關鍵字與路由關鍵字完全匹配的隊列中,但需要滿足以下條件:

(1)路由關鍵字必須由一組通過點“.”分隔的單詞組成 ;

(2)綁定關鍵字也必須由一組通過點“.”分隔的單詞組成;

(3)綁定關鍵字可以利用“*”和“#”兩種比較特殊的字符來進行模糊匹配,其中“*”可以匹配一個單詞,“#”可以匹配零個或多個單詞。

對比fanout、direct和topic類型的交換機發現:

如果綁定關鍵字為“#”,那么topic類型的交換機實質是fanout類型;

如果綁定關鍵字為“*”,那么topic類型的交換機實質則是direct類型。接下來利用圖5-20的具體實例流程對topic類型進行說明。

圖中:生產者P發送用來描述動物的信息;路由關鍵字由依次用來形容速度、顏色和物種的三個單詞和兩個點組成,比如路由關鍵字為"quick.orange.rabbit"和"lazy.brown.fox"等。

交換機和隊列1的綁定關鍵字為“*.orange.*”,和隊列2的綁定關鍵字分別為“lazy.#”、“*.*.rabbit”。我們可以形象的將此理解為:隊列1對橘色的動物更感興趣;隊列2更喜歡比較懶惰的動物,同時會監聽關於兔子的一切特征。依據topic交換機的條件進行分析,如果路由關鍵字為"quick.orange.rabbit",消息會被路由到兩個隊列中;如果路由關鍵字為"quick.orange.rabbit",消息只能被路由到隊列2中;如果生產者發送了“quick.orange.male.rabbit”的路由關鍵字,消息因無法匹配而被拋棄。

3.2、消息的發送和接收結果舉例

三、rabbitmq 遠程過程調用

1、RPC的基礎知識

遠程過程調用(Remote Procedure Call,RPC)是一種進程間的通信方式。

rabbitmq 的RPC使用client/server模型:

  • 首先客戶端發送請求消息到RabbitMQ,
  • 然后服務器從RabbitMQ中獲取消息並進行處理,
  • 最后處理的消息重新通過RabbitMQ傳遞給客戶端。

消息傳遞的流程圖:

為了更形象地理解RPC,這里進一步說明:

(1)客戶端將請求消息發送到RabbitMQ的隊列中進行存儲,此時客戶端和生產者類似;

(2)服務器從RabbitMQ中取出消息並處理,此時服務器和消費者類似;

(3)服務器將消息的處理結果發送到RabbitMQ的隊列中進行存儲,此時服務器類似於生產者;

(4)客戶端從RabbitMQ的隊列中獲取消息結果后,判斷該消息的響應和請求是否匹配,此時客戶端類似消費者。

2、reply-to和correlation-id

客戶端在發送消息給RabbitMQ時,會同時發送消息的reply-to和correlation-id屬性;獲取消息的處理結果時,客戶端也會接收到correlation-id屬性。下面對兩種屬性的功能進行說明。

reply-to即對消息的回調(callback)隊列進行命名。服務器對客戶端發送的請求消息接收並處理后,將結果反饋給客戶端,而客戶端為了接收該結果,需要在發送請求消息的同時,為服務器提供一個回調的隊列地址。

為每個RPC請求創建單獨的回調隊列,其效率非常低。比如客戶端發送大量的請求消息給服務器,服務器為了獲取消息的處理結果,必須為每條請求消息創建回調隊列,這樣不僅花費時間,而且占用了RabbitMQ的資源。因此為避免類似狀況,選擇只創建一個回調隊列,但這又造成客戶端接收到的結果無法確定是屬於哪條請求消息的問題。比如客戶端發送了請求消息A和請求消息B,就會分別接收到處理后的消息結果A和消息結果B,但是之前這些結果存儲在唯一的回調隊列中,我們當然不希望請求消息A和消息結果B匹配,同樣不希望請求消息B和消息結果A匹配。

為解決上述問題,在消息中加入correlation-id,用它與相關聯的請求RPC響應。客戶端發送請求消息時,同時發送唯一的correlation-id;如果接收消息的處理結果時,也接收到相同的correlation-id,則認為請求和響應是匹配的,否則拋棄該結果。圖5-23為生成並發送reply-to和correlation-id的過程。

3、RPC系統工作流程及結果

結合之前的分析,RPC的工作流程可以總結為:

(1)客戶端啟動時,會創建一個匿名的回調隊列;
(2)客戶端在發送的請求消息中設置兩個屬性:reply-to和correlation-id;

(3)服務器等候隊列中的請求,當請求出現,服務器處理請求並將請求結果發送到客戶端,使用的隊列名即消息屬性reply-to;

(4)客戶端等待回調隊列中的結果,當消息結果出現,便檢查correlation-id屬性,若correlation-id和請求中的correlation-id一致,則接收該結果。

 客戶端發送斐波納契數列fib(30)的請求消息,服務器處理后,客戶端接收到了正確的處理結果。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM