RabbitQM 處理和管理消息隊列的中間人(broker)。可簡單理解為郵局,你在程序中寫好消息,指定好收件人,剩下的事件就是 RabbitMQ 的工作了,它會保證收件人正確收到郵件。
任何發送郵件的程序都是 Producer,消息隊列可理解為郵筒,新件將堆積在此處。所有待處理的消息都以隊列形式存儲,總體上看來就是一個巨大的消息 buffer,至於存儲量與設置的內存及硬件有關。任何應用都可以向隊列添加消息,也可以多個消費者都在從隊列中獲取消息。
而 consumer 即是消息隊列中消息的應用,其處於等待接收來自 RabbitMQ 發送來的消息。
消息生產者,消費者及 RabbitMQ 這個中間人三者不必同時存在於同一機器上,實際運用時也確實大部分不會部署在同一機器上,比如有專門的機器作為 RabbitMQ 實體,而應用程序會部署在其他的集群。應用程序可以是同時負責生產消息的,也同時是消費者。
來自官方文檔中關於 RabbitMQ 消息列隊的示意圖
安裝
通過官網提供的地址下載相應平台的程序進行安裝,Mac 可通過 Homebrew 進行安裝:
$ brew update && brew install rabbitmq
啟動
如果使用 Homebrew 安裝,可通過 brew services start rabbitmq 命令來啟動 RabbitMQ 服務。
$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)
或直接運行 /usr/local/sbin/rabbitmq-server。
啟動后,會有一個可視化的管理后台,可通過 http://localhost:15672/ 訪問,用戶名密碼皆為 guest。
基於 Node.js 的 Hello World 示例
通過 amqp.node 展示 RabbitMQ 在 Node.js 中應用的一個示例。
RabbmitMQ 支持多種協議進行通信,amqp.node 使用的是 AMQP 0-9-1 這一開源協議,后者專門為處理消息而設計。作為客戶端消費消息,使用的是 amqp.node client 模塊,但 RabbitMQ 本身是支持多種客戶端的。
初始化一個 Node,js 項目然后通過以下命令安裝 amqp.node 模塊:
$ mkdir rabbitmq-demo && yarn init -y
$ yarn add amqplib
發送消息
創建 send.js 文件,在其中編寫發送消息的邏輯,它將連接到 RabbitMQ 發送消息然后退出。
首先建立到 RabbitMQ 服務的連接,
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {});
連接建立成功后,創建一個通道(channel),具體的發送將會在這個通道中進行。
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {});
});
發送消息前,需要先聲明一個隊列,然后將消息發送到該隊列:
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'hello';
var msg = 'Hello world';
<span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
});
<span class="pl-smi">channel</span>.<span class="pl-en">sendToQueue</span>(queue, <span class="pl-smi">Buffer</span>.<span class="pl-en">from</span>(msg));
<span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [x] Sent %s<span class="pl-pds">"</span></span>, msg);
});
});
隊列的創建是一個冪等操作,只該隊列不存在的情況才會新建。
最后關閉連接並退出。
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
完整的 send.js
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
<span class="pl-k">var</span> queue <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>hello<span class="pl-pds">'</span></span>;
<span class="pl-k">var</span> msg <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>Hello World!<span class="pl-pds">'</span></span>;
<span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
});
<span class="pl-smi">channel</span>.<span class="pl-en">sendToQueue</span>(queue, <span class="pl-smi">Buffer</span>.<span class="pl-en">from</span>(msg));
<span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [x] Sent %s<span class="pl-pds">"</span></span>, msg);
});
<span class="pl-c1">setTimeout</span>(<span class="pl-k">function</span>() {
<span class="pl-smi">connection</span>.<span class="pl-c1">close</span>();
<span class="pl-c1">process</span>.<span class="pl-en">exit</span>(<span class="pl-c1">0</span>);
}, <span class="pl-c1">500</span>);
});
接收消息
下面開始編寫消費者,消費者做的事情是監聽來自 RabbitMQ 的消息並處理。
創建 receive.js,引入 amqp.node 模塊,流程和發送者一樣,也是先創建連接,然后創建通道,在通道中聲明需要監聽的隊列:
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'hello';
<span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
});
});
});
這里的隊列聲明不會與發送者那邊的沖突,因為上面提到過,隊列只在不存在的情況下才會重新生成。這里再次聲明可以保證監聽前隊列已經存在。並且實際場景下,消費者有可能是在發送者之前啟動的。
然后添加監聽的邏輯:
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, function(msg) {
console.log(" [x] Received %s", msg.content.toString());
}, {
noAck: true
});
完整的 receive.js
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
<span class="pl-k">var</span> queue <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>hello<span class="pl-pds">'</span></span>;
<span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
});
<span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [*] Waiting for messages in %s. To exit press CTRL+C<span class="pl-pds">"</span></span>, queue);
<span class="pl-smi">channel</span>.<span class="pl-en">consume</span>(queue, <span class="pl-k">function</span>(<span class="pl-smi">msg</span>) {
<span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [x] Received %s<span class="pl-pds">"</span></span>, <span class="pl-smi">msg</span>.<span class="pl-c1">content</span>.<span class="pl-c1">toString</span>());
}, {
noAck<span class="pl-k">:</span> <span class="pl-c1">true</span>
});
});
});
運行
分別在命令行啟動上面兩個程序,查看打印的信息。
$ node send.js
[x] Sent Hello World!
$ node receive.js
[*] Waiting for messages in hello. To exit press CTRL+C
[x] Received Hello World!
另外,可通過 sudo rabbitmqctl list_queues 手動查看 RabbitMQ 中的消息。
$ /usr/local/sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 0
如果發現 rabbitmqctl 命令不可用,需要添加 /usr/local/sbin 到環境變量中,
export PATH=/usr/local/sbin:$PATH
其中 fish shell 通過添加如下命令到 fish 的配置文件即可:
set -gx PATH /usr/local/sbin $PATH

