RabbitMQ基礎教程


RabbitMQ相關概念介紹

RabbitMQ整體上是一個生產者與消費者模型,主要負責接收、存儲和轉發消息。可以把消息傳遞的過程想象成:當你將一個包裹送到郵局,郵局會暫存並最終將郵件通過郵遞員送到收件人的手上,RabbitMQ就好比郵局、郵箱和郵遞員組成的一個系統。從計算機術語層面來說,RabbitMQ模型更像是一種交換機模型。

生產者和消費者

Producer:生產者,就是投遞消息的一方。

生產者創建消息,然后發布到RabbitMQ中。消息一般可以包含2個部分:消息體和標簽(Label)。消息體也可以稱之為payload,在實際應用中,消息體一般是一個帶有業務邏輯結構的數據,比如一個JSON字符串。當然可以進一步對這個消息體進行序列化操作。消息的標簽用來描述這條消息,比如一個交換器的名稱和一個路由鍵。生產者把消息交由RabbitMQ,RabbitMQ之后會根據標簽把消息發送給感興趣的消費者(Consumer)。

Consumer:消費者,就是接受消息的一方。

消費者連接到RabbitMQ服務器,並訂閱到隊列上。當消費者消費一條消息時,只是消費消息的消息體(payload)。在消息路由的過程中,消息的標簽會丟棄,存入到隊列中的消息只有消息體,消費者也只會消費到消息體,也就不知道消息的生產者是誰,當然消費者也不需要知道。

Broker:消息中間件的服務節點。

對於RabbitMQ來說,一個RabbitMQ Broker可以簡單地看做一個RabbitMQ服務節點,或者RabbitMQ服務實例。大多數情況下也可以將一個RabbitMQ Broker看做一台RabbitMQ服務器。

圖2-2展示了生產者將消息存入RabbitMQ Broker,以及消費者從Broker中消費數據的整個流程。

首先生產者將業務方數據進行可能的包裝,之后封裝成消息,發送(AMQP協議里這個動作對應的命令為Basic.Publish)到Broker中。消費者訂閱並接受消息(AMQP協議里這個動作對應的命令為Basic.Consume或者Basic.Get),經過可能的解包處理得到原始的數據,之后在進行業務處理邏輯。這個業務處理邏輯並不一定需要和接收消息的邏輯使用同一個線程。消費者進程可以使用一個線程去接收消息,存入到內存中,比如使用Java中的BlockingQueue。業務處理邏輯使用另一個線程從內存中讀取數據,這樣可以將應用進一步解耦,提高整個應用的處理效率。

隊列

Queue:隊列,是RabbitMQ的內部對象,用於存儲消息。參考圖2-1,隊列可以用圖2-3表示。

RabbitMQ中消息都只能存儲在隊列中,這一點和Kafka這種消息中間件相反。Kafka將消息存儲在topic(主題)這個邏輯層面,而相對應的隊列邏輯知識topic實際存儲文件中的位移標識。RabbitMQ的生產者生產消息並最終投遞到隊列中,消費者可以從隊列中獲取消息並消費。

多個消費者可以訂閱同一個隊列,這時隊列中的消息會被平均分攤(Round-Robin,即輪詢)給多個消費者進行處理,而不是每個消費者都收到所有的消息並處理,如圖2-4所示。

RabbitMQ不支持隊列層面的廣播消息,如果需要廣播消費,需要在其上進行二次開發,處理邏輯會變得異常復雜,同時也不建議這么做。

交換器、路由鍵、綁定

Exchange:交換器。在圖2-4中我們暫時可以理解成生產者將消息投遞到隊列中,實際上這個在RabbitMQ中不會發生。真實情況是,生產者將消息發送到Exchange(交換器,通常也可以用大寫的"X"來表示),有交換器將消息路由到一個或者多個隊列中。如果路由不到,或許會返回給生產者,或許直接丟棄。這里可以將RabbitMQ中的交換器看做一個簡單的實體。

RabbitMQ中的交換器有四種類型,不同的類型有着不同的路由決策。

RoutingKey:路由鍵。生產者將消息發給交換器的時候,一般會指定一個RoutingKey,用來指定這個消息的路由規則,而這個RoutingKey需要與交換器類型和綁定鍵(BindingKey)聯合使用才能最終生效。

在交換器類型和綁定鍵(BindingKey)固定的情況下,生產者可以在發送消息給交換器時,通過指定RoutingKey來決定消息流向哪里。

Binding:綁定。RabbitMQ中通過綁定將交換器與隊列聯合起來,在綁定的時候一般會指定一個綁定鍵(BindingKey),這樣RabbitMQ就知道如何正確地將消息路由到隊列了,如果2-6所示。

生產者將消息發送給交換器時,需要一個RoutingKey,當BindingKey和RoutingKey相匹配時,消息會被路由到對應的隊列中。在綁定多個隊列到同一個交換器的時候,這些綁定允許使用相同的BindingKey。BindingKey並不是在所有的情況下都生效,它依賴於交換器類型,比如fanout類型的交換器就會無視BindingKey,而是將消息路由到所有綁定該交換器的隊列中。

對於初學者來說,交換器、路由鍵、綁定這幾個概念理解起來會有點晦澀,可以對照着代碼來加深理解。

沿用本章開頭的比喻,交換器相當於投遞包裹的郵箱,RoutingKey相當於填寫在包裹上面的地址,BindingKey相當於包裹的目的地,當填寫在包裹上的地址和實際想要投遞的地址相匹配時,那么這個包裹就會被正確投遞地目的地,最后這個目的地的“主人”--隊列可以保留這個包裹。如果填寫的地址出錯,郵遞員不能正確投遞到目的地,包裹可能會回退給寄件人,也可能被丟棄。

有經驗的讀者可能會發現,在某些情形下,RoutingKey和BindingKey可以看做同一個東西。

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

define('EXCHANGE_NAME', '交換器名');
define('QUEUE_NAME', '隊列名');
define('ROUTING_KEY', '路由鍵');

$con = new AMQPStreamConnection('39.105.15.108', 5672, 'root', 'root');
$channel = $con->channel();

$channel->exchange_declare(EXCHANGE_NAME, 'direct', false, false, false);
$channel->queue_declare(QUEUE_NAME, false, false, false, false);
$channel->queue_bind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

$msg = new AMQPMessage('hello world');
$channel->basic_publish($msg, EXCHANGE_NAME, ROUTING_KEY);

echo '[x]發送hello world!'.PHP_EOL;

$channel->close();
$con->close();

以上代碼聲明了一個direct類型的交換器,然后將交換器和隊列綁定起來。注意這里使用的字樣是"ROUTING_KEY",在本該使用BindingKey的$channel.queue_bind方法中卻和$channel.basic_publish方法同樣使用了RoutingKey,這樣做的潛台詞是:這里的RoutingKey和BindingKey是同一個東西。在direct交換器類型下,RoutingKey和BindingKey需要完全匹配才能使用,所以上面代碼中采用了此種寫法會顯得方便許多。

但是在topic交換器類型下,RoutingKey和BindingKey之間需要做模糊匹配,兩者並不是相同的。BindingKey其實也屬於路由鍵中的一種,官方解釋為:the routing key to use for the binding。可以翻譯為:在綁定的時候使用的路由鍵。大多數時候,包括官方文檔和RabbitMQ PHP API中都把BindingKey和RoutingKey看做RoutingKey,為了避免混淆,可以這么理解:

  • 在使用綁定的時候,其中需要的路由鍵是BindingKey。涉及的客戶端方法如:$channel.exchange_bind、$channel.queue_bind,對應的AMQP命令為Exchange.Bind、Queue.bind。
  • 在發送消息的時候,其中需要的路由鍵是RoutingKey。涉及的客戶端方法如$channel.basic_publish,對應的AMQP命令為basic.publish。

由於某些歷史的原因,包括現存能搜集到的資料顯示:大多數情況下習慣性地將BindingKey寫成RoutingKey,尤其是在使用direct類型的交換器的時候。后面的內容會將兩者合稱為路由鍵,讀者需要注意區分其中的不同,可以根據上面的辨別方法進行有效的區分。

交換器類型

RabbitMQ常用的交換器類型有fanout、direct、topic、headers四種。AMQP協議里還提到另外兩種類型:System和自定義,這里不與描述。對於這四種類型下面一一闡述。

  • fanout
    它會把所有發送到該交換器的消息路由到所有與該交換器綁定的隊列中。

  • direct
    direct類型的交換器路由規則也很簡單,它會把消息路由到那些BindingKey和RoutingKey完全匹配的隊列中。
    以圖2-7為例,交換器的類型為direct,如果我們發送一條消息,並在發送消息的時候設置了路由鍵為"warning",則消息會路由到Queue1和Queue2,對應的實例代碼如下:

    $channel->basic_publish($msg, EXCHANGE_NAME, 'waning');
    

    如果在發送消息的時候設置路由鍵為"info"或者"debug",消息只會路由到Queue2。如果以其它的路由鍵發送消息,則消息不會路由到這兩個隊列中。

  • topic
    前面講到direct類型的交換器路由規則是完全匹配BindingKey和RoutingKey,但是這種嚴格的匹配方式在很多情況下不能滿足實際業務的需求。topic類型的交換器在匹配規則上進行了擴展,它與direct類型的交換器類似,也是將消息路由到BindingKey和RoutingKey想匹配的隊列中,但這里的匹配規則有些不同,它約定:

    • RoutingKey為一個點號“.”分割的字符串(被點號“.”分隔開的每一段獨立的字符串稱為一個單詞),如“com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;
    • BindingKey和RoutingKey一樣也是點號“.”分隔的字符串;
    • BindingKey中可以存在兩種特殊字符串“*”和“#”,用於做模糊匹配,其中“*”用於匹配一個單詞,“#”用於匹配多規則單詞(可以是零個)。

    已圖2-8中的配置為例:

    • 路由鍵為“com.rabbitmq.client”的消息會同時路由到Queue1和Queue2;
    • 路由鍵為“com.hidden.client”的消息只會路由到Queue2中;
    • 路由鍵為“com.hidden.demo”的消息只會路由到Queue2中;
    • 路由鍵為“java.rabbitmq.demo”的消息只會路由到Queue1中;
    • 路由鍵為“java.util.concurrent”的消息將會被丟棄或者返回給生產者(需要設置mandatory參數),因為它沒有匹配任何路由鍵。

  • headers
    headers類型的交換器不依賴於路由鍵的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。在綁定隊列和交換器時制定一組鍵值對,當發送消息到交換器時,RabbitMQ會獲取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配隊列和交換器綁定時指定的鍵值對,如果完全匹配則消息會路由到該隊列,否則不會路由到該隊列。headers類型的交換器性能會很差,而且也不實用,基本上不會看到它的存在。

RabbitMQ運轉流程

了解了以上的RabbitMQ架構模型及相關術語,再來回顧整個消息隊列過程。

在最初狀態下,生產者發送消息的時候(可依照圖2-1):

(1)生產者連接到RabbitMQ Broker,建立一個連接(Connection),開啟一個信道(Channel)

(2)生產者聲明一個交換器,並設置相關屬性,比如交換器類型、是否持久化等

(3)生產者聲明一個隊列並設置相關屬性,比如是否排他、是否持久化、是否自動刪除等

(4)生產者通過路由鍵將交換器和隊列綁定起來

(5)生產者發送消息至RabbitMQ Broker,其中包含路由鍵、交換器等信息

(6)相應的交換器根據接收到的路由鍵查找相匹配的隊列。

(7)如果找到,則將從生產者發送過來的消息存入相應的隊列。

(8)如果沒有找到,則根據生產者配置的屬性選擇丟棄還是回退給生產者

(9)關閉信道

(10)關閉連接

消費者接收消息的過程:

(1)消費者連接到RabbitMQ Broker,建立一個連接(Connection),開啟一個信道(Channel)。

(2)消費者向RabbitMQ Broker請求消費相應隊列中的消息,可能會設置相應的回調函數,以及做一些准備工作。

(3)等待RabbitMQ Broker回應並投遞相應隊列中隊列的消息,消費者接收消息。

(4)消費者確認(ack)接收到的消息。

(5)RabbitMQ從隊列中刪除相應已經被確認的消息。

(6)關閉信道

(7)關閉連接

如圖2-9所示,我們又引入了兩個新的概念:ConnectionChannel。我們知道無論是生產者還是消費者,都需要和RabbitMQ Broker建立連接,這個連接就是一條TCP連接,也就是Connection。一旦TCP連接建立起來,客戶端緊接着可以創建一個AMQP信道(Channel),每個信道都會被指派一個唯一的ID。信道是建立在Connection之上的虛擬連接,RabbitMQ處理的每條AMQP指令都是通過信道完成的。

我們完全可以直接使用Connection就能完成信道的工作,為什么還要引入信道呢?試想這樣一個場景,一個應用程序中有很多個線程需要從RabbitMQ中消費消息,或者生產消息,那么必然要建立很多個Connection,也就是許多個TCP連接。然后對於操作系統而言,建立和銷毀TCP連接是非常昂貴的開銷,如果遇到使用高峰,性能瓶頸也隨之顯現。RabbitMQ采用類似NIO(Non-blocking I/O)的做法,選擇TCP連接復用,不僅可以減少性能開銷,同時也便於管理。

每個線程把持一個信道,所以信道復用了Connection的TCP連接。同時RabbitMQ可以確保每個線程的私密性,就像擁有獨立的連接一樣。當每個信道的流量不是很大時,復用單一的Connection可以在產生性能瓶頸的情況下有效地節省TCP連接資源。但是當信道本身的流量很大時,這時候多個信道復用一個Connection就會產生性能瓶頸,進而使整體的流量被限制了。此時就需要開辟多個Connection,將這些信道均攤到這些Connection中,至於這些相關的調優策略需要根據業務自身的實際情況進行調節。

信道在AMQP中是一個很重要的概念,大多數操作都是在信道這個層面展開的。在代碼中也可以看出一些端倪,比如$channel.exchange_declare、$channel.queue_declare、$channel.basic_publish和$channel.basic_consume等方法。RabbitMQ相關的API與AMQP緊密相連,比如$channel.basic_publish對應AMQP的Basic.Publish命令。

NIO,也稱為非阻塞I/O,包含三大核心部分:Channel(信道)、Buffer(緩沖區)和Selector(選擇器)。NIO基於Channel和Buffer進行操作,數據總是從信道讀取數據到緩沖區中,或者從緩存區寫入到信道中。Selector用於監聽多個信道的事件(比如連接打開,數據到達等)。因此,單線程可以監聽多個數據的信道。NIO中有一個很有名的Reactor模式,有興趣的讀者可以深入研究。

AMQP協議介紹

從前面的內容可惡意了解到RabbitMQ是遵從AMQP協議的,換句話說,RabbitMQ就是AMQP協議的Erlang實現(當然RabbitMQ還支持STOMP、MQTT等協議)。AMQP的模型架構和RabbitMQ的模型架構是一樣的,生產者將消息發送給交換器,交換器和隊列綁定。當生產者發送消息時所攜帶的RoutingKey與綁定時的BingingKey相匹配時,消息即被存入相應的隊列之中。消費者可以訂閱相應的隊列來獲取消息。

RabbitMQ中的交換器、交換器類型、隊列、綁定、路由鍵等都是遵循的AMQP協議中相應的概念。目前RabbitMQ最新版本默認支持的是AMQP0-9-1。

AMQP協議本身包括三層。

  • Module Layer:位於協議最高層,主要定義了一些供客戶端調用的命令,客戶端可以利用這些命令實現自己的業務邏輯。例如,客戶端可以使用Queue.Declare命令聲明一個隊列或者使用Basic.Consume訂閱消費一個隊列中的消息。
  • Session Layer:位於中間層,主要負責將客戶端的命令發送給服務器,再將服務端的應答返回給客戶端,主要為客戶端與服務器之間的通信提供可靠性同步機制和錯誤處理。
  • Transport Layer:位於最底層,主要傳輸二進制數據流,提供幀的處理、信道復用、錯誤檢測和數據表示等。

AMQP說到底還是一個通信協議,通信協議都會涉及報文交互,從low-level距離來說,AMQP本身是應用層的協議,其填充於TCP的協議層的數據部分。而從high-level來說,AMQP是通過協議命令進行交互的。AMQP協議可以看作一系列結構化命令的集合,這里的命令代表一種操作,類似於HTTP中的方法(GET、POST、PUT、DELETE等)。

AMQP生產者流轉過程

為了形象地說明AMQP協議命令的流轉過程,這里示例簡潔版生產者代碼說明:

//創建連接
$connection = new AMQPStreamConnection('39.105.15.108', 5672, 'root', 'root');

//創建信道
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();

當客戶端與Broker建立連接的時候,會調用AMQPStreamConnection()的構造方法,這方法會進一步封裝成Protocol Header0-9-1的報文頭發送給Broker,一次通知Broker本次交互采用的是AMQP0-9-1協議,緊接着Broker返回Connection.Start來建立連接,在連接的過程中涉及Connection.Start/.Start-OK、Connection.Tune/.Tunk-Ok、Connection.Open/.Open-Ok這6個命令的交互。

當客戶端調用connection.createChannel方法准備開啟信道的時候、其包裝Channel.Open命令發送給Broker,等待Channel.Open-Ok命令。

當客戶端發送消息的時候,需要調用channel.basicPublish方法, 對應的AMQP命令為Basic.Publish,注意這個命令和前面涉及的命令略有不同,這個命令還包含了Content Header和Content Body。Content Header里面包含的是消息體的屬性,例如,投遞模式、優先級等,而Content Body包含消息體本身。

當客戶端發送完消息需要關閉資源時,涉及Channel.Close/.Close-Ok與Connection.Close/.Close-Ok的命令及哦啊胡。詳細流轉過程如下圖所示。

AMQP消費者流轉過程

我們繼續來看消費者的流轉過程,消費端的關鍵代碼如下所示:

$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel    = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);
echo "[x] Waiting for messages. To exit press Ctrl+C".PHP_EOL;

$callback = function ($msg) {
    echo '[x] Received ', $msg->body, PHP_EOL;
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
    $channel->wait();
}

其詳細流轉過程如下圖所示:

消費者客戶端同樣需要與Broker建立連接,與生產者客戶端一樣,協議交互同樣設計Connection.Start/.Start-Ok、Connection.Tune/Tune-Ok和Connection.Open/.Open-Ok等,圖2-11中省略了這些步驟,可以參考圖2-10。

緊接着也少不了在Connection之上建立channel,和生產者客戶端一樣,協議涉及Channel.Open/Open-Ok。

如果消費者之前調用了channel.BasicQos(int prefetchCount)的方法來涉及消費者客戶端最大能”保持“的未確認的消息數,那么協議流轉會涉及Basic.Qos/.Qos-Ok這兩個AMQP命令。

在真正消費之前,消費者客戶端需要向Broker發送Basic.Consume命令(即調用channel.basicConsume方法)將Channel置為接收模式,之后Broker回執Basic.Consume-Ok以告訴消費者客戶端准備好消息消息。緊接着Broker向消費者客戶端推送(Push)消息,即Basic.Deliver命令,有意思的是這個和Basic.Publish命令一樣會攜帶Content Header和Content Body。

消費者接收到消息並正確消費之后,向Broker發送確認,即Basic.Ack命令。

在消費者停止消費的時候,主動關閉連接,這點和生產者一樣,設計Channel.Close/.Close-Ok和Connection.Close/.Close-Ok。

安裝RabbitMQ

安裝依賴

yum install -y make gcc gcc-c++ m4 openssl openssl-devel 
yum install -y ncurses-devel unixODBC unixODBC-devel java java-devel socat

安裝Erlang

Erlang RPM包下載地址:https://packagecloud.io/rabbitmq/erlang

image-20201015193536673

下載成功后,到下載的文件資源目錄執行以下命令

yum localinstall erlang-22.3.4.10-1.el7.x86_64.rpm

安裝成功后,可以以下運行命令來查看你安裝的erl版本

erl -version

安裝RabbitMQ

RabbitMQ RPM包下載地址:https://github.com/rabbitmq/rabbitmq-server/releases

當你下載完成后,你需要運行下面的命令來將key導入

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

使用 yum 進行本地安裝,運行命令

yum localinstall rabbitmq-server-3.8.8-1.el7.noarch.rpm

當安裝完成后,你可以使用命令來啟動 rabbitmq 服務器:

systemctl start rabbitmq-server

然后我們就可以添加web管理插件了

# 添加web管理插件
rabbitmq-plugins enable rabbitmq_management

安裝好web管理插件后記得重啟rabbitmq-server

我們通過IP:端口(http://172.16.93.128:15672)的形式,就可以訪問RabbitMQ的Web管理界面了

默認情況下,訪問RabbitMQ服務的用戶名和密碼都是"guest",這個賬戶有限制,默認只能通過本地網絡(如localhost)訪問,遠程網絡訪問受限,使用默認的用戶 guest / guest (此也為管理員用戶)登陸,會發現無法登陸,報錯:User can only log in via localhost。那是因為默認是限制了guest用戶只能在本機登陸,也就是只能登陸localhost:15672。所以在實現生產和消費消息之前,需要另外添加一個用戶,並設置相應的訪問權限

添加新用戶,用戶名為"root",密碼為"root"

rabbitmqctl add_user root root

為root用戶設置所有權限

rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

設置用戶為管理員角色

rabbitmqctl set_user_tags root administrator

我們通過該用戶就可以訪問了

RabbitMQ常用命令

服務啟動與停止

# 啟動
systemctl start rabbitmq-server

# 停止
systemctl stop rabbitmq-server

# 查看狀態
systemctl status rabbitmq-server

插件管理

# 插件列表
rabbitmq-plugins list 

# 啟動插件(XXX為插件名)
rabbitmq-plugins enable XXX

# 停用插件(XXX為插件名)
rabbitmq-plugins disable XXX

用戶管理

# 添加用戶
rabbitmqctl add_user username password

# 刪除用戶
rabbitmqctl delete_user username

# 修改密碼
rabbitmqctl change_password username newpassword

# 設置用戶角色
rabbitmqctl set_user_tags username tag

# 列出用戶
rabbitmqctl list_users

權限管理

# 列出所有用戶權限
rabbitmqctl list_permissions

# 查看指定用戶權限
rabbitmqctl list_user_permissions username

# 清除用戶權限
rabbitmqctl clear_permissions [-p vhostpath] username

# 設置用戶權限
rabbitmqctl set_permissions [-p vhostpath] username conf write read
	conf: 一個正則匹配哪些資源能被該用戶訪問
	write:一個正則匹配哪些資源能被該用戶寫入
	read: 一個正則匹配哪些資源能被該用戶讀取

Go操作RabbitMQ

Hello World

send.go

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func main() {
	//1.與RabbitMQ建立連接,該連接抽象了套接字連接,並為我們處理協議版本協商和認證等。
	conn, err := amqp.Dial("amqp://root:itbsl2021.@39.105.15.108:5672/")
	if err != nil {
		log.Fatalf("連接RabbitMQ失敗,錯誤為:%v\n", err)
	}
	defer conn.Close()
	fmt.Println("成功連接RabbitMQ!")

	//2.創建通道,大多數API都是通過該通道操作的
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("打開channel失敗,錯誤為:%v\n", err)
	}
	defer ch.Close()
	fmt.Println("成功打開channel!")

	//3.聲明隊列(要發送,我們必須得聲明消息要發送到的度列,然后我們可以將消息發布到隊列中)
	//聲明隊列是冪等的,僅當隊列不存在時才創建,消息內容是一個字節數組,因此您可以在此處編碼任何內容
	q, err := ch.QueueDeclare(
		"hello",  //隊列名稱
		false,  //消息持久化(不持久化重啟后未消費的消息會丟失)
		false,//是否自動刪除(當最后一個消費者斷開連接以后,是否把我們的消息從隊列中刪除)
		false, //是否具有排他性
		false,  //是否阻塞
		nil,      //額外參數
	)
	if err != nil {
		log.Fatalf("聲明隊列失敗,錯誤為:%v\n", err)
	}
	fmt.Println("成功聲明隊列!")

	//4.將消息發布到隊列
	body := "Hello World!"
	err = ch.Publish(
		"",
		q.Name,
		false, //如果為true,根據exchange類型和routingKey規則,如果無法找到符合條件的隊列,那么就會把發送的消息返還發送者
		false, //如果為true,當exchange發送消息到隊列后發現隊列上沒有綁定消費者,則會把消息返還給發送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	if err != nil {
		log.Fatalf("消息發送失敗,錯誤為:%v\n", err)
	}
	fmt.Println("成功發送消息!")
}

receive.go

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func main() {
	//1.與RabbitMQ建立連接,該連接抽象了套接字連接,並為我們處理協議版本協商和認證等。
	conn, err := amqp.Dial("amqp://root:itbsl2021.@39.105.15.108:5672/")
	if err != nil {
		log.Fatalf("連接RabbitMQ失敗,錯誤為:%v\n", err)
	}
	defer conn.Close()
	fmt.Println("成功連接RabbitMQ!")

	//2.創建通道,大多數API都是通過該通道操作的
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("打開channel失敗,錯誤為:%v\n", err)
	}
	defer ch.Close()
	fmt.Println("成功打開channel!")

	//3.聲明隊列
	//請注意,我們也在這里聲明隊列。因為我們可能在發布者之前啟動消費者,所以我們希望在嘗試使用隊列中的消息之前確保隊列存在
	q, err := ch.QueueDeclare(
		"hello",  //隊列名稱
		false,  //持久化隊列
		false,//是否自動刪除
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatalf("聲明隊列失敗,錯誤為:%v\n", err)
	}
	fmt.Println("成功聲明隊列!")

	//4.注冊消費者
	//我們將告訴服務器將隊列中的消息傳遞給我們。
	//由於它將異步地向我們發送消息,因此我們將在goroutine中從通道(由amqp :: Consume返回)中讀取消息。
	msgs, err := ch.Consume(
		q.Name, //隊列名稱
		"", //用來區分多個消費者
		true, //是否自動應答
		false, //是否具有排他性
		false, //如果設置為true,表示不能將同一個connection中發送的消息傳遞給這個connection中的消費者
		false, //隊列消費是否阻塞
		nil, //額外參數
	)
	if err != nil {
		log.Fatalf("注冊消費者失敗,錯誤為:%v\n", err)
	}

	forever := make(chan bool)
	go func() {
		for data := range msgs {
			//實現我們要處理的邏輯函數
			fmt.Printf("收到一條消息:%s\n", data.Body)
		}
	}()
	fmt.Printf("[*] 等待消息,使用Ctrl+C退出\n")
	<-forever
}

Work queues

Publish/Subscribe

在上一教程中,我們創建了一個工作隊列。 工作隊列背后的假設是,每個任務都恰好交付給一個消費者。 在這一部分中,我們將做一些完全不同的事情-我們將消息傳達給多個消費者。 這種模式稱為“發布/訂閱”。

為了說明這種模式,我們將構建一個簡單的日志記錄系統。 它包含兩個程序-第一個程序將發出日志消息,第二個程序將接收並打印它們。

在我們的日志系統中,接收器程序的每個運行副本都將獲取消息。 這樣,我們將能夠運行一個接收器並將日志定向到磁盤。 同時我們將能夠運行其他接收器並在屏幕上查看日志。

本質上,已發布的日志消息將被廣播到所有接收者。

在本教程的前面部分中,我們向隊列發送消息和從隊列接收消息。 現在是時候在Rabbit中引入完整的消息傳遞模型了。

讓我們快速回顧一下先前教程中介紹的內容:

  • 生產者是發送消息的用戶應用程序。
  • 隊列是存儲消息的緩沖區。
  • 消費者是接收消息的用戶應用程序。

RabbitMQ消息傳遞模型中的核心思想是生產者從不將任何消息直接發送到隊列。 實際上,生產者經常甚至根本不知道是否將消息傳遞到任何隊列。

相反,生產者只能將消息發送到交換機。 交換機做的事很簡單。 一方面,它接收來自生產者的消息,另一方面,將它們推入隊列。 交換機必須確切知道如何處理收到的消息。 是否應將其附加到特定隊列? 是否應該將其附加到許多隊列中? 還是應該丟棄它。 規則由交換機類型定義。

前面已經介紹過交換機有四種類型,directtopicfanoutheaders。在發布/訂閱模式下,我們需要使用fanout類型。

扇型交換機(funout exchange)將消息路由給綁定到它身上的所有隊列,而不理會綁定的路由鍵。如果N個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這所有的N個隊列。扇型用來交換機處理消息的廣播路由(broadcast routing)。

因為扇型交換機投遞消息的拷貝到所有綁定到它的隊列,所以他的應用案例都極其相似:

  • 大規模多用戶在線(MMO)游戲可以使用它來處理排行榜更新等全局事件
  • 體育新聞網站可以用它來近乎實時地將比分更新分發給移動客戶端
  • 分發系統使用它來廣播各種狀態和配置更新
  • 在群聊的時候,它被用來分發消息給參與群聊的用戶。(AMQP沒有內置presence的概念,因此XMPP可能會是個更好的選擇)

扇型交換機圖例:

在本教程的前面部分中,我們對交換機一無所知,但仍然能夠將消息發送到隊列。 這是可能的,因為我們使用的是默認交換機,該交換由空字符串(“”)標識。我們使用默認或無名稱交換:消息被路由到具有routing_key參數指定的名稱的隊列(如果存在)。

現在,我們可以改為發布到我們命名的交換機中:

集成/封裝

我們可以基於streadway/amqp自己封裝一下操作RabbitMQ的代碼,使其使用起來更方便,下面是我封裝的一個庫,使用起來很簡單,代碼地址是https://github.com/itbsl/rabbitmq

安裝

go get github.com/itbsl/rabbitmq

推薦使用go module

使用

github.com/itbsl/rabbitmq,使用起來很方便,只有8個方法,分別是創建五個不同類型的RabbitMQ實例的方法,一個關閉連接的方法,以及一個生產消息的方法和消費消息的方法。

如下:

創建RabbitMQ實例方法(5個)

//創建普通類型的rabbitmq實例,消息沒有持久化
rabbitmq.NewSimpleRabbitMQ()
//創建工作隊列模式的RabbitMQ實例:消息持久化,需要手動應答
rabbitmq.NewWorkQueueRabbitMQ()
//創建發布/訂閱模式的RabbitMQ實例:消息持久化,自動應答
rabbitmq.NewPubSubRabbitMQ()
//創建路由模式的RabbitMQ實例:消息持久化,手動應答
rabbitmq.NewRouteRabbitMQ()
//創建話題模式的RabbitMQ實例:消息持久化,手動應答
rabbitmq.NewTopicRabbitMQ()

生產者客戶端:

1.創建RabbitMQ實例

2.發送消息

package main

import (
	"github.com/itbsl/rabbitmq"
	"log"
)

func main() {
	//連接RabbitMQ
	url := "amqp://root:root@127.0.0.1:5672/"
	mq, err := rabbitmq.NewWorkQueueRabbitMQ(url, "hello")
	if err != nil {
		log.Fatalf("連接RabbitMQ出錯,錯誤為:%v\n", err)
	}
	defer mq.Close()

	//發送消息
	err = mq.Send("hello world!")
	if err != nil {
		log.Fatalf("發送消息失敗,錯誤為:%v\n", err)
	}
}

消費者客戶端:

1.創建所需類型的RabbitMQ實例

2.消費消息,業務代碼寫在回調函數里

package main

import (
	"github.com/itbsl/rabbitmq"
	"log"
)

func main() {
	//連接RabbitMQ
	url := "amqp://root:root@127.0.0.1:5672/"
	mq, err := rabbitmq.NewWorkQueueRabbitMQ(url, "hello")
	if err != nil {
		log.Fatalf("連接RabbitMQ出錯,錯誤為:%v\n", err)
	}
	defer mq.Close()

	//消費消息
	err = mq.Consume(func(msg string) bool {
		//這里寫業務處理代碼,業務處理成功,返回true用於手動應答,失敗返回false,消息會被重新放入隊列
		return true
	})
	if err != nil {
		log.Fatalf("消費失敗,錯誤為:%v\n", err)
	}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM