在上一篇文章在Node.js中使用RabbitMQ系列一 Hello world我有使用一個任務隊列,不過當時的場景是將消息發送給一個消費者,本篇文章我將討論有多個消費者的場景。
其實,任務隊列最核心解決的問題是避免立即處理那些耗時的任務,也就是避免請求-響應的這種同步模式。取而代之的是我們通過調度算法,讓這些耗時的任務之后再執行,也就是采用異步的模式。我們需要將一條消息封裝成一個任務,並且將它添加到任務隊列里面。后台會運行多個工作進程(worker process),通過調度算法,將隊列里的任務依次彈出來,並交給其中的一個工作進程進行處理執行。這個概念尤其適合那些HTTP短連接的web應用,它們無法在短時間內處理這種復雜的任務。
准備工作
我們將字符串類型的消息看作是耗時的任務,並且每個字符串消息最后帶上一些點。每個點代表該任務需要消耗的秒數。在worker進程處理的時候可以采用setTimeout函數來進行模擬。舉個例子:一個偽造的耗時任務是Hello.,則這個任務會消耗1秒,一個偽造的耗時任務是Hello..,則這個任務會消耗2秒,一個偽造的耗時任務是Hello... ,並且這個任務的處理時間會耗費3秒。
這里稍微對上篇文章的send.js文件修改,讓它可以發送用戶自定義的任意的消息,這個程序會將任務交給任務隊列,我們用new_task.js來進行命名:
var q = 'task_queue';
var msg = process.argv.slice(2).join(' ') || "Hello World!";
ch.assertQueue(q, {durable: true});
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
console.log(" [x] Sent '%s'", msg);
我們的receive.js文件也要進行修改,它需要偽造任務的處理時間,讓字符串類型的任務看起來需要幾秒鍾時間(具體取決於 . 的個數)。
ch.consume(q, function(msg) {
var secs = msg.content.toString().split('.').length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
}, secs * 1000);
}, {noAck: true});
這里以一個worker工作進程為例子,我們先在左側的shell窗口開啟一個worker進程,之后在右側shell窗口發送自定義的消息,執行看看效果:
輪詢調度算法(Round-robin)
使用任務隊列的一個優勢是能夠將任務並行執行,如果需要處理大量的積壓任務,我們只需要像上面運行worker進程的方式,增加更多的worker,這個讓可伸縮變得更加的容易。
首先,我們使用item2開啟2個shell窗口,並在里面運行兩個worker進程,但是到底是哪個worker會對消息進行處理呢?這里我們可以做個簡單的實驗來看看,如下圖所示,左側是兩個worker進程,右側是消息發送端:
默認情況下,RabbitMQ會采用Round-robin算法來分發任務隊列中的任務,每次分發的時候都會將任務派發給下一個消費者,這樣每個消費者(worker進程)處理的任務數量其實是一樣多的。
消息確認
處理一個復雜的任務需要耗費很長時間,這個時間段里面,可能我們的worker進程由於某種原因掛掉了,這種異常情況是需要考慮的。但是我們現有的代碼里面並沒有做這種異常的處理,當RabbitMQ將任務派發給worker進程之后,我們立即將這個任務從內存中剔除掉了,設想下,假設worker收到消息之后,我們馬上將進程殺死掉,這個時候任務並沒有被成功執行的。同時,我們也會丟失所有派發到這個worker進程但是還沒有被處理的任務信息。
但是,我們並不想丟掉任何一個任務,如果一個worker進程掛掉,我們更希望能夠將這個任務派發給其它的worker來處理。
為了避免任務信息丟失的情況,RabbitMQ支持消息確認。在一個任務發送到了worker進程並且被成功處理完畢之后,一個ack (消息確認)的標識會從消費者發回來告訴RabbitMQ這個任務已經被處理完了,可以將它刪除了。
如果一個消費者掛掉了(常見的原因如消息通道關閉了,連接丟失,TCP連接丟失),沒有向RabbitMQ發送消息確認這個ack的標識,這個時候RabbitMQ會將它從新加入到隊列中,如果有其它消費者存在,那么RabbitMQ會馬上將這個任務重新派發下去。之前的例子里面我們並沒有開啟消息確認這個選項,現在我們可以通過{noAck: false}來開啟:
ch.consume(q, function(msg) {
var secs = msg.content.toString().split('.').length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
ch.ack(msg);
}, secs * 1000);
}, {noAck: false}); // 開啟消息確認標識
可以用CTRL + C來做個實驗看看效果。
消息持久化
剛剛談到,如果一個worker進程掛掉了,不讓消息丟失的做法。但是,如果整個RabbitMQ的服務器掛掉了呢?當一個RabbitMQ服務退出或者中斷的情況下,它會忘記任務隊列里面的消息除非你告訴它不要丟掉,即我們通知RabbitMQ任務隊列和這些任務都是需要持久化的。
首先,我們需要確保RabbitMQ永遠不會丟失掉我們的任務隊列。
ch.assertQueue('hello', {durable: true});
但是,你會發現這樣並沒有效果,那是因為hello這個隊列我們已經定義過,並且指定了它不需要持久化。RabbitMQ不允許我們通過改變參數配置的方式對已經存在的任務隊列進行重新定義,因此我們需要定義一個新的任務隊列。
ch.assertQueue('task_queue', {durable: true});
這行代碼需要同時在生產者和消費者里面的相關代碼的地方進行修改。
接下來,我們需要通過配置persistent 選項讓我們發送的消息也是持久化的。
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
公平調度
前面的例子,我們討論了RabbitMQ的調度方式,即采用Round-robin輪詢調度算法,因此它會將消息均勻的分配給每個worker進程。RabbitMQ並不會關注每個worker進程有多少個消息沒有確認,它只會不斷的給你派發任務,不管你能不能處理的過來。這個時候,問題就出現了,設想下,假設有2個worker,其中1個worker剛好很不幸被分配了一個非常復雜的任務,可能需要耗費好幾個小時的時間,另外一個worker被分配的任務都比較簡單,只需要幾分鍾就能處理完,由於RabbitMQ的任務分配問題,有很多新的任務依然會分配到那個正在處理很耗時任務的worker上面,這個worker后面的任務都會處於等待狀態。幸好,RabbitMQ可以通過prefetch(1)來指定某個worker同時最多只會派發到1個任務,一旦任務處理完成發送了確認通知,才會有新的任務派發過來。
ch.prefetch(1);
最終的代碼
new_task.js 代碼:
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'task_queue';
var msg = process.argv.slice(2).join(' ') || "Hello World!";
ch.assertQueue(q, {durable: true});
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
console.log(" [x] Sent '%s'", msg);
});
setTimeout(function() { conn.close(); process.exit(0) }, 500);
});
worker.js:
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'task_queue';
ch.assertQueue(q, {durable: true});
ch.prefetch(1);
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
ch.consume(q, function(msg) {
var secs = msg.content.toString().split('.').length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
ch.ack(msg);
}, secs * 1000);
}, {noAck: false});
});
});