nodejs操作消息隊列RabbitMQ


一. 什么是消息隊列

消息隊列(Message Queue,簡稱MQ),從字面意思上看,本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是message而已。
其主要用途:不同進程Process/線程Thread之間通信。

為什么會產生消息隊列?有幾個原因:

不同進程(process)之間傳遞消息時,兩個進程之間耦合程度過高,改動一個進程,引發必須修改另一個進程,為了隔離這兩個進程,在兩進程間抽離出一層(一個模塊),所有兩進程之間傳遞的消息,都必須通過消息隊列來傳遞,單獨修改某一個進程,不會影響另一個;

不同進程(process)之間傳遞消息時,為了實現標准化,將消息的格式規范化了,並且,某一個進程接受的消息太多,一下子無法處理完,並且也有先后順序,必須對收到的消息進行排隊,因此誕生了事實上的消息隊列;

二. 常用的消息隊列

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq

三. 使用場景

異步處理

應用解耦

流量削峰

四 使用amqplib操作RabbitMQ

安裝 amqplib

npm install amqplib
生產者:

let amqp = require('amqplib');

class RabbitMQ {
constructor() {
this.hosts = [];
this.index = 0;
this.length = this.hosts.length;
this.open = amqp.connect(this.hosts[this.index]);
}
sendQueueMsg(queueName, msg, errCallBack) {
let self = this;

self.open
.then(function (conn) {
return conn.createChannel();
})
.then(function (channel) {
return channel.assertQueue(queueName).then(function (ok) {
return channel.sendToQueue(queueName, new Buffer(msg), {
persistent: true
});
})
.then(function (data) {
if (data) {
errCallBack && errCallBack("success");
channel.close();
}
})
.catch(function () {
setTimeout(() => {
if (channel) {
channel.close();
}
}, 500)
});
})
.catch(function () {
let num = self.index++;

if (num <= self.length - 1) {
self.open = amqp.connect(self.hosts[num]);
} else {
self.index == 0;
}
});
}
}

let mq = new RabbitMQ();
mq.sendQueueMsg('testQueue', '123', (error) => {
console.log(error)
})
消費者

let amqp = require('amqplib');

class RabbitMQ {
constructor() {
this.hosts = [];
this.index = 0;
this.length = this.hosts.length;
this.open = amqp.connect(this.hosts[this.index]);
}

receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
let self = this;

self.open
.then(function (conn) {
return conn.createChannel();
})
.then(function (channel) {
return channel.assertQueue(queueName)
.then(function (ok) {
return channel.consume(queueName, function (msg) {
if (msg !== null) {
let data = msg.content.toString();
channel.ack(msg);
receiveCallBack && receiveCallBack(data);
}
})
.finally(function () {
setTimeout(() => {
if (channel) {
channel.close();
}
}, 500)
});
})
})
.catch(function () {
let num = self.index++;
if (num <= self.length - 1) {
self.open = amqp.connect(self.hosts[num]);
} else {
self.index = 0;
self.open = amqp.connect(self.hosts[0]);
}
});
}
}

let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) =>
{
console.log(msg)//123
})
打開mq后台 http://127.0.0.1:15672/ 看到新增隊列,接受一條消息

 

 

當運行消費者代碼時輸入 123,消息隊列消息為0

 

 


---------------------


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM