在前一篇文章中可伸縮架構簡短系列中提到過關於異步的問題。當時推薦使用RabbitMQ來做任務隊列的實現方案。本篇文章以Node.js為例子,來實際操作如何和RabbitMQ進行交互。
介紹
RabbitMQ是一個消息代理。它最初的思想特別簡單:接受並且轉發消息。你可以將它想象為郵局:當你將郵件放到信箱中,你可以非常肯定快件員最終會將郵件交到接受人手中。你可以把RabbitMQ比喻為信箱、郵局和快遞員。RabbitMQ和郵局之間主要的區別是它不處理紙張,而是接受、存儲和轉發二進制數據‒消息。
在RabbitMQ中,有一些基本術語:
- 生產者:就是發送信息這方。
- 任務隊列:雖然信息流在RabbitMQ和你的應用之間流動,它可以存儲在一個隊列中。隊列不受任何限制的約束,它可以存儲任意多的消息,它本質上是一個無限的緩沖區。許多生產者可以將消息發往到這個隊列中,許多消費者可以嘗試從這個隊列中接受數據。
- 消費者:消費者和信息接受者有相近的含義,一個消費者就是一個等待去接受信息的程序。
安裝RabbitMQ server
打開RabbitMQ的下載頁面(https://www.rabbitmq.com/download.html),下載安裝,這里以Mac OSX平台安裝為例:
RabbitMQ依賴Erlang,由於Mac OSX本身已經安裝了Erlang,所以可以直接通過Homebrew來進行安裝。
$ brew update
$ brew install rabbitmq
安裝完后,需要將/usr/local/sbin添加到$PATH,添加到./.bash_profile文件,然后
$ source ./.bash_profile
$ echo $PATH // 檢查環境變量是否已經成功加入
安裝完成后就可以啟動RabbitMQ server了。
至此,安裝就完成了。運行rabbitmq-server命令時可能會報錯誤:ERROR: epmd error for host localhost: timeout (timed out),如果遇到這種情況,可以打開/etc/hosts文件,在文件末尾加上 127.0.0.1 localhost即可解決問題。
Hello world
在這個部分,我會使用Javascript編寫兩個小程序。一個發送單個消息的生產者和接收消息並將其打印出來的消費者。我們將跳過在amqp.node API的一些細節,集中在這個非常簡單的事情。
在下圖中,P代表生產者,C代表消費者,中間紅色代表的是任務隊列-消息緩沖區
首先,使用npm安裝amqp.node
$ npm install amqplib
發送消息
這里我將消息的發送者稱作send.js,消息接受者稱作receive.js,消息發送者會連接到RabbitMQ,發送一個消息,最后退出。
首先引入amqplib這個模塊:
var amqp = require('amqplib/callback_api');
連接到 RabbitMQ server
amqp.connect('amqp://localhost', function(err, conn) {});
接下來創建一個通道,
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {});
});
為了發送消息,我們需要定義一個隊列,我們可以將消息發送到這個隊列中:
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'hello';
ch.assertQueue(q, {durable: false});
// Note: on Node 6 Buffer.from(msg) should be used
ch.sendToQueue(q, new Buffer('Hello World!'));
console.log(" [x] Sent 'Hello World!'");
});
});
最后,我們關閉連接,並且退出:
setTimeout(function() { conn.close(); process.exit(0) }, 500);
最終代碼參考:send.js
接受消息
建立一個接受者的方式和發送者是相同的。打開一個連接和通道,並且申明一個需要處理的隊列。注意:這里的隊列和發送者里面定義的隊列需要匹配。
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'hello';
ch.assertQueue(q, {durable: false});
});
});
這里也定義隊列的原因:接受者可能比發送者先開始執行。我們需要確保當接受者處理queue的時候,queue是存在的。
由於消息的發送是異步的,我們需要提供一個回調,這樣,當RabbitMQ發送消息給我們的消費者時,回調會執行。這個也是Channel.consume做的事情。
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
ch.consume(q, function(msg) {
console.log(" [x] Received %s", msg.content.toString());
}, {noAck: true});
最終代碼參考:receive.js
運行代碼
// 先執行send.js
$ ./send.js
// 后執行receive.js
$ ./receive.js
最終結果如圖:
開源信息
- 騰訊NOW直播前端工程化解決方案feflow正式開源啦~: https://github.com/cpselvis/feflow-cli