掌握Rabbitmq幾個重要概念,從一條消息說起


RabbitMQ 是功能強大的開源消息代理。根據官網稱:也是使用量最廣泛的消息隊列。就像他的口號“Messaging that just works”,開箱即用使用簡單,支持多種消息傳輸協議(AMQP、STOMP、MQTT)。

一個應用程序或者服務如何使用RabbitMq呢?

首先會有生產者和消費者兩個角色;生產者連接到rabbit代理服務,創建一條AMQP信道,然后把生成的消息,通過信道發布到交換器上,交換器根據路由規則(路由key)進行綁定到或者路由到隊列上面。最后消息到達隊列上中。消費者跟生產者一樣需要先和rabbit代理服務器創建連接,同時創建一個消息管道,並訂閱到隊列上,進而從隊列中獲取消息,進行處理。這里面涉及到消息、交換器、綁定、隊列幾個重要的概念,下面會一一講解。整個過程如圖所示

 

消息

生產者創建消息,這里的消息是指?消息包含兩個部分內容:有效載荷(payload)、標簽(label)。有效載荷就是你想要傳輸的數據。而標簽是描述了有效載荷,並且RabbitMQ用它來決定誰將獲得消息的拷貝。其實通過上圖你也會發現,不同於tcp協議,因為AMQP沒有明確的接收方,只會用標簽表述這條消息,然后把消息交給Rabbit。rabbit會根據標簽把消息發送給感興趣的接收方。

隊列

消息最終到達隊列中並等待消費。消費者通過AMQP的Basic.Consume命令訂閱。這樣做會將信道設置為接受模式,直到取消對隊列的訂閱為止。訂閱之后,消費者在消費(或者拒絕)最近的接收的那條消息之后,就能從隊列中自動的接收下一條消息。

注意:什么時候消息才會從隊列中刪除呢?這里涉及到一個消息確認的動作。消費者接收到的每一條消息都必須進行確認。才會從隊列中刪除。消費者可以通過AMQP的Basic.Ack命令顯式地向rabbtmq發送一個確認,或者在訂閱到隊列的時候就將autoAck屬性設置為true;如:autoAck: true,一旦消費者接收消息,rabbitmq會自動視其確認了消息。

如果消費者接收到消費1,然后在確認之前從rabbit斷開連接,rabbitmq會認為這條消息沒有分發,然后重新分發下一個訂閱的消費者。這樣做的好處,即使你的應用程序奔潰了,也可以確保消息會被發送給另一個消費者進行處理,或者等待你的程序恢復正常連接,繼續消費。假設消費者A程序與rabbit斷開了連接,消息進而會被消費者B進行消費處理。如下圖

只要消費者不進行確認,rabbit將不會給該消費者發送消息,因為在上一條消息被確認之前,rabbit會認為這個消費者並沒有准備好接收下一條消息的能力。

在沒有辦法正常確認消息,不能一直堵塞呀,比如消費者有bug。那就使用AMPQ的Basic.Reject命令;明確的拒絕這條消息,其中一個參數requeue如果設置了ture的話,Rabbit會把消息重新發給下一個訂閱的消費者。

如果你檢測到一條消息本身有錯誤而任何一個消費者都無法處理的時候,就可以把requeue設置為false,rabbitmq會把消息從隊里中移除,而不會把他發送給新的消費者。

注意:這里你可以使用對拒絕的消息進行特殊處理,比如發送到死信隊列或者專門收集的erro隊里中。

小結:隊列是amqp消息通信的基礎模塊

1.為消息提供的處所,消息在此等待消費
2.對負載均衡來說,隊列是絕佳方案。只需附加一堆消費者,並讓rabbitmq以循環的方式均勻地分配發來的消息。
3.隊列是rabbit中消息的最后的終點。

 

交換器、綁定

我們知道消費者如何獲取消息,那么現在的問題是,消息是如何到達隊列的呢?消息發送到交換器,會根據確定的規則,RabbitMQ將會決定消息該投遞到哪個隊列。這些規則稱為路由鍵(routing key)。隊列通過路由鍵綁定到交換器。當你發送消息到代理服務器時,消息將擁有一個路由鍵。如:AMPQ的Basic.Publish方法,有個參數routingKey通過他指定。即便是空的,RabbitMQ也會將其和綁定使用的路由鍵進行匹配。

交換器有四種類型:direct、fanout、topic和headers;每種類型實現了不同的路由算法,前三個比較常用。

1.direct

這種模式非常簡單:路由鍵匹配的話,消息就被投遞到對應的隊列。路由算法-使用路由鍵和隊列名稱同名進行路由消息。使用場景-直接把消息發送到指定隊列時使用。


默認的direct交換器,不需要進行聲明, 隊列聲明會自動綁定到默認的交換器上,並以隊列名稱作為路右鍵。使用以下代碼發送消息申明的隊列中。

channel.BasicPublish(exchange: "",
                               routingKey: "hello",
                               basicProperties: null,
                               body: body);

2.fanout交換器

這種模式下,可以忽略routing key,唯一需要做的就是為新的消費者寫一段代碼,然后聲明新的隊列並將其綁定到fanout交換器上。當你發送一條消息到fanout交換器上,他會把消息投遞給所有附加在此交換器的隊里上。路由算法-消息會路由到綁定到交換器上的所有隊列。使用場景-發布訂閱的廣播功能

 

2.topic交換器

這類交換器允許不同源頭的消息到達同一個隊列。路由算法-根據全部或部分路由鍵匹配將消息路由綁定的隊列上。使用場景-根據某些條件廣播到特定的隊列上。

小結:

本文主要總結了 AMQP幾個主要元素:交換器,綁定,隊列。以及一個消息創建到消費者讀取消費的過程。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM