RabbitMQ Node.js 示例


RabbitQM 處理和管理消息隊列的中間人(broker)。可簡單理解為郵局,你在程序中寫好消息,指定好收件人,剩下的事件就是 RabbitMQ 的工作了,它會保證收件人正確收到郵件。

任何發送郵件的程序都是 Producer,消息隊列可理解為郵筒,新件將堆積在此處。所有待處理的消息都以隊列形式存儲,總體上看來就是一個巨大的消息 buffer,至於存儲量與設置的內存及硬件有關。任何應用都可以向隊列添加消息,也可以多個消費者都在從隊列中獲取消息。

consumer 即是消息隊列中消息的應用,其處於等待接收來自 RabbitMQ 發送來的消息。

消息生產者,消費者及 RabbitMQ 這個中間人三者不必同時存在於同一機器上,實際運用時也確實大部分不會部署在同一機器上,比如有專門的機器作為 RabbitMQ 實體,而應用程序會部署在其他的集群。應用程序可以是同時負責生產消息的,也同時是消費者。

來自官方文檔中關於 RabbitMQ 消息列隊的示意圖

來自官方文檔中關於 RabbitMQ 消息列隊的示意圖

安裝

通過官網提供的地址下載相應平台的程序進行安裝,Mac 可通過 Homebrew 進行安裝

$ brew update && brew install rabbitmq

啟動

如果使用 Homebrew 安裝,可通過 brew services start rabbitmq 命令來啟動 RabbitMQ 服務。

$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)

或直接運行 /usr/local/sbin/rabbitmq-server

啟動后,會有一個可視化的管理后台,可通過 http://localhost:15672/ 訪問,用戶名密碼皆為 guest

基於 Node.js 的 Hello World 示例

通過 amqp.node 展示 RabbitMQ 在 Node.js 中應用的一個示例。

RabbmitMQ 支持多種協議進行通信,amqp.node 使用的是 AMQP 0-9-1 這一開源協議,后者專門為處理消息而設計。作為客戶端消費消息,使用的是 amqp.node client 模塊,但 RabbitMQ 本身是支持多種客戶端的。

初始化一個 Node,js 項目然后通過以下命令安裝 amqp.node 模塊:

$ mkdir rabbitmq-demo && yarn init -y
$ yarn add amqplib

發送消息

創建 send.js 文件,在其中編寫發送消息的邏輯,它將連接到 RabbitMQ 發送消息然后退出。

首先建立到 RabbitMQ 服務的連接,

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {});

連接建立成功后,創建一個通道(channel),具體的發送將會在這個通道中進行。

amqp.connect('amqp://localhost', function(error0, connection) {
  if (error0) {
    throw error0;
  }
  connection.createChannel(function(error1, channel) {});
});

發送消息前,需要先聲明一個隊列,然后將消息發送到該隊列:

amqp.connect('amqp://localhost', function(error0, connection) {
  if (error0) {
    throw error0;
  }
  connection.createChannel(function(error1, channel) {
    if (error1) {
      throw error1;
    }
    var queue = 'hello';
    var msg = 'Hello world';
<span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
  durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
});

<span class="pl-smi">channel</span>.<span class="pl-en">sendToQueue</span>(queue, <span class="pl-smi">Buffer</span>.<span class="pl-en">from</span>(msg));
<span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [x] Sent %s<span class="pl-pds">"</span></span>, msg);

});
});

隊列的創建是一個冪等操作,只該隊列不存在的情況才會新建。

最后關閉連接並退出。

setTimeout(function() {
    connection.close();
    process.exit(0);
}, 500);
完整的 send.js
#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}

    <span class="pl-k">var</span> queue <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>hello<span class="pl-pds">'</span></span>;
    <span class="pl-k">var</span> msg <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>Hello World!<span class="pl-pds">'</span></span>;

    <span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
        durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
    });
    <span class="pl-smi">channel</span>.<span class="pl-en">sendToQueue</span>(queue, <span class="pl-smi">Buffer</span>.<span class="pl-en">from</span>(msg));

    <span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [x] Sent %s<span class="pl-pds">"</span></span>, msg);
});
<span class="pl-c1">setTimeout</span>(<span class="pl-k">function</span>() {
    <span class="pl-smi">connection</span>.<span class="pl-c1">close</span>();
    <span class="pl-c1">process</span>.<span class="pl-en">exit</span>(<span class="pl-c1">0</span>);
}, <span class="pl-c1">500</span>);

});

接收消息

下面開始編寫消費者,消費者做的事情是監聽來自 RabbitMQ 的消息並處理。

創建 receive.js,引入 amqp.node 模塊,流程和發送者一樣,也是先創建連接,然后創建通道,在通道中聲明需要監聽的隊列:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'hello';

<span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
  durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
});

});
});

這里的隊列聲明不會與發送者那邊的沖突,因為上面提到過,隊列只在不存在的情況下才會重新生成。這里再次聲明可以保證監聽前隊列已經存在。並且實際場景下,消費者有可能是在發送者之前啟動的。

然后添加監聽的邏輯:

 console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);

channel.consume(queue, function(msg) {
console.log(" [x] Received %s", msg.content.toString());
}, {
noAck: true
});

完整的 receive.js
#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}

    <span class="pl-k">var</span> queue <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>hello<span class="pl-pds">'</span></span>;

    <span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
        durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
    });

    <span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [*] Waiting for messages in %s. To exit press CTRL+C<span class="pl-pds">"</span></span>, queue);

    <span class="pl-smi">channel</span>.<span class="pl-en">consume</span>(queue, <span class="pl-k">function</span>(<span class="pl-smi">msg</span>) {
        <span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [x] Received %s<span class="pl-pds">"</span></span>, <span class="pl-smi">msg</span>.<span class="pl-c1">content</span>.<span class="pl-c1">toString</span>());
    }, {
        noAck<span class="pl-k">:</span> <span class="pl-c1">true</span>
    });
});

});

運行

分別在命令行啟動上面兩個程序,查看打印的信息。

$ node send.js
 [x] Sent Hello World!

$ node receive.js
[*] Waiting for messages in hello. To exit press CTRL+C
[x] Received Hello World!

另外,可通過 sudo rabbitmqctl list_queues 手動查看 RabbitMQ 中的消息。

$ /usr/local/sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
hello	0

如果發現 rabbitmqctl 命令不可用,需要添加 /usr/local/sbin 到環境變量中,

export PATH=/usr/local/sbin:$PATH

其中 fish shell 通過添加如下命令到 fish 的配置文件即可:

set -gx PATH /usr/local/sbin $PATH

相關資源


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM